diff --git a/cmd/es-node/config.go b/cmd/es-node/config.go index 09ecfc55..b1d9f6e6 100644 --- a/cmd/es-node/config.go +++ b/cmd/es-node/config.go @@ -121,7 +121,7 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) { Storage: *storageConfig, Mining: minerConfig, Archiver: archiverConfig, - Scanner: scanner.NewConfig(ctx), + Scanner: scanner.NewConfig(ctx, l1Endpoint.L1BeaconSlotTime), } if err := cfg.Check(); err != nil { return nil, err diff --git a/cmd/es-node/utils.go b/cmd/es-node/utils.go index 443fcd88..e89b96a2 100644 --- a/cmd/es-node/utils.go +++ b/cmd/es-node/utils.go @@ -6,7 +6,6 @@ package main import ( "bytes" "context" - "crypto/sha256" "fmt" "math/big" "net/http" @@ -19,11 +18,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" es "github.com/ethstorage/go-ethstorage/ethstorage" + "github.com/ethstorage/go-ethstorage/ethstorage/blobs" "github.com/ethstorage/go-ethstorage/ethstorage/flags" "github.com/ethstorage/go-ethstorage/ethstorage/storage" "github.com/urfave/cli" @@ -261,15 +260,11 @@ func downloadBlobFromRPC(endpoint string, kvIndex uint64, hash common.Hash) ([]b return nil, err } - var blob kzg4844.Blob - copy(blob[:], result) - commitment, err := kzg4844.BlobToCommitment(&blob) + blobhash, err := blobs.BlobToVersionedHash(result) if err != nil { return nil, fmt.Errorf("blobToCommitment failed: %w", err) } - blobhash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &commitment)) - fmt.Printf("blobhash from blob: %x\n", blobhash) - if bytes.Compare(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) != 0 { + if !bytes.Equal(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) { return nil, fmt.Errorf("invalid blobhash for %d want: %x, got: %x", kvIndex, hash, blobhash) } diff --git a/cmd/es-utils/utils/utils.go b/cmd/es-utils/utils/utils.go index d4e9562d..13141840 100644 --- a/cmd/es-utils/utils/utils.go +++ b/cmd/es-utils/utils/utils.go @@ -186,6 +186,7 @@ func SendBlobTx( for i := 0; i <= maxRetries; i++ { errRetry = client.SendTransaction(context.Background(), tx) if errRetry == nil { + lg.Info("SendTransaction succeeded", "txHash", tx.Hash()) break } lg.Warn("SendTransaction failed", "retriesLeft", maxRetries-i, "error", errRetry) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 95169cb7..618247ee 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -228,6 +228,20 @@ func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eve return w.FilterLogs(context.Background(), query) } +func (w *PollingClient) GetUpdatedKvIndices(startBlock, endBlock *big.Int) ([]uint64, error) { + events, err := w.FilterLogsByBlockRange(startBlock, endBlock, PutBlobEvent) + if err != nil { + return nil, err + } + var kvIndices []uint64 + for _, event := range events { + kvIndices = append(kvIndices, new(big.Int).SetBytes(event.Topics[1][:]).Uint64()) + var hash common.Hash + copy(hash[:], event.Topics[3][:]) + } + return kvIndices, nil +} + func (w *PollingClient) GetStorageKvEntryCount(blockNumber int64) (uint64, error) { h := crypto.Keccak256Hash([]byte(`kvEntryCount()`)) diff --git a/ethstorage/scanner/README.md b/ethstorage/scanner/README.md index 970f73ee..c2496e0e 100644 --- a/ethstorage/scanner/README.md +++ b/ethstorage/scanner/README.md @@ -1,14 +1,57 @@ # EthStorage Scanner -A data verification service periodically checks if the data hashes of the blobs in local storage files align with the key-value hashes in the storage contract. If any mismatch found, the service looks for the correct blob in the p2p network, and update the data in the local storage. +A data verification service that periodically checks whether locally stored KV blobs match the on-chain KV meta hash (commit) from the storage contract. -This service offers a lightweight yet effective way to maintain network-wide data consistency. +If a mismatch is detected, the scanner can attempt to repair the local data by re-fetching the blob from the network and rewriting it with meta validation. -### Usage +## Options -The scanner service is enabled with `check meta` mode by default: -- `--scanner.mode` Data scan mode, 0: disabled, 1: check meta, 2: check blob (default: 1)[`ES_NODE_SCANNER_MODE`]. +| Flag | Default | Env var | Description | +| --- | --- | --- | --- | +| `--scanner.mode` | `1` | `ES_NODE_SCANNER_MODE` | Data scan mode (bitmask): `0`=disabled, `1`=meta, `2`=blob, `4`=block. Combine via sum/OR (e.g. `3`=`1+2`, `5`=`1+4`, `7`=`1+2+4`). | +| `--scanner.batch-size` | `8192` | `ES_NODE_SCANNER_BATCH_SIZE` | Data scan batch size. | +| `--scanner.interval.meta` | `3` (minutes) | `ES_NODE_SCANNER_INTERVAL_META` | Scan interval for `check-meta`. | +| `--scanner.interval.blob` | `60` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOB` | Scan interval for `check-blob`. | +| `--scanner.interval.block` | `1440` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOCK` | Scan interval for `check-block`. | -The following settings are required if the service is not disabled manually: -- `--scanner.batch-size` Data scan batch size (default: 8192) [`$ES_NODE_SCANNER_BATCH_SIZE`] -- `--scanner.interval` Data scan interval in minutes (default: 3) [`$ES_NODE_SCANNER_INTERVAL`] +## Scan modes explained + +The flag / env `--scanner.mode` (default: `1`) [`ES_NODE_SCANNER_MODE`] is a bitmask: + +- `0`: disabled +- `1`: check-meta (compare local meta with on-chain meta) +- `2`: check-blob (read local blob and validate its commit against on-chain meta) +- `4`: check-block (scan recently updated KVs from finalized blocks, then run check-blob on them) + +### Quick comparison + +| Name | `--scanner.mode` | What it does | Performance impact | Notes | +| --- | ---: | --- | --- | --- | +| check-meta | `1` | Read local meta and compare with on-chain meta | Low | Minimal impact; may miss some mismatches | +| check-blob | `2` | Compute the commit from a local blob and validate it against on-chain meta | High | Best precision; highest IO/CPU cost when many blobs | +| check-block | `4` | Scan recently finalized blocks for updated KVs, then run `check-blob` on them | High | Ensure newly updated blobs are fetched and verified within the Beacon node retention window | + +### More choices + +You can combine modes by summing/OR-ing them to get mixed behavior and balance precision, coverage vs performance: + +- `3` = `1 + 2` = meta + blob +- `5` = `1 + 4` = meta + block +- `6` = `2 + 4` = blob + block +- `7` = meta + blob + block + +`--scanner.batch-size` and `--scanner.interval.*` control the batch size and frequency of each scan mode, so you can tune the performance impact further based on the amount of data and hardware resources. + +## Status tracking + +When es-node starts, the scanner only starts after the node finishes syncing all shards from the P2P network. + +After it starts, the scanner periodically logs summaries and statistics (mismatched/unfixed counts). These counts are also exposed in the node state as `scan_stats`. + +## Repair behavior + +A background repair loop periodically retries/fixes mismatched KVs by fetching blobs from the p2p network and rewriting them locally. + +- If mismatches are detected for the first time, the KV is marked as `pending`, meaning it is scheduled for repair. Sometimes the mismatch is transient (e.g., due to download latency) and may be recovered automatically by the downloader. +- If the KV is repaired successfully or recovered, it is removed from the mismatch list. +- If the repair fails, it remains in the mismatch list and is marked as `failed` for future retries. diff --git a/ethstorage/scanner/config.go b/ethstorage/scanner/config.go index 14057afb..49a4a8c4 100644 --- a/ethstorage/scanner/config.go +++ b/ethstorage/scanner/config.go @@ -5,40 +5,106 @@ package scanner import ( "fmt" + "time" "github.com/ethstorage/go-ethstorage/ethstorage/flags/utils" "github.com/urfave/cli" ) +const ( + ModeFlagName = "scanner.mode" + BatchSizeFlagName = "scanner.batch-size" + IntervalMetaFlagName = "scanner.interval.meta" + IntervalBlobFlagName = "scanner.interval.blob" + IntervalBlockFlagName = "scanner.interval.block" +) + +// intervals in minutes +const defaultIntervalMeta = 3 +const defaultIntervalBlob = 60 +const defaultIntervalBlock = 24 * 60 + +func scannerEnv(name string) string { + return utils.PrefixEnvVar("SCANNER_" + name) +} + const ( modeDisabled = iota + // Compare local meta hashes with those in L1 contract modeCheckMeta + // Compute meta hashes from local blobs and compare with those in L1 contract modeCheckBlob + // Scan updated KVs from recent blocks and run "check-blob" on them + modeCheckBlock ) +// scanMode is an internal per-loop mode used by scan workers (meta/blob/block). +type scanMode int + +func (m scanMode) String() string { + switch m { + case modeDisabled: + return "disabled" + case modeCheckMeta: + return "check-meta" + case modeCheckBlob: + return "check-blob" + case modeCheckBlock: + return "check-block" + default: + return fmt.Sprintf("unknown(%d)", int(m)) + } +} + const ( - ModeFlagName = "scanner.mode" - BatchSizeFlagName = "scanner.batch-size" - IntervalFlagName = "scanner.interval" + modeSetMeta scanModeSet = 1 << iota // 1 + modeSetBlob // 2 + modeSetBlock // 4 ) -const defaultInterval = 3 // in minutes +const scanModeSetMask = modeSetMeta | modeSetBlob | modeSetBlock // 7 -func scannerEnv(name string) string { - return utils.PrefixEnvVar("SCANNER_" + name) +// scanModeSet is a combination of scanMode values used for configuration purposes. +type scanModeSet uint8 + +func (m scanModeSet) String() string { + if m == 0 { + return "disabled" + } + + out := "" + if m&modeSetMeta != 0 { + out = "check-meta" + } + if m&modeSetBlob != 0 { + if out != "" { + out += "+" + } + out += "check-blob" + } + if m&modeSetBlock != 0 { + if out != "" { + out += "+" + } + out += "check-block" + } + return out } type Config struct { - Mode int - BatchSize int - Interval int + Mode scanModeSet + BatchSize int + L1SlotTime time.Duration + IntervalMeta time.Duration + IntervalBlob time.Duration + IntervalBlock time.Duration } func CLIFlags() []cli.Flag { flags := []cli.Flag{ cli.IntFlag{ Name: ModeFlagName, - Usage: "Data scan mode, 0: disabled, 1: check meta, 2: check blob", + Usage: "Data scan mode (bitmask) : 0=disabled, 1=meta, 2=blob, 4=block; combinations via sum/OR: 3=meta+blob, 5=meta+block, 6=blob+block, 7=all", EnvVar: scannerEnv("MODE"), Value: 1, }, @@ -49,29 +115,39 @@ func CLIFlags() []cli.Flag { Value: 8192, }, cli.IntFlag{ - Name: IntervalFlagName, - Usage: fmt.Sprintf("Data scan interval in minutes, minimum %d (default)", defaultInterval), - EnvVar: scannerEnv("INTERVAL"), - Value: defaultInterval, + Name: IntervalMetaFlagName, + Usage: fmt.Sprintf("Data scan interval of check-meta mode in minutes (default %d)", defaultIntervalMeta), + EnvVar: scannerEnv("INTERVAL_META"), + Value: defaultIntervalMeta, + }, + cli.IntFlag{ + Name: IntervalBlobFlagName, + Usage: fmt.Sprintf("Data scan interval of check-blob mode in minutes (default %d)", defaultIntervalBlob), + EnvVar: scannerEnv("INTERVAL_BLOB"), + Value: defaultIntervalBlob, + }, + cli.IntFlag{ + Name: IntervalBlockFlagName, + Usage: fmt.Sprintf("Data scan interval of check-block mode in minutes (default %d)", defaultIntervalBlock), + EnvVar: scannerEnv("INTERVAL_BLOCK"), + Value: defaultIntervalBlock, }, } return flags } -func NewConfig(ctx *cli.Context) *Config { - mode := ctx.GlobalInt(ModeFlagName) - if mode == modeDisabled { +func NewConfig(ctx *cli.Context, slot uint64) *Config { + mode := scanModeSet(ctx.GlobalInt(ModeFlagName)) & scanModeSetMask + + if mode == 0 { return nil } - if mode != modeCheckMeta && mode != modeCheckBlob { - panic(fmt.Sprintf("invalid scanner mode: %d", mode)) - } - if interval := ctx.GlobalInt(IntervalFlagName); interval < defaultInterval { - panic(fmt.Sprintf("scanner interval must be at least %d minutes", defaultInterval)) - } return &Config{ - Mode: mode, - BatchSize: ctx.GlobalInt(BatchSizeFlagName), - Interval: ctx.GlobalInt(IntervalFlagName), + Mode: mode, + BatchSize: ctx.GlobalInt(BatchSizeFlagName), + L1SlotTime: time.Second * time.Duration(slot), + IntervalMeta: time.Minute * time.Duration(ctx.GlobalInt(IntervalMetaFlagName)), + IntervalBlob: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlobFlagName)), + IntervalBlock: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlockFlagName)), } } diff --git a/ethstorage/scanner/scanner.go b/ethstorage/scanner/scanner.go index de3b7142..fa128312 100644 --- a/ethstorage/scanner/scanner.go +++ b/ethstorage/scanner/scanner.go @@ -5,6 +5,7 @@ package scanner import ( "context" + "maps" "sync" "time" @@ -15,16 +16,18 @@ import ( ) type Scanner struct { - worker *Worker - feed *event.Feed - interval time.Duration - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - running bool - mu sync.Mutex - lg log.Logger - scanStats ScanStats + worker *Worker + feed *event.Feed + cfg Config + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + running bool + mu sync.Mutex // protects running + lg log.Logger + scanPermit chan struct{} // to ensure only one scan at a time + sharedStats scannedKVs + statsMu sync.Mutex // protects sharedStats } func New( @@ -32,20 +35,22 @@ func New( cfg Config, sm *es.StorageManager, fetchBlob es.FetchBlobFunc, - l1 es.Il1Source, + l1 IL1, feed *event.Feed, lg log.Logger, ) *Scanner { cctx, cancel := context.WithCancel(ctx) scanner := &Scanner{ - worker: NewWorker(sm, fetchBlob, l1, cfg, lg), - feed: feed, - interval: time.Minute * time.Duration(cfg.Interval), - ctx: cctx, - cancel: cancel, - lg: lg, - scanStats: ScanStats{0, 0}, + worker: NewWorker(sm, fetchBlob, l1, uint64(cfg.BatchSize), lg), + feed: feed, + cfg: cfg, + ctx: cctx, + cancel: cancel, + lg: lg, + scanPermit: make(chan struct{}, 1), + sharedStats: scannedKVs{}, } + scanner.scanPermit <- struct{}{} scanner.wg.Add(1) go scanner.update() return scanner @@ -89,39 +94,176 @@ func (s *Scanner) start() { s.running = true s.mu.Unlock() - s.wg.Add(1) + if s.cfg.Mode == 0 { + s.lg.Info("Scanner is disabled") + return + } + + if s.cfg.Mode&modeSetMeta != 0 { + s.launchScanLoop(s.metaScanLoopRuntime()) + } + if s.cfg.Mode&modeSetBlob != 0 { + s.launchScanLoop(s.blobScanLoopRuntime()) + } + if s.cfg.Mode&modeSetBlock != 0 { + s.launchScanLoop(s.latestScanLoopRuntime()) + } + + s.lg.Info("Scanner started", "mode", s.cfg.Mode.String()) + + s.startReporter() + // Launch the scan loop to fix mismatched KVs every 12 minutes FIXME: adjust interval? + s.launchFixLoop(time.Minute * 12) +} +func (s *Scanner) latestScanLoopRuntime() *scanLoopRuntime { + return &scanLoopRuntime{ + mode: modeCheckBlock, + nextBatch: s.worker.latestUpdated, + interval: s.cfg.IntervalBlock, + batchSize: uint64(s.cfg.IntervalBlock / s.cfg.L1SlotTime), // number of slots in the interval + nextIndex: 0, + } +} + +func (s *Scanner) blobScanLoopRuntime() *scanLoopRuntime { + return &scanLoopRuntime{ + mode: modeCheckBlob, + nextBatch: s.worker.getKvsInBatch, + interval: s.cfg.IntervalBlob, + batchSize: uint64(s.cfg.BatchSize), + nextIndex: 0, + } +} + +func (s *Scanner) metaScanLoopRuntime() *scanLoopRuntime { + return &scanLoopRuntime{ + mode: modeCheckMeta, + nextBatch: s.worker.getKvsInBatch, + interval: s.cfg.IntervalMeta, + batchSize: uint64(s.cfg.BatchSize), + nextIndex: 0, + } +} + +func (s *Scanner) launchScanLoop(state *scanLoopRuntime) { + s.wg.Add(1) go func() { defer s.wg.Done() - s.lg.Info("Scanner started", "mode", s.worker.cfg.Mode, "interval", s.interval.String(), "batchSize", s.worker.cfg.BatchSize) + s.lg.Info("Launching scanner loop", "mode", state.mode, "interval", state.interval.String(), "batchSize", state.batchSize) - mainTicker := time.NewTicker(s.interval) - reportTicker := time.NewTicker(1 * time.Minute) + mainTicker := time.NewTicker(state.interval) defer mainTicker.Stop() - defer reportTicker.Stop() - sts, errCache, err := s.doWork(mismatchTracker{}) - if err != nil { - s.lg.Error("Initial scan failed", "error", err) - } + s.doScan(state) for { select { case <-mainTicker.C: - newSts, scanErrs, err := s.doWork(sts.mismatched.clone()) + s.doScan(state) + + case <-s.ctx.Done(): + return + } + } + }() +} + +func (s *Scanner) doScan(state *scanLoopRuntime) { + if !s.acquireScanPermit() { + return + } + err := s.worker.scanBatch(s.ctx, state, func(kvi uint64, m *scanned) { + s.updateStats(kvi, m) + }) + s.releaseScanPermit() + if err != nil { + s.lg.Error("Scan batch failed", "mode", state.mode, "error", err) + } +} + +func (s *Scanner) launchFixLoop(interval time.Duration) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + + s.lg.Info("Launching scan fix loop", "interval", interval.String()) + + fixTicker := time.NewTicker(interval) + defer fixTicker.Stop() + + for { + select { + case <-fixTicker.C: + s.lg.Info("Scanner fix batch triggered") + // hold until other possible ongoing scans finish + if !s.acquireScanPermit() { + return + } + s.statsMu.Lock() + kvIndices := s.sharedStats.needFix() + s.statsMu.Unlock() + s.lg.Info("Scanner fixing batch", "mismatches", kvIndices) + err := s.worker.fixBatch(s.ctx, kvIndices, func(kvi uint64, m *scanned) { + s.updateStats(kvi, m) + }) + s.releaseScanPermit() if err != nil { - s.lg.Error("Scanner: scan batch failed", "error", err) - continue + s.lg.Error("Fix scan batch failed", "error", err) } - sts = newSts - errCache.merge(scanErrs) - case <-reportTicker.C: - s.logStats(sts) - for i, e := range errCache { - s.lg.Info("Scanner error happened earlier", "kvIndex", i, "error", e) - } + case <-s.ctx.Done(): + return + } + } + }() +} + +func (s *Scanner) updateStats(kvi uint64, m *scanned) { + s.statsMu.Lock() + defer s.statsMu.Unlock() + + if m != nil { + if m.status == pending && s.sharedStats[kvi].status == failed { + // keep failed status until fixed + return + } + s.sharedStats[kvi] = *m + } else { + // fixed or recovered + delete(s.sharedStats, kvi) + } +} + +func (s *Scanner) acquireScanPermit() bool { + select { + case <-s.ctx.Done(): + return false + case <-s.scanPermit: + return true + } +} + +func (s *Scanner) releaseScanPermit() { + select { + case s.scanPermit <- struct{}{}: + default: + } +} + +func (s *Scanner) startReporter() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + localKvCount, sum := s.worker.summaryLocalKvs() + s.lg.Info("Local storage summary", "localKvs", sum, "localKvCount", localKvCount) + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.logStats() case <-s.ctx.Done(): return } @@ -129,29 +271,38 @@ func (s *Scanner) start() { }() } -func (s *Scanner) logStats(sts *stats) { - logFields := []any{ - "localKvs", sts.localKvs, - "localKvsCount", sts.total, +func (s *Scanner) logStats() { + localKvCount, sum := s.worker.summaryLocalKvs() + s.lg.Info("Local storage summary", "localKvs", sum, "localKvCount", localKvCount) + + s.statsMu.Lock() + mismatched := "[]" + if len(s.sharedStats) > 0 { + mismatched = s.sharedStats.String() + } + errSnapshot := make(map[uint64]error) + if s.sharedStats.hasError() { + maps.Copy(errSnapshot, s.sharedStats.withErrors()) } - if len(sts.mismatched) > 0 { - logFields = append(logFields, "mismatched", sts.mismatched.String()) + s.statsMu.Unlock() + + s.lg.Info("Scanner stats", "mode", s.cfg.Mode, "mismatched", mismatched) + for i, e := range errSnapshot { + s.lg.Info("Scanner error happened earlier", "kvIndex", i, "error", e) } - s.lg.Info("Scanner stats", logFields...) } func (s *Scanner) GetScanState() *ScanStats { - s.mu.Lock() - defer s.mu.Unlock() - snapshot := s.scanStats // Make a copy - return &snapshot // Return a pointer to the copy -} + if s == nil { + return &ScanStats{} + } + s.statsMu.Lock() + defer s.statsMu.Unlock() -func (s *Scanner) setScanState(sts *stats) { - s.mu.Lock() - defer s.mu.Unlock() - s.scanStats.MismatchedCount = len(sts.mismatched) - s.scanStats.UnfixedCount = len(sts.mismatched.failed()) + return &ScanStats{ + MismatchedCount: len(s.sharedStats), + UnfixedCount: len(s.sharedStats.failed()), + } } func (s *Scanner) Close() { @@ -167,17 +318,3 @@ func (s *Scanner) Close() { s.lg.Info("Scanner closed") s.wg.Wait() } - -func (s *Scanner) doWork(tracker mismatchTracker) (*stats, scanErrors, error) { - s.lg.Debug("Scan batch started") - start := time.Now() - defer func(stt time.Time) { - s.lg.Info("Scan batch done", "duration", time.Since(stt).String()) - }(start) - - sts, scanErrs, err := s.worker.ScanBatch(s.ctx, tracker) - if err == nil { - s.setScanState(sts) - } - return sts, scanErrs, err -} diff --git a/ethstorage/scanner/utils.go b/ethstorage/scanner/utils.go index ae6f6d40..4e515af9 100644 --- a/ethstorage/scanner/utils.go +++ b/ethstorage/scanner/utils.go @@ -5,42 +5,35 @@ package scanner import ( "fmt" - "maps" "slices" "strings" -) - -type scanErrors map[uint64]error + "time" -func (s scanErrors) add(kvIndex uint64, err error) { - s[kvIndex] = err -} + "github.com/ethereum/go-ethereum/common" +) -func (s scanErrors) nil(kvIndex uint64) { - s[kvIndex] = nil -} - -func (s scanErrors) merge(errs scanErrors) { - for k, v := range errs { - if v != nil { - s[k] = v - } else { - delete(s, k) - } - } +type scanned struct { + status + err error } type status int const ( - pending status = iota // first-time detected - fixed // by scanner - recovered // by downloader - failed // failed to fix + ok status = iota + err_read // read meta or blob error / not found + pending // mismatch detected + fixed // by scanner + recovered // by downloader + failed // failed to fix ) func (s status) String() string { switch s { + case ok: + return "ok" + case err_read: + return "err_read" case pending: return "pending" case recovered: @@ -54,9 +47,9 @@ func (s status) String() string { } } -type mismatchTracker map[uint64]status +type scannedKVs map[uint64]scanned -func (m mismatchTracker) String() string { +func (m scannedKVs) String() string { var items []string keys := make([]uint64, 0, len(m)) for k := range m { @@ -71,75 +64,87 @@ func (m mismatchTracker) String() string { return "[" + strings.Join(items, ",") + "]" } -func (m mismatchTracker) markPending(kvIndex uint64) { - m[kvIndex] = pending +func (m scannedKVs) hasError() bool { + for _, scanned := range m { + if scanned.err != nil { + return true + } + } + return false } -func (m mismatchTracker) markRecovered(kvIndex uint64) { - m[kvIndex] = recovered +func (m scannedKVs) withErrors() map[uint64]error { + res := make(map[uint64]error) + for kvIndex, scanned := range m { + if scanned.err != nil { + res[kvIndex] = scanned.err + } + } + return res } -func (m mismatchTracker) markFixed(kvIndex uint64) { - m[kvIndex] = fixed +// failed() returns all kvIndices that are failed to be fixed +func (m scannedKVs) failed() []uint64 { + var res []uint64 + for kvIndex, scanned := range m { + if scanned.status == failed { + res = append(res, kvIndex) + } + } + slices.Sort(res) + return res } -func (m mismatchTracker) markFailed(kvIndex uint64) { - m[kvIndex] = failed +// needFix() returns all kvIndices that need to be fixed or at least check again +func (m scannedKVs) needFix() []uint64 { + var res []uint64 + for kvIndex, scanned := range m { + if scanned.status == pending || scanned.status == failed || scanned.err != nil { + res = append(res, kvIndex) + } + } + slices.Sort(res) + return res } -func (m mismatchTracker) shouldFix(kvIndex uint64) bool { - status, exists := m[kvIndex] - return exists && (status == pending || status == failed) +type scanLoopRuntime struct { + mode scanMode + nextBatch nextBatchFn + interval time.Duration + batchSize uint64 + nextIndex uint64 } -// failed() returns all indices that are still mismatched -// since the first-time do not count as mismatched and the -// second-time will be fixed immediately if possible -func (m mismatchTracker) failed() []uint64 { - return m.filterByStatus(failed) +type scanUpdateFn func(uint64, *scanned) +type nextBatchFn func(uint64, uint64) ([]uint64, uint64) + +type scanMarker struct { + kvIndex uint64 + mark scanUpdateFn } -// fixed() returns only indices that have been fixed by the scanner -// add recovered() to get those fixed by downloader -func (m mismatchTracker) fixed() []uint64 { - return m.filterByStatus(fixed) +func newScanMarker(kvIndex uint64, fn scanUpdateFn) *scanMarker { + return &scanMarker{kvIndex: kvIndex, mark: fn} } -// recovered() returns indices fixed by downloader from failed status -// those recovered from pending status are no longer tracked -func (m mismatchTracker) recovered() []uint64 { - return m.filterByStatus(recovered) +func (m *scanMarker) markError(commit common.Hash, err error) { + m.mark(m.kvIndex, &scanned{status: err_read, err: fmt.Errorf("commit: %x, error reading kv: %w", commit, err)}) } -func (m mismatchTracker) filterByStatus(s status) []uint64 { - var res []uint64 - for kvIndex, status := range m { - if status == s { - res = append(res, kvIndex) - } - } - slices.Sort(res) - return res +func (m *scanMarker) markFailed(commit common.Hash, err error) { + m.mark(m.kvIndex, &scanned{status: failed, err: fmt.Errorf("commit: %x, error fixing kv: %w", commit, err)}) } -func (m mismatchTracker) clone() mismatchTracker { - clone := make(mismatchTracker) - maps.Copy(clone, m) - return clone +func (m *scanMarker) markMismatched() { + m.mark(m.kvIndex, &scanned{status: pending, err: nil}) } -type stats struct { - localKvs string // kv entries stored in local - total int // total number of kv entries stored in local - mismatched mismatchTracker // tracks all mismatched indices and their status +func (m *scanMarker) markFixed() { + m.mark(m.kvIndex, nil) } -func newStats() *stats { - return &stats{ - localKvs: "", - total: 0, - mismatched: mismatchTracker{}, - } +func (m *scanMarker) markRecovered() { + m.mark(m.kvIndex, nil) } func shortPrt(nums []uint64) string { @@ -162,25 +167,6 @@ func shortPrt(nums []uint64) string { return strings.Join(res, ",") } -func summaryLocalKvs(shards []uint64, kvEntries, lastKvIdx uint64) string { - var res []string - for _, shard := range shards { - if shard*kvEntries > lastKvIdx { - // skip empty shards - break - } - var lastEntry uint64 - if shard == lastKvIdx/kvEntries { - lastEntry = lastKvIdx - } else { - lastEntry = (shard+1)*kvEntries - 1 - } - shardView := fmt.Sprintf("shard%d%s", shard, formatRange(shard*kvEntries, lastEntry)) - res = append(res, shardView) - } - return strings.Join(res, ",") -} - func formatRange(start, end uint64) string { if start == end { return fmt.Sprintf("[%d]", start) diff --git a/ethstorage/scanner/worker.go b/ethstorage/scanner/worker.go index d754f491..dc6d371a 100644 --- a/ethstorage/scanner/worker.go +++ b/ethstorage/scanner/worker.go @@ -7,8 +7,12 @@ import ( "context" "errors" "fmt" + "math/big" + "strings" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" @@ -25,188 +29,241 @@ type IStorageManager interface { Shards() []uint64 } +type IL1 interface { + GetKvMetas(kvIndices []uint64, blockNumber int64) ([][32]byte, error) + GetUpdatedKvIndices(startBlock, endBlock *big.Int) ([]uint64, error) + HeaderByNumber(context.Context, *big.Int) (*types.Header, error) +} + type Worker struct { - sm IStorageManager - fetchBlob es.FetchBlobFunc - l1 es.Il1Source - cfg Config - nextIndexOfKvIdx uint64 - lg log.Logger + sm IStorageManager + fetchBlob es.FetchBlobFunc + l1 IL1 + lg log.Logger } func NewWorker( sm IStorageManager, fetch es.FetchBlobFunc, - l1 es.Il1Source, - cfg Config, + l1 IL1, + batchSize uint64, lg log.Logger, ) *Worker { return &Worker{ sm: sm, fetchBlob: fetch, l1: l1, - cfg: cfg, lg: lg, } } -func (s *Worker) ScanBatch(ctx context.Context, mismatched mismatchTracker) (*stats, scanErrors, error) { - // Never return nil stats and nil scanErrors - sts := newStats() - scanErrors := make(scanErrors) - - // Query local storage info - shards := s.sm.Shards() - kvEntries := s.sm.KvEntries() - entryCount := s.sm.KvEntryCount() - if entryCount == 0 { - s.lg.Info("Scanner: no KV entries found in local storage") - return sts, scanErrors, nil - } - lastKvIdx := entryCount - 1 - s.lg.Info("Scanner: local storage info", "lastKvIdx", lastKvIdx, "shards", shards, "kvEntriesPerShard", kvEntries) +func (s *Worker) scanBatch(ctx context.Context, runtime *scanLoopRuntime, onUpdate scanUpdateFn) error { + start := time.Now() + var kvsInBatch []uint64 + defer func(stt time.Time) { + if len(kvsInBatch) > 0 { + s.lg.Info("Scan batch done", + "mode", runtime.mode, + "scanned", shortPrt(kvsInBatch), + "count", len(kvsInBatch), + "nextIndexOfKvIdx", runtime.nextIndex, + "duration", time.Since(stt).String(), + ) + } + }(start) // Determine the batch of KV indices to scan - kvsInBatch, totalEntries, batchEndExclusive := getKvsInBatch(shards, kvEntries, lastKvIdx, uint64(s.cfg.BatchSize), s.nextIndexOfKvIdx, s.lg) - - sts.localKvs = summaryLocalKvs(shards, kvEntries, lastKvIdx) - sts.total = int(totalEntries) + kvsInBatch, batchEndExclusive := runtime.nextBatch(runtime.batchSize, runtime.nextIndex) + if len(kvsInBatch) == 0 { + s.lg.Info("No KV entries to scan in this batch", "mode", runtime.mode) + return nil + } + s.lg.Info("Scan batch started", "mode", runtime.mode, "startIndexOfKvIdx", runtime.nextIndex, "kvsInBatch", shortPrt(kvsInBatch)) // Query the metas from the L1 contract metas, err := s.l1.GetKvMetas(kvsInBatch, rpc.FinalizedBlockNumber.Int64()) if err != nil { - s.lg.Error("Scanner: failed to query KV metas", "error", err) - return sts, scanErrors, fmt.Errorf("failed to query KV metas: %w", err) + s.lg.Error("Failed to query KV metas for scan batch", "error", err) + return fmt.Errorf("failed to query KV metas: %w", err) } - s.lg.Debug("Scanner: query KV meta done", "kvsInBatch", shortPrt(kvsInBatch)) + s.lg.Debug("Query KV meta done", "kvsInBatch", shortPrt(kvsInBatch)) for i, meta := range metas { select { case <-ctx.Done(): s.lg.Warn("Scanner canceled, stopping scan", "ctx.Err", ctx.Err()) - return sts, scanErrors, ctx.Err() + return ctx.Err() default: } - var found bool + mode := runtime.mode + if mode == modeCheckBlock { + // since we done parsing blob info from block + mode = modeCheckBlob + } var commit common.Hash copy(commit[:], meta[32-es.HashSizeInContract:32]) - kvIndex := kvsInBatch[i] - if s.cfg.Mode == modeCheckMeta { - // Check meta only - var metaLocal []byte - metaLocal, found, err = s.sm.TryReadMeta(kvIndex) - if err != nil { - s.lg.Error("Scanner: failed to read meta", "kvIndex", kvIndex, "error", err) - scanErrors.add(kvIndex, fmt.Errorf("failed to read meta: %w", err)) - continue - } - err = es.CompareCommits(commit.Bytes(), metaLocal) - } else if s.cfg.Mode == modeCheckBlob { - // Query blob and check meta from storage - _, found, err = s.sm.TryRead(kvIndex, int(s.sm.MaxKvSize()), commit) - } else { - s.lg.Error("Scanner: invalid scanner mode", "mode", s.cfg.Mode) - return sts, scanErrors, fmt.Errorf("invalid scanner mode: %d", s.cfg.Mode) - } + s.scanKv(mode, kvsInBatch[i], commit, onUpdate) + } - if found && err == nil { - - // Update status for previously mismatched entries that are now valid - if status, exists := mismatched[kvIndex]; exists { - switch status { - case failed: - mismatched.markRecovered(kvIndex) - // Clear the error state - scanErrors.nil(kvIndex) - s.lg.Info("Scanner: previously failed KV recovered", "kvIndex", kvIndex) - case pending: - delete(mismatched, kvIndex) - s.lg.Info("Scanner: previously pending KV recovered", "kvIndex", kvIndex) - } - } + runtime.nextIndex = batchEndExclusive + return nil +} - // Happy path - s.lg.Debug("Scanner: KV check completed successfully", "kvIndex", kvIndex, "commit", commit) - continue - } +func (s *Worker) getKvsInBatch(batchSize uint64, startIndexOfKvIdx uint64) ([]uint64, uint64) { + localKvCount, _ := s.summaryLocalKvs() + if localKvCount == 0 { + return []uint64{}, 0 + } + shards := s.sm.Shards() + kvEntries := s.sm.KvEntries() + return getKvsInBatch(shards, kvEntries, localKvCount, batchSize, startIndexOfKvIdx, s.lg) +} - if !found { - // The shard is not stored locally - scanErrors.add(kvIndex, fmt.Errorf("shard not found locally: commit=%x", commit)) - s.lg.Error("Scanner: blob not found locally", "kvIndex", kvIndex, "commit", commit) - continue +func (s *Worker) latestUpdated(blocksToScan uint64, lastScannedBlock uint64) ([]uint64, uint64) { + latestFinalized, err := s.l1.HeaderByNumber(context.Background(), big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + s.lg.Error("Failed to get latest finalized block header", "error", err) + return []uint64{}, lastScannedBlock + } + startBlock := lastScannedBlock + 1 + endBlock := latestFinalized.Number.Uint64() + if lastScannedBlock == 0 { + s.lg.Info(fmt.Sprintf("No last scanned block recorded, starting from %d slots ago", blocksToScan)) + startBlock = endBlock - blocksToScan + } + if startBlock > endBlock { + s.lg.Info("No new finalized blocks to scan", "lastScannedBlock", lastScannedBlock, "latestFinalized", endBlock) + return []uint64{}, lastScannedBlock + } + kvsIndices, err := s.l1.GetUpdatedKvIndices(big.NewInt(int64(startBlock)), big.NewInt(int64(endBlock))) + if err != nil { + s.lg.Error("Failed to get updated KV indices", "startBlock", startBlock, "endBlock", endBlock, "error", err) + return []uint64{}, lastScannedBlock + } + // filter out kv indices that are not stored in local storage + shardSet := make(map[uint64]struct{}) + for _, shard := range s.sm.Shards() { + shardSet[shard] = struct{}{} + } + kvEntries := s.sm.KvEntries() + var locallyStored []uint64 + for _, kvi := range kvsIndices { + shardIdx := kvi / kvEntries + if _, ok := shardSet[shardIdx]; ok { + locallyStored = append(locallyStored, kvi) } + } + s.lg.Info("Latest updated KV indices fetched", "startBlock", startBlock, "endBlock", endBlock, "totalUpdatedKvs", len(kvsIndices), "locallyStored", len(locallyStored)) + return locallyStored, endBlock +} - if err != nil { - var commitErr *es.CommitMismatchError - if errors.As(err, &commitErr) { - s.lg.Warn("Scanner: commit mismatch detected", "kvIndex", kvIndex, "error", err) - - // Only fix repeated mismatches - if mismatched.shouldFix(kvIndex) { - s.lg.Info("Scanner: mismatch again, attempting to fix blob", "kvIndex", kvIndex, "commit", commit) - if fixErr := s.fixKv(kvIndex, commit); fixErr != nil { - mismatched.markFailed(kvIndex) - s.lg.Error("Scanner: failed to fix blob", "kvIndex", kvIndex, "error", fixErr) - scanErrors.add(kvIndex, fmt.Errorf("failed to fix blob: %w", fixErr)) - } else { - s.lg.Info("Scanner: blob fixed successfully", "kvIndex", kvIndex) - mismatched.markFixed(kvIndex) - scanErrors.nil(kvIndex) - } - } else { - - // Mark but skip on the first occurrence as it may be caused by KV update and delayed download - mismatched.markPending(kvIndex) - s.lg.Info("Scanner: first-time mismatch, skipping fix attempt", "kvIndex", kvIndex) - } - } else { - s.lg.Error("Scanner: unexpected error occurred", "kvIndex", kvIndex, "error", err) - scanErrors.add(kvIndex, fmt.Errorf("unexpected error: %w", err)) +func (s *Worker) scanKv(mode scanMode, kvIndex uint64, commit common.Hash, onUpdate scanUpdateFn) { + var err error + switch mode { + case modeCheckMeta: + // Check meta only + metaLocal, found, readErr := s.sm.TryReadMeta(kvIndex) + if metaLocal != nil { + err = es.CompareCommits(commit.Bytes(), metaLocal) + } else { + if readErr != nil { + err = fmt.Errorf("failed to read meta: %w", readErr) + } else if !found { + err = fmt.Errorf("meta not found locally: %x", commit) } } + case modeCheckBlob: + // Query blob and check meta from storage + _, found, readErr := s.sm.TryRead(kvIndex, int(s.sm.MaxKvSize()), commit) + if readErr != nil { + // Could be CommitMismatchError + err = readErr + } else if !found { + err = fmt.Errorf("blob not found locally: %x", commit) + } + default: + // Other modes are handled outside + s.lg.Crit("Invalid scanner mode", "mode", mode) } - - s.nextIndexOfKvIdx = batchEndExclusive - if len(kvsInBatch) > 0 { - s.lg.Info("Scanner: scan batch done", "scanned", shortPrt(kvsInBatch), "count", len(kvsInBatch), "nextIndexOfKvIdx", s.nextIndexOfKvIdx) + if err != nil { + marker := newScanMarker(kvIndex, onUpdate) + var commitErr *es.CommitMismatchError + if errors.As(err, &commitErr) { + s.lg.Warn("Commit mismatch detected", "kvIndex", kvIndex, "error", err) + marker.markMismatched() + return + } + s.lg.Error("Failed to scan KV", "mode", mode, "kvIndex", kvIndex, "error", err) + marker.markError(commit, err) + return } - sts.mismatched = mismatched - - return sts, scanErrors, nil + // Happy path + s.lg.Debug("KV check completed successfully", "kvIndex", kvIndex, "commit", commit) } -func (s *Worker) fixKv(kvIndex uint64, commit common.Hash) error { - if err := s.sm.TryWriteWithMetaCheck(kvIndex, commit, s.fetchBlob); err != nil { - return fmt.Errorf("failed to write KV: kvIndex=%d, commit=%x, %w", kvIndex, commit, err) +func (s *Worker) fixBatch(ctx context.Context, kvIndices []uint64, onUpdate scanUpdateFn) error { + metas, err := s.l1.GetKvMetas(kvIndices, rpc.FinalizedBlockNumber.Int64()) + if err != nil { + s.lg.Error("Failed to query KV metas for scan batch", "error", err) + return fmt.Errorf("failed to query KV metas: %w", err) + } + s.lg.Debug("Query KV meta done", "kvsInBatch", shortPrt(kvIndices)) + + for i, meta := range metas { + var commit common.Hash + copy(commit[:], meta[32-es.HashSizeInContract:32]) + s.fixKv(kvIndices[i], commit, onUpdate) } return nil } -func getKvsInBatch(shards []uint64, kvEntries, lastKvIdx, batchSize, batchStartIndex uint64, lg log.Logger) ([]uint64, uint64, uint64) { - // Calculate the total number of KV entries stored locally - var totalEntries uint64 - // Shard indices are sorted but may not be continuous: e.g. [0, 1, 3, 4] indicates shard 2 is missing - for _, shardIndex := range shards { - // The last shard may contain fewer than the full kvEntries - if shardIndex == lastKvIdx/kvEntries { - totalEntries += lastKvIdx%kvEntries + 1 - break +func (s *Worker) fixKv(kvIndex uint64, commit common.Hash, onUpdate scanUpdateFn) { + marker := newScanMarker(kvIndex, onUpdate) + // check blob again before fix + _, found, err := s.sm.TryRead(kvIndex, int(s.sm.MaxKvSize()), commit) + if !found && err == nil { + err = fmt.Errorf("blob not found locally: %x", commit) + } + if err != nil { + var commitErr *es.CommitMismatchError + if errors.As(err, &commitErr) { + s.lg.Info("Fixing mismatched KV", "kvIndex", kvIndex) + if err := s.sm.TryWriteWithMetaCheck(kvIndex, commit, s.fetchBlob); err != nil { + fixErr := fmt.Errorf("failed to fix KV: kvIndex=%d, commit=%x, %w", kvIndex, commit, err) + marker.markFailed(commit, fixErr) + s.lg.Error("Failed to fix KV", "error", fixErr) + return + } + marker.markFixed() + s.lg.Info("KV fixed successfully", "kvIndex", kvIndex) + return } - // Complete shards - totalEntries += kvEntries + s.lg.Error("Failed to scan KV to fix", "kvIndex", kvIndex, "error", err) + marker.markError(commit, err) + return } - lg.Debug("Scanner: KV entries stored locally", "totalKvStored", totalEntries) + marker.markRecovered() + s.lg.Info("KV recovered", "kvIndex", kvIndex, "commit", commit) +} +func (s *Worker) summaryLocalKvs() (uint64, string) { + kvEntryCountOnChain := s.sm.KvEntryCount() + if kvEntryCountOnChain == 0 { + s.lg.Info("No KV entries found in local storage") + return 0, "[]" + } + return summaryLocalKvs(s.sm.Shards(), s.sm.KvEntries(), kvEntryCountOnChain-1) +} + +func getKvsInBatch(shards []uint64, kvEntries, localKvCount, batchSize, startKvIndex uint64, lg log.Logger) ([]uint64, uint64) { // Determine batch start and end KV indices - startKvIndex := batchStartIndex - if startKvIndex >= totalEntries { + if startKvIndex >= localKvCount { startKvIndex = 0 - lg.Debug("Scanner: restarting scan from the beginning") + lg.Debug("Restarting scan from the beginning") } - endKvIndexExclusive := min(startKvIndex+batchSize, totalEntries) + endKvIndexExclusive := min(startKvIndex+batchSize, localKvCount) // The actual batch range is [startKvIndex, endKvIndexExclusive) or [startKvIndex, endIndex] endIndex := endKvIndexExclusive - 1 @@ -235,6 +292,33 @@ func getKvsInBatch(shards []uint64, kvEntries, lastKvIdx, batchSize, batchStartI kvsInBatch = append(kvsInBatch, shards[i]*kvEntries+k) } } - lg.Debug("Scanner: batch index range determined", "batchStart", startKvIndex, "batchEnd(exclusive)", endKvIndexExclusive, "kvsInBatch", shortPrt(kvsInBatch)) - return kvsInBatch, totalEntries, endKvIndexExclusive + lg.Debug("Scan batch index range determined", "batchStart", startKvIndex, "batchEnd(exclusive)", endKvIndexExclusive, "kvsInBatch", shortPrt(kvsInBatch)) + return kvsInBatch, endKvIndexExclusive +} + +// Calculate the total number of KV entries stored locally +func summaryLocalKvs(shards []uint64, kvEntries, lastKvIdx uint64) (uint64, string) { + var totalEntries uint64 + var res []string + // Shard indices are sorted but may not be continuous: e.g. [0, 1, 3, 4] indicates shard 2 is missing + for _, shard := range shards { + shardOfLastKv := lastKvIdx / kvEntries + if shard > shardOfLastKv { + // Skip empty shards + break + } + var lastEntry uint64 + // The last shard may contain fewer than the full kvEntries + if shard == shardOfLastKv { + totalEntries += lastKvIdx%kvEntries + 1 + lastEntry = lastKvIdx + } else { + // Complete shards + totalEntries += kvEntries + lastEntry = (shard+1)*kvEntries - 1 + } + shardView := fmt.Sprintf("shard%d%s", shard, formatRange(shard*kvEntries, lastEntry)) + res = append(res, shardView) + } + return totalEntries, strings.Join(res, ",") } diff --git a/ethstorage/scanner/worker_test.go b/ethstorage/scanner/worker_test.go index e8fe6c66..b6191e84 100644 --- a/ethstorage/scanner/worker_test.go +++ b/ethstorage/scanner/worker_test.go @@ -151,6 +151,17 @@ func TestGetKvsInBatch(t *testing.T) { expectedTotal: 14, expectedBatchEnd: 14, }, + { + name: "Discontinuous shards missing current", + shards: []uint64{0, 2}, + kvEntries: 8, + lastKvIdx: 12, + batchSize: 100, + batchStartIndex: 0, + expectedKvs: []uint64{0, 1, 2, 3, 4, 5, 6, 7}, + expectedTotal: 8, + expectedBatchEnd: 8, + }, { name: "Boundary conditions 1 kv", shards: []uint64{0}, @@ -224,7 +235,8 @@ func TestGetKvsInBatch(t *testing.T) { t.Run(tt.name, func(t *testing.T) { lg := log.New() - kvs, total, batchEnd := getKvsInBatch(tt.shards, tt.kvEntries, tt.lastKvIdx, tt.batchSize, tt.batchStartIndex, lg) + total, _ := summaryLocalKvs(tt.shards, tt.kvEntries, tt.lastKvIdx) + kvs, batchEnd := getKvsInBatch(tt.shards, tt.kvEntries, total, tt.batchSize, tt.batchStartIndex, lg) assert.Equal(t, tt.expectedKvs, kvs, "KV indices do not match") assert.Equal(t, tt.expectedTotal, total, "Total entries do not match")