Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/brianvoe/gofakeit/v7 v7.6.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cilium/ebpf v0.16.0 // indirect
github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/blocklessnetwork/b7s-attributes v0.0.0/go.mod h1:0c+ZemB4kfylI14IERH4
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/brianvoe/gofakeit/v7 v7.6.0 h1:M3RUb5CuS2IZmF/cP+O+NdLxJEuDAZxNQBwPbbqR6h4=
github.com/brianvoe/gofakeit/v7 v7.6.0/go.mod h1:QXuPeBw164PJCzCUZVmgpgHJ3Llj49jSLVkKPMtxtxA=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cavaliergopher/grab/v3 v3.0.1 h1:4z7TkBfmPjmLAAmkkAZNX/6QJ1nNFdv3SdIHXju0Fr4=
github.com/cavaliergopher/grab/v3 v3.0.1/go.mod h1:1U/KNnD+Ft6JJiYoYBAimKH2XrYptb8Kl3DFGmsjpq4=
Expand Down
173 changes: 173 additions & 0 deletions node/head/batch_store_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package head

import (
"context"
"slices"
"strings"
"testing"

"github.com/blessnetwork/b7s/models/execute"
"github.com/blessnetwork/b7s/models/request"
batchstore "github.com/blessnetwork/b7s/stores/batch-store"
"github.com/blessnetwork/b7s/testing/mocks"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func TestHead_SaveBatch(t *testing.T) {

var (
head = createHeadNode(t)
bs = mocks.BaselineBatchStore(t)

batchID = newRequestID()
req = generateExecuteBatch(t, 4, 3)
)

head.cfg.BatchStore = bs

bs.CreateBatchFunc = func(_ context.Context, er *batchstore.ExecuteBatchRecord) error {
require.NotNil(t, er)
require.Equal(t, batchID, er.ID)
require.Equal(t, req.Template.FunctionID, er.CID)
require.Equal(t, req.Template.Method, er.Method)
require.Equal(t, req.Template.Config, er.Config)
require.Equal(t, req.MaxAttempts, er.MaxAttempts)

return nil
}

bs.CreateWorkItemsFunc = func(_ context.Context, items ...*batchstore.WorkItemRecord) error {

require.Len(t, items, len(req.Arguments))
for i, item := range items {
require.Equal(t, batchID, item.BatchID)
require.Empty(t, item.ChunkID)

require.Equal(t, req.Arguments[i], item.Arguments)
require.Equal(t, batchstore.StatusCreated, int(item.Status))
require.Empty(t, item.Output)
require.Zero(t, item.Attempts)
}

return nil
}

err := head.saveBatch(batchID, req)
require.NoError(t, err)
}

func TestHead_SaveChunk(t *testing.T) {

var (
head = createHeadNode(t)
bs = mocks.BaselineBatchStore(t)

batchID = newRequestID()

template = request.ExecutionRequestTemplate{
FunctionID: mocks.GenericFunctionID,
Method: mocks.GenericFunctionMethod,
}

parts = []*request.WorkOrderBatch{
{
Template: template,
RequestID: batchID,
ChunkID: newRequestID(),
Arguments: [][]string{
{"10", "11", "12"},
{"20", "21", "22"},
{"30", "31", "32"},
},
},
{
Template: template,
RequestID: batchID,
ChunkID: newRequestID(),
Arguments: [][]string{
{"40", "41", "42"},
{"50", "51", "52"},
{"60", "61", "62"},
},
},
}

assignments = map[peer.ID]*request.WorkOrderBatch{
mocks.GenericPeerIDs[0]: parts[0],
mocks.GenericPeerIDs[1]: parts[1],
}
)

head.cfg.BatchStore = bs

// Enable lookup for easier verification.
lookup := map[string]struct {
worker peer.ID
wo *request.WorkOrderBatch
}{
parts[0].ChunkID: {
worker: mocks.GenericPeerIDs[0],
wo: parts[0],
},
parts[1].ChunkID: {
worker: mocks.GenericPeerIDs[1],
wo: parts[1],
},
}

bs.CreateChunksFunc = func(_ context.Context, chunks ...*batchstore.ChunkRecord) error {

// Require we have precisely the number of chunks we expect.
require.Len(t, chunks, len(parts))
for _, chunk := range chunks {

// We MUST have this exact chunk.
orig, ok := lookup[chunk.ID]
require.True(t, ok)

require.Equal(t, batchID, chunk.BatchID)
require.Equal(t, orig.wo.ChunkID, chunk.ID)
require.Equal(t, orig.worker.String(), chunk.Worker)
require.Equal(t, batchstore.StatusCreated, int(chunk.Status))
}

return nil
}

// For each chunk we should assign the work items.
assignmentsDone := 0
bs.AssignWorkItemsFunc = func(_ context.Context, chunkID string, ids ...string) error {
assignmentsDone += 1

orig, ok := lookup[chunkID]
require.True(t, ok)
require.Len(t, ids, len(orig.wo.Arguments))

// Each list of arguments will produce one work item.
var expectedIDs []string
for _, args := range orig.wo.Arguments {
expectedIDs = append(expectedIDs,
workItemID(batchID, string(execute.ExecutionID(orig.wo.Template.FunctionID, orig.wo.Template.Method, args))))
}

// We need these sorted so comparison returns success.
slices.Sort(ids)
slices.Sort(expectedIDs)
require.Equal(t, expectedIDs, ids)

// Verify ID is in format <batch-id>/<work-item-id>
for _, id := range ids {
fields := strings.Split(id, "/")
require.Len(t, fields, 2)
require.Equal(t, batchID, fields[0])
}

return nil
}

err := head.saveChunkInfo(batchID, assignments)
require.NoError(t, err)

require.Equal(t, len(parts), assignmentsDone)
}
13 changes: 7 additions & 6 deletions node/head/batch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ func batchRecordToRequest(batch *batchstore.ExecuteBatchRecord, items []*batchst
func requestToBatchRecord(id string, req request.ExecuteBatch) (*batchstore.ExecuteBatchRecord, []*batchstore.WorkItemRecord) {

batch := &batchstore.ExecuteBatchRecord{
ID: id,
CID: req.Template.FunctionID,
Method: req.Template.Method,
Config: req.Template.Config,
Status: batchstore.StatusCreated,
CreatedAt: time.Now().UTC(),
ID: id,
CID: req.Template.FunctionID,
Method: req.Template.Method,
Config: req.Template.Config,
Status: batchstore.StatusCreated,
MaxAttempts: req.MaxAttempts,
CreatedAt: time.Now().UTC(),
}

items := make([]*batchstore.WorkItemRecord, len(req.Arguments))
Expand Down
15 changes: 12 additions & 3 deletions node/head/work_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,30 @@ import (
// In the future, we may have different criteria for what gets assigned to each peer. Right now we do round robin.
func partitionWorkBatch(peers []peer.ID, requestID string, req request.ExecuteBatch) map[peer.ID]*request.WorkOrderBatch {

n := len(peers)
assignments := make(map[peer.ID]*request.WorkOrderBatch)

if n == 0 {
return assignments
}

variants := req.Arguments

// TODO: Do this in one go, not two maps.

// Assign arguments to a list of peers in a round robin fashion
n := len(peers)
a := make(map[peer.ID][][]string)
for i, args := range variants {
target := peers[i%n]

a[target] = append(a[target], args)
}

assignments := make(map[peer.ID]*request.WorkOrderBatch)
for _, peer := range peers {
// If we have more peers than items to execute - we could have peers without work.
// In this case skip them.
if len(a[peer]) == 0 {
continue
}

chunkID := newChunkID()
assignments[peer] = req.WorkOrderBatch(requestID, chunkID, a[peer]...)
Expand Down
Loading