Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module github.com/luno/reflex

go 1.21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support for i := range N.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we can't go for 1.23? If people are still using 1.22, they'd just have to use the version before this update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, just that I didn't need it.

go 1.22

require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
github.com/go-sql-driver/mysql v1.7.1
github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9
github.com/prometheus/client_golang v1.15.0
github.com/prometheus/client_model v0.3.0
github.com/sebdah/goldie/v2 v2.5.3
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
Expand Down Expand Up @@ -52,6 +53,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.11.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,7 @@ github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624
github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
Expand Down
108 changes: 86 additions & 22 deletions rsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,82 @@ func (t eventType) ReflexType() int {

// makeDefaultInserter returns the default sql inserter configured via WithEventsXField options.
func makeDefaultInserter(schema eTableSchema) inserter {
return func(ctx context.Context, tx *sql.Tx,
foreignID string, typ reflex.EventType, metadata []byte,
ins := makeDefaultManyInserter(schema)
return func(
ctx context.Context,
tx *sql.Tx,
foreignID string,
typ reflex.EventType,
metadata []byte,
) error {
q := "insert into " + schema.name +
" set " + schema.foreignIDField + "=?, " + schema.timeField + "=now(6), " + schema.typeField + "=?"
args := []interface{}{foreignID, typ.ReflexType()}
return ins(ctx, tx, EventToInsert{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice I like that the singular is using the multiple inserter with just 1 event.

ForeignID: foreignID,
Type: typ,
Metadata: metadata,
})
}
}

if schema.metadataField != "" {
q += ", " + schema.metadataField + "=?"
args = append(args, metadata)
} else if metadata != nil {
return errors.New("metadata not enabled")
// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options.
func makeDefaultManyInserter(schema eTableSchema) manyInserter {
return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error {
if len(events) == 0 {
return nil
}
q, args, err := makeInsertManyQuery(ctx, schema, events)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, q, args...)
return errors.Wrap(err, "insert error")
}
}

func makeInsertManyQuery(
ctx context.Context,
schema eTableSchema,
events []EventToInsert,
) (query string, args []any, err error) {
spanCtx, hasTrace := tracing.Extract(ctx)
var traceData []byte
if schema.traceField != "" && hasTrace {
d, err := tracing.Marshal(spanCtx)
if err != nil {
return "", nil, err
}
traceData = d
}

spanCtx, hasTrace := tracing.Extract(ctx)
if schema.traceField != "" && hasTrace {
traceData, err := tracing.Marshal(spanCtx)
if err != nil {
return err
}
cols := schema.foreignIDField + ", " + schema.typeField + ", " + schema.timeField
if schema.metadataField != "" {
cols += ", " + schema.metadataField
}
if traceData != nil {
cols += ", " + schema.traceField
}

q += ", " + schema.traceField + "=?"
q := "insert into " + schema.name + " (" + cols + ") values"

for i, e := range events {
vals := "?, ?, now(6)"
args = append(args, e.ForeignID, e.Type.ReflexType())
if schema.metadataField != "" {
vals += ", ?"
args = append(args, e.Metadata)
} else if e.Metadata != nil {
return "", nil, errors.New("metadata not enabled")
}
if traceData != nil {
vals += ", ?"
args = append(args, traceData)
}

_, err := tx.ExecContext(ctx, q, args...)
return errors.Wrap(err, "insert error")
if i > 0 {
q += ","
}
q += " (" + vals + ")"
}

return q, args, nil
}

type row interface {
Expand Down Expand Up @@ -158,7 +206,14 @@ func getNextEvents(ctx context.Context, dbc *sql.DB, schema eTableSchema,
}

// GetNextEventsForTesting fetches a bunch of events from the event table
func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error) {
func GetNextEventsForTesting(
ctx context.Context,
_ *testing.T,
dbc *sql.DB,
table *EventsTable,
after int64,
lag time.Duration,
) ([]*reflex.Event, error) {
return getNextEvents(ctx, dbc, table.schema, after, lag)
}

Expand Down Expand Up @@ -287,7 +342,16 @@ func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter {
// NB: See the documentation is the following link on the behaviour of "on last_insert_id(<expr>)" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
q := fmt.Sprintf(
"insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)",
schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorCreatedAtField, schema.errorUpdatedAtField, schema.errorStatusField, schema.idField, schema.idField)
schema.name,
schema.eventConsumerField,
schema.eventIDField,
schema.errorMsgField,
schema.errorCreatedAtField,
schema.errorUpdatedAtField,
schema.errorStatusField,
schema.idField,
schema.idField,
)
return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) {
r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus)
// If the error has already been written then we can ignore the error
Expand Down
117 changes: 117 additions & 0 deletions rsql/db_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package rsql

import (
"bytes"
"context"
"fmt"
"testing"

"github.com/luno/jettison/jtest"
"github.com/sebdah/goldie/v2"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/luno/reflex/internal/tracing"
)

//go:generate go test . -run Test_makeInsertManyQuery -update -clean

func Test_makeInsertManyQuery(t *testing.T) {
ctx := context.Background()

defaultSchema := eTableSchema{
name: "events",
idField: "id",
timeField: "timestamp",
typeField: "type",
foreignIDField: "foreign_id",
}

assert := func(t *testing.T, q string, args []any) {
buf := new(bytes.Buffer)
buf.WriteString(q)
buf.WriteString("\n")
for _, arg := range args {
buf.WriteString("\n")
buf.WriteString(fmt.Sprint(arg))
}
goldie.New(t).Assert(t, t.Name(), buf.Bytes())
}

t.Run("empty", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, nil)
jtest.RequireNil(t, err)
assert(t, q, args)
})

t.Run("one", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid", testEventType(1), nil},
})
jtest.RequireNil(t, err)
assert(t, q, args)
})

