diff --git a/README.md b/README.md index 4e2d5f0..c219105 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,16 @@ The sidecar trusts the application's reflection proto, this is probably not great but have to start somewhere. Comparing the reflection proto to some central registry could be interesting but not in the initial scope. -## Boot Process -```mermaid -sequenceDiagram +Environment +----------- +Baseline Required: +`APP_NAME string` +`ENVIRONMENT_NAME string` + +EventBridge + +`EVENTBRIDGE_ARN string` - The ARN of the EventBridge bus to use -``` diff --git a/adapters/amqp/ampq.go b/adapters/amqp/ampq.go new file mode 100644 index 0000000..cb45ec7 --- /dev/null +++ b/adapters/amqp/ampq.go @@ -0,0 +1,96 @@ +package amqp + +import ( + "fmt" + "strings" + "sync" + + "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" + amqp "github.com/rabbitmq/amqp091-go" +) + +type AMQPConfig struct { + URI string `env:"AMQP_URI" default:""` + Exchange string `env:"AMQP_EXCHANGE" default:""` + Queue string `env:"AMQP_QUEUE" default:""` +} + +type Connection struct { + connLock sync.Mutex + connection *amqp.Connection + channel *amqp.Channel + exchange string +} + +func NewConnection(config AMQPConfig, envName string) (*Connection, error) { + conn, err := amqp.Dial(config.URI) + if err != nil { + return nil, err + } + + exchange := config.Exchange + if exchange == "" { + exchange = fmt.Sprintf("o5.exchange.%s", envName) + } + + cw := &Connection{ + connection: conn, + exchange: exchange, + } + return cw, nil +} + +func (p *Connection) getChannel() (*amqp.Channel, error) { + p.connLock.Lock() + defer p.connLock.Unlock() + if p.channel != nil { + if p.channel.IsClosed() { + p.channel = nil + } else { + return p.channel, nil + } + } + ch, err := p.connection.Channel() + if err != nil { + return nil, err + } + p.channel = ch + return ch, nil +} + +/* Topic Breakdown: + +These are defined in o5 configs: + +o5-infra/{topicName} is an SNS subscription only, not compatible. +global/{globalType} + +/{package}.{topic} +/{package}.{topic}/{endpoint} +{topicName} + +/{package}.{topic} becomes service.{package}.{topic} + +// when published +// foo.v1.FooService/FooMethod becomes service.foo/v1/FooService.FooMethod + +// when subscribing: +// a config of /foo.v1.FooService +// will become service.foo/v1/FooService.* + +// a config of /foo.v1.FooService/FooMethod +// will become service.foo/v1/FooService.FooMethod + +*/ + +func messageToRoutingKey(message *messaging_pb.Message) string { + + /* + if message.DestinationTopic != "" { + return fmt.Sprintf("topic.%s", message.DestinationTopic) + }*/ + + serviceShash := strings.ReplaceAll(message.GrpcService, ".", "/") + + return fmt.Sprintf("service.%s.%s", serviceShash, message.GrpcMethod) +} diff --git a/adapters/amqp/publisher.go b/adapters/amqp/publisher.go new file mode 100644 index 0000000..8e27bd0 --- /dev/null +++ b/adapters/amqp/publisher.go @@ -0,0 +1,69 @@ +package amqp + +import ( + "context" + "errors" + + "github.com/pentops/j5/lib/j5codec" + "github.com/pentops/log.go/log" + "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" + amqp "github.com/rabbitmq/amqp091-go" +) + +type Publisher struct { + conn *Connection +} + +func NewPublisher(conn *Connection) (*Publisher, error) { + cw := &Publisher{ + conn: conn, + } + return cw, nil +} + +func (p *Publisher) Publish(ctx context.Context, message *messaging_pb.Message) error { + routingKey := messageToRoutingKey(message) + + ch, err := p.conn.getChannel() + if err != nil { + return err + } + + // eventbridge requires JSON bodies. + detail, err := j5codec.Global.ProtoToJSON(message.ProtoReflect()) + if err != nil { + return err + } + + log.WithFields(ctx, "exchange", p.conn.exchange, "routing_key", routingKey).Info("Publishing message to AMQP") + return ch.PublishWithContext(ctx, + p.conn.exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/o5-message", + Body: detail, + }, + ) + +} + +func (p *Publisher) PublishBatch(ctx context.Context, messages []*messaging_pb.Message) ([]string, error) { + + errs := make([]error, 0) + ids := make([]string, 0, len(messages)) + for _, msg := range messages { + err := p.Publish(ctx, msg) + if err != nil { + errs = append(errs, err) + continue + } + ids = append(ids, msg.MessageId) + } + if len(errs) > 0 { + return ids, errors.Join(errs...) + } + + return ids, nil +} diff --git a/adapters/amqp/subscriber.go b/adapters/amqp/subscriber.go new file mode 100644 index 0000000..902596b --- /dev/null +++ b/adapters/amqp/subscriber.go @@ -0,0 +1,100 @@ +package amqp + +import ( + "context" + "fmt" + + "github.com/pentops/j5/lib/j5codec" + "github.com/pentops/log.go/log" + "github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb" + "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +type Subscriber struct { + queueName string + conn *Connection + handler messaging.Handler +} + +func NewSubscriber(conn *Connection, queueName string, router messaging.Handler) (*Subscriber, error) { + sub := &Subscriber{ + conn: conn, + queueName: queueName, + handler: router, + } + return sub, nil + +} + +func (p *Subscriber) Run(ctx context.Context) error { + ch, err := p.conn.getChannel() + if err != nil { + return err + } + + delivery, err := ch.ConsumeWithContext(ctx, + p.queueName, // queue + "", // consumer + false, // autoAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // args + ) + if err != nil { + return err + } + defer ch.Close() + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-delivery: + if !ok { + return fmt.Errorf("AMQP delivery channel closed") + } + + err = p.handleDelivery(ctx, msg) + if err != nil { + return err + } + } + } + +} + +func (p *Subscriber) handleDelivery(ctx context.Context, delivery amqp.Delivery) error { + count, ok := delivery.Headers["x-delivery-count"].(int64) + if !ok { + count = 0 + } + + if delivery.ContentType != "application/o5-message" { + return fmt.Errorf("invalid content type: %s", delivery.ContentType) + } + + msg := &messaging_pb.Message{} + err := j5codec.Global.JSONToProto(delivery.Body, msg.ProtoReflect()) + if err != nil { + return err + } + + handlerError := p.handler.HandleMessage(ctx, msg) + if handlerError == nil { // LOGIC INVERSION + err = delivery.Ack(false) + if err != nil { + return err + } + return nil + } + log.WithError(ctx, handlerError).Error("Message Handler: Error") + if count >= 3 { + log.Info(ctx, "Message Handler: Killing after 3 attempts") + + } else { + log.Info(ctx, "Message Handler: Requeuing message") + err = delivery.Nack(false, true) // 'requeue', meaning a DLX should be set up + } + return err +} diff --git a/entrypoint/builder.go b/entrypoint/builder.go index 79001df..bd8878a 100644 --- a/entrypoint/builder.go +++ b/entrypoint/builder.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/pentops/o5-runtime-sidecar/adapters/amqp" "github.com/pentops/o5-runtime-sidecar/adapters/eventbridge" "github.com/pentops/o5-runtime-sidecar/adapters/msgconvert" "github.com/pentops/o5-runtime-sidecar/adapters/pgclient" @@ -28,6 +29,7 @@ type Config struct { OutboxConfig pgoutbox.OutboxConfig BridgeConfig bridge.BridgeConfig EventBridgeConfig eventbridge.EventBridgeConfig + AMQPConfig amqp.AMQPConfig ServiceEndpoints []string `env:"SERVICE_ENDPOINT" default:""` } @@ -48,6 +50,7 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (* runtime.endpoints = envConfig.ServiceEndpoints runtime.msgConverter = msgconvert.NewConverter(srcConfig) + // Publish to EventBridge if envConfig.EventBridgeConfig.BusARN != "" { eventBridge, err := awsConfig.EventBridge(ctx) if err != nil { @@ -61,6 +64,57 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (* runtime.sender = s } + // Subscribe to SQS messages + if envConfig.WorkerConfig.SQSURL != "" { + sqs, err := awsConfig.SQS(ctx) + if err != nil { + return nil, err + } + + router := messaging.NewRouter() + runtime.queueRouter = router + + w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, sqs, router) + if err != nil { + return nil, fmt.Errorf("creating queue worker: %w", err) + } + + runtime.queueWorker = w + } + + if envConfig.AMQPConfig.URI != "" { + if runtime.sender != nil { + return nil, fmt.Errorf("cannot set both AMQP_URI and EVENTBRIDGE_ARN") + } + if runtime.queueWorker != nil { + return nil, fmt.Errorf("cannot set both AMQP_URI and SQS_URL") + } + + conn, err := amqp.NewConnection(envConfig.AMQPConfig, envConfig.EnvironmentName) + if err != nil { + return nil, fmt.Errorf("creating amqp connection: %w", err) + } + + publisher, err := amqp.NewPublisher(conn) + if err != nil { + return nil, fmt.Errorf("creating amqp publisher: %w", err) + } + runtime.sender = publisher + + router := messaging.NewRouter() + runtime.queueRouter = router + + worker, err := amqp.NewSubscriber(conn, envConfig.AMQPConfig.Queue, router) + if err != nil { + return nil, fmt.Errorf("creating amqp publisher: %w", err) + } + + runtime.queueWorker = worker + + } else if len(envConfig.AMQPConfig.Queue) > 0 { + return nil, fmt.Errorf("AMQP_QUEUES set but AMQP_URI is empty") + } + pgConfigs := pgclient.NewConnectorSet(awsConfig, pgclient.EnvProvider{}) // Listen to a Postgres outbox table @@ -87,24 +141,6 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (* runtime.postgresProxy = p } - // Subscribe to SQS messages - if envConfig.WorkerConfig.SQSURL != "" { - sqs, err := awsConfig.SQS(ctx) - if err != nil { - return nil, err - } - - router := messaging.NewRouter() - runtime.queueRouter = router - - w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, sqs, router) - if err != nil { - return nil, fmt.Errorf("creating queue worker: %w", err) - } - - runtime.queueWorker = w - } - // Serve an internal gRPC server, for the app to use messaging without an outbox if envConfig.BridgeConfig.AdapterAddr != "" { if runtime.sender == nil { diff --git a/entrypoint/runtime.go b/entrypoint/runtime.go index 6a18bd4..a2ca797 100644 --- a/entrypoint/runtime.go +++ b/entrypoint/runtime.go @@ -14,7 +14,6 @@ import ( "github.com/pentops/o5-runtime-sidecar/apps/httpserver" "github.com/pentops/o5-runtime-sidecar/apps/pgoutbox" "github.com/pentops/o5-runtime-sidecar/apps/pgproxy" - "github.com/pentops/o5-runtime-sidecar/apps/queueworker" "github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging" "github.com/pentops/runner" "google.golang.org/grpc" @@ -23,10 +22,14 @@ import ( var ErrNothingToDo = errors.New("no services configured") +type Runner interface { + Run(ctx context.Context) error +} + type Runtime struct { - sender Publisher + sender Publisher + queueWorker Runner //*queueworker.App - queueWorker *queueworker.App adapter *bridge.App queueRouter *messaging.Router serviceRouter *httpserver.Router @@ -81,6 +84,7 @@ func (rt *Runtime) Run(ctx context.Context) error { runGroup.Add("register-endpoints", func(ctx context.Context) error { defer close(rt.endpointWait) for _, endpoint := range rt.endpoints { + ctx := log.WithField(ctx, "endpoint", endpoint) prClient, err := rt.connectEndpoint(endpoint) if err != nil { return fmt.Errorf("connect to endpoint %s: %w", endpoint, err) @@ -137,7 +141,7 @@ func (rt *Runtime) Run(ctx context.Context) error { func (rt *Runtime) connectEndpoint(endpoint string) (*grpcreflect.ReflectionClient, error) { conn, err := grpc.NewClient(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - return nil, fmt.Errorf("dial: %w", err) + return nil, fmt.Errorf("dial: %s: %w", endpoint, err) } return grpcreflect.NewClient(conn), nil diff --git a/go.mod b/go.mod index dbcf8c7..d6f7f2a 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/pentops/o5-messaging v0.0.0-20250804192812-419352c5dd36 github.com/pentops/runner v0.0.0-20250619010747-2bb7a5385324 github.com/pressly/goose/v3 v3.24.3 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/rs/cors v1.11.1 github.com/stretchr/testify v1.10.0 golang.org/x/sync v0.16.0 diff --git a/go.sum b/go.sum index 396e6ed..98cf76f 100644 --- a/go.sum +++ b/go.sum @@ -143,6 +143,8 @@ github.com/pquerna/cachecontrol v0.2.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQ github.com/pressly/goose/v3 v3.24.3 h1:DSWWNwwggVUsYZ0X2VitiAa9sKuqtBfe+Jr9zFGwWlM= github.com/pressly/goose/v3 v3.24.3/go.mod h1:v9zYL4xdViLHCUUJh/mhjnm6JrK7Eul8AS93IxiZM4E= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -181,6 +183,8 @@ go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=