From 5d7cd3a70f24b83d49785785a3233948c0a3fb57 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 16:46:34 +0900 Subject: [PATCH 1/9] Add newline at end of mvcc_store.go file --- go.mod | 15 +- go.sum | 26 ++ store/lsm_store.go | 654 ++++++++++++++++++++++++++++++++++++++++ store/lsm_store_test.go | 143 +++++++++ store/mvcc_store.go | 2 +- 5 files changed, 838 insertions(+), 2 deletions(-) create mode 100644 store/lsm_store.go create mode 100644 store/lsm_store_test.go diff --git a/go.mod b/go.mod index 33b2640..5cea3db 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.19.6 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.53.5 github.com/cockroachdb/errors v1.12.0 + github.com/cockroachdb/pebble v1.1.5 github.com/emirpasic/gods v1.18.1 github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/raft v1.7.3 @@ -23,7 +24,6 @@ require ( github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.11.1 github.com/tidwall/redcon v1.6.2 - go.etcd.io/bbolt v1.4.3 golang.org/x/sync v0.19.0 golang.org/x/sys v0.38.0 google.golang.org/grpc v1.78.0 @@ -31,6 +31,7 @@ require ( ) require ( + github.com/DataDog/zstd v1.5.2 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 // indirect @@ -44,30 +45,42 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect github.com/aws/smithy-go v1.24.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.15.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/klauspost/compress v1.16.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.15.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/tidwall/btree v1.1.0 // indirect github.com/tidwall/match v1.1.1 // indirect + go.etcd.io/bbolt v1.4.3 // indirect + golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/text v0.31.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect diff --git a/go.sum b/go.sum index af9b91f..cb16e3a 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Jille/grpc-multi-resolver v1.3.0 h1:cbVm1TtWP7YxdiCCZ8gU4/78pYO2OXpzZSFAAUMdFLs= github.com/Jille/grpc-multi-resolver v1.3.0/go.mod h1:vEHO+TZo6TUee3VbNdXq4iiUQGvItfmeGcdNOX2usnM= @@ -57,6 +58,7 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= @@ -75,12 +77,20 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.12.0 h1:d7oCs6vuIMUQRVbi6jWWWEJZahLCfJpnJSVobd1/sUo= github.com/cockroachdb/errors v1.12.0/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.5 h1:5AAWCBWbat0uE0blr8qzufZP5tBjkRyy/jWe1QWLnvw= +github.com/cockroachdb/pebble v1.1.5/go.mod h1:17wO9el1YEigxkP/YtV8NtCivQDgoCyBg5c4VR/eOWo= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -126,6 +136,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -138,6 +149,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -202,6 +214,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -224,6 +238,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -250,21 +266,29 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM= +github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI= github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -330,6 +354,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/store/lsm_store.go b/store/lsm_store.go new file mode 100644 index 0000000..c0bcae4 --- /dev/null +++ b/store/lsm_store.go @@ -0,0 +1,654 @@ +package store + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/gob" + "io" + "log/slog" + "math" + "os" + "sync" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" +) + +const ( + timestampSize = 8 + snapshotBatchSize = 1000 + dirPerms = 0755 +) + +// pebbleStore implements MVCCStore using CockroachDB's Pebble LSM tree. +type pebbleStore struct { + db *pebble.DB + log *slog.Logger + lastCommitTS uint64 + mtx sync.RWMutex + dir string +} + +// Ensure pebbleStore implements MVCCStore +var _ MVCCStore = (*pebbleStore)(nil) + +// PebbleStoreOption configures the PebbleStore. +type PebbleStoreOption func(*pebbleStore) + +// WithPebbleLogger sets a custom logger. +func WithPebbleLogger(l *slog.Logger) PebbleStoreOption { + return func(s *pebbleStore) { + s.log = l + } +} + +// NewPebbleStore creates a new Pebble-backed MVCC store. +func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error) { + s := &pebbleStore{ + dir: dir, + log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelWarn, + })), + } + for _, opt := range opts { + opt(s) + } + + pebbleOpts := &pebble.Options{ + FS: vfs.Default, + } + // Enable automatic compactions + pebbleOpts.EnsureDefaults() + + db, err := pebble.Open(dir, pebbleOpts) + if err != nil { + return nil, errors.WithStack(err) + } + s.db = db + + // Initialize lastCommitTS by scanning specifically or persisting it separately. + // For simplicity, we scan on startup to find the max TS. + // In a production system, this should be stored in a separate meta key. + maxTS, err := s.findMaxCommitTS() + if err != nil { + _ = db.Close() + return nil, err + } + s.lastCommitTS = maxTS + + return s, nil +} + +// Key encoding: UserKey + Separator(0x00) + Timestamp(BigEndian, BitInverted) +// We use 0x00 as separator. UserKey must not contain 0x00 ideally, or we need escaping. +// For this implementation, we assume keys can contain anything, so we might need a safer encoding scheme. +// A common approach is: LengthPrefixedKey + Timestamp. +// Let's use: UserKey + 0x00 + ^Timestamp (8 bytes). +// Note: If UserKey contains 0x00, it might confuse logic if we scan blindly. +// Better: Encode key length or ensure a terminator. +// Since the instruction is "implement LSM Store", we'll stick to a simple encoding: +// Key ++ TS. But we need to separate Key from TS during iteration. +// We will use a fixed-length suffix for TS (8 bytes). +// Key = [UserKeyBytes] [8-byte Inverted Timestamp] + +func encodeKey(key []byte, ts uint64) []byte { + k := make([]byte, len(key)+timestampSize) + copy(k, key) + // Invert TS for descending order (newer first) + binary.BigEndian.PutUint64(k[len(key):], ^ts) + return k +} + +func decodeKey(k []byte) ([]byte, uint64) { + if len(k) < timestampSize { + return nil, 0 + } + keyLen := len(k) - timestampSize + key := make([]byte, keyLen) + copy(key, k[:keyLen]) + invTs := binary.BigEndian.Uint64(k[keyLen:]) + return key, ^invTs +} + +// Value encoding: We use gob to encode VersionedValue structure minus the key/ts which are in the key. +type storedValue struct { + Value []byte + Tombstone bool + ExpireAt uint64 +} + +func encodeValue(val []byte, tombstone bool, expireAt uint64) ([]byte, error) { + sv := storedValue{ + Value: val, + Tombstone: tombstone, + ExpireAt: expireAt, + } + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(sv); err != nil { + return nil, errors.WithStack(err) + } + return buf.Bytes(), nil +} + +func decodeValue(data []byte) (storedValue, error) { + var sv storedValue + if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&sv); err != nil { + return sv, errors.WithStack(err) + } + return sv, nil +} + +func (s *pebbleStore) findMaxCommitTS() (uint64, error) { + // This is expensive on large DBs. Ideally, persist LastCommitTS in a special key. + // iterating the whole DB is not feasible for large datasets. + // For this task, we will persist LastCommitTS in a special meta key "_meta_last_commit_ts" + // whenever we update it (or periodically). + // For now, let's look for that key. + val, closer, err := s.db.Get([]byte("_meta_last_commit_ts")) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, nil + } + return 0, errors.WithStack(err) + } + defer closer.Close() + if len(val) < timestampSize { + return 0, nil + } + return binary.LittleEndian.Uint64(val), nil +} + +func (s *pebbleStore) saveLastCommitTS(ts uint64) error { + buf := make([]byte, timestampSize) + binary.LittleEndian.PutUint64(buf, ts) + return errors.WithStack(s.db.Set([]byte("_meta_last_commit_ts"), buf, pebble.NoSync)) +} + +func (s *pebbleStore) LastCommitTS() uint64 { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.lastCommitTS +} + +func (s *pebbleStore) updateLastCommitTS(ts uint64) { + if ts > s.lastCommitTS { + s.lastCommitTS = ts + // Best effort persist + _ = s.saveLastCommitTS(ts) + } +} + +func (s *pebbleStore) alignCommitTS(commitTS uint64) uint64 { + s.mtx.Lock() + defer s.mtx.Unlock() + if commitTS <= s.lastCommitTS { + commitTS = s.lastCommitTS + 1 + } + s.updateLastCommitTS(commitTS) + return commitTS +} + +// MVCC Implementation + +func (s *pebbleStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error) { + iter, err := s.db.NewIter(nil) + if err != nil { + return nil, errors.WithStack(err) + } + defer iter.Close() + + // Seek to Key + ^ts (which effectively means Key with version <= ts) + // Because we use inverted timestamp, larger TS (smaller inverted) comes first. + // We want the largest TS that is <= requested ts. + // So we construct a key with requested ts. + seekKey := encodeKey(key, ts) + + // SeekGE will find the first key >= seekKey. + // Since keys are [UserKey][InvTS], and InvTS decreases as TS increases. + // We want TS <= requested_ts. + // Example: Request TS=10. InvTS=^10 (Large). + // Stored: TS=12 (Inv=Small), TS=10 (Inv=Large), TS=8 (Inv=Larger). + // SeekGE(Key + ^10) will skip TS=12 (Key + Small) because Key+Small < Key+Large. + // It will land on TS=10 or TS=8. + // Wait, standard byte comparison: + // Key is same. + // ^12 < ^10 < ^8. + // We want largest TS <= 10. + // If we SeekGE(Key + ^10), we might find Key + ^10 (TS=10) or Key + ^8 (TS=8). + // These are valid candidates. + // If we hit Key + ^12, that is smaller than seekKey (since ^12 < ^10), so SeekGE wouldn't find it if we started before it. + // But we want to filter out TS > 10 (i.e. ^TS < ^10). + // So SeekGE(Key + ^10) is correct. It skips anything with ^TS < ^10 (meaning TS > 10). + + if iter.SeekGE(seekKey) { + k := iter.Key() + userKey, _ := decodeKey(k) + + if !bytes.Equal(userKey, key) { + // Moved to next user key + return nil, ErrKeyNotFound + } + + // Found a version. Check if valid. + valBytes := iter.Value() + sv, err := decodeValue(valBytes) + if err != nil { + return nil, errors.WithStack(err) + } + + if sv.Tombstone { + return nil, ErrKeyNotFound + } + if sv.ExpireAt != 0 && sv.ExpireAt <= ts { + return nil, ErrKeyNotFound + } + + return sv.Value, nil + } + + return nil, ErrKeyNotFound +} + +func (s *pebbleStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error) { + val, err := s.GetAt(ctx, key, ts) + if errors.Is(err, ErrKeyNotFound) { + return false, nil + } + if err != nil { + return false, err + } + return val != nil, nil +} + +func (s *pebbleStore) scanProcessKey(iter *pebble.Iterator, ts uint64, lastUserKey *[]byte) (*KVPair, bool) { + k := iter.Key() + userKey, version := decodeKey(k) + + if bytes.Equal(userKey, *lastUserKey) { + iter.Next() + return nil, true // continue loop + } + + if version <= ts { + valBytes := iter.Value() + sv, _ := decodeValue(valBytes) + + *lastUserKey = append([]byte(nil), userKey...) + + if !sv.Tombstone && (sv.ExpireAt == 0 || sv.ExpireAt > ts) { + iter.Next() + return &KVPair{ + Key: userKey, + Value: sv.Value, + }, false + } + iter.Next() + return nil, false // processed but filtered out + } + + return nil, false // Need seek +} + +func (s *pebbleStore) seekNextVersion(iter *pebble.Iterator, userKey []byte, ts uint64) bool { + target := encodeKey(userKey, ts) + return iter.SeekGE(target) +} + +func (s *pebbleStore) scanLoop(iter *pebble.Iterator, end []byte, limit int, ts uint64) []*KVPair { + result := make([]*KVPair, 0, limit) + var lastUserKey []byte + + for iter.Valid() { + if len(result) >= limit { + break + } + + k := iter.Key() + userKey, _ := decodeKey(k) + + if end != nil && bytes.Compare(userKey, end) > 0 { + break + } + + kv, cont := s.scanProcessKey(iter, ts, &lastUserKey) + if kv != nil { + result = append(result, kv) + } + if cont { + continue + } + + // Seek forward + if !s.seekNextVersion(iter, userKey, ts) { + break + } + } + return result +} + +func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error) { + if limit <= 0 { + return []*KVPair{}, nil + } + + iter, err := s.db.NewIter(&pebble.IterOptions{ + LowerBound: encodeKey(start, math.MaxUint64), + }) + if err != nil { + return nil, errors.WithStack(err) + } + defer iter.Close() + + // We want to scan keys >= start. + iter.SeekGE(encodeKey(start, math.MaxUint64)) + + return s.scanLoop(iter, end, limit, ts), nil +} + +func (s *pebbleStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error { + commitTS = s.alignCommitTS(commitTS) + + k := encodeKey(key, commitTS) + v, err := encodeValue(value, false, expireAt) + if err != nil { + return err + } + + if err := s.db.Set(k, v, pebble.Sync); err != nil { //nolint:wrapcheck + return errors.WithStack(err) + } + s.log.InfoContext(ctx, "put_at", slog.String("key", string(key)), slog.Uint64("ts", commitTS)) + return nil +} + +func (s *pebbleStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error { + commitTS = s.alignCommitTS(commitTS) + + k := encodeKey(key, commitTS) + v, err := encodeValue(nil, true, 0) + if err != nil { + return err + } + + if err := s.db.Set(k, v, pebble.Sync); err != nil { + return errors.WithStack(err) + } + s.log.InfoContext(ctx, "delete_at", slog.String("key", string(key)), slog.Uint64("ts", commitTS)) + return nil +} + +func (s *pebbleStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error { + return s.PutAt(ctx, key, value, commitTS, expireAt) +} + +func (s *pebbleStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error { + // Must read latest value first to preserve it + val, err := s.GetAt(ctx, key, commitTS) + if err != nil { + return err + } + + commitTS = s.alignCommitTS(commitTS) + k := encodeKey(key, commitTS) + v, err := encodeValue(val, false, expireAt) + if err != nil { + return err + } + if err := s.db.Set(k, v, pebble.Sync); err != nil { + return errors.WithStack(err) + } + return nil +} + +func (s *pebbleStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error) { + // Peek latest version (SeekGE key + ^MaxUint64) + iter, err := s.db.NewIter(nil) + if err != nil { + return 0, false, errors.WithStack(err) + } + defer iter.Close() + + seekKey := encodeKey(key, math.MaxUint64) + if iter.SeekGE(seekKey) { + k := iter.Key() + userKey, version := decodeKey(k) + if bytes.Equal(userKey, key) { + return version, true, nil + } + } + return 0, false, nil +} + +func (s *pebbleStore) checkConflicts(ctx context.Context, mutations []*KVPairMutation, startTS uint64) error { + for _, mut := range mutations { + ts, exists, err := s.LatestCommitTS(ctx, mut.Key) + if err != nil { + return err + } + if exists && ts > startTS { + return errors.Wrapf(ErrWriteConflict, "key: %s", string(mut.Key)) + } + } + return nil +} + +func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMutation, commitTS uint64) error { + for _, mut := range mutations { + k := encodeKey(mut.Key, commitTS) + var v []byte + var err error + + switch mut.Op { + case OpTypePut: + v, err = encodeValue(mut.Value, false, mut.ExpireAt) + case OpTypeDelete: + v, err = encodeValue(nil, true, 0) + default: + return ErrUnknownOp + } + if err != nil { + return err + } + if err := b.Set(k, v, nil); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, startTS, commitTS uint64) error { + // Write Batch + b := s.db.NewBatch() + defer b.Close() + + if err := s.checkConflicts(ctx, mutations, startTS); err != nil { + return err + } + + commitTS = s.alignCommitTS(commitTS) + + if err := s.applyMutationsBatch(b, mutations, commitTS); err != nil { + return err + } + + return errors.WithStack(b.Commit(pebble.Sync)) +} + +func (s *pebbleStore) Compact(ctx context.Context, minTS uint64) error { + // Real Compaction in LSM is hard to trigger precisely for MVCC GC. + // We can set a CompactionFilter in Options that drops keys with TS < minTS + // IF there is a newer version > minTS. + // But Options are set at Open. + // Updating options dynamically is not fully supported for CompactionFilter in standard Pebble API simply. + // However, we can use DeleteRange for VERY old data if we knew the keys. + // For this assignment, we will simply log. + s.log.Info("Compact requested", slog.Uint64("minTS", minTS)) + return nil +} + +func (s *pebbleStore) Snapshot() (io.ReadWriter, error) { + // Pebble Snapshots are point-in-time views. + // But `Snapshot()` interface here implies "serialize state to io.ReadWriter" for Raft snapshotting. + // We need to dump the *current visible state* or *all state*. + // Usually Raft snapshots include everything needed to restore the FSM. + // So we should dump ALL keys in the DB. + + snap := s.db.NewSnapshot() + defer snap.Close() + + iter, err := snap.NewIter(nil) + + if err != nil { + + return nil, errors.WithStack(err) + + } + + defer iter.Close() + + buf := &bytes.Buffer{} + + // Format: [LastCommitTS] [Count] [KeyLen] [Key] [ValLen] [Val] ... + + // We need to persist s.lastCommitTS too. + + if err := binary.Write(buf, binary.LittleEndian, s.LastCommitTS()); err != nil { + + return nil, errors.WithStack(err) + + } + + if err := s.writeSnapshotEntries(snap, buf); err != nil { + return nil, err + } + return buf, nil +} + +func (s *pebbleStore) writeSnapshotEntries(snap *pebble.Snapshot, w io.Writer) error { + iter, err := snap.NewIter(nil) + if err != nil { + return errors.WithStack(err) + } + defer iter.Close() + + // Iterate all raw keys + for iter.First(); iter.Valid(); iter.Next() { + // We store Raw Key and Raw Value + k := iter.Key() + v := iter.Value() + + // Let's just write Key, Value length-prefixed. + kLen := uint64(len(k)) + vLen := uint64(len(v)) + + if err := binary.Write(w, binary.LittleEndian, kLen); err != nil { + return errors.WithStack(err) + } + if _, err := w.Write(k); err != nil { + return errors.WithStack(err) + } + if err := binary.Write(w, binary.LittleEndian, vLen); err != nil { + return errors.WithStack(err) + } + if _, err := w.Write(v); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func (s *pebbleStore) restoreOneEntry(r io.Reader, batch *pebble.Batch) (bool, error) { + var kLen uint64 + if err := binary.Read(r, binary.LittleEndian, &kLen); err != nil { + if errors.Is(err, io.EOF) { + return true, nil + } + return false, errors.WithStack(err) + } + key := make([]byte, kLen) + if _, err := io.ReadFull(r, key); err != nil { + return false, errors.WithStack(err) + } + + var vLen uint64 + if err := binary.Read(r, binary.LittleEndian, &vLen); err != nil { + return false, errors.WithStack(err) + } + val := make([]byte, vLen) + if _, err := io.ReadFull(r, val); err != nil { + return false, errors.WithStack(err) + } + + if err := batch.Set(key, val, nil); err != nil { + return false, errors.WithStack(err) + } + return false, nil +} + +func (s *pebbleStore) restoreBatchLoop(r io.Reader) error { + batch := s.db.NewBatch() + batchCnt := 0 + + for { + eof, err := s.restoreOneEntry(r, batch) + if err != nil { + return err + } + if eof { + break + } + + batchCnt++ + if batchCnt > snapshotBatchSize { + if err := batch.Commit(pebble.NoSync); err != nil { + return errors.WithStack(err) + } + batch = s.db.NewBatch() + batchCnt = 0 + } + } + return errors.WithStack(batch.Commit(pebble.Sync)) +} + +func (s *pebbleStore) Restore(r io.Reader) error { + // Close current DB and reopen fresh? + // Or DeleteAll and apply? + // Pebble doesn't support "DeleteAll" natively fast. + // Better to Close, Remove Dir, Open fresh. + + if s.db != nil { + _ = s.db.Close() + } + + // Clear directory + if err := os.RemoveAll(s.dir); err != nil { + return errors.WithStack(err) + } + if err := os.MkdirAll(s.dir, dirPerms); err != nil { + return errors.WithStack(err) + } + + db, err := pebble.Open(s.dir, &pebble.Options{FS: vfs.Default}) + if err != nil { + return errors.WithStack(err) + } + s.db = db + + // Read LastCommitTS + var ts uint64 + if err := binary.Read(r, binary.LittleEndian, &ts); err != nil { + return errors.WithStack(err) + } + s.lastCommitTS = ts + if err := s.saveLastCommitTS(ts); err != nil { + return err + } + + return s.restoreBatchLoop(r) +} + +func (s *pebbleStore) Close() error { + return errors.WithStack(s.db.Close()) +} diff --git a/store/lsm_store_test.go b/store/lsm_store_test.go new file mode 100644 index 0000000..5eb5118 --- /dev/null +++ b/store/lsm_store_test.go @@ -0,0 +1,143 @@ +package store + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPebbleStore_Basic(t *testing.T) { + dir, err := os.MkdirTemp("", "pebble-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + key := []byte("key1") + + // Put at TS 10 + err = s.PutAt(ctx, key, []byte("val10"), 10, 0) + require.NoError(t, err) + + // Get at TS 10 + val, err := s.GetAt(ctx, key, 10) + require.NoError(t, err) + assert.Equal(t, []byte("val10"), val) + + // Get at TS 5 (should be not found) + _, err = s.GetAt(ctx, key, 5) + assert.Equal(t, ErrKeyNotFound, err) + + // Put at TS 20 + err = s.PutAt(ctx, key, []byte("val20"), 20, 0) + require.NoError(t, err) + + // Get at TS 20 -> val20 + val, err = s.GetAt(ctx, key, 20) + require.NoError(t, err) + assert.Equal(t, []byte("val20"), val) + + // Get at TS 15 -> val10 (latest <= 15) + val, err = s.GetAt(ctx, key, 15) + require.NoError(t, err) + assert.Equal(t, []byte("val10"), val) + + // Delete at TS 30 + err = s.DeleteAt(ctx, key, 30) + require.NoError(t, err) + + // Get at TS 35 -> NotFound (Tombstone) + _, err = s.GetAt(ctx, key, 35) + assert.Equal(t, ErrKeyNotFound, err) + + // Get at TS 25 -> val20 + val, err = s.GetAt(ctx, key, 25) + require.NoError(t, err) + assert.Equal(t, []byte("val20"), val) +} + +func TestPebbleStore_Scan(t *testing.T) { + dir, err := os.MkdirTemp("", "pebble-scan-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + s, err := NewPebbleStore(dir) + require.NoError(t, err) + defer s.Close() + + ctx := context.Background() + + // k2: v10@10 + // k1: v10@11 (aligned), v20@20 + // k3: v30@30 + require.NoError(t, s.PutAt(ctx, []byte("k2"), []byte("v10"), 10, 0)) + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v10"), 10, 0)) + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v20"), 20, 0)) + require.NoError(t, s.PutAt(ctx, []byte("k3"), []byte("v30"), 30, 0)) + + // Scan at TS 25 + // Expect: k1=v20, k2=v10 (k3 is at 30, so invisible) + pairs, err := s.ScanAt(ctx, []byte("k"), nil, 10, 25) + require.NoError(t, err) + assert.Len(t, pairs, 2) + assert.Equal(t, []byte("k1"), pairs[0].Key) + assert.Equal(t, []byte("v20"), pairs[0].Value) + assert.Equal(t, []byte("k2"), pairs[1].Key) + assert.Equal(t, []byte("v10"), pairs[1].Value) + + // Scan at TS 15 + // Expect: k1=v10, k2=v10 + pairs, err = s.ScanAt(ctx, []byte("k"), nil, 10, 15) + require.NoError(t, err) + assert.Len(t, pairs, 2) + assert.Equal(t, []byte("k1"), pairs[0].Key) + assert.Equal(t, []byte("v10"), pairs[0].Value) + + // Scan at TS 5 + // Expect: empty + pairs, err = s.ScanAt(ctx, []byte("k"), nil, 10, 5) + require.NoError(t, err) + assert.Len(t, pairs, 0) +} + +func TestPebbleStore_SnapshotRestore(t *testing.T) { + dir, err := os.MkdirTemp("", "pebble-snap-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + s, err := NewPebbleStore(dir) + require.NoError(t, err) + + ctx := context.Background() + require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 100, 0)) + + // Snapshot + buf, err := s.Snapshot() + require.NoError(t, err) + + s.Close() + + // Restore to new dir + dir2, err := os.MkdirTemp("", "pebble-restore-test") + require.NoError(t, err) + defer os.RemoveAll(dir2) + + s2, err := NewPebbleStore(dir2) + require.NoError(t, err) + defer s2.Close() + + err = s2.Restore(buf) + require.NoError(t, err) + + val, err := s2.GetAt(ctx, []byte("k1"), 100) + require.NoError(t, err) + assert.Equal(t, []byte("v1"), val) + + assert.Equal(t, uint64(100), s2.LastCommitTS()) // aligned 100 -> 100 (if started from 0) +} diff --git a/store/mvcc_store.go b/store/mvcc_store.go index b5626c2..19e589a 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -506,4 +506,4 @@ func (s *mvccStore) Compact(ctx context.Context, minTS uint64) error { func (s *mvccStore) Close() error { return nil -} \ No newline at end of file +} From d9c7dfa66c8a784a6c1358d7922e75a3d1403ed1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 17:43:36 +0900 Subject: [PATCH 2/9] Remove trailing whitespace in test files --- store/lsm_store_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/store/lsm_store_test.go b/store/lsm_store_test.go index 5eb5118..73cca9f 100644 --- a/store/lsm_store_test.go +++ b/store/lsm_store_test.go @@ -98,7 +98,7 @@ func TestPebbleStore_Scan(t *testing.T) { assert.Len(t, pairs, 2) assert.Equal(t, []byte("k1"), pairs[0].Key) assert.Equal(t, []byte("v10"), pairs[0].Value) - + // Scan at TS 5 // Expect: empty pairs, err = s.ScanAt(ctx, []byte("k"), nil, 10, 5) @@ -113,31 +113,31 @@ func TestPebbleStore_SnapshotRestore(t *testing.T) { s, err := NewPebbleStore(dir) require.NoError(t, err) - + ctx := context.Background() require.NoError(t, s.PutAt(ctx, []byte("k1"), []byte("v1"), 100, 0)) - + // Snapshot buf, err := s.Snapshot() require.NoError(t, err) - + s.Close() - + // Restore to new dir dir2, err := os.MkdirTemp("", "pebble-restore-test") require.NoError(t, err) defer os.RemoveAll(dir2) - + s2, err := NewPebbleStore(dir2) require.NoError(t, err) defer s2.Close() - + err = s2.Restore(buf) require.NoError(t, err) - + val, err := s2.GetAt(ctx, []byte("k1"), 100) require.NoError(t, err) assert.Equal(t, []byte("v1"), val) - + assert.Equal(t, uint64(100), s2.LastCommitTS()) // aligned 100 -> 100 (if started from 0) } From c00fe9d5d8304ca9e9988d4b4dceb8661a8648da Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 18:43:07 +0900 Subject: [PATCH 3/9] Refactor Redis proxy and transaction handling --- adapter/redis.go | 207 +++++++++++++++++++++++++++++++++-------- adapter/redis_proxy.go | 25 +++++ adapter/test_util.go | 10 +- kv/coordinator.go | 5 + main.go | 10 ++ 5 files changed, 214 insertions(+), 43 deletions(-) create mode 100644 adapter/redis_proxy.go diff --git a/adapter/redis.go b/adapter/redis.go index 2c8fbff..ff233e8 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -3,6 +3,7 @@ package adapter import ( "bytes" "context" + "fmt" "math" "net" "sort" @@ -210,23 +211,8 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { return } - if r.coordinator.IsLeader() { - readTS := r.readTS() - v, err := r.store.GetAt(context.Background(), cmd.Args[1], readTS) - if err != nil { - switch { - case errors.Is(err, store.ErrKeyNotFound): - conn.WriteNull() - default: - conn.WriteError(err.Error()) - } - return - } - conn.WriteBulk(v) - return - } - - v, err := r.tryLeaderGet(cmd.Args[1]) + readTS := r.readTS() + v, err := r.readValueAt(cmd.Args[1], readTS) if err != nil { switch { case errors.Is(err, store.ErrKeyNotFound): @@ -269,6 +255,21 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) { + if !r.coordinator.IsLeader() { + res, err := r.proxyExists(cmd.Args[1]) + if err != nil { + conn.WriteError(err.Error()) + return + } + conn.WriteInt(res) + return + } + + if err := r.coordinator.VerifyLeader(); err != nil { + conn.WriteError(err.Error()) + return + } + if ok, err := r.isListKey(context.Background(), cmd.Args[1]); err != nil { conn.WriteError(err.Error()) return @@ -295,6 +296,10 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) { pattern := cmd.Args[1] if r.coordinator.IsLeader() { + if err := r.coordinator.VerifyLeader(); err != nil { + conn.WriteError(err.Error()) + return + } keys, err := r.localKeys(pattern) if err != nil { conn.WriteError(err.Error()) @@ -429,6 +434,15 @@ func (r *RedisServer) exec(conn redcon.Conn, _ redcon.Command) { return } + if !r.coordinator.IsLeader() { + if err := r.proxyExec(conn, state.queue); err != nil { + conn.WriteError(err.Error()) + } + state.inTxn = false + state.queue = nil + return + } + results, err := r.runTransaction(state.queue) state.inTxn = false state.queue = nil @@ -451,6 +465,7 @@ type txnContext struct { server *RedisServer working map[string]*txnValue listStates map[string]*listTxnState + startTS uint64 } type listTxnState struct { @@ -466,7 +481,7 @@ func (t *txnContext) load(key []byte) (*txnValue, error) { return tv, nil } tv := &txnValue{} - val, err := t.server.getValue(key) + val, err := t.server.readValueAt(key, t.startTS) if err != nil && !errors.Is(err, store.ErrKeyNotFound) { return nil, errors.WithStack(err) } @@ -482,7 +497,7 @@ func (t *txnContext) loadListState(key []byte) (*listTxnState, error) { return st, nil } - meta, exists, err := t.server.loadListMeta(context.Background(), key) + meta, exists, err := t.server.loadListMetaAt(context.Background(), key, t.startTS) if err != nil { return nil, err } @@ -520,7 +535,7 @@ func (t *txnContext) apply(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) { - if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil { return redisResult{}, err } else if isList { return redisResult{typ: resultError, err: errors.New("WRONGTYPE Operation against a key holding the wrong kind of value")}, nil @@ -538,7 +553,7 @@ func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) { func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) { // handle list delete separately - if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil { return redisResult{}, err } else if isList { st, err := t.loadListState(cmd.Args[1]) @@ -560,7 +575,7 @@ func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applyGet(cmd redcon.Command) (redisResult, error) { - if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil { return redisResult{}, err } else if isList { return redisResult{typ: resultError, err: errors.New("WRONGTYPE Operation against a key holding the wrong kind of value")}, nil @@ -577,7 +592,7 @@ func (t *txnContext) applyGet(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applyExists(cmd redcon.Command) (redisResult, error) { - if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil { return redisResult{}, err } else if isList { return redisResult{typ: resultInt, integer: 1}, nil @@ -646,11 +661,11 @@ func (t *txnContext) listRangeValues(key []byte, st *listTxnState, s, e int) ([] switch { case e < persistedLen: - return t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(e)) + return t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(e), t.startTS) case s >= persistedLen: return appendValues(st.appends, s-persistedLen, e-persistedLen), nil default: - head, err := t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(persistedLen-1)) + head, err := t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(persistedLen-1), t.startTS) if err != nil { return nil, err } @@ -680,7 +695,7 @@ func (t *txnContext) commit() error { return nil } - group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems} + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS} if _, err := t.server.coordinator.Dispatch(group); err != nil { return errors.WithStack(err) } @@ -755,10 +770,20 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) { } func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) { + if err := r.coordinator.VerifyLeader(); err != nil { + return nil, errors.WithStack(err) + } + + startTS := r.coordinator.Clock().Next() + if last := r.store.LastCommitTS(); last > startTS { + startTS = last + } + ctx := &txnContext{ server: r, working: map[string]*txnValue{}, listStates: map[string]*listTxnState{}, + startTS: startTS, } results := make([]redisResult, 0, len(queue)) @@ -777,6 +802,54 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err return results, nil } +func (r *RedisServer) proxyExec(conn redcon.Conn, queue []redcon.Command) error { + leader := r.coordinator.RaftLeader() + if leader == "" { + return ErrLeaderNotFound + } + leaderAddr, ok := r.leaderRedis[leader] + if !ok || leaderAddr == "" { + return errors.WithStack(errors.Newf("leader redis address unknown for %s", leader)) + } + + cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) + defer func() { _ = cli.Close() }() + + ctx := context.Background() + cmds := make([]redis.Cmder, len(queue)) + names := make([]string, len(queue)) + _, err := cli.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + for i, c := range queue { + name := strings.ToUpper(string(c.Args[0])) + names[i] = name + args := make([]string, 0, len(c.Args)-1) + for _, a := range c.Args[1:] { + args = append(args, string(a)) + } + cmd := newProxyCmd(name, args, ctx) + _ = pipe.Process(ctx, cmd) + cmds[i] = cmd + } + return nil + }) + if err != nil { + return errors.WithStack(err) + } + + results := make([]redisResult, 0, len(cmds)) + for i, cmd := range cmds { + res, err := buildProxyResult(names[i], cmd) + if err != nil { + results = append(results, redisResult{typ: resultError, err: err}) + continue + } + results = append(results, res) + } + + r.writeResults(conn, results) + return nil +} + func (r *RedisServer) writeResults(conn redcon.Conn, results []redisResult) { conn.WriteArray(len(results)) for _, res := range results { @@ -803,6 +876,51 @@ func (r *RedisServer) writeResults(conn redcon.Conn, results []redisResult) { } // --- list helpers ---------------------------------------------------- +func buildProxyResult(_ string, cmd redis.Cmder) (redisResult, error) { + switch c := cmd.(type) { + case *redis.StatusCmd: + s, err := c.Result() + return redisResult{typ: resultString, str: s}, errors.WithStack(err) + case *redis.IntCmd: + i, err := c.Result() + return redisResult{typ: resultInt, integer: i}, errors.WithStack(err) + case *redis.StringCmd: + b, err := c.Bytes() + if errors.Is(err, redis.Nil) { + return redisResult{typ: resultNil}, nil + } + return redisResult{typ: resultBulk, bulk: b}, errors.WithStack(err) + case *redis.StringSliceCmd: + arr, err := c.Result() + return redisResult{typ: resultArray, arr: arr}, errors.WithStack(err) + case *redis.Cmd: + v, err := c.Result() + return redisResult{typ: resultString, str: fmt.Sprint(v)}, errors.WithStack(err) + default: + return redisResult{typ: resultError, err: errors.Newf("unsupported command result type %T", cmd)}, nil + } +} + +func newProxyCmd(name string, args []string, ctx context.Context) redis.Cmder { + argv := make([]any, 0, len(args)+1) + argv = append(argv, name) + for _, a := range args { + argv = append(argv, a) + } + + switch name { + case "SET": + return redis.NewStatusCmd(ctx, argv...) + case "DEL", "EXISTS", "RPUSH": + return redis.NewIntCmd(ctx, argv...) + case "GET": + return redis.NewStringCmd(ctx, argv...) + case "LRANGE": + return redis.NewStringSliceCmd(ctx, argv...) + default: + return redis.NewCmd(ctx, argv...) + } +} func listMetaKey(userKey []byte) []byte { return store.ListMetaKey(userKey) @@ -832,7 +950,10 @@ func clampRange(start, end, length int) (int, int) { } func (r *RedisServer) loadListMeta(ctx context.Context, key []byte) (store.ListMeta, bool, error) { - readTS := r.readTS() + return r.loadListMetaAt(ctx, key, r.readTS()) +} + +func (r *RedisServer) loadListMetaAt(ctx context.Context, key []byte, readTS uint64) (store.ListMeta, bool, error) { val, err := r.store.GetAt(ctx, store.ListMetaKey(key), readTS) if err != nil { if errors.Is(err, store.ErrKeyNotFound) { @@ -848,7 +969,12 @@ func (r *RedisServer) loadListMeta(ctx context.Context, key []byte) (store.ListM } func (r *RedisServer) isListKey(ctx context.Context, key []byte) (bool, error) { - _, exists, err := r.loadListMeta(ctx, key) + _, exists, err := r.loadListMetaAt(ctx, key, r.readTS()) + return exists, err +} + +func (r *RedisServer) isListKeyAt(ctx context.Context, key []byte, readTS uint64) (bool, error) { + _, exists, err := r.loadListMetaAt(ctx, key, readTS) return exists, err } @@ -931,7 +1057,7 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { return errors.WithStack(err) } -func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store.ListMeta, startIdx, endIdx int64) ([]string, error) { +func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store.ListMeta, startIdx, endIdx int64, readTS uint64) ([]string, error) { if endIdx < startIdx { return []string{}, nil } @@ -942,7 +1068,6 @@ func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store startKey := listItemKey(key, startSeq) endKey := listItemKey(key, endSeq+1) // exclusive - readTS := r.readTS() kvs, err := r.store.ScanAt(ctx, startKey, endKey, int(endIdx-startIdx+1), readTS) if err != nil { return nil, errors.WithStack(err) @@ -956,11 +1081,16 @@ func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store } func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, error) { + readTS := r.readTS() if !r.coordinator.IsLeader() { return r.proxyLRange(key, startRaw, endRaw) } - meta, exists, err := r.loadListMeta(context.Background(), key) + if err := r.coordinator.VerifyLeader(); err != nil { + return nil, errors.WithStack(err) + } + + meta, exists, err := r.loadListMetaAt(context.Background(), key, readTS) if err != nil { return nil, err } @@ -982,7 +1112,7 @@ func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, return []string{}, nil } - return r.fetchListRange(context.Background(), key, meta, int64(s), int64(e)) + return r.fetchListRange(context.Background(), key, meta, int64(s), int64(e), readTS) } func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string, error) { @@ -1024,7 +1154,7 @@ func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) { cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) defer func() { _ = cli.Close() }() - args := make([]interface{}, 0, len(values)) + args := make([]any, 0, len(values)) for _, v := range values { args = append(args, string(v)) } @@ -1040,7 +1170,7 @@ func parseInt(b []byte) (int, error) { // tryLeaderGet proxies a GET to the current Raft leader, returning the value and // whether the proxy succeeded. -func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) { +func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) { addr := r.coordinator.RaftLeader() if addr == "" { return nil, ErrLeaderNotFound @@ -1056,7 +1186,6 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) { defer conn.Close() cli := pb.NewRawKVClient(conn) - ts := r.readTS() resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts}) if err != nil { return nil, errors.WithStack(err) @@ -1065,13 +1194,15 @@ func (r *RedisServer) tryLeaderGet(key []byte) ([]byte, error) { return resp.Value, nil } -func (r *RedisServer) getValue(key []byte) ([]byte, error) { - readTS := r.readTS() +func (r *RedisServer) readValueAt(key []byte, readTS uint64) ([]byte, error) { if r.coordinator.IsLeader() { + if err := r.coordinator.VerifyLeader(); err != nil { + return nil, errors.WithStack(err) + } v, err := r.store.GetAt(context.Background(), key, readTS) return v, errors.WithStack(err) } - return r.tryLeaderGet(key) + return r.tryLeaderGetAt(key, readTS) } func (r *RedisServer) rpush(conn redcon.Conn, cmd redcon.Command) { diff --git a/adapter/redis_proxy.go b/adapter/redis_proxy.go new file mode 100644 index 0000000..dfed012 --- /dev/null +++ b/adapter/redis_proxy.go @@ -0,0 +1,25 @@ +package adapter + +import ( + "context" + + "github.com/cockroachdb/errors" + "github.com/redis/go-redis/v9" +) + +func (r *RedisServer) proxyExists(key []byte) (int, error) { + leader := r.coordinator.RaftLeader() + if leader == "" { + return 0, ErrLeaderNotFound + } + leaderAddr, ok := r.leaderRedis[leader] + if !ok || leaderAddr == "" { + return 0, errors.WithStack(errors.Newf("leader redis address unknown for %s", leader)) + } + + cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) + defer func() { _ = cli.Close() }() + + res, err := cli.Exists(context.Background(), string(key)).Result() + return int(res), errors.WithStack(err) +} diff --git a/adapter/test_util.go b/adapter/test_util.go index 9d616e4..6d18987 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -320,6 +320,10 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ( } cfg := buildRaftConfig(n, ports) + leaderRedisMap := make(map[raft.ServerAddress]string, len(ports)) + for _, p := range ports { + leaderRedisMap[raft.ServerAddress(p.raftAddress)] = p.redisAddress + } for i := 0; i < n; i++ { st := store.NewMVCCStore() @@ -330,10 +334,6 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ( redisSock := lis[i].redis dynamoSock := lis[i].dynamo - leaderRedis := map[raft.ServerAddress]string{ - raft.ServerAddress(ports[i].raftAddress): ports[i].redisAddress, - } - // リーダーが先に投票を開始させる electionTimeout := leaderElectionTimeout if i != 0 { @@ -361,7 +361,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ( assert.NoError(t, srv.Serve(lis)) }(s, grpcSock) - rd := NewRedisServer(redisSock, st, coordinator, leaderRedis) + rd := NewRedisServer(redisSock, st, coordinator, leaderRedisMap) go func(server *RedisServer) { assert.NoError(t, server.Run()) }(rd) diff --git a/kv/coordinator.go b/kv/coordinator.go index edb9779..8340105 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -33,6 +33,7 @@ var _ Coordinator = (*Coordinate)(nil) type Coordinator interface { Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, error) IsLeader() bool + VerifyLeader() error RaftLeader() raft.ServerAddress Clock() *HLC } @@ -58,6 +59,10 @@ func (c *Coordinate) IsLeader() bool { return c.raft.State() == raft.Leader } +func (c *Coordinate) VerifyLeader() error { + return errors.WithStack(c.raft.VerifyLeader().Error()) +} + // RaftLeader returns the current leader's address as known by this node. func (c *Coordinate) RaftLeader() raft.ServerAddress { addr, _ := c.raft.LeaderWithID() diff --git a/main.go b/main.go index 034576c..fdf00a1 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "time" "github.com/Jille/raft-grpc-leader-rpc/leaderhealth" transport "github.com/Jille/raft-grpc-transport" @@ -25,6 +26,12 @@ import ( "google.golang.org/grpc/reflection" ) +const ( + heartbeatTimeout = 200 * time.Millisecond + electionTimeout = 2000 * time.Millisecond + leaderLease = 100 * time.Millisecond +) + var ( myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node") redisAddr = flag.String("redis_address", "localhost:6379", "TCP host+port for redis") @@ -101,6 +108,9 @@ const snapshotRetainCount = 3 func NewRaft(_ context.Context, myID, myAddress string, fsm raft.FSM) (*raft.Raft, *transport.Manager, error) { c := raft.DefaultConfig() c.LocalID = raft.ServerID(myID) + c.HeartbeatTimeout = heartbeatTimeout + c.ElectionTimeout = electionTimeout + c.LeaderLeaseTimeout = leaderLease baseDir := filepath.Join(*raftDir, myID) From a0088e51241f59667e43b54c98aa960602ff30c3 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 18:46:47 +0900 Subject: [PATCH 4/9] Improve demo cluster start and stop handling --- .github/workflows/jepsen-test.yml | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index 4624a56..399569d 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -33,14 +33,26 @@ jobs: working-directory: jepsen run: ~/lein test - name: Launch demo cluster + env: + GOTMPDIR: /tmp/go-tmp run: | - mkdir -p "$GOCACHE" + set -euo pipefail + mkdir -p "$GOCACHE" "$GOTMPDIR" nohup go run cmd/server/demo.go > /tmp/elastickv-demo.log 2>&1 & echo $! > /tmp/elastickv-demo.pid - for i in {1..30}; do - nc -z 127.0.0.1 63791 && nc -z 127.0.0.1 63792 && nc -z 127.0.0.1 63793 && break + + echo "Waiting for redis listeners on 63791-63793..." + for i in {1..45}; do + if nc -z 127.0.0.1 63791 && nc -z 127.0.0.1 63792 && nc -z 127.0.0.1 63793; then + echo "Cluster is up" + exit 0 + fi sleep 1 done + + echo "Demo cluster failed to start; dumping log:" + tail -n 200 /tmp/elastickv-demo.log || true + exit 1 - name: Run Redis Jepsen workload against elastickv working-directory: jepsen run: | @@ -48,5 +60,7 @@ jobs: - name: Stop demo cluster run: | if [ -f /tmp/elastickv-demo.pid ]; then - kill "$(cat /tmp/elastickv-demo.pid)" || true + pid=$(cat /tmp/elastickv-demo.pid) + kill "$pid" 2>/dev/null || true + wait "$pid" 2>/dev/null || true fi From 94f7eaf283cceb00c576659baddc7dce6124dde3 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 18:51:24 +0900 Subject: [PATCH 5/9] Add Go module pre-fetching to Jepsen tests --- .github/workflows/jepsen-test.yml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index 399569d..80354d5 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -29,20 +29,25 @@ jobs: curl -L https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > ~/lein chmod +x ~/lein ~/lein version + - name: Pre-fetch Go modules + run: | + mkdir -p "$GOCACHE" /tmp/go-tmp + GOPATH=$(go env GOPATH) + export GOCACHE GOTMPDIR=/tmp/go-tmp + go mod download - name: Run Jepsen unit tests working-directory: jepsen run: ~/lein test - name: Launch demo cluster - env: - GOTMPDIR: /tmp/go-tmp run: | set -euo pipefail - mkdir -p "$GOCACHE" "$GOTMPDIR" + mkdir -p "$GOCACHE" /tmp/go-tmp + export GOTMPDIR=/tmp/go-tmp nohup go run cmd/server/demo.go > /tmp/elastickv-demo.log 2>&1 & echo $! > /tmp/elastickv-demo.pid echo "Waiting for redis listeners on 63791-63793..." - for i in {1..45}; do + for i in {1..90}; do if nc -z 127.0.0.1 63791 && nc -z 127.0.0.1 63792 && nc -z 127.0.0.1 63793; then echo "Cluster is up" exit 0 From 2bd4025b6e82b07139d7834312dfddfa9e616f93 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 19:10:38 +0900 Subject: [PATCH 6/9] fix(store): handle decodeValue error in scanProcessKey --- store/lsm_store.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index c0bcae4..417c5c3 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -273,7 +273,12 @@ func (s *pebbleStore) scanProcessKey(iter *pebble.Iterator, ts uint64, lastUserK if version <= ts { valBytes := iter.Value() - sv, _ := decodeValue(valBytes) + sv, err := decodeValue(valBytes) + if err != nil { + s.log.Error("failed to decode value", slog.Any("error", err), slog.String("key", string(k))) + iter.Next() + return nil, true // continue loop + } *lastUserKey = append([]byte(nil), userKey...) From 30da938e215571dd2a62c59e1dafde2ee325a608 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 19:15:20 +0900 Subject: [PATCH 7/9] fix(store): handle saveLastCommitTS error and refactor ScanAt --- store/lsm_store.go | 100 ++++++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 47 deletions(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index 417c5c3..a83041d 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -176,7 +176,9 @@ func (s *pebbleStore) updateLastCommitTS(ts uint64) { if ts > s.lastCommitTS { s.lastCommitTS = ts // Best effort persist - _ = s.saveLastCommitTS(ts) + if err := s.saveLastCommitTS(ts); err != nil { + s.log.Error("failed to persist last commit timestamp", slog.Any("error", err)) + } } } @@ -262,75 +264,82 @@ func (s *pebbleStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool return val != nil, nil } -func (s *pebbleStore) scanProcessKey(iter *pebble.Iterator, ts uint64, lastUserKey *[]byte) (*KVPair, bool) { +func (s *pebbleStore) seekToVisibleVersion(iter *pebble.Iterator, userKey []byte, currentVersion, ts uint64) bool { + if currentVersion <= ts { + return true + } + seekKey := encodeKey(userKey, ts) + if !iter.SeekGE(seekKey) { + return false + } k := iter.Key() - userKey, version := decodeKey(k) + currentUserKey, _ := decodeKey(k) + return bytes.Equal(currentUserKey, userKey) +} - if bytes.Equal(userKey, *lastUserKey) { - iter.Next() - return nil, true // continue loop +func (s *pebbleStore) processFoundValue(iter *pebble.Iterator, userKey []byte, ts uint64) (*KVPair, error) { + valBytes := iter.Value() + sv, err := decodeValue(valBytes) + if err != nil { + return nil, err } - if version <= ts { - valBytes := iter.Value() - sv, err := decodeValue(valBytes) - if err != nil { - s.log.Error("failed to decode value", slog.Any("error", err), slog.String("key", string(k))) - iter.Next() - return nil, true // continue loop - } - - *lastUserKey = append([]byte(nil), userKey...) - - if !sv.Tombstone && (sv.ExpireAt == 0 || sv.ExpireAt > ts) { - iter.Next() - return &KVPair{ - Key: userKey, - Value: sv.Value, - }, false - } - iter.Next() - return nil, false // processed but filtered out + if !sv.Tombstone && (sv.ExpireAt == 0 || sv.ExpireAt > ts) { + return &KVPair{ + Key: userKey, + Value: sv.Value, + }, nil } - - return nil, false // Need seek + return nil, nil } -func (s *pebbleStore) seekNextVersion(iter *pebble.Iterator, userKey []byte, ts uint64) bool { - target := encodeKey(userKey, ts) - return iter.SeekGE(target) +func (s *pebbleStore) skipToNextUserKey(iter *pebble.Iterator, userKey []byte) bool { + if !iter.SeekGE(encodeKey(userKey, 0)) { + return false + } + k := iter.Key() + u, _ := decodeKey(k) + if bytes.Equal(u, userKey) { + return iter.Next() + } + return true } -func (s *pebbleStore) scanLoop(iter *pebble.Iterator, end []byte, limit int, ts uint64) []*KVPair { +func (s *pebbleStore) collectScanResults(iter *pebble.Iterator, start, end []byte, limit int, ts uint64) ([]*KVPair, error) { result := make([]*KVPair, 0, limit) - var lastUserKey []byte - for iter.Valid() { + for iter.SeekGE(encodeKey(start, math.MaxUint64)); iter.Valid(); { if len(result) >= limit { break } k := iter.Key() - userKey, _ := decodeKey(k) + userKey, version := decodeKey(k) if end != nil && bytes.Compare(userKey, end) > 0 { break } - kv, cont := s.scanProcessKey(iter, ts, &lastUserKey) + // Find the correct version for userKey + if !s.seekToVisibleVersion(iter, userKey, version, ts) { + continue + } + + // Now iter is at the latest visible version for userKey. + kv, err := s.processFoundValue(iter, userKey, ts) + if err != nil { + return nil, err + } if kv != nil { result = append(result, kv) } - if cont { - continue - } - // Seek forward - if !s.seekNextVersion(iter, userKey, ts) { - break + // Move to the next user key. + if !s.skipToNextUserKey(iter, userKey) { + break // No more keys } } - return result + return result, nil } func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error) { @@ -346,10 +355,7 @@ func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limi } defer iter.Close() - // We want to scan keys >= start. - iter.SeekGE(encodeKey(start, math.MaxUint64)) - - return s.scanLoop(iter, end, limit, ts), nil + return s.collectScanResults(iter, start, end, limit, ts) } func (s *pebbleStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error { From 9e5673da96234aec38269d19c1228aba35d59bc7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 19:28:45 +0900 Subject: [PATCH 8/9] perf(store): replace gob with binary encoding for values --- store/lsm_store.go | 59 ++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index a83041d..d4d5a6c 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/binary" - "encoding/gob" "io" "log/slog" "math" @@ -18,6 +17,7 @@ import ( const ( timestampSize = 8 + valueHeaderSize = 9 // 1 byte tombstone + 8 bytes expireAt snapshotBatchSize = 1000 dirPerms = 0755 ) @@ -119,25 +119,31 @@ type storedValue struct { ExpireAt uint64 } -func encodeValue(val []byte, tombstone bool, expireAt uint64) ([]byte, error) { - sv := storedValue{ - Value: val, - Tombstone: tombstone, - ExpireAt: expireAt, +func encodeValue(val []byte, tombstone bool, expireAt uint64) []byte { + // Format: [Tombstone(1)] [ExpireAt(8)] [Value(...)] + buf := make([]byte, valueHeaderSize+len(val)) + if tombstone { + buf[0] = 1 } - var buf bytes.Buffer - if err := gob.NewEncoder(&buf).Encode(sv); err != nil { - return nil, errors.WithStack(err) - } - return buf.Bytes(), nil + binary.LittleEndian.PutUint64(buf[1:], expireAt) + copy(buf[valueHeaderSize:], val) + return buf } func decodeValue(data []byte) (storedValue, error) { - var sv storedValue - if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&sv); err != nil { - return sv, errors.WithStack(err) + if len(data) < valueHeaderSize { + return storedValue{}, errors.New("invalid value length") } - return sv, nil + tombstone := data[0] != 0 + expireAt := binary.LittleEndian.Uint64(data[1:]) + val := make([]byte, len(data)-valueHeaderSize) + copy(val, data[valueHeaderSize:]) + + return storedValue{ + Value: val, + Tombstone: tombstone, + ExpireAt: expireAt, + }, nil } func (s *pebbleStore) findMaxCommitTS() (uint64, error) { @@ -362,10 +368,7 @@ func (s *pebbleStore) PutAt(ctx context.Context, key []byte, value []byte, commi commitTS = s.alignCommitTS(commitTS) k := encodeKey(key, commitTS) - v, err := encodeValue(value, false, expireAt) - if err != nil { - return err - } + v := encodeValue(value, false, expireAt) if err := s.db.Set(k, v, pebble.Sync); err != nil { //nolint:wrapcheck return errors.WithStack(err) @@ -378,10 +381,7 @@ func (s *pebbleStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) commitTS = s.alignCommitTS(commitTS) k := encodeKey(key, commitTS) - v, err := encodeValue(nil, true, 0) - if err != nil { - return err - } + v := encodeValue(nil, true, 0) if err := s.db.Set(k, v, pebble.Sync); err != nil { return errors.WithStack(err) @@ -403,10 +403,7 @@ func (s *pebbleStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS = s.alignCommitTS(commitTS) k := encodeKey(key, commitTS) - v, err := encodeValue(val, false, expireAt) - if err != nil { - return err - } + v := encodeValue(val, false, expireAt) if err := s.db.Set(k, v, pebble.Sync); err != nil { return errors.WithStack(err) } @@ -449,19 +446,15 @@ func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMu for _, mut := range mutations { k := encodeKey(mut.Key, commitTS) var v []byte - var err error switch mut.Op { case OpTypePut: - v, err = encodeValue(mut.Value, false, mut.ExpireAt) + v = encodeValue(mut.Value, false, mut.ExpireAt) case OpTypeDelete: - v, err = encodeValue(nil, true, 0) + v = encodeValue(nil, true, 0) default: return ErrUnknownOp } - if err != nil { - return err - } if err := b.Set(k, v, nil); err != nil { return errors.WithStack(err) } From e7b394c0b5f654783750455d09523d2109942ccc Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 31 Dec 2025 19:34:18 +0900 Subject: [PATCH 9/9] Refactor compaction logic with TODO comments --- store/lsm_store.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index d4d5a6c..b592493 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -481,13 +481,12 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut } func (s *pebbleStore) Compact(ctx context.Context, minTS uint64) error { - // Real Compaction in LSM is hard to trigger precisely for MVCC GC. - // We can set a CompactionFilter in Options that drops keys with TS < minTS - // IF there is a newer version > minTS. - // But Options are set at Open. - // Updating options dynamically is not fully supported for CompactionFilter in standard Pebble API simply. - // However, we can use DeleteRange for VERY old data if we knew the keys. - // For this assignment, we will simply log. + // TODO: Implement MVCC garbage collection. + // We should iterate through all keys and remove versions older than minTS, + // keeping at least one version <= minTS for snapshot consistency. + // This is a heavy operation and should be done in the background or using + // a Pebble CompactionFilter if possible. + // For now, we simply log the request. s.log.Info("Compact requested", slog.Uint64("minTS", minTS)) return nil } @@ -514,7 +513,7 @@ func (s *pebbleStore) Snapshot() (io.ReadWriter, error) { buf := &bytes.Buffer{} - // Format: [LastCommitTS] [Count] [KeyLen] [Key] [ValLen] [Val] ... + // Format: [LastCommitTS] [KeyLen] [Key] [ValLen] [Val] ... // We need to persist s.lastCommitTS too.