From feb09527c990ad17f03755168311fbddc730ce9a Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Wed, 22 Oct 2025 18:09:14 -0700 Subject: [PATCH 1/5] Message Handler Split --- apps/queueworker/sqslink/worker.go | 100 +++++++++++++----------- apps/queueworker/sqslink/worker_test.go | 24 +++++- 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/apps/queueworker/sqslink/worker.go b/apps/queueworker/sqslink/worker.go index 2f8f58a..da641e8 100644 --- a/apps/queueworker/sqslink/worker.go +++ b/apps/queueworker/sqslink/worker.go @@ -35,12 +35,7 @@ func (hf HandlerFunc) HandleMessage(ctx context.Context, msg *messaging_pb.Messa return hf(ctx, msg) } -type Worker struct { - SQSClient SQSAPI - QueueURL string - deadLetterHandler DeadLetterHandler - resendChance int - +type Router struct { handlers map[string]Handler fallbackHandler Handler } @@ -73,17 +68,32 @@ func randomlySelected(ctx context.Context, pct int) bool { return false } +func NewRouter(deadLetters DeadLetterHandler) *Router { + return &Router{ + handlers: make(map[string]Handler), + } +} + +type Worker struct { + router *Router + resendChance int + SQSClient SQSAPI + QueueURL string + deadLetterHandler DeadLetterHandler +} + func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resendChance int) *Worker { + router := NewRouter(deadLetters) return &Worker{ SQSClient: sqs, QueueURL: queueURL, - handlers: make(map[string]Handler), - deadLetterHandler: deadLetters, + router: router, resendChance: resendChance, + deadLetterHandler: deadLetters, } } -func (ww *Worker) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { +func (ww *Router) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { methods := service.Methods() for ii := 0; ii < methods.Len(); ii++ { method := methods.Get(ii) @@ -94,7 +104,7 @@ func (ww *Worker) RegisterService(ctx context.Context, service protoreflect.Serv return nil } -func (ww *Worker) registerMethod(ctx context.Context, method protoreflect.MethodDescriptor, invoker AppLink) error { +func (ww *Router) registerMethod(ctx context.Context, method protoreflect.MethodDescriptor, invoker AppLink) error { serviceName := method.Parent().(protoreflect.ServiceDescriptor).FullName() fullName := fmt.Sprintf("/%s/%s", serviceName, method.Name()) @@ -116,10 +126,39 @@ func (ww *Worker) registerMethod(ctx context.Context, method protoreflect.Method return nil } -func (ww *Worker) RegisterHandler(fullMethod string, handler Handler) { +func (ww *Router) RegisterHandler(fullMethod string, handler Handler) { ww.handlers[fullMethod] = handler } +type ErrNoHandlerMatched string + +func (e ErrNoHandlerMatched) Error() string { + return fmt.Sprintf("no handler matched for %q", string(e)) +} + +func (ww *Router) HandleMessage(ctx context.Context, parsed *messaging_pb.Message) error { + ctx = log.WithFields(ctx, map[string]any{ + "grpc-service": parsed.GrpcService, + "grpc-method": parsed.GrpcMethod, + "message-id": parsed.MessageId, + "topic": parsed.DestinationTopic, + }) + log.Debug(ctx, "Message Handler: Begin") + + fullServiceName := fmt.Sprintf("/%s/%s", parsed.GrpcService, parsed.GrpcMethod) + handler, ok := ww.handlers[fullServiceName] + if !ok { + if ww.fallbackHandler != nil { + log.Debug(ctx, "Message Handler: Using fallback handler") + handler = ww.fallbackHandler + } else { + return ErrNoHandlerMatched(fullServiceName) + } + } + + return handler.HandleMessage(ctx, parsed) +} + func (ww *Worker) Run(ctx context.Context) error { for { if err := ww.FetchOnce(ctx); err != nil { @@ -128,6 +167,10 @@ func (ww *Worker) Run(ctx context.Context) error { } } +func (ww *Worker) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { + return ww.router.RegisterService(ctx, service, invoker) +} + func (ww *Worker) FetchOnce(ctx context.Context) error { out, err := ww.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: &ww.QueueURL, @@ -196,40 +239,9 @@ func (ww *Worker) handleMessage(ctx context.Context, msg types.Message) { return } - ctx = log.WithFields(ctx, map[string]any{ - "grpc-service": parsed.GrpcService, - "grpc-method": parsed.GrpcMethod, - "message-id": parsed.MessageId, - "topic": parsed.DestinationTopic, - "sqs-message-id": msg.MessageId, - }) - log.Debug(ctx, "Message Handler: Begin") - - fullServiceName := fmt.Sprintf("/%s/%s", parsed.GrpcService, parsed.GrpcMethod) - handler, ok := ww.handlers[fullServiceName] - if !ok { - if ww.fallbackHandler != nil { - log.Debug(ctx, "Message Handler: Using fallback handler") - handler = ww.fallbackHandler - } else { - log.Error(ctx, "no handler matched") - if ww.deadLetterHandler == nil && getReceiveCount(msg) <= 3 { - log.Error(ctx, "Error handling message, leaving in queue") - return - } - err := ww.killMessage(ctx, msg, parsed, fmt.Errorf("no handler for %s", fullServiceName)) - if err != nil { - log.WithField(ctx, "killError", err.Error()). - Error("Message Worker: Error killing message, leaving in queue") - return - } - log.Debug(ctx, "Message Handler: Killed") - return - } - } - - err = handler.HandleMessage(ctx, parsed) + ctx = log.WithField(ctx, "sqs-message-id", msg.MessageId) + err = ww.router.HandleMessage(ctx, parsed) if err != nil { ctx = log.WithError(ctx, err) log.Error(ctx, "Message Handler: Error") diff --git a/apps/queueworker/sqslink/worker_test.go b/apps/queueworker/sqslink/worker_test.go index aa475c6..6521bcb 100644 --- a/apps/queueworker/sqslink/worker_test.go +++ b/apps/queueworker/sqslink/worker_test.go @@ -5,11 +5,14 @@ import ( "testing" "github.com/pentops/j5/gen/j5/messaging/v1/messaging_j5pb" + "github.com/pentops/j5/lib/j5codec" "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" "github.com/pentops/o5-messaging/o5msg" "github.com/pentops/o5-runtime-sidecar/testproto/gen/test/v1/test_tpb" + "google.golang.org/grpc" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" ) func TestChance(t *testing.T) { @@ -23,7 +26,7 @@ func TestChance(t *testing.T) { } func TestDynamicParse(t *testing.T) { - ww := NewWorker(nil, "https://test.com/queue", nil, 0) + ww := NewRouter(nil) fd := test_tpb.File_test_v1_topic_test_p_j5s_proto.Services().Get(0) if err := ww.RegisterService(context.Background(), fd, nil); err != nil { @@ -56,15 +59,27 @@ func TestDynamicParse(t *testing.T) { } } +type encoderInvoker struct { + invoke func(context.Context, string, any, any, ...grpc.CallOption) error +} + +func (ei encoderInvoker) Invoke(ctx context.Context, method string, req any, res any, opts ...grpc.CallOption) error { + return ei.invoke(ctx, method, req, res, opts...) +} + +func (ei encoderInvoker) JSONToProto(body []byte, msg protoreflect.Message) error { + return j5codec.Global.JSONToProto(body, msg) +} + func TestRequestMetadata(t *testing.T) { - ww := NewWorker(nil, "https://test.com/queue", nil, 0) + ww := NewRouter(nil) fd := test_tpb.File_test_v1_topic_test_p_j5s_proto.Services().ByName("TestReqResRequestTopic") if fd == nil { t.Fatal("no service found") } - if err := ww.RegisterService(context.Background(), fd, nil); err != nil { + if err := ww.RegisterService(context.Background(), fd, encoderInvoker{}); err != nil { t.Fatal(err.Error()) } @@ -72,6 +87,9 @@ func TestRequestMetadata(t *testing.T) { if !ok || handler == nil { t.Fatal("handler is nil") } + if handler.invoker == nil { + t.Fatal("invoker is nil") + } input := &test_tpb.TestReqResRequestMessage{ Request: &messaging_j5pb.RequestMetadata{ From 80bf31745a4b73629d0b7d4dfae0d83961f765cc Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Wed, 22 Oct 2025 18:28:13 -0700 Subject: [PATCH 2/5] Organize --- apps/queueworker/sqslink/router.go | 87 +++++++++++++++++++++++++ apps/queueworker/sqslink/worker.go | 79 +--------------------- apps/queueworker/sqslink/worker_test.go | 4 +- 3 files changed, 90 insertions(+), 80 deletions(-) create mode 100644 apps/queueworker/sqslink/router.go diff --git a/apps/queueworker/sqslink/router.go b/apps/queueworker/sqslink/router.go new file mode 100644 index 0000000..c223bb1 --- /dev/null +++ b/apps/queueworker/sqslink/router.go @@ -0,0 +1,87 @@ +package sqslink + +import ( + "context" + "fmt" + + "github.com/pentops/log.go/log" + "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" + "google.golang.org/protobuf/reflect/protoreflect" +) + +type ErrNoHandlerMatched string + +func (e ErrNoHandlerMatched) Error() string { + return fmt.Sprintf("no handler matched for %q", string(e)) +} + +type Router struct { + handlers map[string]Handler + fallbackHandler Handler +} + +func NewRouter() *Router { + return &Router{ + handlers: make(map[string]Handler), + } +} + +func (rr *Router) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { + methods := service.Methods() + for ii := range methods.Len() { + method := methods.Get(ii) + if err := rr.registerMethod(ctx, method, invoker); err != nil { + return err + } + } + return nil +} + +func (rr *Router) registerMethod(ctx context.Context, method protoreflect.MethodDescriptor, invoker AppLink) error { + serviceName := method.Parent().(protoreflect.ServiceDescriptor).FullName() + fullName := fmt.Sprintf("/%s/%s", serviceName, method.Name()) + + if fullName == GenericTopic { + log.WithField(ctx, "service", fullName).Info("Registering Generic Fallback") + rr.fallbackHandler = &genericHandler{ + invoker: invoker, + } + + } else { + log.WithField(ctx, "service", fullName).Info("Registering Worker Service") + ss := &service{ + requestMessage: method.Input(), + fullName: fullName, + invoker: invoker, + } + rr.handlers[ss.fullName] = ss + } + return nil +} + +func (rr *Router) RegisterHandler(fullMethod string, handler Handler) { + rr.handlers[fullMethod] = handler +} + +func (rr *Router) HandleMessage(ctx context.Context, parsed *messaging_pb.Message) error { + ctx = log.WithFields(ctx, map[string]any{ + "grpc-service": parsed.GrpcService, + "grpc-method": parsed.GrpcMethod, + "message-id": parsed.MessageId, + "topic": parsed.DestinationTopic, + }) + log.Debug(ctx, "Message Handler: Begin") + + fullServiceName := fmt.Sprintf("/%s/%s", parsed.GrpcService, parsed.GrpcMethod) + handler, ok := rr.handlers[fullServiceName] + if !ok { + if rr.fallbackHandler != nil { + log.Debug(ctx, "Message Handler: Using fallback handler") + handler = rr.fallbackHandler + } else { + return ErrNoHandlerMatched(fullServiceName) + } + } + + return handler.HandleMessage(ctx, parsed) +} diff --git a/apps/queueworker/sqslink/worker.go b/apps/queueworker/sqslink/worker.go index da641e8..0ad4eec 100644 --- a/apps/queueworker/sqslink/worker.go +++ b/apps/queueworker/sqslink/worker.go @@ -35,11 +35,6 @@ func (hf HandlerFunc) HandleMessage(ctx context.Context, msg *messaging_pb.Messa return hf(ctx, msg) } -type Router struct { - handlers map[string]Handler - fallbackHandler Handler -} - // Is this message is randomly selected based on percent received? func randomlySelected(ctx context.Context, pct int) bool { if pct == 0 { @@ -68,12 +63,6 @@ func randomlySelected(ctx context.Context, pct int) bool { return false } -func NewRouter(deadLetters DeadLetterHandler) *Router { - return &Router{ - handlers: make(map[string]Handler), - } -} - type Worker struct { router *Router resendChance int @@ -83,7 +72,7 @@ type Worker struct { } func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resendChance int) *Worker { - router := NewRouter(deadLetters) + router := NewRouter() return &Worker{ SQSClient: sqs, QueueURL: queueURL, @@ -93,72 +82,6 @@ func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resen } } -func (ww *Router) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { - methods := service.Methods() - for ii := 0; ii < methods.Len(); ii++ { - method := methods.Get(ii) - if err := ww.registerMethod(ctx, method, invoker); err != nil { - return err - } - } - return nil -} - -func (ww *Router) registerMethod(ctx context.Context, method protoreflect.MethodDescriptor, invoker AppLink) error { - serviceName := method.Parent().(protoreflect.ServiceDescriptor).FullName() - fullName := fmt.Sprintf("/%s/%s", serviceName, method.Name()) - - if fullName == GenericTopic { - log.WithField(ctx, "service", fullName).Info("Registering Generic Fallback") - ww.fallbackHandler = &genericHandler{ - invoker: invoker, - } - - } else { - log.WithField(ctx, "service", fullName).Info("Registering Worker Service") - ss := &service{ - requestMessage: method.Input(), - fullName: fullName, - invoker: invoker, - } - ww.handlers[ss.fullName] = ss - } - return nil -} - -func (ww *Router) RegisterHandler(fullMethod string, handler Handler) { - ww.handlers[fullMethod] = handler -} - -type ErrNoHandlerMatched string - -func (e ErrNoHandlerMatched) Error() string { - return fmt.Sprintf("no handler matched for %q", string(e)) -} - -func (ww *Router) HandleMessage(ctx context.Context, parsed *messaging_pb.Message) error { - ctx = log.WithFields(ctx, map[string]any{ - "grpc-service": parsed.GrpcService, - "grpc-method": parsed.GrpcMethod, - "message-id": parsed.MessageId, - "topic": parsed.DestinationTopic, - }) - log.Debug(ctx, "Message Handler: Begin") - - fullServiceName := fmt.Sprintf("/%s/%s", parsed.GrpcService, parsed.GrpcMethod) - handler, ok := ww.handlers[fullServiceName] - if !ok { - if ww.fallbackHandler != nil { - log.Debug(ctx, "Message Handler: Using fallback handler") - handler = ww.fallbackHandler - } else { - return ErrNoHandlerMatched(fullServiceName) - } - } - - return handler.HandleMessage(ctx, parsed) -} - func (ww *Worker) Run(ctx context.Context) error { for { if err := ww.FetchOnce(ctx); err != nil { diff --git a/apps/queueworker/sqslink/worker_test.go b/apps/queueworker/sqslink/worker_test.go index 6521bcb..e3de2b8 100644 --- a/apps/queueworker/sqslink/worker_test.go +++ b/apps/queueworker/sqslink/worker_test.go @@ -26,7 +26,7 @@ func TestChance(t *testing.T) { } func TestDynamicParse(t *testing.T) { - ww := NewRouter(nil) + ww := NewRouter() fd := test_tpb.File_test_v1_topic_test_p_j5s_proto.Services().Get(0) if err := ww.RegisterService(context.Background(), fd, nil); err != nil { @@ -72,7 +72,7 @@ func (ei encoderInvoker) JSONToProto(body []byte, msg protoreflect.Message) erro } func TestRequestMetadata(t *testing.T) { - ww := NewRouter(nil) + ww := NewRouter() fd := test_tpb.File_test_v1_topic_test_p_j5s_proto.Services().ByName("TestReqResRequestTopic") if fd == nil { From ac91fcc2a90961d495aee9b99087b0d1fa0fbdeb Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Wed, 22 Oct 2025 18:33:28 -0700 Subject: [PATCH 3/5] More split --- apps/queueworker/sqslink/worker.go | 12 +++--------- apps/queueworker/worker.go | 9 +++++++-- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/apps/queueworker/sqslink/worker.go b/apps/queueworker/sqslink/worker.go index 0ad4eec..05df353 100644 --- a/apps/queueworker/sqslink/worker.go +++ b/apps/queueworker/sqslink/worker.go @@ -14,7 +14,6 @@ import ( "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_tpb" "github.com/pentops/o5-runtime-sidecar/apps/queueworker/awsmsg" - "google.golang.org/protobuf/reflect/protoreflect" ) const RawMessageName = "/o5.messaging.v1.topic.RawMessageTopic/Raw" @@ -64,19 +63,18 @@ func randomlySelected(ctx context.Context, pct int) bool { } type Worker struct { - router *Router + router Handler resendChance int SQSClient SQSAPI QueueURL string deadLetterHandler DeadLetterHandler } -func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resendChance int) *Worker { - router := NewRouter() +func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resendChance int, handler Handler) *Worker { return &Worker{ SQSClient: sqs, QueueURL: queueURL, - router: router, + router: handler, resendChance: resendChance, deadLetterHandler: deadLetters, } @@ -90,10 +88,6 @@ func (ww *Worker) Run(ctx context.Context) error { } } -func (ww *Worker) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker AppLink) error { - return ww.router.RegisterService(ctx, service, invoker) -} - func (ww *Worker) FetchOnce(ctx context.Context) error { out, err := ww.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: &ww.QueueURL, diff --git a/apps/queueworker/worker.go b/apps/queueworker/worker.go index f2eb703..0ccf07d 100644 --- a/apps/queueworker/worker.go +++ b/apps/queueworker/worker.go @@ -17,6 +17,7 @@ type WorkerConfig struct { type App struct { queueWorker *sqslink.Worker + router *sqslink.Router } type Publisher interface { @@ -32,8 +33,12 @@ func NewApp(config WorkerConfig, info sidecar.AppInfo, publisher sqslink.Publish dlh = sqslink.NewO5MessageDeadLetterHandler(publisher, info) } - ww := sqslink.NewWorker(sqs, config.SQSURL, dlh, config.ResendChance) + + router := sqslink.NewRouter() + + ww := sqslink.NewWorker(sqs, config.SQSURL, dlh, config.ResendChance, router) return &App{ + router: router, queueWorker: ww, }, nil @@ -44,5 +49,5 @@ func (app *App) Run(ctx context.Context) error { } func (app *App) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker sqslink.AppLink) error { - return app.queueWorker.RegisterService(ctx, service, invoker) + return app.router.RegisterService(ctx, service, invoker) } From d8e13fb1845704793ac135df47f99f23706136f4 Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Wed, 22 Oct 2025 18:36:35 -0700 Subject: [PATCH 4/5] Middleware-like pattern --- apps/queueworker/sqslink/worker.go | 31 ++++++++++++++++++++++++------ apps/queueworker/worker.go | 6 +++++- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/apps/queueworker/sqslink/worker.go b/apps/queueworker/sqslink/worker.go index 05df353..65aed78 100644 --- a/apps/queueworker/sqslink/worker.go +++ b/apps/queueworker/sqslink/worker.go @@ -62,20 +62,42 @@ func randomlySelected(ctx context.Context, pct int) bool { return false } +type ResendHandler struct { + resendChance int + handler Handler +} + +func NewResendHandler(handler Handler, resendChance int) *ResendHandler { + return &ResendHandler{ + handler: handler, + resendChance: resendChance, + } +} + +func (ww *ResendHandler) HandleMessage(ctx context.Context, msg *messaging_pb.Message) error { + if err := ww.handler.HandleMessage(ctx, msg); err != nil { + return err + } + if randomlySelected(ctx, ww.resendChance) { + if err := ww.handler.HandleMessage(ctx, msg); err != nil { + return err + } + } + return nil +} + type Worker struct { router Handler - resendChance int SQSClient SQSAPI QueueURL string deadLetterHandler DeadLetterHandler } -func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, resendChance int, handler Handler) *Worker { +func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, handler Handler) *Worker { return &Worker{ SQSClient: sqs, QueueURL: queueURL, router: handler, - resendChance: resendChance, deadLetterHandler: deadLetters, } } @@ -116,9 +138,6 @@ func (ww *Worker) FetchOnce(ctx context.Context) error { for _, msg := range out.Messages { ww.handleMessage(ctx, msg) - if randomlySelected(ctx, ww.resendChance) { - ww.handleMessage(ctx, msg) - } } return nil } diff --git a/apps/queueworker/worker.go b/apps/queueworker/worker.go index 0ccf07d..dbbc950 100644 --- a/apps/queueworker/worker.go +++ b/apps/queueworker/worker.go @@ -35,8 +35,12 @@ func NewApp(config WorkerConfig, info sidecar.AppInfo, publisher sqslink.Publish } router := sqslink.NewRouter() + var handler sqslink.Handler = router + if config.ResendChance > 0 { + handler = sqslink.NewResendHandler(router, config.ResendChance) + } - ww := sqslink.NewWorker(sqs, config.SQSURL, dlh, config.ResendChance, router) + ww := sqslink.NewWorker(sqs, config.SQSURL, dlh, handler) return &App{ router: router, queueWorker: ww, From 10252dbff278a8dd039d1a139a2a16592ca05481 Mon Sep 17 00:00:00 2001 From: Damien Whitten Date: Thu, 23 Oct 2025 15:43:38 -0700 Subject: [PATCH 5/5] Refactor Message Handler --- adapters/pgclient/aurora.go | 22 +++-- adapters/pgclient/postgres.go | 4 +- .../awsmsg => adapters/sqsmsg}/sqs_parse.go | 2 +- .../sqsmsg}/sqs_parse_test.go | 2 +- .../sqslink => adapters/sqsmsg}/worker.go | 79 ++------------- .../{sqslink => messaging}/death.go | 2 +- apps/queueworker/messaging/middleware.go | 62 ++++++++++++ .../{sqslink => messaging}/router.go | 14 ++- .../{sqslink => messaging}/service_handler.go | 2 +- .../{sqslink => messaging}/worker_test.go | 2 +- apps/queueworker/worker.go | 26 ++--- cmd/server/main.go | 2 +- entrypoint/aws.go | 98 +++++++++++++------ entrypoint/builder.go | 22 ++++- entrypoint/runtime.go | 16 +-- entrypoint/runtime_test.go | 22 ++--- 16 files changed, 217 insertions(+), 160 deletions(-) rename {apps/queueworker/awsmsg => adapters/sqsmsg}/sqs_parse.go (99%) rename {apps/queueworker/awsmsg => adapters/sqsmsg}/sqs_parse_test.go (99%) rename {apps/queueworker/sqslink => adapters/sqsmsg}/worker.go (72%) rename apps/queueworker/{sqslink => messaging}/death.go (98%) create mode 100644 apps/queueworker/messaging/middleware.go rename apps/queueworker/{sqslink => messaging}/router.go (86%) rename apps/queueworker/{sqslink => messaging}/service_handler.go (99%) rename apps/queueworker/{sqslink => messaging}/worker_test.go (99%) diff --git a/adapters/pgclient/aurora.go b/adapters/pgclient/aurora.go index 363de55..bf0fc03 100644 --- a/adapters/pgclient/aurora.go +++ b/adapters/pgclient/aurora.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/rds/auth" ) @@ -38,16 +37,17 @@ func (ac *auroraConnector) DSN(ctx context.Context) (string, error) { } type CredBuilder struct { - creds aws.CredentialsProvider + //creds aws.CredentialsProvider configs map[string]*AuroraConfig - region string + //region string + provider AWSProvider } -func NewCredBuilder(creds aws.CredentialsProvider, region string) *CredBuilder { +// func NewCredBuilder(creds aws.CredentialsProvider, region string) *CredBuilder { +func NewCredBuilder(provider AWSProvider) *CredBuilder { cb := &CredBuilder{ - creds: creds, - region: region, - configs: make(map[string]*AuroraConfig), + provider: provider, + configs: make(map[string]*AuroraConfig), } return cb @@ -105,8 +105,14 @@ func (cb *CredBuilder) NewToken(ctx context.Context, lookupName string) (string, } func (cb *CredBuilder) newToken(ctx context.Context, config *AuroraConfig) (string, error) { + region := cb.provider.Region() + creds, err := cb.provider.Credentials(ctx) + if err != nil { + return "", fmt.Errorf("failed to get aws credentials: %w", err) + } + authenticationToken, err := auth.BuildAuthToken( - ctx, fmt.Sprintf("%s:%d", config.Endpoint, config.Port), cb.region, config.DBUser, cb.creds) + ctx, fmt.Sprintf("%s:%d", config.Endpoint, config.Port), region, config.DBUser, creds) if err != nil { return "", fmt.Errorf("failed to create authentication token: %w", err) } diff --git a/adapters/pgclient/postgres.go b/adapters/pgclient/postgres.go index 2b65e30..d98f719 100644 --- a/adapters/pgclient/postgres.go +++ b/adapters/pgclient/postgres.go @@ -29,7 +29,7 @@ func looksLikeJSONString(name string) bool { var reValidDBEnvName = regexp.MustCompile(`^[A-Z0-9_]+$`) type AWSProvider interface { - Credentials() aws.CredentialsProvider + Credentials(context.Context) (aws.CredentialsProvider, error) Region() string } @@ -82,7 +82,7 @@ func (ss *pgConnSet) aurora(name string, config *AuroraConfig) (PGConnector, err } if ss.credBuilder == nil { - ss.credBuilder = NewCredBuilder(ss.awsProvider.Credentials(), ss.awsProvider.Region()) + ss.credBuilder = NewCredBuilder(ss.awsProvider) } if err := ss.credBuilder.AddConfig(name, config); err != nil { diff --git a/apps/queueworker/awsmsg/sqs_parse.go b/adapters/sqsmsg/sqs_parse.go similarity index 99% rename from apps/queueworker/awsmsg/sqs_parse.go rename to adapters/sqsmsg/sqs_parse.go index 2a0f250..ce6c2ed 100644 --- a/apps/queueworker/awsmsg/sqs_parse.go +++ b/adapters/sqsmsg/sqs_parse.go @@ -1,4 +1,4 @@ -package awsmsg +package sqsmsg import ( "bytes" diff --git a/apps/queueworker/awsmsg/sqs_parse_test.go b/adapters/sqsmsg/sqs_parse_test.go similarity index 99% rename from apps/queueworker/awsmsg/sqs_parse_test.go rename to adapters/sqsmsg/sqs_parse_test.go index cf44dca..aedee72 100644 --- a/apps/queueworker/awsmsg/sqs_parse_test.go +++ b/adapters/sqsmsg/sqs_parse_test.go @@ -1,4 +1,4 @@ -package awsmsg +package sqsmsg import ( "testing" diff --git a/apps/queueworker/sqslink/worker.go b/adapters/sqsmsg/worker.go similarity index 72% rename from apps/queueworker/sqslink/worker.go rename to adapters/sqsmsg/worker.go index 65aed78..b105464 100644 --- a/apps/queueworker/sqslink/worker.go +++ b/adapters/sqsmsg/worker.go @@ -1,10 +1,8 @@ -package sqslink +package sqsmsg import ( "context" - "crypto/rand" "fmt" - "math/big" "strconv" "github.com/aws/aws-sdk-go-v2/service/sqs" @@ -13,87 +11,24 @@ import ( "github.com/pentops/log.go/log" "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_tpb" - "github.com/pentops/o5-runtime-sidecar/apps/queueworker/awsmsg" + "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" ) const RawMessageName = "/o5.messaging.v1.topic.RawMessageTopic/Raw" -const GenericTopic = "/o5.messaging.v1.topic.GenericMessageTopic/Generic" type SQSAPI interface { ReceiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput, opts ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(ctx context.Context, input *sqs.DeleteMessageInput, opts ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) } -type Handler interface { - HandleMessage(context.Context, *messaging_pb.Message) error -} - -type HandlerFunc func(context.Context, *messaging_pb.Message) error - -func (hf HandlerFunc) HandleMessage(ctx context.Context, msg *messaging_pb.Message) error { - return hf(ctx, msg) -} - -// Is this message is randomly selected based on percent received? -func randomlySelected(ctx context.Context, pct int) bool { - if pct == 0 { - return false - } - - if pct == 100 { - return true - } - - if pct > 100 || pct < 0 { - log.Infof(ctx, "Received invalid percent for randomly selecting a message: %v", pct) - return false - } - - r, err := rand.Int(rand.Reader, big.NewInt(100)) - if err != nil { - log.WithError(ctx, err).Error("couldn't generate random number for selecting message") - return false - } - - if r.Int64() <= big.NewInt(int64(pct)).Int64() { - log.Infof(ctx, "Message randomly selected: rand of %v and percent of %v", r.Int64(), pct) - return true - } - return false -} - -type ResendHandler struct { - resendChance int - handler Handler -} - -func NewResendHandler(handler Handler, resendChance int) *ResendHandler { - return &ResendHandler{ - handler: handler, - resendChance: resendChance, - } -} - -func (ww *ResendHandler) HandleMessage(ctx context.Context, msg *messaging_pb.Message) error { - if err := ww.handler.HandleMessage(ctx, msg); err != nil { - return err - } - if randomlySelected(ctx, ww.resendChance) { - if err := ww.handler.HandleMessage(ctx, msg); err != nil { - return err - } - } - return nil -} - type Worker struct { - router Handler + router messaging.Handler SQSClient SQSAPI QueueURL string - deadLetterHandler DeadLetterHandler + deadLetterHandler messaging.DeadLetterHandler } -func NewWorker(sqs SQSAPI, queueURL string, deadLetters DeadLetterHandler, handler Handler) *Worker { +func NewWorker(sqs SQSAPI, queueURL string, deadLetters messaging.DeadLetterHandler, handler messaging.Handler) *Worker { return &Worker{ SQSClient: sqs, QueueURL: queueURL, @@ -125,7 +60,7 @@ func (ww *Worker) FetchOnce(ctx context.Context) error { // retrieve requests after being retrieved by a ReceiveMessage request. VisibilityTimeout: 30, - MessageAttributeNames: awsmsg.SQSMessageAttributes, + MessageAttributeNames: SQSMessageAttributes, AttributeNames: []types.QueueAttributeName{ // this type conversion is probably a bug in the SDK @@ -156,7 +91,7 @@ func getReceiveCount(msg types.Message) int { } func (ww *Worker) handleMessage(ctx context.Context, msg types.Message) { - parsed, err := awsmsg.ParseSQSMessage(msg) + parsed, err := ParseSQSMessage(msg) if err != nil { // Leave it for retry unless we keep failing at parsing it log.WithError(ctx, err).Error("Message Worker: Failed to parse message") diff --git a/apps/queueworker/sqslink/death.go b/apps/queueworker/messaging/death.go similarity index 98% rename from apps/queueworker/sqslink/death.go rename to apps/queueworker/messaging/death.go index 42166c4..6991484 100644 --- a/apps/queueworker/sqslink/death.go +++ b/apps/queueworker/messaging/death.go @@ -1,4 +1,4 @@ -package sqslink +package messaging import ( "context" diff --git a/apps/queueworker/messaging/middleware.go b/apps/queueworker/messaging/middleware.go new file mode 100644 index 0000000..c6745f0 --- /dev/null +++ b/apps/queueworker/messaging/middleware.go @@ -0,0 +1,62 @@ +package messaging + +import ( + "context" + "crypto/rand" + "math/big" + + "github.com/pentops/log.go/log" + "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" +) + +type ResendHandler struct { + resendChance int + handler Handler +} + +func NewResendHandler(handler Handler, resendChance int) *ResendHandler { + return &ResendHandler{ + handler: handler, + resendChance: resendChance, + } +} + +func (ww *ResendHandler) HandleMessage(ctx context.Context, msg *messaging_pb.Message) error { + if err := ww.handler.HandleMessage(ctx, msg); err != nil { + return err + } + if randomlySelected(ctx, ww.resendChance) { + if err := ww.handler.HandleMessage(ctx, msg); err != nil { + return err + } + } + return nil +} + +// Is this message is randomly selected based on percent received? +func randomlySelected(ctx context.Context, pct int) bool { + if pct == 0 { + return false + } + + if pct == 100 { + return true + } + + if pct > 100 || pct < 0 { + log.Infof(ctx, "Received invalid percent for randomly selecting a message: %v", pct) + return false + } + + r, err := rand.Int(rand.Reader, big.NewInt(100)) + if err != nil { + log.WithError(ctx, err).Error("couldn't generate random number for selecting message") + return false + } + + if r.Int64() <= big.NewInt(int64(pct)).Int64() { + log.Infof(ctx, "Message randomly selected: rand of %v and percent of %v", r.Int64(), pct) + return true + } + return false +} diff --git a/apps/queueworker/sqslink/router.go b/apps/queueworker/messaging/router.go similarity index 86% rename from apps/queueworker/sqslink/router.go rename to apps/queueworker/messaging/router.go index c223bb1..6873682 100644 --- a/apps/queueworker/sqslink/router.go +++ b/apps/queueworker/messaging/router.go @@ -1,4 +1,4 @@ -package sqslink +package messaging import ( "context" @@ -9,12 +9,24 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" ) +const GenericTopic = "/o5.messaging.v1.topic.GenericMessageTopic/Generic" + type ErrNoHandlerMatched string func (e ErrNoHandlerMatched) Error() string { return fmt.Sprintf("no handler matched for %q", string(e)) } +type Handler interface { + HandleMessage(context.Context, *messaging_pb.Message) error +} + +type HandlerFunc func(context.Context, *messaging_pb.Message) error + +func (hf HandlerFunc) HandleMessage(ctx context.Context, msg *messaging_pb.Message) error { + return hf(ctx, msg) +} + type Router struct { handlers map[string]Handler fallbackHandler Handler diff --git a/apps/queueworker/sqslink/service_handler.go b/apps/queueworker/messaging/service_handler.go similarity index 99% rename from apps/queueworker/sqslink/service_handler.go rename to apps/queueworker/messaging/service_handler.go index d1b38db..08476b0 100644 --- a/apps/queueworker/sqslink/service_handler.go +++ b/apps/queueworker/messaging/service_handler.go @@ -1,4 +1,4 @@ -package sqslink +package messaging import ( "context" diff --git a/apps/queueworker/sqslink/worker_test.go b/apps/queueworker/messaging/worker_test.go similarity index 99% rename from apps/queueworker/sqslink/worker_test.go rename to apps/queueworker/messaging/worker_test.go index e3de2b8..03ad221 100644 --- a/apps/queueworker/sqslink/worker_test.go +++ b/apps/queueworker/messaging/worker_test.go @@ -1,4 +1,4 @@ -package sqslink +package messaging import ( "context" diff --git a/apps/queueworker/worker.go b/apps/queueworker/worker.go index dbbc950..54fe73a 100644 --- a/apps/queueworker/worker.go +++ b/apps/queueworker/worker.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/pentops/o5-runtime-sidecar/apps/queueworker/sqslink" + "github.com/pentops/o5-runtime-sidecar/adapters/sqsmsg" + "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" "github.com/pentops/o5-runtime-sidecar/sidecar" - "google.golang.org/protobuf/reflect/protoreflect" ) type WorkerConfig struct { @@ -16,33 +16,29 @@ type WorkerConfig struct { } type App struct { - queueWorker *sqslink.Worker - router *sqslink.Router + queueWorker *sqsmsg.Worker } type Publisher interface { - sqslink.Publisher + messaging.Publisher } -func NewApp(config WorkerConfig, info sidecar.AppInfo, publisher sqslink.Publisher, sqs sqslink.SQSAPI) (*App, error) { - var dlh sqslink.DeadLetterHandler +func NewApp(config WorkerConfig, info sidecar.AppInfo, publisher messaging.Publisher, sqs sqsmsg.SQSAPI, handler messaging.Handler) (*App, error) { + var dlh messaging.DeadLetterHandler if !config.NoDeadLetters { if publisher == nil { return nil, fmt.Errorf("SQS queue worker requires a sender for dead letters (set EVENTBRIDGE_ARN)") } - dlh = sqslink.NewO5MessageDeadLetterHandler(publisher, info) + dlh = messaging.NewO5MessageDeadLetterHandler(publisher, info) } - router := sqslink.NewRouter() - var handler sqslink.Handler = router if config.ResendChance > 0 { - handler = sqslink.NewResendHandler(router, config.ResendChance) + handler = messaging.NewResendHandler(handler, config.ResendChance) } - ww := sqslink.NewWorker(sqs, config.SQSURL, dlh, handler) + ww := sqsmsg.NewWorker(sqs, config.SQSURL, dlh, handler) return &App{ - router: router, queueWorker: ww, }, nil @@ -51,7 +47,3 @@ func NewApp(config WorkerConfig, info sidecar.AppInfo, publisher sqslink.Publish func (app *App) Run(ctx context.Context) error { return app.queueWorker.Run(ctx) } - -func (app *App) RegisterService(ctx context.Context, service protoreflect.ServiceDescriptor, invoker sqslink.AppLink) error { - return app.router.RegisterService(ctx, service, invoker) -} diff --git a/cmd/server/main.go b/cmd/server/main.go index 9de6117..2e5cebf 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -45,7 +45,7 @@ func run(ctx context.Context, envConfig entrypoint.Config) error { return fmt.Errorf("failed to create AWS config builder: %w", err) } - runtime, err := entrypoint.FromConfig(envConfig, awsBuilder) + runtime, err := entrypoint.FromConfig(ctx, envConfig, awsBuilder) if err != nil { return fmt.Errorf("from config: %w", err) } diff --git a/entrypoint/aws.go b/entrypoint/aws.go index 95f13e1..3ad537f 100644 --- a/entrypoint/aws.go +++ b/entrypoint/aws.go @@ -13,62 +13,96 @@ import ( "github.com/pentops/log.go/log" ) -type AWSConfigBuilder struct { - config aws.Config -} +type configLoaderFunc func(ctx context.Context) (aws.Config, error) -func (acb *AWSConfigBuilder) SNS() SNSAPI { - return sns.NewFromConfig(acb.config) +type AWSConfigBuilder struct { + config *aws.Config + configLoader configLoaderFunc } -func (acb *AWSConfigBuilder) SQS() SQSAPI { - return sqs.NewFromConfig(acb.config) -} +var _ AWSProvider = (*AWSConfigBuilder)(nil) -func (acb *AWSConfigBuilder) EventBridge() EventBridgeAPI { - return eventbridge.NewFromConfig(acb.config) +func NewAWSConfigBuilder(provided configLoaderFunc) *AWSConfigBuilder { + return &AWSConfigBuilder{configLoader: provided} } -func (acb *AWSConfigBuilder) Region() string { - return acb.config.Region +func (acb *AWSConfigBuilder) getConfig(ctx context.Context) (aws.Config, error) { + if acb.config == nil { + cfg, err := acb.configLoader(ctx) + if err != nil { + return aws.Config{}, fmt.Errorf("couldn't load aws config: %w", err) + } + acb.config = &cfg + } + return *acb.config, nil } -func (acb *AWSConfigBuilder) Credentials() aws.CredentialsProvider { - return acb.config.Credentials +func (acb *AWSConfigBuilder) SNS(ctx context.Context) (SNSAPI, error) { + config, err := acb.getConfig(ctx) + if err != nil { + return nil, err + } + return sns.NewFromConfig(config), nil } -func NewAWSConfigBuilder(provided aws.Config) *AWSConfigBuilder { - return &AWSConfigBuilder{config: provided} +func (acb *AWSConfigBuilder) SQS(ctx context.Context) (SQSAPI, error) { + config, err := acb.getConfig(ctx) + if err != nil { + return nil, err + } + return sqs.NewFromConfig(config), nil } -func NewDefaultAWSConfigBuilder(ctx context.Context) (*AWSConfigBuilder, error) { - cfg, err := config.LoadDefaultConfig(ctx) +func (acb *AWSConfigBuilder) EventBridge(ctx context.Context) (EventBridgeAPI, error) { + config, err := acb.getConfig(ctx) if err != nil { - return nil, fmt.Errorf("couldn't load aws config: %w", err) + return nil, err } + return eventbridge.NewFromConfig(config), nil +} - stsClient := sts.NewFromConfig(cfg) - callerIdentity, err := stsClient.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{}) +func (acb *AWSConfigBuilder) Region() string { + return acb.config.Region +} + +func (acb *AWSConfigBuilder) Credentials(ctx context.Context) (aws.CredentialsProvider, error) { + config, err := acb.getConfig(ctx) if err != nil { return nil, err } + return config.Credentials, nil +} - log.WithFields(ctx, map[string]any{ - "account": aws.ToString(callerIdentity.Account), - "arn": aws.ToString(callerIdentity.Arn), - "user": aws.ToString(callerIdentity.UserId), - }).Info("Sidecar AWS Identity") - - return NewAWSConfigBuilder(cfg), nil +func NewDefaultAWSConfigBuilder(ctx context.Context) (*AWSConfigBuilder, error) { + return NewAWSConfigBuilder(func(ctx context.Context) (aws.Config, error) { + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return aws.Config{}, fmt.Errorf("couldn't load aws config: %w", err) + } + + stsClient := sts.NewFromConfig(cfg) + callerIdentity, err := stsClient.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{}) + if err != nil { + return aws.Config{}, err + } + + log.WithFields(ctx, map[string]any{ + "account": aws.ToString(callerIdentity.Account), + "arn": aws.ToString(callerIdentity.Arn), + "user": aws.ToString(callerIdentity.UserId), + }).Info("Sidecar AWS Identity") + return cfg, nil + }), nil } type AWSProvider interface { - SNS() SNSAPI - SQS() SQSAPI - EventBridge() EventBridgeAPI + SNS(context.Context) (SNSAPI, error) + SQS(context.Context) (SQSAPI, error) + EventBridge(context.Context) (EventBridgeAPI, error) Region() string - Credentials() aws.CredentialsProvider + Credentials(context.Context) (aws.CredentialsProvider, error) } // SNSAPI is an interface for the SNS client which satisfies the interfaces of diff --git a/entrypoint/builder.go b/entrypoint/builder.go index 0bc8658..79001df 100644 --- a/entrypoint/builder.go +++ b/entrypoint/builder.go @@ -1,6 +1,7 @@ package entrypoint import ( + "context" "fmt" "github.com/pentops/o5-runtime-sidecar/adapters/eventbridge" @@ -11,6 +12,7 @@ import ( "github.com/pentops/o5-runtime-sidecar/apps/pgoutbox" "github.com/pentops/o5-runtime-sidecar/apps/pgproxy" "github.com/pentops/o5-runtime-sidecar/apps/queueworker" + "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" "github.com/pentops/o5-runtime-sidecar/sidecar" ) @@ -35,7 +37,7 @@ type Publisher interface { queueworker.Publisher } -func FromConfig(envConfig Config, awsConfig AWSProvider) (*Runtime, error) { +func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (*Runtime, error) { srcConfig := sidecar.AppInfo{ SourceApp: envConfig.AppName, SourceEnv: envConfig.EnvironmentName, @@ -47,7 +49,11 @@ func FromConfig(envConfig Config, awsConfig AWSProvider) (*Runtime, error) { runtime.msgConverter = msgconvert.NewConverter(srcConfig) if envConfig.EventBridgeConfig.BusARN != "" { - s, err := eventbridge.NewEventBridgePublisher(awsConfig.EventBridge(), envConfig.EventBridgeConfig) + eventBridge, err := awsConfig.EventBridge(ctx) + if err != nil { + return nil, fmt.Errorf("getting eventbridge client: %w", err) + } + s, err := eventbridge.NewEventBridgePublisher(eventBridge, envConfig.EventBridgeConfig) if err != nil { return nil, fmt.Errorf("creating eventbridge publisher: %w", err) } @@ -83,7 +89,15 @@ func FromConfig(envConfig Config, awsConfig AWSProvider) (*Runtime, error) { // Subscribe to SQS messages if envConfig.WorkerConfig.SQSURL != "" { - w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, awsConfig.SQS()) + sqs, err := awsConfig.SQS(ctx) + if err != nil { + return nil, err + } + + router := messaging.NewRouter() + runtime.queueRouter = router + + w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, sqs, router) if err != nil { return nil, fmt.Errorf("creating queue worker: %w", err) } @@ -107,7 +121,7 @@ func FromConfig(envConfig Config, awsConfig AWSProvider) (*Runtime, error) { return nil, fmt.Errorf("creating router: %w", err) } - runtime.routerServer = r + runtime.serviceRouter = r } return runtime, nil diff --git a/entrypoint/runtime.go b/entrypoint/runtime.go index ad044b0..6a18bd4 100644 --- a/entrypoint/runtime.go +++ b/entrypoint/runtime.go @@ -15,6 +15,7 @@ import ( "github.com/pentops/o5-runtime-sidecar/apps/pgoutbox" "github.com/pentops/o5-runtime-sidecar/apps/pgproxy" "github.com/pentops/o5-runtime-sidecar/apps/queueworker" + "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" "github.com/pentops/runner" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -27,7 +28,8 @@ type Runtime struct { queueWorker *queueworker.App adapter *bridge.App - routerServer *httpserver.Router + queueRouter *messaging.Router + serviceRouter *httpserver.Router outboxListeners []*pgoutbox.App postgresProxy *pgproxy.App @@ -106,11 +108,11 @@ func (rt *Runtime) Run(ctx context.Context) error { <-rt.endpointWait - if rt.routerServer != nil { + if rt.serviceRouter != nil { // TODO: Metrics didAnything = true - runGroup.Add("router", rt.routerServer.Run) + runGroup.Add("router", rt.serviceRouter.Run) } @@ -152,20 +154,20 @@ func (rt *Runtime) registerEndpoint(ctx context.Context, prClient *grpcreflect.R switch { case strings.HasSuffix(name, "Service"), strings.HasSuffix(name, "Sandbox"): - if rt.routerServer == nil { + if rt.serviceRouter == nil { return fmt.Errorf("service %s requires a public port", name) } - if err := rt.routerServer.RegisterService(ctx, s, prClient); err != nil { + if err := rt.serviceRouter.RegisterService(ctx, s, prClient); err != nil { return fmt.Errorf("register service %s: %w", name, err) } case strings.HasSuffix(name, "Topic"): - if rt.queueWorker == nil { + if rt.queueRouter == nil { return fmt.Errorf("topic %s requires an SQS URL", name) } - if err := rt.queueWorker.RegisterService(ctx, s, prClient); err != nil { + if err := rt.queueRouter.RegisterService(ctx, s, prClient); err != nil { return fmt.Errorf("register worker %s: %w", name, err) } diff --git a/entrypoint/runtime_test.go b/entrypoint/runtime_test.go index 741d205..8f87ddb 100644 --- a/entrypoint/runtime_test.go +++ b/entrypoint/runtime_test.go @@ -14,28 +14,28 @@ import ( type TestAWS struct{} -func (ta TestAWS) SNS() SNSAPI { - return nil +func (ta TestAWS) SNS(ctx context.Context) (SNSAPI, error) { + return nil, fmt.Errorf("Test Not Implemented") } -func (ta TestAWS) SQS() SQSAPI { - return nil +func (ta TestAWS) SQS(ctx context.Context) (SQSAPI, error) { + return nil, fmt.Errorf("Test Not Implemented") } -func (ta TestAWS) EventBridge() EventBridgeAPI { - return nil +func (ta TestAWS) EventBridge(ctx context.Context) (EventBridgeAPI, error) { + return nil, fmt.Errorf("Test Not Implemented") } func (ta TestAWS) Region() string { return "local" } -func (ta TestAWS) Credentials() aws.CredentialsProvider { - return nil +func (ta TestAWS) Credentials(ctx context.Context) (aws.CredentialsProvider, error) { + return nil, fmt.Errorf("Test Not Implemented") } func TestLoader(t *testing.T) { - runtime, err := FromConfig(Config{}, TestAWS{}) + runtime, err := FromConfig(t.Context(), Config{}, TestAWS{}) assert.NoError(t, err) err = runtime.Run(context.Background()) @@ -49,7 +49,7 @@ func TestLoader(t *testing.T) { } func TestLoadEverything(t *testing.T) { - runtime, err := FromConfig(Config{ + runtime, err := FromConfig(t.Context(), Config{ ServerConfig: httpserver.ServerConfig{ PublicAddr: ":0", CORSOrigins: []string{"*"}, @@ -63,7 +63,7 @@ func TestLoadEverything(t *testing.T) { exitErr <- err }() - httpAddr := runtime.routerServer.Addr() + httpAddr := runtime.serviceRouter.Addr() t.Logf("HTTP Addr: %s", httpAddr) t.Run("HTTP CORS", func(t *testing.T) {