diff --git a/go.mod b/go.mod index 6807208a..32496266 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/boltdb/bolt v1.3.1 // indirect + github.com/brianvoe/gofakeit/v7 v7.6.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cilium/ebpf v0.16.0 // indirect github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect diff --git a/go.sum b/go.sum index 817add1d..5d0aa366 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/blocklessnetwork/b7s-attributes v0.0.0/go.mod h1:0c+ZemB4kfylI14IERH4 github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= +github.com/brianvoe/gofakeit/v7 v7.6.0 h1:M3RUb5CuS2IZmF/cP+O+NdLxJEuDAZxNQBwPbbqR6h4= +github.com/brianvoe/gofakeit/v7 v7.6.0/go.mod h1:QXuPeBw164PJCzCUZVmgpgHJ3Llj49jSLVkKPMtxtxA= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/cavaliergopher/grab/v3 v3.0.1 h1:4z7TkBfmPjmLAAmkkAZNX/6QJ1nNFdv3SdIHXju0Fr4= github.com/cavaliergopher/grab/v3 v3.0.1/go.mod h1:1U/KNnD+Ft6JJiYoYBAimKH2XrYptb8Kl3DFGmsjpq4= diff --git a/node/head/batch_store_internal_test.go b/node/head/batch_store_internal_test.go new file mode 100644 index 00000000..825419ae --- /dev/null +++ b/node/head/batch_store_internal_test.go @@ -0,0 +1,173 @@ +package head + +import ( + "context" + "slices" + "strings" + "testing" + + "github.com/blessnetwork/b7s/models/execute" + "github.com/blessnetwork/b7s/models/request" + batchstore "github.com/blessnetwork/b7s/stores/batch-store" + "github.com/blessnetwork/b7s/testing/mocks" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestHead_SaveBatch(t *testing.T) { + + var ( + head = createHeadNode(t) + bs = mocks.BaselineBatchStore(t) + + batchID = newRequestID() + req = generateExecuteBatch(t, 4, 3) + ) + + head.cfg.BatchStore = bs + + bs.CreateBatchFunc = func(_ context.Context, er *batchstore.ExecuteBatchRecord) error { + require.NotNil(t, er) + require.Equal(t, batchID, er.ID) + require.Equal(t, req.Template.FunctionID, er.CID) + require.Equal(t, req.Template.Method, er.Method) + require.Equal(t, req.Template.Config, er.Config) + require.Equal(t, req.MaxAttempts, er.MaxAttempts) + + return nil + } + + bs.CreateWorkItemsFunc = func(_ context.Context, items ...*batchstore.WorkItemRecord) error { + + require.Len(t, items, len(req.Arguments)) + for i, item := range items { + require.Equal(t, batchID, item.BatchID) + require.Empty(t, item.ChunkID) + + require.Equal(t, req.Arguments[i], item.Arguments) + require.Equal(t, batchstore.StatusCreated, int(item.Status)) + require.Empty(t, item.Output) + require.Zero(t, item.Attempts) + } + + return nil + } + + err := head.saveBatch(batchID, req) + require.NoError(t, err) +} + +func TestHead_SaveChunk(t *testing.T) { + + var ( + head = createHeadNode(t) + bs = mocks.BaselineBatchStore(t) + + batchID = newRequestID() + + template = request.ExecutionRequestTemplate{ + FunctionID: mocks.GenericFunctionID, + Method: mocks.GenericFunctionMethod, + } + + parts = []*request.WorkOrderBatch{ + { + Template: template, + RequestID: batchID, + ChunkID: newRequestID(), + Arguments: [][]string{ + {"10", "11", "12"}, + {"20", "21", "22"}, + {"30", "31", "32"}, + }, + }, + { + Template: template, + RequestID: batchID, + ChunkID: newRequestID(), + Arguments: [][]string{ + {"40", "41", "42"}, + {"50", "51", "52"}, + {"60", "61", "62"}, + }, + }, + } + + assignments = map[peer.ID]*request.WorkOrderBatch{ + mocks.GenericPeerIDs[0]: parts[0], + mocks.GenericPeerIDs[1]: parts[1], + } + ) + + head.cfg.BatchStore = bs + + // Enable lookup for easier verification. + lookup := map[string]struct { + worker peer.ID + wo *request.WorkOrderBatch + }{ + parts[0].ChunkID: { + worker: mocks.GenericPeerIDs[0], + wo: parts[0], + }, + parts[1].ChunkID: { + worker: mocks.GenericPeerIDs[1], + wo: parts[1], + }, + } + + bs.CreateChunksFunc = func(_ context.Context, chunks ...*batchstore.ChunkRecord) error { + + // Require we have precisely the number of chunks we expect. + require.Len(t, chunks, len(parts)) + for _, chunk := range chunks { + + // We MUST have this exact chunk. + orig, ok := lookup[chunk.ID] + require.True(t, ok) + + require.Equal(t, batchID, chunk.BatchID) + require.Equal(t, orig.wo.ChunkID, chunk.ID) + require.Equal(t, orig.worker.String(), chunk.Worker) + require.Equal(t, batchstore.StatusCreated, int(chunk.Status)) + } + + return nil + } + + // For each chunk we should assign the work items. + assignmentsDone := 0 + bs.AssignWorkItemsFunc = func(_ context.Context, chunkID string, ids ...string) error { + assignmentsDone += 1 + + orig, ok := lookup[chunkID] + require.True(t, ok) + require.Len(t, ids, len(orig.wo.Arguments)) + + // Each list of arguments will produce one work item. + var expectedIDs []string + for _, args := range orig.wo.Arguments { + expectedIDs = append(expectedIDs, + workItemID(batchID, string(execute.ExecutionID(orig.wo.Template.FunctionID, orig.wo.Template.Method, args)))) + } + + // We need these sorted so comparison returns success. + slices.Sort(ids) + slices.Sort(expectedIDs) + require.Equal(t, expectedIDs, ids) + + // Verify ID is in format / + for _, id := range ids { + fields := strings.Split(id, "/") + require.Len(t, fields, 2) + require.Equal(t, batchID, fields[0]) + } + + return nil + } + + err := head.saveChunkInfo(batchID, assignments) + require.NoError(t, err) + + require.Equal(t, len(parts), assignmentsDone) +} diff --git a/node/head/batch_types.go b/node/head/batch_types.go index 0530e660..92b509e4 100644 --- a/node/head/batch_types.go +++ b/node/head/batch_types.go @@ -34,12 +34,13 @@ func batchRecordToRequest(batch *batchstore.ExecuteBatchRecord, items []*batchst func requestToBatchRecord(id string, req request.ExecuteBatch) (*batchstore.ExecuteBatchRecord, []*batchstore.WorkItemRecord) { batch := &batchstore.ExecuteBatchRecord{ - ID: id, - CID: req.Template.FunctionID, - Method: req.Template.Method, - Config: req.Template.Config, - Status: batchstore.StatusCreated, - CreatedAt: time.Now().UTC(), + ID: id, + CID: req.Template.FunctionID, + Method: req.Template.Method, + Config: req.Template.Config, + Status: batchstore.StatusCreated, + MaxAttempts: req.MaxAttempts, + CreatedAt: time.Now().UTC(), } items := make([]*batchstore.WorkItemRecord, len(req.Arguments)) diff --git a/node/head/work_partition.go b/node/head/work_partition.go index 05f7a083..614f80a3 100644 --- a/node/head/work_partition.go +++ b/node/head/work_partition.go @@ -12,21 +12,30 @@ import ( // In the future, we may have different criteria for what gets assigned to each peer. Right now we do round robin. func partitionWorkBatch(peers []peer.ID, requestID string, req request.ExecuteBatch) map[peer.ID]*request.WorkOrderBatch { + n := len(peers) + assignments := make(map[peer.ID]*request.WorkOrderBatch) + + if n == 0 { + return assignments + } + variants := req.Arguments // TODO: Do this in one go, not two maps. // Assign arguments to a list of peers in a round robin fashion - n := len(peers) a := make(map[peer.ID][][]string) for i, args := range variants { target := peers[i%n] - a[target] = append(a[target], args) } - assignments := make(map[peer.ID]*request.WorkOrderBatch) for _, peer := range peers { + // If we have more peers than items to execute - we could have peers without work. + // In this case skip them. + if len(a[peer]) == 0 { + continue + } chunkID := newChunkID() assignments[peer] = req.WorkOrderBatch(requestID, chunkID, a[peer]...) diff --git a/node/head/work_partition_internal_test.go b/node/head/work_partition_internal_test.go new file mode 100644 index 00000000..b1dd7cda --- /dev/null +++ b/node/head/work_partition_internal_test.go @@ -0,0 +1,162 @@ +package head + +import ( + "math/rand/v2" + "testing" + + "github.com/brianvoe/gofakeit/v7" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/blessnetwork/b7s/models/execute" + "github.com/blessnetwork/b7s/models/request" + "github.com/blessnetwork/b7s/testing/mocks" +) + +func TestHead_PartitionWork(t *testing.T) { + + const ( + variants = 20 + arglen = 4 + peerCount = 4 + ) + + var ( + peers = mocks.GenericPeerIDs[:peerCount] + batchID = newRequestID() + req = generateExecuteBatch(t, variants, arglen) + ) + + assignments := partitionWorkBatch(peers, batchID, req) + + // Each peer gets a chunk + require.Len(t, assignments, peerCount) + + argsFound := make([][]string, 0) + for peer, woBatch := range assignments { + + require.Contains(t, peers, peer) // Peer must be one of the specified ones. + require.Nil(t, woBatch.Valid()) // Created work order batch must be valid. + + require.Equal(t, batchID, woBatch.RequestID) + require.NotEmpty(t, woBatch.ChunkID) + + require.Equal(t, req.Template, woBatch.Template) // Work Order Batch must originate from our original request. + + // We should not have chunks without work items. + require.NotEmpty(t, woBatch.Arguments) + + // Accounting so we verify that we have saved all variants we put in. + argsFound = append(argsFound, woBatch.Arguments...) + + // NOTE: This is not universally true, but for our test parameters it is: + // input variants should be as evenly as possible split between peers. + require.Len(t, woBatch.Arguments, variants/peerCount) + } + + // Each argument list from the batch should produce one work item. + // Make sure we have all of them assigned, but also have no more than we specified. + require.ElementsMatch(t, req.Arguments, argsFound) +} + +func TestHead_PartitionWorkDistribution(t *testing.T) { + + tests := []struct { + name string + variants int + peerCount int + distribution map[int]int // Map chunk size to number of nodes that has chunk of that size. + }{ + { + name: "even split", + variants: 20, + peerCount: 4, + // Variants can be evenly split between nodes - each node should get five items. + distribution: map[int]int{ + 5: 4, + }, + }, + { + name: "uneven split", + variants: 18, + peerCount: 5, + // Some nodes get more than others - three nodes will get 4 items, and the rest will get 3 each. + distribution: map[int]int{ + 4: 3, + 3: 2, + }, + }, + { + name: "more workers than items", + variants: 3, + peerCount: 10, + // Some workers don't get anything. + distribution: map[int]int{ + 1: 3, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + var ( + peers = mocks.GenericPeerIDs[:test.peerCount] + req = generateExecuteBatch(t, test.variants, 2) + ) + + assignments := partitionWorkBatch(peers, newRequestID(), req) + + counts := make(map[int]int) + for peer, woBatch := range assignments { + // We should not have chunks without work items. + require.NotEmpty(t, woBatch.Arguments) + + t.Logf("peer: %v items: %v", peer.String(), len(woBatch.Arguments)) + counts[len(woBatch.Arguments)]++ + } + + require.Equal(t, test.distribution, counts) + }) + } +} + +func TestHead_PartitionWorkHandlesErrors(t *testing.T) { + + t.Run("empty peer list", func(t *testing.T) { + + req := generateExecuteBatch(t, 1, 2) + + assignments := partitionWorkBatch([]peer.ID{}, "dummy-request-id", req) + require.Empty(t, assignments) + }) +} + +func generateExecuteBatch(t *testing.T, itemCount int, arglen int) request.ExecuteBatch { + t.Helper() + + req := request.ExecuteBatch{ + Template: request.ExecutionRequestTemplate{ + FunctionID: mocks.GenericFunctionID, + Method: mocks.GenericFunctionMethod, + Config: execute.Config{ + NodeCount: rand.Int(), + Environment: []execute.EnvVar{ + {Name: "env_var1", Value: "val1"}, + {Name: "env_var2", Value: "val2"}, + }, + }, + }, + MaxAttempts: rand.Uint32(), + } + + variants := make([][]string, itemCount) + for i := range variants { + variants[i] = make([]string, arglen) + gofakeit.Slice(&variants[i]) + } + + req.Arguments = variants + + return req +} diff --git a/stores/batch-store/store.go b/stores/batch-store/store.go index 343038eb..94084451 100644 --- a/stores/batch-store/store.go +++ b/stores/batch-store/store.go @@ -4,6 +4,7 @@ import ( "context" ) +// TODO: Perhaps update the code and use actual type for consts below. type Status int32 const ( diff --git a/testing/mocks/batch_store.go b/testing/mocks/batch_store.go index 988627cb..558cc610 100644 --- a/testing/mocks/batch_store.go +++ b/testing/mocks/batch_store.go @@ -37,7 +37,7 @@ type BatchStore struct { // TODO: Add actual types to be returned, not nils -func BaselineMockStore(t *testing.T) *BatchStore { +func BaselineBatchStore(t *testing.T) *BatchStore { t.Helper() return &BatchStore{ diff --git a/testing/mocks/generic.go b/testing/mocks/generic.go index 55a11df8..3e031410 100644 --- a/testing/mocks/generic.go +++ b/testing/mocks/generic.go @@ -102,6 +102,9 @@ var ( }, } + GenericFunctionID = "dummy-cid" + GenericFunctionMethod = "function.wasm" + GenericFunctionRecord = bls.FunctionRecord{ CID: "dummy-cid", URL: fmt.Sprintf("https://example.com/%v", GenericString),