Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
1988e9e
alert when blob missing from cl
syntrust Dec 4, 2025
8303198
update content
syntrust Dec 4, 2025
c75fe84
update content
syntrust Dec 4, 2025
aeb3dad
update content
syntrust Dec 4, 2025
f5d4f7d
update content
syntrust Dec 4, 2025
d5fdad8
check email config
syntrust Dec 4, 2025
c4dc8b4
check email config
syntrust Dec 4, 2025
8be4488
check email config
syntrust Dec 4, 2025
a1b0be6
check email config
syntrust Dec 4, 2025
3839512
fix config
syntrust Dec 5, 2025
efc5dc9
fix config
syntrust Dec 5, 2025
b593a43
fix config
syntrust Dec 5, 2025
7b75b7f
fix config
syntrust Dec 5, 2025
e7ce75f
refactor
syntrust Dec 5, 2025
5b0fd98
refactor
syntrust Dec 9, 2025
f8d0371
check shard exist
syntrust Dec 9, 2025
98efc3d
Merge branch 'cl-blob-miss' of https://github.com/ethstorage/es-node …
syntrust Dec 9, 2025
d5b8524
skip empty shards
syntrust Dec 9, 2025
b4d7e75
suport mode 3
syntrust Dec 10, 2025
cbc567e
refactor
syntrust Dec 11, 2025
2a69b55
refactor
syntrust Dec 11, 2025
433ab9f
minor
syntrust Dec 11, 2025
aa2cfcb
refactor
syntrust Dec 16, 2025
bb167af
report stats using timely local kv summary
syntrust Dec 16, 2025
71c023f
count local kvs on demand
syntrust Dec 17, 2025
6621274
support 2 intevals
syntrust Dec 17, 2025
3ffb9f0
update check stats using callback
syntrust Dec 18, 2025
7aa6373
retry download blob
syntrust Dec 18, 2025
51fc8c6
clean up
syntrust Dec 18, 2025
070f0e2
refactor
syntrust Dec 19, 2025
dfd6d40
minor
syntrust Dec 20, 2025
87dd045
Merge branch 'cl-blob-miss' of https://github.com/ethstorage/es-node …
syntrust Dec 20, 2025
d0f3f85
fix kv
syntrust Dec 22, 2025
2321088
scan latest updated
syntrust Dec 23, 2025
ddb2c7a
minor
syntrust Dec 25, 2025
a11236a
fix nil
syntrust Dec 30, 2025
c65e607
check blob exist before put cache
syntrust Dec 30, 2025
151feea
Merge branch 'cl-blob-miss' of https://github.com/ethstorage/es-node …
syntrust Dec 30, 2025
0eade27
filter updates
syntrust Dec 31, 2025
b43b008
Fix comments
syntrust Dec 31, 2025
0feff13
Merge branch 'main' of https://github.com/ethstorage/es-node into cl-…
syntrust Dec 31, 2025
13979c3
complete err msg
syntrust Jan 4, 2026
cd334db
better err msg
syntrust Jan 4, 2026
5cb1132
Merge branch 'cl-blob-miss' of https://github.com/ethstorage/es-node …
syntrust Jan 4, 2026
3dc133e
handle send email error
syntrust Jan 4, 2026
7338876
Merge branch 'cl-blob-miss' of https://github.com/ethstorage/es-node …
syntrust Jan 4, 2026
d40c01c
test and fix
syntrust Jan 5, 2026
4ec27f2
Merge branch 'main' of https://github.com/ethstorage/es-node into sca…
syntrust Jan 5, 2026
ddfa86c
refactor
syntrust Jan 6, 2026
02e49ca
config slot
syntrust Jan 6, 2026
06ab8e7
log err
syntrust Jan 6, 2026
4d1fb90
fix batch size for block check
syntrust Jan 6, 2026
e675b22
rm fixed/recovered
syntrust Jan 6, 2026
e1c9d77
fix scan permit
syntrust Jan 6, 2026
f5b3ecc
refactor
syntrust Jan 6, 2026
d7b54ed
update doc
syntrust Jan 9, 2026
a530f38
updates on fix
syntrust Jan 13, 2026
9099214
minor
syntrust Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/es-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) {
Storage: *storageConfig,
Mining: minerConfig,
Archiver: archiverConfig,
Scanner: scanner.NewConfig(ctx),
Scanner: scanner.NewConfig(ctx, l1Endpoint.L1BeaconSlotTime),
}
if err := cfg.Check(); err != nil {
return nil, err
Expand Down
11 changes: 3 additions & 8 deletions cmd/es-node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"math/big"
"net/http"
Expand All @@ -19,11 +18,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
es "github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/blobs"
"github.com/ethstorage/go-ethstorage/ethstorage/flags"
"github.com/ethstorage/go-ethstorage/ethstorage/storage"
"github.com/urfave/cli"
Expand Down Expand Up @@ -261,15 +260,11 @@ func downloadBlobFromRPC(endpoint string, kvIndex uint64, hash common.Hash) ([]b
return nil, err
}

var blob kzg4844.Blob
copy(blob[:], result)
commitment, err := kzg4844.BlobToCommitment(&blob)
blobhash, err := blobs.BlobToVersionedHash(result)
if err != nil {
return nil, fmt.Errorf("blobToCommitment failed: %w", err)
}
blobhash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &commitment))
fmt.Printf("blobhash from blob: %x\n", blobhash)
if bytes.Compare(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) != 0 {
if !bytes.Equal(blobhash[:es.HashSizeInContract], hash[:es.HashSizeInContract]) {
return nil, fmt.Errorf("invalid blobhash for %d want: %x, got: %x", kvIndex, hash, blobhash)
}

Expand Down
1 change: 1 addition & 0 deletions cmd/es-utils/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func SendBlobTx(
for i := 0; i <= maxRetries; i++ {
errRetry = client.SendTransaction(context.Background(), tx)
if errRetry == nil {
lg.Info("SendTransaction succeeded", "txHash", tx.Hash())
break
}
lg.Warn("SendTransaction failed", "retriesLeft", maxRetries-i, "error", errRetry)
Expand Down
14 changes: 14 additions & 0 deletions ethstorage/eth/polling_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,20 @@ func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eve
return w.FilterLogs(context.Background(), query)
}

func (w *PollingClient) GetUpdatedKvIndices(startBlock, endBlock *big.Int) ([]uint64, error) {
events, err := w.FilterLogsByBlockRange(startBlock, endBlock, PutBlobEvent)
if err != nil {
return nil, err
}
var kvIndices []uint64
for _, event := range events {
kvIndices = append(kvIndices, new(big.Int).SetBytes(event.Topics[1][:]).Uint64())
var hash common.Hash
copy(hash[:], event.Topics[3][:])
}
return kvIndices, nil
}

func (w *PollingClient) GetStorageKvEntryCount(blockNumber int64) (uint64, error) {
h := crypto.Keccak256Hash([]byte(`kvEntryCount()`))

Expand Down
59 changes: 51 additions & 8 deletions ethstorage/scanner/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,57 @@
# EthStorage Scanner

A data verification service periodically checks if the data hashes of the blobs in local storage files align with the key-value hashes in the storage contract. If any mismatch found, the service looks for the correct blob in the p2p network, and update the data in the local storage.
A data verification service that periodically checks whether locally stored KV blobs match the on-chain KV meta hash (commit) from the storage contract.

This service offers a lightweight yet effective way to maintain network-wide data consistency.
If a mismatch is detected, the scanner can attempt to repair the local data by re-fetching the blob from the network and rewriting it with meta validation.

### Usage
## Options

The scanner service is enabled with `check meta` mode by default:
- `--scanner.mode` Data scan mode, 0: disabled, 1: check meta, 2: check blob (default: 1)[`ES_NODE_SCANNER_MODE`].
| Flag | Default | Env var | Description |
| --- | --- | --- | --- |
| `--scanner.mode` | `1` | `ES_NODE_SCANNER_MODE` | Data scan mode (bitmask): `0`=disabled, `1`=meta, `2`=blob, `4`=block. Combine via sum/OR (e.g. `3`=`1+2`, `5`=`1+4`, `7`=`1+2+4`). |
| `--scanner.batch-size` | `8192` | `ES_NODE_SCANNER_BATCH_SIZE` | Data scan batch size. |
| `--scanner.interval.meta` | `3` (minutes) | `ES_NODE_SCANNER_INTERVAL_META` | Scan interval for `check-meta`. |
| `--scanner.interval.blob` | `60` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOB` | Scan interval for `check-blob`. |
| `--scanner.interval.block` | `1440` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOCK` | Scan interval for `check-block`. |

The following settings are required if the service is not disabled manually:
- `--scanner.batch-size` Data scan batch size (default: 8192) [`$ES_NODE_SCANNER_BATCH_SIZE`]
- `--scanner.interval` Data scan interval in minutes (default: 3) [`$ES_NODE_SCANNER_INTERVAL`]
## Scan modes explained

The flag / env `--scanner.mode` (default: `1`) [`ES_NODE_SCANNER_MODE`] is a bitmask:

- `0`: disabled
- `1`: check-meta (compare local meta with on-chain meta)
- `2`: check-blob (read local blob and validate its commit against on-chain meta)
- `4`: check-block (scan recently updated KVs from finalized blocks, then run check-blob on them)

### Quick comparison

| Name | `--scanner.mode` | What it does | Performance impact | Notes |
| --- | ---: | --- | --- | --- |
| check-meta | `1` | Read local meta and compare with on-chain meta | Low | Minimal impact; may miss some mismatches |
| check-blob | `2` | Compute the commit from a local blob and validate it against on-chain meta | High | Best precision; highest IO/CPU cost when many blobs |
| check-block | `4` | Scan recently finalized blocks for updated KVs, then run `check-blob` on them | High | Ensure newly updated blobs are fetched and verified within the Beacon node retention window |

### More choices

You can combine modes by summing/OR-ing them to get mixed behavior and balance precision, coverage vs performance:

- `3` = `1 + 2` = meta + blob
- `5` = `1 + 4` = meta + block
- `6` = `2 + 4` = blob + block
- `7` = meta + blob + block

`--scanner.batch-size` and `--scanner.interval.*` control the batch size and frequency of each scan mode, so you can tune the performance impact further based on the amount of data and hardware resources.

## Status tracking

When es-node starts, the scanner only starts after the node finishes syncing all shards from the P2P network.

After it starts, the scanner periodically logs summaries and statistics (mismatched/unfixed counts). These counts are also exposed in the node state as `scan_stats`.

## Repair behavior

A background repair loop periodically retries/fixes mismatched KVs by fetching blobs from the p2p network and rewriting them locally.

- If mismatches are detected for the first time, the KV is marked as `pending`, meaning it is scheduled for repair. Sometimes the mismatch is transient (e.g., due to download latency) and may be recovered automatically by the downloader.
- If the KV is repaired successfully or recovered, it is removed from the mismatch list.
- If the repair fails, it remains in the mismatch list and is marked as `failed` for future retries.
128 changes: 102 additions & 26 deletions ethstorage/scanner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,106 @@ package scanner

import (
"fmt"
"time"

"github.com/ethstorage/go-ethstorage/ethstorage/flags/utils"
"github.com/urfave/cli"
)

const (
ModeFlagName = "scanner.mode"
BatchSizeFlagName = "scanner.batch-size"
IntervalMetaFlagName = "scanner.interval.meta"
IntervalBlobFlagName = "scanner.interval.blob"
IntervalBlockFlagName = "scanner.interval.block"
)

// intervals in minutes
const defaultIntervalMeta = 3
const defaultIntervalBlob = 60
const defaultIntervalBlock = 24 * 60

func scannerEnv(name string) string {
return utils.PrefixEnvVar("SCANNER_" + name)
}

const (
modeDisabled = iota
// Compare local meta hashes with those in L1 contract
modeCheckMeta
// Compute meta hashes from local blobs and compare with those in L1 contract
modeCheckBlob
// Scan updated KVs from recent blocks and run "check-blob" on them
modeCheckBlock
)

// scanMode is an internal per-loop mode used by scan workers (meta/blob/block).
type scanMode int

func (m scanMode) String() string {
switch m {
case modeDisabled:
return "disabled"
case modeCheckMeta:
return "check-meta"
case modeCheckBlob:
return "check-blob"
case modeCheckBlock:
return "check-block"
default:
return fmt.Sprintf("unknown(%d)", int(m))
}
}

const (
ModeFlagName = "scanner.mode"
BatchSizeFlagName = "scanner.batch-size"
IntervalFlagName = "scanner.interval"
modeSetMeta scanModeSet = 1 << iota // 1
modeSetBlob // 2
modeSetBlock // 4
)

const defaultInterval = 3 // in minutes
const scanModeSetMask = modeSetMeta | modeSetBlob | modeSetBlock // 7

func scannerEnv(name string) string {
return utils.PrefixEnvVar("SCANNER_" + name)
// scanModeSet is a combination of scanMode values used for configuration purposes.
type scanModeSet uint8

func (m scanModeSet) String() string {
if m == 0 {
return "disabled"
}

out := ""
if m&modeSetMeta != 0 {
out = "check-meta"
}
if m&modeSetBlob != 0 {
if out != "" {
out += "+"
}
out += "check-blob"
}
if m&modeSetBlock != 0 {
if out != "" {
out += "+"
}
out += "check-block"
}
return out
}

type Config struct {
Mode int
BatchSize int
Interval int
Mode scanModeSet
BatchSize int
L1SlotTime time.Duration
IntervalMeta time.Duration
IntervalBlob time.Duration
IntervalBlock time.Duration
}

func CLIFlags() []cli.Flag {
flags := []cli.Flag{
cli.IntFlag{
Name: ModeFlagName,
Usage: "Data scan mode, 0: disabled, 1: check meta, 2: check blob",
Usage: "Data scan mode (bitmask) : 0=disabled, 1=meta, 2=blob, 4=block; combinations via sum/OR: 3=meta+blob, 5=meta+block, 6=blob+block, 7=all",
EnvVar: scannerEnv("MODE"),
Value: 1,
},
Expand All @@ -49,29 +115,39 @@ func CLIFlags() []cli.Flag {
Value: 8192,
},
cli.IntFlag{
Name: IntervalFlagName,
Usage: fmt.Sprintf("Data scan interval in minutes, minimum %d (default)", defaultInterval),
EnvVar: scannerEnv("INTERVAL"),
Value: defaultInterval,
Name: IntervalMetaFlagName,
Usage: fmt.Sprintf("Data scan interval of check-meta mode in minutes (default %d)", defaultIntervalMeta),
EnvVar: scannerEnv("INTERVAL_META"),
Value: defaultIntervalMeta,
},
cli.IntFlag{
Name: IntervalBlobFlagName,
Usage: fmt.Sprintf("Data scan interval of check-blob mode in minutes (default %d)", defaultIntervalBlob),
EnvVar: scannerEnv("INTERVAL_BLOB"),
Value: defaultIntervalBlob,
},
cli.IntFlag{
Name: IntervalBlockFlagName,
Usage: fmt.Sprintf("Data scan interval of check-block mode in minutes (default %d)", defaultIntervalBlock),
EnvVar: scannerEnv("INTERVAL_BLOCK"),
Value: defaultIntervalBlock,
},
}
return flags
}

func NewConfig(ctx *cli.Context) *Config {
mode := ctx.GlobalInt(ModeFlagName)
if mode == modeDisabled {
func NewConfig(ctx *cli.Context, slot uint64) *Config {
mode := scanModeSet(ctx.GlobalInt(ModeFlagName)) & scanModeSetMask

if mode == 0 {
return nil
}
if mode != modeCheckMeta && mode != modeCheckBlob {
panic(fmt.Sprintf("invalid scanner mode: %d", mode))
}
if interval := ctx.GlobalInt(IntervalFlagName); interval < defaultInterval {
panic(fmt.Sprintf("scanner interval must be at least %d minutes", defaultInterval))
}
return &Config{
Mode: mode,
BatchSize: ctx.GlobalInt(BatchSizeFlagName),
Interval: ctx.GlobalInt(IntervalFlagName),
Mode: mode,
BatchSize: ctx.GlobalInt(BatchSizeFlagName),
L1SlotTime: time.Second * time.Duration(slot),
IntervalMeta: time.Minute * time.Duration(ctx.GlobalInt(IntervalMetaFlagName)),
IntervalBlob: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlobFlagName)),
IntervalBlock: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlockFlagName)),
}
}
Loading
Loading