t.Run("two", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), nil},
{"fid2", testEventType(2), nil},
})
jtest.RequireNil(t, err)
assert(t, q, args)
})

t.Run("more", func(t *testing.T) {
var events []EventToInsert
for i := range 5 {
events = append(events, EventToInsert{
ForeignID: fmt.Sprintf("fid%d", i+1),
Type: testEventType(i),
})
}
q, args, err := makeInsertManyQuery(ctx, defaultSchema, events)
jtest.RequireNil(t, err)
assert(t, q, args)
})

t.Run("metadata_error", func(t *testing.T) {
_, _, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid", testEventType(1), []byte("metadata")},
})
require.ErrorContains(t, err, "metadata not enabled")
})

t.Run("with_metadata", func(t *testing.T) {
schemaWithMetadata := defaultSchema
schemaWithMetadata.metadataField = "metadata"
q, args, err := makeInsertManyQuery(ctx, schemaWithMetadata, []EventToInsert{
{"fid", testEventType(1), []byte("metadata")},
})
jtest.RequireNil(t, err)
assert(t, q, args)
})

t.Run("with_trace", func(t *testing.T) {
schemaWithTrace := defaultSchema
schemaWithTrace.traceField = "trace"
traceID, err := trace.TraceIDFromHex("00000000000000000000000000000009")
jtest.RequireNil(t, err)
spanID, err := trace.SpanIDFromHex("0000000000000002")
jtest.RequireNil(t, err)
data, err := tracing.Marshal(trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
}))
jtest.RequireNil(t, err)
ctx := tracing.Inject(ctx, data)
q, args, err := makeInsertManyQuery(ctx, schemaWithTrace, []EventToInsert{
{"fid", testEventType(1), nil},
})
jtest.RequireNil(t, err)
assert(t, q, args)
})
}

type testEventType int

func (t testEventType) ReflexType() int { return int(t) }
58 changes: 56 additions & 2 deletions rsql/eventstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable {
table.inserter = makeDefaultInserter(table.schema)
}

if table.manyInserter == nil {
table.manyInserter = makeDefaultManyInserter(table.schema)
}

table.gapCh = make(chan Gap)
table.gapListeners = make(chan GapListenFunc)
table.gapListenDone = make(chan struct{})
table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents)
table.currentLoader = buildLoader(
table.baseLoader,
table.gapCh,
table.disableCache,
table.schema,
table.includeNoopEvents,
)

return table
}
Expand Down Expand Up @@ -175,10 +185,28 @@ func WithEventsInserter(inserter inserter) EventsOption {
}
}

// WithEventsManyInserter provides an option to set the event inserter
// which inserts many events into a sql table. The default inserter is
// configured with the WithEventsXField options.
func WithEventsManyInserter(manyInserter manyInserter) EventsOption {
return func(table *EventsTable) {
table.manyInserter = manyInserter
}
}

// inserter abstracts the insertion of an event into a sql table.
type inserter func(ctx context.Context, tx *sql.Tx,
foreignID string, typ reflex.EventType, metadata []byte) error

type EventToInsert struct {
ForeignID string
Type reflex.EventType
Metadata []byte
}

// manyInserter abstracts the insertion of many events into a sql table.
type manyInserter func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error

// EventsTable provides reflex event insertion and streaming
// for a sql db table.
type EventsTable struct {
Expand All @@ -189,6 +217,7 @@ type EventsTable struct {
includeNoopEvents bool
baseLoader loader
inserter inserter
manyInserter manyInserter

// Stateful fields not cloned
currentLoader filterLoader
Expand Down Expand Up @@ -230,6 +259,25 @@ func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreig
return t.notifier.Notify, nil
}

// InsertMany inserts a many events into the EventsTable.
func (t *EventsTable) InsertMany(
ctx context.Context,
tx *sql.Tx,
events []EventToInsert,
) (NotifyFunc, error) {
for _, e := range events {
if isNoop(e.ForeignID, e.Type) {
return nil, errors.New("inserting invalid noop event")
}
}
err := t.manyInserter(ctx, tx, events...)
if err != nil {
return noopFunc, err
}

return t.notifier.Notify, nil
}

// Clone returns a new events table generated from the config of t with the new options applied.
// Note that non-config fields are not copied, so things like the cache and inmemnotifier
// are not shared.
Expand Down Expand Up @@ -312,7 +360,13 @@ func (t *EventsTable) getSchema() eTableSchema {
}

// buildLoader returns a new layered event loader.
func buildLoader(baseLoader loader, ch chan<- Gap, disableCache bool, schema eTableSchema, withNoopEvents bool) filterLoader {
func buildLoader(
baseLoader loader,
ch chan<- Gap,
disableCache bool,
schema eTableSchema,
withNoopEvents bool,
) filterLoader {
if baseLoader == nil {
baseLoader = makeBaseLoader(schema)
}
Expand Down
Loading