Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ const updateSplitCount = 2

type DynamoDBServer struct {
listen net.Listener
store store.ScanStore
store store.MVCCStore
coordinator kv.Coordinator
dynamoTranscoder *dynamodbTranscoder
httpServer *http.Server
}

func NewDynamoDBServer(listen net.Listener, st store.ScanStore, coordinate *kv.Coordinate) *DynamoDBServer {
func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate *kv.Coordinate) *DynamoDBServer {
d := &DynamoDBServer{
listen: listen,
store: st,
Expand Down Expand Up @@ -114,7 +114,8 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
http.Error(w, "missing key", http.StatusBadRequest)
return
}
v, err := d.store.Get(r.Context(), []byte(keyAttr.S))
readTS := snapshotTS(d.coordinator.Clock(), d.store)
v, err := d.store.GetAt(r.Context(), []byte(keyAttr.S), readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
w.Header().Set("Content-Type", "application/x-amz-json-1.0")
Expand Down Expand Up @@ -233,7 +234,8 @@ func (d *DynamoDBServer) validateCondition(ctx context.Context, expr string, nam
if expr == "" {
return nil
}
exists, err := d.store.Exists(ctx, key)
readTS := snapshotTS(d.coordinator.Clock(), d.store)
exists, err := d.store.ExistsAt(ctx, key, readTS)
if err != nil {
return errors.WithStack(err)
}
Expand Down
5 changes: 3 additions & 2 deletions adapter/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDynamoDB_PutItem_GetItem(t *testing.T) {
Expand Down Expand Up @@ -248,7 +249,7 @@ func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) {
})
assert.NoError(t, err, "Get failed for key1 in goroutine %d", i)
value1Attr, ok := out1.Item["value"].(*types.AttributeValueMemberS)
assert.True(t, ok, "Type assertion failed for key1 in goroutine %d", i)
require.True(t, ok, "Type assertion failed for key1 in goroutine %d", i)
assert.Equal(t, value1, value1Attr.Value, "Value mismatch for key1 in goroutine %d", i)

out2, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{
Expand All @@ -259,7 +260,7 @@ func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) {
})
assert.NoError(t, err, "Get failed for key2 in goroutine %d", i)
value2Attr, ok := out2.Item["value"].(*types.AttributeValueMemberS)
assert.True(t, ok, "Type assertion failed for key2 in goroutine %d", i)
require.True(t, ok, "Type assertion failed for key2 in goroutine %d", i)
assert.Equal(t, value2, value2Attr.Value, "Value mismatch for key2 in goroutine %d", i)
}(i)
}
Expand Down
20 changes: 14 additions & 6 deletions adapter/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type GRPCServer struct {
log *slog.Logger
grpcTranscoder *grpcTranscoder
coordinator kv.Coordinator
store store.ScanStore
store store.MVCCStore

pb.UnimplementedRawKVServer
pb.UnimplementedTransactionalKVServer
}

