diff --git a/tablestore/FleetDBTableSpec.go b/tablestore/FleetDBTableSpec.go new file mode 100644 index 0000000..6497ef8 --- /dev/null +++ b/tablestore/FleetDBTableSpec.go @@ -0,0 +1,16 @@ +package tablestore + +import ( + "fmt" +) +type FleetDBTableSpec struct { + columnSpecs map[string]*FleetDbColumnSpec +} + +func (s *FleetDBTableSpec) GetColumnSpec(column string) *FleetDbColumnSpec { + if column == ""{ + fmt.Println("Table name is Nil") + return nil + } + return s.columnSpecs[column] +} diff --git a/tablestore/FleetDBValue.go b/tablestore/FleetDBValue.go new file mode 100644 index 0000000..00bc596 --- /dev/null +++ b/tablestore/FleetDBValue.go @@ -0,0 +1,232 @@ +package tablestore + +import ( + "io" + "encoding/binary" + "fmt" + "bytes" + "io/ioutil" +) +type FleetDBValue interface{ + Serialize() []byte // this translates to []bytes + String() string + getType() FleetDBType +} + +type FleetDBType uint8 +const ( + Int FleetDBType = iota + 1 + BigInt + Float + Double + Text + Boolean +) + +type ComparatorType uint8 +const ( + LESSTHAN = iota + 1 + LESSTHANEQUAL + GREATERTHAN + GREATERTHANEQUAL + BETWEEN + EQUAL +) + +type ColKeyType uint8 +const ( + PRIMARY ColKeyType = iota + 1 + CLUSTERING + COLUMN +) + +func (f FleetDBType) getVal() uint8{ + return uint8(f) +} + +type IntValue struct{ + val int32 +} + +type FloatValue struct{ + val float32 +} + +type BigIntValue struct{ + val int64 +} + +type DoubleValue struct{ + val float64 +} + +type TextValue struct{ + val string +} + +type BooleanValue struct{ + val bool +} + +func (i IntValue) String() string { + return fmt.Sprintf("Integer Value = %v", i.val) +} + +func (i IntValue) Serialize() []byte{ + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, i.val) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func NewIntValue(reader io.Reader) *IntValue{ + var i IntValue + err := binary.Read(reader, binary.LittleEndian, &(i.val)) + if err != nil { + fmt.Println("binary.Read failed:", err) + } + return &i +} + +func (i IntValue) getType() FleetDBType{ + return Int; +} + +func (f FloatValue) String() string{ + return fmt.Sprintf("FloatValue = %v", f.val) +} + +func (f FloatValue) Serialize() []byte{ + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, f.val) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func NewFloatValue(reader io.Reader) *FloatValue{ + var f FloatValue + err := binary.Read(reader, binary.LittleEndian, &(f.val)) + if err != nil { + fmt.Println("binary.Read failed:", err) + } + return &f +} + +func (f FloatValue) getType() FleetDBType{ + return Float +} + +func (l BigIntValue) String() string { + return fmt.Sprintf("BigIntValue Value = %v", l.val) +} + +func (l BigIntValue) Serialize() []byte{ + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, l.val) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func NewBigIntValue(reader io.Reader) *BigIntValue{ + var l BigIntValue + err := binary.Read(reader, binary.LittleEndian, &(l.val)) + if err != nil { + fmt.Println("binary.Read failed:", err) + } + return &l +} + +func (l BigIntValue) getType() FleetDBType{ + return BigInt +} + +func (d DoubleValue) String() string{ + return fmt.Sprintf("DoubleValue = %v", d.val) +} + +func (d DoubleValue) Serialize() []byte{ + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, d.val) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func NewDoubleValue(reader io.Reader) *DoubleValue{ + var d DoubleValue + err := binary.Read(reader, binary.LittleEndian, &(d.val)) + if err != nil { + fmt.Println("binary.Read failed:", err) + } + return &d +} + +func (d DoubleValue) getType() FleetDBType{ + return Double +} + +func (s TextValue) String() string{ + return fmt.Sprintf(s.val) +} + +func (s TextValue) Serialize() []byte{ + return []byte(s.val) +} + +func NewTextValue(reader io.Reader) *TextValue{ + b , err := ioutil.ReadAll(reader) + if err != nil { + fmt.Println("readAll failed:", err) + } + ans := TextValue{string(b)} + return &ans +} + +func NewTextValueFromNullTerminatedStream(r io.Reader) *TextValue { + p := []byte{} + one := make([]byte, 1) + _ , err := r.Read(one) + for err != io.EOF && string(one) != "\000" { + p = append(p,one...) + _ , err = r.Read(one) + } + ans := TextValue{string(p)} + return &ans +} + +func (s TextValue) getType() FleetDBType{ + return Text +} + +func (b BooleanValue) String() string{ + return fmt.Sprintf("BooleanValue = %v", b.val) +} + +func (b BooleanValue) Serialize() []byte{ + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, b.val) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func NewBooleanValue(reader io.Reader) *BooleanValue{ + var b BooleanValue + err := binary.Read(reader, binary.LittleEndian, &(b.val)) + if err != nil { + fmt.Println("binary.Read failed:", err) + } + return &b +} + +func (b BooleanValue) getType() FleetDBType{ + return Boolean +} \ No newline at end of file diff --git a/tablestore/database.go b/tablestore/database.go new file mode 100644 index 0000000..3e464ae --- /dev/null +++ b/tablestore/database.go @@ -0,0 +1,234 @@ +package tablestore + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" + "github.com/google/uuid" + "sync" + "fmt" + "io" + "bytes" +) + +// StateMachine interface provides execution of command against database +// the implementation should be thread safe +type Store interface { + Save(tableid uuid.UUID, items []KVItem) error + GetPartition(tableid uuid.UUID, partitionKeys []FleetDBValue, clusteringKyes []FleetDBValue, clusteringComparators []ComparatorType, columns []string, tableSpec FleetDBTableSpec) ([]Row, error) +} + +// database maintains the key-value datastore +type database struct { + lock *sync.RWMutex + leveldbs map[uuid.UUID] *leveldb.DB // map of leveldb shards +} + +// NewStore get the instance of LevelDB Wrapper +func NewStore() Store { + db := new(database) + db.lock = new(sync.RWMutex) + db.leveldbs = make(map[uuid.UUID] *leveldb.DB) + return db +} + +func (db *database) getStore(tableid uuid.UUID) *leveldb.DB{ + lvldb := db.leveldbs[tableid] + dir := "/tmp/lvldb" + if lvldb == nil { + lvlDBName := dir + "/" + tableid.String() + lvldb, _ = leveldb.OpenFile(lvlDBName,nil) + db.leveldbs[tableid] = lvldb; + } + return lvldb + +} + +func (db *database) Save(tableid uuid.UUID, items []KVItem) error { + storedb := db.getStore(tableid) + var err error + batch := new(leveldb.Batch) + for _, item := range items{ + if(item.Value == nil){ + fmt.Printf("Deleting Key %s", item.Key) + batch.Delete(item.Key) + }else{ + batch.Put(item.Key,item.Value) + } + } + err = storedb.Write(batch, nil) + if err != nil { + return err; + } + return nil +} + + +func buildKey(keyType uint8, key FleetDBValue) []byte{ + searchKey := make([]byte,0) + searchKey = append(searchKey, keyType) + coltype := key.getType() + searchKey = append(searchKey, coltype.getVal()) + if coltype == Text{ + searchKey = append(searchKey,[]byte(key.String()+ nullchar)...) + }else{ + searchKey = append(searchKey,key.Serialize()...) + } + return searchKey +} + +func (db *database) GetPartition(tableid uuid.UUID, partitionKeys []FleetDBValue , clusteringKyes []FleetDBValue, clusteringComparators []ComparatorType, columns []string, tableSpec FleetDBTableSpec) ([]Row, error) { + var rows []Row + var err error + storedb := db.getStore(tableid) + oldCKeyvalues := []FleetDBValue{} + oldColvalues := []FleetDBValue{} + seqMap := make(map[uint8]string) + for colName , columnSpec := range tableSpec.columnSpecs{ + seqMap[columnSpec.seqNo] = colName + } + + valMap := make(map[string]FleetDBValue) + oldValMap := make(map[string]FleetDBValue) + searchKey := make([]byte,0) + for _ , key := range partitionKeys{ + searchKey = append(searchKey,buildKey(PartitionType, key)...) + } + index := 0 + for index < len(clusteringComparators) && clusteringComparators[index] == EQUAL{ + searchKey = append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + index++ + } + isAtFirst := false + iter := storedb.NewIterator(util.BytesPrefix(searchKey), nil) + if(index < len(clusteringComparators)){ + switch(clusteringComparators[index]){ + case GREATERTHANEQUAL: + lowerLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + iter = storedb.NewIterator(nil, nil) + iter.Seek(lowerLimit) + isAtFirst = true + case GREATERTHAN: + lowerLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + iter = storedb.NewIterator(nil, nil) + iter.Seek(lowerLimit) + isAtFirst = true + for (isAtFirst || iter.Next()) && bytes.HasPrefix(iter.Key(), lowerLimit){ + isAtFirst = false + } + isAtFirst = true + case LESSTHAN: + lowerLimit := append(searchKey, ClusteringType) + lowerLimit = append(lowerLimit, clusteringKyes[index].getType().getVal()) + upperLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + iter = storedb.NewIterator(&util.Range{Start: lowerLimit, Limit: upperLimit}, nil) + //fmt.Printf("LessThanEqualLower: %s LessThanEqualUpper: %s ", NewTextValue(bytes.NewReader(lowerLimit)).val, NewTextValue(bytes.NewReader(upperLimit)).val) + + case LESSTHANEQUAL: + lowerLimit := append(searchKey, ClusteringType) + lowerLimit = append(lowerLimit, clusteringKyes[index].getType().getVal()) + upperLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + iterTemp := storedb.NewIterator(nil, nil) + iterTemp.Seek(upperLimit) + isAtFirst = true + for (isAtFirst || iterTemp.Next()) && bytes.HasPrefix(iterTemp.Key(), upperLimit){ + isAtFirst = false + } + iter = storedb.NewIterator(&util.Range{Start: lowerLimit, Limit: iterTemp.Key()}, nil) + isAtFirst = false + + case BETWEEN: + lowerLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + index++ + upperLimit := append(searchKey,buildKey(ClusteringType, clusteringKyes[index])...) + iter = storedb.NewIterator(&util.Range{Start: lowerLimit , Limit: upperLimit}, nil) + } + } + i := 0 + for isAtFirst || iter.Next() { + isAtFirst = false + if iter.Key() == nil{ + break + } + cKeyvalues := []FleetDBValue{} + colvalues := []FleetDBValue{} + //fmt.Printf("key: %s | value: %s\n", item.Key, item.Value) + r := bytes.NewReader(iter.Key()) + p := make([]byte, 1) + _ , err := r.Read(p) + j := uint8(1) + for err != io.EOF{ + var val FleetDBValue + keyType := ColKeyType(p[0]) + r.Read(p) + dataType := FleetDBType(p[0]) + switch dataType { + case Int: + val = NewIntValue(r) + case BigInt: + val = NewBigIntValue(r) + case Float: + val = NewFloatValue(r) + case Double: + val = NewDoubleValue(r) + case Text: + val = NewTextValueFromNullTerminatedStream(r) + case Boolean: + val = NewBooleanValue(r) + } + if(keyType == PRIMARY){ + valMap[seqMap[j]] = val + } + if(keyType == CLUSTERING){ + valMap[seqMap[j]] = val + cKeyvalues = append(cKeyvalues, val) + }else if(keyType == COLUMN) { + colvalues = append(colvalues,NewTextValue(bytes.NewReader(iter.Value()))) + valMap[val.String()] = NewTextValue(bytes.NewReader(iter.Value())) + } + _ , err = r.Read(p) + j += 1 + } + isEq := equal(oldCKeyvalues, cKeyvalues) + if i != 0 && (!isEq){ + myrow := createRowFromMap(oldValMap, columns) + rows = append(rows,myrow) + oldColvalues = colvalues + oldValMap = valMap + + }else{ + oldColvalues = append(oldColvalues,colvalues...) + for k,v := range valMap { + oldValMap[k] = v + } + } + oldCKeyvalues = cKeyvalues + valMap = make(map[string]FleetDBValue) + //oldValMap = valMap + //valMap := make(map[string]FleetDBValue) + //oldValMap := make(map[string]FleetDBValue) + i++ + } + myrow := createRowFromMap(oldValMap, columns) + rows = append(rows,myrow) + return rows,err +} + +func createRowFromMap(valMap map[string]FleetDBValue, columns []string) Row{ + row := []FleetDBValue{} + for _ , colName := range columns{ + row = append(row, valMap[colName]) + } + return Row{row} +} + +func equal(val1 []FleetDBValue, val2 []FleetDBValue) bool{ + if(len(val1) != len(val2)){ + return false + } + for i, _ := range val1{ + if val1[i].String() != val2[i].String(){ + return false + } + } + return true +} \ No newline at end of file diff --git a/tablestore/db_test.go b/tablestore/db_test.go new file mode 100644 index 0000000..07a4831 --- /dev/null +++ b/tablestore/db_test.go @@ -0,0 +1,415 @@ +package tablestore + +import ( + "testing" + "github.com/google/uuid" + // "fmt" + "github.com/stretchr/testify/assert" +) + +func createSampleDB()(uuid.UUID, []FleetDBValue, []string, FleetDBTableSpec, Store){ + db := NewStore() + tableid := uuid.New() + tableMap := make(map[string][]FleetDbColumnSpec) + tableName := "crossfit_gyms" + var myText TextValue + + //Test With One Partition Key and One Clustering Key + myschema := make([]FleetDbColumnSpec, 4) + myschema[0] = FleetDbColumnSpec{"country_code", myText, true, false, 1} + myschema[1] = FleetDbColumnSpec{"state_province", myText, false, true, 2} + myschema[2] = FleetDbColumnSpec{"city", myText, false, false, 3} + myschema[3] = FleetDbColumnSpec{"gym_name", myText, false, false, 4} + tableMap[tableName] = myschema + + insertCommand1 := "INSERT INTO crossfit_gyms (country_code, state_province, city, gym_name) VALUES ('US', ‘NY’, ‘Buffalo’, 'University Avenue');"; + rowData, myTableName := decodeInsertCommand(insertCommand1) + res := TranslateToKV(tableMap[myTableName], rowData) + data := TranslateToKV(tableMap[tableName], []FleetDBValue{TextValue{"US"}, TextValue{"Ohio"},TextValue{"Columbus"},TextValue{"Englewood Avenue"}}) + res = append(res,data...) + db.Save(tableid, res) + + columnSpecs := make(map[string]*FleetDbColumnSpec) + columnSpecs["country_code"] = &myschema[0] + columnSpecs["state_province"] = &myschema[1] + columnSpecs["city"] = &myschema[2] + columnSpecs["gym_name"] = &myschema[3] + tableSpec := FleetDBTableSpec{columnSpecs} + partitionKeys := []FleetDBValue{TextValue{"US"}} + columns := []string{"country_code","state_province", "city", "gym_name"} + return tableid,partitionKeys,columns,tableSpec,db +} + +func TestBETWEEN(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + //fmt.Println("Testing BETWEEN") + expectedVal := [][]string{{"US", "NY", "Buffalo", "University Avenue"}} + clusteringKyes := []FleetDBValue {TextValue{"ABC"}, TextValue{"OA"}} + clusteringComparators := []ComparatorType{BETWEEN} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestGREATERTHANEQUAL(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + //fmt.Println("Testing GREATERTHANEQUAL 1") + expectedVal := [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringKyes := []FleetDBValue {TextValue{"NY"}} + clusteringComparators := []ComparatorType{GREATERTHANEQUAL} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + //fmt.Println("Testing GREATERTHANEQUAL 2") + expectedVal = [][]string{{"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringKyes = []FleetDBValue {TextValue{"NYA"}} + clusteringComparators = []ComparatorType{GREATERTHANEQUAL} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 1) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestGREATERTHAN(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + //fmt.Println("Testing GREATERTHAN 1") + expectedVal := [][]string{{"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringKyes := []FleetDBValue {TextValue{"NY"}} + clusteringComparators := []ComparatorType{GREATERTHAN} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + assert.Equal(t, len(rows), 1) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + //fmt.Println("Testing GREATERTHAN 2") + expectedVal = [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringKyes = []FleetDBValue {TextValue{"ABC"}} + clusteringComparators = []ComparatorType{GREATERTHAN} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestLESSERTHANEQUAL(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + //fmt.Println("Testing LESSERTHANEQUAL 1") + clusteringKyes := []FleetDBValue {TextValue{"Ohio"}} + expectedVal := [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringComparators := []ComparatorType{LESSTHANEQUAL} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + //fmt.Println(len(rows)) + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + //fmt.Println("Testing LESSERTHANEQUAL 2") + clusteringKyes = []FleetDBValue {TextValue{"NY"}} + expectedVal = [][]string{{"US", "NY", "Buffalo", "University Avenue"}} + clusteringComparators = []ComparatorType{LESSTHANEQUAL} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 1) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestLESSERTHAN(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + //fmt.Println("Testing LESSERTHAN 1") + clusteringKyes := []FleetDBValue {TextValue{"Ohio"}} + expectedVal := [][]string{{"US", "NY", "Buffalo", "University Avenue"}} + clusteringComparators := []ComparatorType{LESSTHAN} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + assert.Equal(t, len(rows), 1) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + //fmt.Println("Testing LESSERTHAN 2") + clusteringKyes = []FleetDBValue {TextValue{"ZAB"}} + expectedVal = [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + clusteringComparators = []ComparatorType{LESSTHAN} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestCreateRowFromMap(t *testing.T) { + columns := []string{"country_code","state_province", "city", "gym_name"} + valMap := make(map[string]FleetDBValue) + valMap["country_code"] = TextValue{"US"} + valMap["state_province"] = TextValue{"NY"} + valMap["city"] = TextValue{"Buffalo"} + valMap["gym_name"] = TextValue{"University Avenue"} + + //All rows required + row := createRowFromMap(valMap, columns) + assert.Equal(t, row.Values[0], TextValue{"US"}) + assert.Equal(t, row.Values[1], TextValue{"NY"}) + assert.Equal(t, row.Values[2], TextValue{"Buffalo"}) + assert.Equal(t, row.Values[3], TextValue{"University Avenue"}) + + //Only 2 row required + columns = []string{"country_code", "gym_name"} + row = createRowFromMap(valMap, columns) + assert.Equal(t, row.Values[0], TextValue{"US"}) + assert.Equal(t, row.Values[1], TextValue{"University Avenue"}) + + //when column value is not present in database + columns = []string{"country_code", "street_name"} + row = createRowFromMap(valMap, columns) + assert.Equal(t, row.Values[0], TextValue{"US"}) + assert.Equal(t, row.Values[1], nil) + +} + +func TestGetPartition(t *testing.T) { + tableid,partitionKeys,columns,tableSpec,db := createSampleDB() + + //Retrieving only 2 columns + columns = []string{"state_province", "city"} + clusteringKyes := []FleetDBValue {} + clusteringComparators := []ComparatorType{} + expectedVal := [][]string{{"NY", "Buffalo"}, {"Ohio", "Columbus"}} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + + //Retrieving All columns + columns = []string{"country_code", "state_province", "city", "gym_name"} + clusteringKyes = []FleetDBValue {} + clusteringComparators = []ComparatorType{} + expectedVal = [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } +} + +func TestSave(t *testing.T){ + + //Test Insert + db := NewStore() + tableid := uuid.New() + tableMap := make(map[string][]FleetDbColumnSpec) + tableName := "crossfit_gyms" + var myText TextValue + + myschema := make([]FleetDbColumnSpec, 4) + myschema[0] = FleetDbColumnSpec{"country_code", myText, true, false, 1} + myschema[1] = FleetDbColumnSpec{"state_province", myText, false, true, 2} + myschema[2] = FleetDbColumnSpec{"city", myText, false, false, 3} + myschema[3] = FleetDbColumnSpec{"gym_name", myText, false, false, 4} + tableMap[tableName] = myschema + + insertCommand1 := "INSERT INTO crossfit_gyms (country_code, state_province, city, gym_name) VALUES ('US', ‘NY’, ‘Buffalo’, 'University Avenue');"; + rowData, myTableName := decodeInsertCommand(insertCommand1) + res := TranslateToKV(tableMap[myTableName], rowData) + data := TranslateToKV(tableMap[tableName], []FleetDBValue{TextValue{"US"}, TextValue{"Ohio"},TextValue{"Columbus"},TextValue{"Englewood Avenue"}}) + res = append(res,data...) + db.Save(tableid, res) + + + columnSpecs := make(map[string]*FleetDbColumnSpec) + columnSpecs["country_code"] = &myschema[0] + columnSpecs["state_province"] = &myschema[1] + columnSpecs["city"] = &myschema[2] + columnSpecs["gym_name"] = &myschema[3] + tableSpec := FleetDBTableSpec{columnSpecs} + partitionKeys := []FleetDBValue{TextValue{"US"}} + columns := []string{"country_code", "state_province", "city", "gym_name"} + clusteringKyes := []FleetDBValue {} + clusteringComparators := []ComparatorType{} + expectedVal := [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood Avenue"}} + rows, _ := db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum := 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + + // Test Update + + data = TranslateToKV(tableMap[tableName], []FleetDBValue{TextValue{"US"}, TextValue{"Ohio"},TextValue{"Columbus"},TextValue{"Englewood New Avenue"}}) + db.Save(tableid, data) + + expectedVal = [][]string{{"US", "NY", "Buffalo", "University Avenue"}, {"US", "Ohio", "Columbus", "Englewood New Avenue"}} + rows, _ = db.GetPartition(tableid, partitionKeys, clusteringKyes, clusteringComparators, columns, tableSpec) + rowNum = 0 + assert.Equal(t, len(rows), 2) + for _, row := range rows{ + //ch := "" + index := 0 + for _, val := range row.Values{ + assert.Equal(t, val.String(), expectedVal[rowNum][index]) + if(val != nil){ + //fmt.Printf(ch + val.String()) + } + //ch = " || " + index += 1 + } + //fmt.Println() + rowNum++ + } + +} + diff --git a/tablestore/fleetDBValue_test.go b/tablestore/fleetDBValue_test.go new file mode 100644 index 0000000..fe5087c --- /dev/null +++ b/tablestore/fleetDBValue_test.go @@ -0,0 +1,52 @@ +package tablestore + +import ( + "testing" + // "fmt" + "bytes" + "github.com/stretchr/testify/assert" +) + +func TestSerialize(t *testing.T) { + myInt := IntValue{200000} + byteSlice := myInt.Serialize() + //fmt.Println(byteSlice) + reader := bytes.NewReader(byteSlice) + assert.Equal(t, int32(200000), NewIntValue(reader).val) + + myFloat := FloatValue{3.14444} + byteSlice = myFloat.Serialize() + //fmt.Println(byteSlice) + reader = bytes.NewReader(byteSlice) + assert.Equal(t, float32(3.14444), NewFloatValue(reader).val) + + myLong := BigIntValue{20000000000} + byteSlice = myLong.Serialize() + //fmt.Println(byteSlice) + reader = bytes.NewReader(byteSlice) + assert.Equal(t, int64(20000000000), NewBigIntValue(reader).val) + + myDouble := DoubleValue{3.14444454543546} + byteSlice = myDouble.Serialize() + //fmt.Println(byteSlice) + reader = bytes.NewReader(byteSlice) + assert.Equal(t, float64(3.14444454543546), NewDoubleValue(reader).val) + + myString := TextValue{"Hello World"} + byteSlice = myString.Serialize() + reader = bytes.NewReader(byteSlice) + assert.Equal(t, "Hello World", NewTextValue(reader).val) + + myNullTerminatedString := TextValue{"Hello World\000"} + byteSlice = myNullTerminatedString.Serialize() + reader = bytes.NewReader(byteSlice) + //fmt.Println(NewTextValueFromNullTerminatedStream(reader).val) + assert.Equal(t, "Hello World", NewTextValueFromNullTerminatedStream(reader).val) + + myBoolean := BooleanValue{true} + byteSlice = myBoolean.Serialize() + //fmt.Println(byteSlice) + reader = bytes.NewReader(byteSlice) + assert.Equal(t, true, NewBooleanValue(reader).val) +} + diff --git a/tablestore/kv_item.go b/tablestore/kv_item.go new file mode 100644 index 0000000..ca6eb40 --- /dev/null +++ b/tablestore/kv_item.go @@ -0,0 +1,9 @@ +package tablestore + +import ( + +) +type KVItem struct { + Key []byte + Value []byte +} \ No newline at end of file diff --git a/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/000001.log b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/000001.log new file mode 100644 index 0000000..0408b76 Binary files /dev/null and b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/000001.log differ diff --git a/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/CURRENT b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/CURRENT new file mode 100644 index 0000000..feda7d6 --- /dev/null +++ b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/CURRENT @@ -0,0 +1 @@ +MANIFEST-000000 diff --git a/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/LOCK b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/LOCK new file mode 100644 index 0000000..e69de29 diff --git a/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/LOG b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/LOG new file mode 100644 index 0000000..4a64a0a --- /dev/null +++ b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/LOG @@ -0,0 +1,6 @@ +=============== Oct 12, 2019 (EDT) =============== +15:30:07.296659 log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed +15:30:07.320460 db@open opening +15:30:07.323607 version@stat F·[] S·0B[] Sc·[] +15:30:07.338370 db@janitor F·2 G·0 +15:30:07.339100 db@open done T·17.301754ms diff --git a/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/MANIFEST-000000 b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/MANIFEST-000000 new file mode 100644 index 0000000..9d54f67 Binary files /dev/null and b/tablestore/myDB/25431cf8-3e99-460e-be81-f784a1400873/MANIFEST-000000 differ diff --git a/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/000001.log b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/000001.log new file mode 100644 index 0000000..0408b76 Binary files /dev/null and b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/000001.log differ diff --git a/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/CURRENT b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/CURRENT new file mode 100644 index 0000000..feda7d6 --- /dev/null +++ b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/CURRENT @@ -0,0 +1 @@ +MANIFEST-000000 diff --git a/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/LOCK b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/LOCK new file mode 100644 index 0000000..e69de29 diff --git a/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/LOG b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/LOG new file mode 100644 index 0000000..ea688b5 --- /dev/null +++ b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/LOG @@ -0,0 +1,6 @@ +=============== Oct 12, 2019 (EDT) =============== +15:24:06.380136 log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed +15:24:06.406744 db@open opening +15:24:06.409675 version@stat F·[] S·0B[] Sc·[] +15:24:06.426362 db@janitor F·2 G·0 +15:24:06.427099 db@open done T·19.337227ms diff --git a/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/MANIFEST-000000 b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/MANIFEST-000000 new file mode 100644 index 0000000..9d54f67 Binary files /dev/null and b/tablestore/myDB/ad50d105-cdec-44f5-9eba-e2537944cd26/MANIFEST-000000 differ diff --git a/tablestore/row.go b/tablestore/row.go new file mode 100644 index 0000000..458b184 --- /dev/null +++ b/tablestore/row.go @@ -0,0 +1,9 @@ +package tablestore + +import ( + +) +type Row struct { + Values []FleetDBValue +} + diff --git a/tablestore/table_schema.go b/tablestore/table_schema.go new file mode 100644 index 0000000..9a50e71 --- /dev/null +++ b/tablestore/table_schema.go @@ -0,0 +1,13 @@ +package tablestore + +import ( + +) + +type FleetDbColumnSpec struct { + colname string + coltype FleetDBValue + isPartition bool + isClustering bool + seqNo uint8 +} diff --git a/tablestore/translatetoKV_test.go b/tablestore/translatetoKV_test.go new file mode 100644 index 0000000..3a44376 --- /dev/null +++ b/tablestore/translatetoKV_test.go @@ -0,0 +1,116 @@ +package tablestore + +import ( + "testing" + "github.com/stretchr/testify/assert" + //"fmt" +) + +func TestTranslateToKV(t *testing.T) { + tableMap := make(map[string][]FleetDbColumnSpec) + tableName := "crossfit_gyms" + + var myText TextValue + + + //Test With One Partition Key and One Clustering Key + myschema := make([]FleetDbColumnSpec, 4) + myschema[0] = FleetDbColumnSpec{"country_code", myText, true, false, 1} + myschema[1] = FleetDbColumnSpec{"state_province", myText, false, true, 2} + myschema[2] = FleetDbColumnSpec{"city", myText, false, false, 3} + myschema[3] = FleetDbColumnSpec{"gym_name", myText, false, false, 4} + tableMap[tableName] = myschema + insertCommand := "INSERT INTO crossfit_gyms (country_code, state_province, city, gym_name) VALUES ('US', ‘NY’, ‘Buffalo’, 'University Avenue');"; + rowData, myTableName := decodeInsertCommand(insertCommand) + res := TranslateToKV(tableMap[myTableName], rowData) + + expectedByteKey := [][][]byte{{[]byte{1},[]byte{5},[]byte("US\000"), []byte{2},[]byte{5},[]byte("NY\000"), []byte{3},[]byte{5},[]byte("city\000")}, + {[]byte{1},[]byte{5},[]byte("US\000"), []byte{2},[]byte{5},[]byte("NY\000"), []byte{3},[]byte{5},[]byte("gym_name\000")}} + + expectedSingeDKey := [][]byte{} + for i, _:= range expectedByteKey{ + oneKey := make([]byte,0) + for j, _:= range expectedByteKey[i]{ + oneKey = append(oneKey, expectedByteKey[i][j]...) + } + expectedSingeDKey = append(expectedSingeDKey, oneKey) + } + expectedVal := []string{"Buffalo","University Avenue"} + for i ,_ := range res { + assert.Equal(t, expectedSingeDKey[i], res[i].Key) + assert.Equal(t, expectedVal[i], string(res[i].Value)) + } + + + //Test With One Partition Key and Zero Clustering Key + myschema[1] = FleetDbColumnSpec{"state_province", myText, false, false, 2} + res = TranslateToKV(tableMap[myTableName], rowData) + + expectedByteKey = [][][]byte{{[]byte{1},[]byte{5},[]byte("US\000"), []byte{3},[]byte{5},[]byte("state_province\000")}, + {[]byte{1},[]byte{5},[]byte("US\000"), []byte{3},[]byte{5},[]byte("city\000")}, + {[]byte{1},[]byte{5},[]byte("US\000"), []byte{3},[]byte{5},[]byte("gym_name\000")}} + + expectedSingeDKey = [][]byte{} + for i, _:= range expectedByteKey{ + oneKey := make([]byte,0) + for j, _:= range expectedByteKey[i]{ + oneKey = append(oneKey, expectedByteKey[i][j]...) + } + expectedSingeDKey = append(expectedSingeDKey, oneKey) + } + expectedVal = []string{"NY", "Buffalo","University Avenue"} + for i ,_ := range res { + assert.Equal(t, expectedSingeDKey[i], res[i].Key) + assert.Equal(t, expectedVal[i], string(res[i].Value)) + } + + + //All columns are either PK or CK + myschema[1] = FleetDbColumnSpec{"state_province", myText, false, true, 2} + myschema[2] = FleetDbColumnSpec{"city", myText, false, true, 3} + myschema[3] = FleetDbColumnSpec{"gym_name", myText, false, true, 4} + res = TranslateToKV(tableMap[myTableName], rowData) + + expectedByteKey = [][][]byte{{[]byte{1},[]byte{5},[]byte("US\000"), []byte{2},[]byte{5},[]byte("NY\000"), []byte{2},[]byte{5},[]byte("Buffalo\000"), []byte{2},[]byte{5},[]byte("University Avenue\000")}} + + expectedSingeDKey = [][]byte{} + for i, _:= range expectedByteKey{ + oneKey := make([]byte,0) + for j, _:= range expectedByteKey[i]{ + oneKey = append(oneKey, expectedByteKey[i][j]...) + } + expectedSingeDKey = append(expectedSingeDKey, oneKey) + } + for i ,_ := range res { + assert.Equal(t, expectedSingeDKey[i], res[i].Key) + assert.Equal(t,[]byte(nil), res[i].Value) + } + + // Composite Key + myschema[1] = FleetDbColumnSpec{"state_province", myText, true, false, 2} + myschema[2] = FleetDbColumnSpec{"city", myText, false, false, 3} + myschema[3] = FleetDbColumnSpec{"gym_name", myText, false, false, 4} + res = TranslateToKV(tableMap[myTableName], rowData) + + expectedByteKey = [][][]byte{{[]byte{1},[]byte{5},[]byte("US\000"), []byte{1},[]byte{5},[]byte("NY\000"), []byte{3},[]byte{5},[]byte("city\000")}, + {[]byte{1},[]byte{5},[]byte("US\000"), []byte{1},[]byte{5},[]byte("NY\000"), []byte{3},[]byte{5},[]byte("gym_name\000")}} + + expectedSingeDKey = [][]byte{} + for i, _:= range expectedByteKey{ + oneKey := make([]byte,0) + for j, _:= range expectedByteKey[i]{ + oneKey = append(oneKey, expectedByteKey[i][j]...) + } + expectedSingeDKey = append(expectedSingeDKey, oneKey) + } + expectedVal = []string{"Buffalo","University Avenue"} + for i ,_ := range res { + assert.Equal(t, expectedSingeDKey[i], res[i].Key) + assert.Equal(t, expectedVal[i], string(res[i].Value)) + } +} + +func decodeInsertCommand(query string)([]FleetDBValue , string){ + val := []FleetDBValue{TextValue{"US"}, TextValue{"NY"},TextValue{"Buffalo"},TextValue{"University Avenue"}} + return val, "crossfit_gyms" +} diff --git a/tablestore/utility.go b/tablestore/utility.go new file mode 100644 index 0000000..da4cfa0 --- /dev/null +++ b/tablestore/utility.go @@ -0,0 +1,75 @@ +package tablestore + +import ( +) + +const( + PartitionType = iota + 1; + ClusteringType + ColumnType +) + +var nullchar string = "\000" + +func TranslateToKV(columnSpecs []FleetDbColumnSpec, values []FleetDBValue) []KVItem{ + var pKey []byte + var cKey []byte + pKeyCount := 0 + cKeyCount := 0 + for i, colSpec := range columnSpecs { + if colSpec.isPartition{ + pKey = append(pKey,PartitionType) + coltype := colSpec.coltype.getType() + pKey = append(pKey, coltype.getVal()) + if coltype == Text{ + pKey = append(pKey,[]byte(values[i].String()+ nullchar)...) + }else{ + pKey = append(pKey,values[i].Serialize()...) + } + pKeyCount = pKeyCount + 1 + } + + if colSpec.isClustering{ + cKey = append(cKey,ClusteringType) + coltype := colSpec.coltype.getType() + cKey = append(cKey, coltype.getVal()) + if coltype == Text{ + cKey = append(cKey,[]byte(values[i].String() + nullchar)...) + }else{ + cKey = append(cKey,values[i].Serialize()...) + } + cKeyCount = cKeyCount + 1 + } + } + kvItems := []KVItem{} + var prefix []byte + prefix = pKey + + if cKeyCount > 0{ + prefix = append(prefix,cKey...) + } + if len(columnSpecs) == (pKeyCount + cKeyCount){ + key := prefix + var val []byte + kvItems = append(kvItems, KVItem{key, val}) + }else{ + for i, colSpec := range columnSpecs { + if !colSpec.isPartition && !colSpec.isClustering{ + //fmt.Println(index) + key := []byte{} + key = append(key,prefix...) + key = append(key,ColumnType) + key = append(key, Text.getVal()) + key = append(key,[]byte(colSpec.colname + nullchar)...) + var val []byte + if(values[i] != nil){ + val = values[i].Serialize() + }else { + val = nil + } + kvItems = append(kvItems, KVItem{key, val}) + } + } + } + return kvItems; +}