diff --git a/cfglogging.go b/cfglogging.go index 6651e2b..808f33b 100644 --- a/cfglogging.go +++ b/cfglogging.go @@ -3,6 +3,7 @@ package veracity import ( "github.com/datatrails/go-datatrails-common/logger" "github.com/urfave/cli/v2" + "go.uber.org/zap" ) // cfgLogging establishes the logger @@ -12,7 +13,14 @@ func cfgLogging(cmd *CmdCtx, cCtx *cli.Context) error { if logLevel == "" { logLevel = "INFO" } - logger.New(logLevel, logger.WithConsole()) - cmd.log = logger.Sugar + if logLevel == "NOOP" { + cmd.log = &logger.WrappedLogger{ + SugaredLogger: zap.NewNop().Sugar(), + } + } else { + logger.New(logLevel, logger.WithConsole()) + cmd.log = logger.Sugar + } + return nil } diff --git a/replicatelogs.go b/replicatelogs.go index 012446f..f36f576 100644 --- a/replicatelogs.go +++ b/replicatelogs.go @@ -12,6 +12,7 @@ import ( "time" "github.com/datatrails/go-datatrails-common/cbor" + "github.com/datatrails/go-datatrails-common/cose" "github.com/datatrails/go-datatrails-common/logger" "github.com/datatrails/go-datatrails-merklelog/massifs" "github.com/datatrails/go-datatrails-merklelog/massifs/watcher" @@ -38,17 +39,35 @@ const ( ) var ( - ErrChangesFlagIsExclusive = errors.New("use --changes Or --massif and --tenant, not both") - ErrNewReplicaNotEmpty = errors.New("the local directory for a new replica already exists") - ErrSealNotFound = errors.New("seal not found") - ErrSealVerifyFailed = errors.New("the seal signature verification failed") - ErrFailedCheckingConsistencyProof = errors.New("failed to check a consistency proof") - ErrFailedToCreateReplicaDir = errors.New("failed to create a directory needed for local replication") - ErrRequiredOption = errors.New("a required option was not provided") - ErrRemoteLogTruncated = errors.New("the local replica indicates the remote log has been truncated") - ErrRemoteLogInconsistentRootState = errors.New("the local replica root state disagrees with the remote") + ErrChangesFlagIsExclusive = errors.New("use --changes Or --massif and --tenant, not both") + ErrNewReplicaNotEmpty = errors.New("the local directory for a new replica already exists") + ErrSealNotFound = errors.New("seal not found") + ErrSealVerifyFailed = errors.New("the seal signature verification failed") + ErrFailedCheckingConsistencyProof = errors.New("failed to check a consistency proof") + ErrFailedToCreateReplicaDir = errors.New("failed to create a directory needed for local replication") + ErrRequiredOption = errors.New("a required option was not provided") + ErrRemoteLogTruncated = errors.New("the local replica indicates the remote log has been truncated") + ErrRemoteLogInconsistentRootState = errors.New("the local replica root state disagrees with the remote") + ErrInconsistentUseOfPrefetchedSeal = errors.New("prefetching signed root reader used inconsistently") ) +// prefetchingSealReader pre-fetches the seal for the massif to avoid racing with the +// sealer. If the massif is read first, the log can grow and a a new seal can +// be applied to the *longer* log. At which point the previously read copy of +// the massif will be "to short" for the seal. +// See Bug#10530 +type prefetchingSealReader struct { + msg *cose.CoseSign1Message + state massifs.MMRState + tenantIdentity string + massifIndex uint32 +} + +type changeCollector struct { + log logger.Logger + watchOutput string +} + // NewReplicateLogsCmd updates a local replica of a remote log, verifying the mutual consistency of the two before making any changes. // //nolint:gocognit @@ -266,6 +285,7 @@ type VerifiedReplica struct { writeOpener massifs.WriteAppendOpener localReader massifs.ReplicaReader remoteReader MassifReader + rootReader massifs.SealGetter cborCodec cbor.CBORCodec } @@ -324,7 +344,6 @@ func NewVerifiedReplica( remoteReader := massifs.NewMassifReader( logger.Sugar, reader, - massifs.WithSealGetter(&cmd.rootReader), ) return &VerifiedReplica{ @@ -333,6 +352,7 @@ func NewVerifiedReplica( writeOpener: NewFileWriteOpener(), localReader: &localReader, remoteReader: &remoteReader, + rootReader: &cmd.rootReader, cborCodec: cmd.cborCodec, }, nil } @@ -441,10 +461,20 @@ func (v *VerifiedReplica) ReplicateVerifiedUpdates( for i := startMassif; i <= endMassif; i++ { - remoteVerifyOpts := []massifs.ReaderOption{massifs.WithCBORCodec(v.cborCodec)} + // Note: we have to fetch the seal before the massif, otherwise we can lose a rase with the builder + // See bug#10530 + remoteSealReader, err := NewPrefetchingSealReader(ctx, v.rootReader, tenantIdentity, i) + if err != nil { + return err + } + remoteVerifyOpts := []massifs.ReaderOption{ + massifs.WithCBORCodec(v.cborCodec), + massifs.WithSealGetter(remoteSealReader), + } if local != nil { + var baseState massifs.MMRState // Promote the trusted base state to a V1 state if it is a V0 state. - baseState, err := trustedBaseState(local) + baseState, err = trustedBaseState(local) if err != nil { return err } @@ -595,11 +625,6 @@ func peakBaggedRoot(state massifs.MMRState) []byte { return mmr.HashPeaksRHS(sha256.New(), state.Peaks) } -type changeCollector struct { - log logger.Logger - watchOutput string -} - func (c *changeCollector) Logf(msg string, args ...any) { if c.log == nil { return @@ -677,3 +702,28 @@ func readTenantMassifChanges(ctx context.Context, cCtx *cli.Context, cmd *CmdCtx // No explicit config and --all not set, read from stdin return stdinToDecodedTenantMassifs() } + +func NewPrefetchingSealReader(ctx context.Context, sealGetter massifs.SealGetter, tenantIdentity string, massifIndex uint32) (*prefetchingSealReader, error) { + + msg, state, err := sealGetter.GetSignedRoot(ctx, tenantIdentity, massifIndex) + if err != nil { + return nil, err + } + reader := prefetchingSealReader{ + msg: msg, + state: state, + tenantIdentity: tenantIdentity, + massifIndex: massifIndex, + } + return &reader, nil +} + +func (r *prefetchingSealReader) GetSignedRoot(ctx context.Context, tenantIdentity string, massifIndex uint32, opts ...massifs.ReaderOption) (*cose.CoseSign1Message, massifs.MMRState, error) { + if tenantIdentity != r.tenantIdentity { + return nil, massifs.MMRState{}, fmt.Errorf("%w: tenant requested: %s, tenant prefetched: %s", ErrInconsistentUseOfPrefetchedSeal, tenantIdentity, r.tenantIdentity) + } + if massifIndex != r.massifIndex { + return nil, massifs.MMRState{}, fmt.Errorf("%w: massif requested: %d, massif prefetched: %d", ErrInconsistentUseOfPrefetchedSeal, massifIndex, r.massifIndex) + } + return r.msg, r.state, nil +} diff --git a/tests/replicatelogs/replicatelogs_azurite_test.go b/tests/replicatelogs/replicatelogs_azurite_test.go index 3e0864f..4f797e2 100644 --- a/tests/replicatelogs/replicatelogs_azurite_test.go +++ b/tests/replicatelogs/replicatelogs_azurite_test.go @@ -7,11 +7,13 @@ import ( "crypto/elliptic" "crypto/sha256" "encoding/json" + "errors" "fmt" "io" "os" "path/filepath" "testing" + "time" "github.com/datatrails/go-datatrails-common/logger" "github.com/datatrails/go-datatrails-merklelog/massifs" @@ -43,6 +45,109 @@ func mustHashFile(t *testing.T, filename string) []byte { return hash } +// TestRegression10530 covers the case where the seal fetched for the massif is +// ahead of the masssif. Essentially, this can only happen where the seal is +// read *after* the massif, and between reading the massif and generating the +// seal that was fetched, more items are added to the massif. Thus the massif +// data fetched does not containe all the items that are covered by the seal. +// +// NOTICE: this test is unavoidably *theoretically* flaky on the PASS side. +// In that it can fail to trigger the race condition and so PASS. +// However, If the test is succsfull in triggering the race condition, it will +// always FAIL. +// Further, in development against the known broken code, we have never seen +// this test take more than 3 attempts to cause the race As configured +// +// The implication of this is that there is a small chance this test will not +// imediately catch the regresion, but soon after. This means that on a FAIL +// from this test, it is 100% a return of the race condition and should be +// investigated. There is a tiny chance the race may have been re-introduced by +// an earlier change than is associated with the fail. +func (s *ReplicateLogsCmdSuite) TestRegression10530() { + + tests := []struct { + name string + // attempts mitigates the inherent flakyness of detecting a race bug + attempts int + massifHeight uint8 + leafBatch int + batchCount int + activeMassif string + }{ + // note: we only need a single massif to catch the race condition. and + // having more makes it much harder to configure the replication run. + // these numbers are tuned to balance run time against the reliability + // of catching the error. A pass unfortunately takes 10's of seconds + {"one by one", 3, 14, 1, 250, "0"}, + } + key := massifs.TestGenerateECKey(s.T(), elliptic.P256()) + tc := massifs.NewLocalMassifReaderTestContext(s.T(), logger.Sugar, "TestRegression10530") + tenantId0 := tc.G.NewTenantIdentity() + tc.AzuriteContext.DeleteBlobsByPrefix(massifs.TenantMassifPrefix(tenantId0)) + + for _, tt := range tests { + + s.Run(tt.name, func() { + + for attempt := range tt.attempts { + + ctx, cancel := context.WithCancel(context.Background()) + go func(cancel context.CancelFunc, massifHeight uint8, leafBatch, batchcount int) { + defer cancel() + + for range batchcount { + tc.AddLeavesToLog( + tenantId0, massifHeight, leafBatch, + massifs.TestWithSealKey(&key), + ) + } + }(cancel, tt.massifHeight, tt.leafBatch, tt.batchCount) + + // Replicate the log + // note: VERACITY_IKWID is set in main, we need it to enable --envauth so we force it here + app := veracity.NewApp("tests", true) + veracity.AddCommands(app, true) + + replicaDir := s.T().TempDir() + veracityRuns := 1 + done := false + for !done { + + err := app.Run([]string{ + "veracity", + "--loglevel", "NOOP", // sets the zap noop logger which avoids a race with our logging package. + "--envauth", // uses the emulator + "--container", tc.TestConfig.Container, + "--data-url", s.Env.AzuriteVerifiableDataURL, + "--tenant", tenantId0, + "--height", fmt.Sprintf("%d", tt.massifHeight), + "replicate-logs", + // "--ancestors", fmt.Sprintf("%d", tt.ancestors), + "--replicadir", replicaDir, + "--massif", tt.activeMassif, + }) + if err != nil { + // We want to fatal out on the first instalce of state size exceeds data + if errors.Is(err, massifs.ErrStateSizeExceedsData) { + s.T().Fatalf("seal race detected on run %d, in attempt %d. %v", veracityRuns, attempt, err) + } + fmt.Printf("run %d: %v\n", veracityRuns, err) + } + time.Sleep(1 * time.Second) + + select { + case <-ctx.Done(): + done = true + default: + continue + } + veracityRuns++ + } + } + }) + } +} + // TestReplicateMassifUpdate ensures that an extension to a previously replicated // massif is handled correctly func (s *ReplicateLogsCmdSuite) TestReplicateMassifUpdate() {