func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer {
func NewGRPCServer(store store.MVCCStore, coordinate *kv.Coordinate) *GRPCServer {
return &GRPCServer{
log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
Expand All @@ -40,8 +40,13 @@ func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer
}

func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
readTS := req.GetTs()
if readTS == 0 {
readTS = snapshotTS(r.coordinator.Clock(), r.store)
}

if r.coordinator.IsLeader() {
v, err := r.store.Get(ctx, req.Key)
v, err := r.store.GetAt(ctx, req.Key, readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
Expand Down Expand Up @@ -93,7 +98,8 @@ func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
defer conn.Close()

cli := pb.NewRawKVClient(conn)
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
ts := snapshotTS(r.coordinator.Clock(), r.store)
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -180,7 +186,8 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
return nil, errors.WithStack(err)
}

v, err := r.store.Get(ctx, req.Key)
readTS := snapshotTS(r.coordinator.Clock(), r.store)
v, err := r.store.GetAt(ctx, req.Key, readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
Expand Down Expand Up @@ -227,7 +234,8 @@ func (r GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResp
Kv: nil,
}, errors.WithStack(err)
}
res, err := r.store.Scan(ctx, req.StartKey, req.EndKey, limit)
readTS := snapshotTS(r.coordinator.Clock(), r.store)
res, err := r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
if err != nil {
return &pb.ScanResponse{
Kv: nil,
Expand Down
46 changes: 34 additions & 12 deletions adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,7 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
return nil, errors.WithStack(ErrNotLeader)
}

// Ensure leader issues start_ts when followers forward txn groups without it.
if req.IsTxn {
var startTs uint64
for _, r := range req.Requests {
if r.Ts == 0 {
if startTs == 0 {
startTs = i.clock.Next()
}
r.Ts = startTs
}
}
}
i.stampTimestamps(req)

r, err := i.transactionManager.Commit(req.Requests)
if err != nil {
Expand All @@ -61,3 +50,36 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
CommitIndex: r.CommitIndex,
}, nil
}

func (i *Internal) stampTimestamps(req *pb.ForwardRequest) {
if req == nil {
return
}
if req.IsTxn {
var startTs uint64
// All requests in a transaction must have the same timestamp.
// Find a timestamp from the requests, or generate a new one if none exist.
for _, r := range req.Requests {
if r.Ts != 0 {
startTs = r.Ts
break
}
}

if startTs == 0 && len(req.Requests) > 0 {
startTs = i.clock.Next()
}

// Assign the unified timestamp to all requests in the transaction.
for _, r := range req.Requests {
r.Ts = startTs
}
return
}

for _, r := range req.Requests {
if r.Ts == 0 {
r.Ts = i.clock.Next()
}
}
}
55 changes: 37 additions & 18 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ var argsLen = map[string]int{

type RedisServer struct {
listen net.Listener
store store.ScanStore
store store.MVCCStore
coordinator kv.Coordinator
redisTranscoder *redisTranscoder
listStore *store.ListStore
// TODO manage membership from raft log
leaderRedis map[raft.ServerAddress]string

Expand Down Expand Up @@ -72,15 +71,12 @@ type redisResult struct {
err error
}

func store2list(st store.ScanStore) *store.ListStore { return store.NewListStore(st) }

func NewRedisServer(listen net.Listener, store store.ScanStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
func NewRedisServer(listen net.Listener, store store.MVCCStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
r := &RedisServer{
listen: listen,
store: store,
coordinator: coordinate,
redisTranscoder: newRedisTranscoder(),
listStore: store2list(store),
leaderRedis: leaderRedis,
}

Expand Down Expand Up @@ -112,6 +108,10 @@ func getConnState(conn redcon.Conn) *connState {
return st
}

func (r *RedisServer) readTS() uint64 {
return snapshotTS(r.coordinator.Clock(), r.store)
}

func (r *RedisServer) Run() error {
err := redcon.Serve(r.listen,
func(conn redcon.Conn, cmd redcon.Command) {
Expand Down Expand Up @@ -211,7 +211,8 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
}

if r.coordinator.IsLeader() {
v, err := r.store.Get(context.Background(), cmd.Args[1])
readTS := r.readTS()
v, err := r.store.GetAt(context.Background(), cmd.Args[1], readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
Expand Down Expand Up @@ -276,7 +277,8 @@ func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
return
}

ok, err := r.store.Exists(context.Background(), cmd.Args[1])
readTS := r.readTS()
ok, err := r.store.ExistsAt(context.Background(), cmd.Args[1], readTS)
if err != nil {
conn.WriteError(err.Error())
return
Expand Down Expand Up @@ -325,7 +327,8 @@ func (r *RedisServer) localKeys(pattern []byte) ([][]byte, error) {
}

func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) {
res, err := r.store.Exists(context.Background(), pattern)
readTS := r.readTS()
res, err := r.store.ExistsAt(context.Background(), pattern, readTS)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -338,7 +341,8 @@ func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) {
func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
start := r.patternStart(pattern)

keys, err := r.store.Scan(context.Background(), start, nil, math.MaxInt)
readTS := r.readTS()
keys, err := r.store.ScanAt(context.Background(), start, nil, math.MaxInt, readTS)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -828,13 +832,24 @@ func clampRange(start, end, length int) (int, int) {
}

func (r *RedisServer) loadListMeta(ctx context.Context, key []byte) (store.ListMeta, bool, error) {
meta, exists, err := r.listStore.LoadMeta(ctx, key)
return meta, exists, errors.WithStack(err)
readTS := r.readTS()
val, err := r.store.GetAt(ctx, store.ListMetaKey(key), readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
return store.ListMeta{}, false, nil
}
return store.ListMeta{}, false, errors.WithStack(err)
}
meta, err := store.UnmarshalListMeta(val)
if err != nil {
return store.ListMeta{}, false, errors.WithStack(err)
}
return meta, true, nil
}

func (r *RedisServer) isListKey(ctx context.Context, key []byte) (bool, error) {
isList, err := r.listStore.IsList(ctx, key)
return isList, errors.WithStack(err)
_, exists, err := r.loadListMeta(ctx, key)
return exists, err
}

func (r *RedisServer) buildRPushOps(meta store.ListMeta, key []byte, values [][]byte) ([]*kv.Elem[kv.OP], store.ListMeta, error) {
Expand Down Expand Up @@ -895,7 +910,8 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error {
start := listItemKey(key, math.MinInt64)
end := listItemKey(key, math.MaxInt64)

kvs, err := r.store.Scan(ctx, start, end, math.MaxInt)
readTS := r.readTS()
kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, readTS)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -926,7 +942,8 @@ func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store
startKey := listItemKey(key, startSeq)
endKey := listItemKey(key, endSeq+1) // exclusive

kvs, err := r.store.Scan(ctx, startKey, endKey, int(endIdx-startIdx+1))
readTS := r.readTS()
kvs, err := r.store.ScanAt(ctx, startKey, endKey, int(endIdx-startIdx+1), readTS)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -1039,7 +1056,8 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
defer conn.Close()

cli := pb.NewRawKVClient(conn)
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
ts := r.readTS()
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -1048,8 +1066,9 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) {
}

func (r *RedisServer) getValue(key []byte) ([]byte, error) {
readTS := r.readTS()
if r.coordinator.IsLeader() {
v, err := r.store.Get(context.Background(), key)
v, err := r.store.GetAt(context.Background(), key, readTS)
return v, errors.WithStack(err)
}
return r.tryLeaderGet(key)
Expand Down
26 changes: 26 additions & 0 deletions adapter/ts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package adapter

import (
"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
)

// snapshotTS picks a safe snapshot timestamp:
// - uses the store's last commit watermark if available,
// - otherwise the coordinator's HLC current value,
// - and falls back to MaxUint64 if neither is set.
func snapshotTS(clock *kv.HLC, st store.MVCCStore) uint64 {
ts := uint64(0)
if st != nil {
ts = st.LastCommitTS()
}
if clock != nil {
if cur := clock.Current(); cur > ts {
ts = cur
}
}
if ts == 0 {
ts = ^uint64(0)
}
return ts
}
1 change: 1 addition & 0 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Coordinator interface {
Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error)
IsLeader() bool
RaftLeader() raft.ServerAddress
Clock() *HLC
}

func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error) {
Expand Down
Loading
Loading