Skip to content
Merged

AMQP #86

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ The sidecar trusts the application's reflection proto, this is probably not
great but have to start somewhere. Comparing the reflection proto to some
central registry could be interesting but not in the initial scope.

## Boot Process

```mermaid
sequenceDiagram
Environment
-----------

Baseline Required:

`APP_NAME string`
`ENVIRONMENT_NAME string`

EventBridge

`EVENTBRIDGE_ARN string` - The ARN of the EventBridge bus to use

```
96 changes: 96 additions & 0 deletions adapters/amqp/ampq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package amqp

import (
"fmt"
"strings"
"sync"

"github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb"
amqp "github.com/rabbitmq/amqp091-go"
)

type AMQPConfig struct {
URI string `env:"AMQP_URI" default:""`
Exchange string `env:"AMQP_EXCHANGE" default:""`
Queue string `env:"AMQP_QUEUE" default:""`
}

type Connection struct {
connLock sync.Mutex
connection *amqp.Connection
channel *amqp.Channel
exchange string
}

func NewConnection(config AMQPConfig, envName string) (*Connection, error) {
conn, err := amqp.Dial(config.URI)
if err != nil {
return nil, err
}

exchange := config.Exchange
if exchange == "" {
exchange = fmt.Sprintf("o5.exchange.%s", envName)
}

cw := &Connection{
connection: conn,
exchange: exchange,
}
return cw, nil
}

func (p *Connection) getChannel() (*amqp.Channel, error) {
p.connLock.Lock()
defer p.connLock.Unlock()
if p.channel != nil {
if p.channel.IsClosed() {
p.channel = nil
} else {
return p.channel, nil
}
}
ch, err := p.connection.Channel()
if err != nil {
return nil, err
}
p.channel = ch
return ch, nil
}

/* Topic Breakdown:

These are defined in o5 configs:

o5-infra/{topicName} is an SNS subscription only, not compatible.
global/{globalType}

/{package}.{topic}
/{package}.{topic}/{endpoint}
{topicName}

/{package}.{topic} becomes service.{package}.{topic}

// when published
// foo.v1.FooService/FooMethod becomes service.foo/v1/FooService.FooMethod

// when subscribing:
// a config of /foo.v1.FooService
// will become service.foo/v1/FooService.*

// a config of /foo.v1.FooService/FooMethod
// will become service.foo/v1/FooService.FooMethod

*/

func messageToRoutingKey(message *messaging_pb.Message) string {

/*
if message.DestinationTopic != "" {
return fmt.Sprintf("topic.%s", message.DestinationTopic)
}*/

serviceShash := strings.ReplaceAll(message.GrpcService, ".", "/")

return fmt.Sprintf("service.%s.%s", serviceShash, message.GrpcMethod)
}
69 changes: 69 additions & 0 deletions adapters/amqp/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package amqp

import (
"context"
"errors"

"github.com/pentops/j5/lib/j5codec"
"github.com/pentops/log.go/log"
"github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb"
amqp "github.com/rabbitmq/amqp091-go"
)

type Publisher struct {
conn *Connection
}

func NewPublisher(conn *Connection) (*Publisher, error) {
cw := &Publisher{
conn: conn,
}
return cw, nil
}

func (p *Publisher) Publish(ctx context.Context, message *messaging_pb.Message) error {
routingKey := messageToRoutingKey(message)

ch, err := p.conn.getChannel()
if err != nil {
return err
}

// eventbridge requires JSON bodies.
detail, err := j5codec.Global.ProtoToJSON(message.ProtoReflect())
if err != nil {
return err
}

log.WithFields(ctx, "exchange", p.conn.exchange, "routing_key", routingKey).Info("Publishing message to AMQP")
return ch.PublishWithContext(ctx,
p.conn.exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/o5-message",
Body: detail,
},
)

}

func (p *Publisher) PublishBatch(ctx context.Context, messages []*messaging_pb.Message) ([]string, error) {

errs := make([]error, 0)
ids := make([]string, 0, len(messages))
for _, msg := range messages {
err := p.Publish(ctx, msg)
if err != nil {
errs = append(errs, err)
continue
}
ids = append(ids, msg.MessageId)
}
if len(errs) > 0 {
return ids, errors.Join(errs...)
}

return ids, nil
}
100 changes: 100 additions & 0 deletions adapters/amqp/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package amqp

import (
"context"
"fmt"

"github.com/pentops/j5/lib/j5codec"
"github.com/pentops/log.go/log"
"github.com/pentops/o5-messaging/gen/o5/messaging/v1/messaging_pb"
"github.com/pentops/o5-runtime-sidecar/apps/queueworker/messaging"
amqp "github.com/rabbitmq/amqp091-go"
)

type Subscriber struct {
queueName string
conn *Connection
handler messaging.Handler
}

func NewSubscriber(conn *Connection, queueName string, router messaging.Handler) (*Subscriber, error) {
sub := &Subscriber{
conn: conn,
queueName: queueName,
handler: router,
}
return sub, nil

}

func (p *Subscriber) Run(ctx context.Context) error {
ch, err := p.conn.getChannel()
if err != nil {
return err
}

delivery, err := ch.ConsumeWithContext(ctx,
p.queueName, // queue
"", // consumer
false, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // args
)
if err != nil {
return err
}
defer ch.Close()
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-delivery:
if !ok {
return fmt.Errorf("AMQP delivery channel closed")
}

err = p.handleDelivery(ctx, msg)
if err != nil {
return err
}
}
}

}

func (p *Subscriber) handleDelivery(ctx context.Context, delivery amqp.Delivery) error {
count, ok := delivery.Headers["x-delivery-count"].(int64)
if !ok {
count = 0
}

if delivery.ContentType != "application/o5-message" {
return fmt.Errorf("invalid content type: %s", delivery.ContentType)
}

msg := &messaging_pb.Message{}
err := j5codec.Global.JSONToProto(delivery.Body, msg.ProtoReflect())
if err != nil {
return err
}

handlerError := p.handler.HandleMessage(ctx, msg)
if handlerError == nil { // LOGIC INVERSION
err = delivery.Ack(false)
if err != nil {
return err
}
return nil
}
log.WithError(ctx, handlerError).Error("Message Handler: Error")
if count >= 3 {
log.Info(ctx, "Message Handler: Killing after 3 attempts")

} else {
log.Info(ctx, "Message Handler: Requeuing message")
err = delivery.Nack(false, true) // 'requeue', meaning a DLX should be set up
}
return err
}
72 changes: 54 additions & 18 deletions entrypoint/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/pentops/o5-runtime-sidecar/adapters/amqp"
"github.com/pentops/o5-runtime-sidecar/adapters/eventbridge"
"github.com/pentops/o5-runtime-sidecar/adapters/msgconvert"
"github.com/pentops/o5-runtime-sidecar/adapters/pgclient"
Expand All @@ -28,6 +29,7 @@ type Config struct {
OutboxConfig pgoutbox.OutboxConfig
BridgeConfig bridge.BridgeConfig
EventBridgeConfig eventbridge.EventBridgeConfig
AMQPConfig amqp.AMQPConfig

ServiceEndpoints []string `env:"SERVICE_ENDPOINT" default:""`
}
Expand All @@ -48,6 +50,7 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (*
runtime.endpoints = envConfig.ServiceEndpoints
runtime.msgConverter = msgconvert.NewConverter(srcConfig)

// Publish to EventBridge
if envConfig.EventBridgeConfig.BusARN != "" {
eventBridge, err := awsConfig.EventBridge(ctx)
if err != nil {
Expand All @@ -61,6 +64,57 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (*
runtime.sender = s
}

// Subscribe to SQS messages
if envConfig.WorkerConfig.SQSURL != "" {
sqs, err := awsConfig.SQS(ctx)
if err != nil {
return nil, err
}

router := messaging.NewRouter()
runtime.queueRouter = router

w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, sqs, router)
if err != nil {
return nil, fmt.Errorf("creating queue worker: %w", err)
}

runtime.queueWorker = w
}

if envConfig.AMQPConfig.URI != "" {
if runtime.sender != nil {
return nil, fmt.Errorf("cannot set both AMQP_URI and EVENTBRIDGE_ARN")
}
if runtime.queueWorker != nil {
return nil, fmt.Errorf("cannot set both AMQP_URI and SQS_URL")
}

conn, err := amqp.NewConnection(envConfig.AMQPConfig, envConfig.EnvironmentName)
if err != nil {
return nil, fmt.Errorf("creating amqp connection: %w", err)
}

publisher, err := amqp.NewPublisher(conn)
if err != nil {
return nil, fmt.Errorf("creating amqp publisher: %w", err)
}
runtime.sender = publisher

router := messaging.NewRouter()
runtime.queueRouter = router

worker, err := amqp.NewSubscriber(conn, envConfig.AMQPConfig.Queue, router)
if err != nil {
return nil, fmt.Errorf("creating amqp publisher: %w", err)
}

runtime.queueWorker = worker

} else if len(envConfig.AMQPConfig.Queue) > 0 {
return nil, fmt.Errorf("AMQP_QUEUES set but AMQP_URI is empty")
}

pgConfigs := pgclient.NewConnectorSet(awsConfig, pgclient.EnvProvider{})

// Listen to a Postgres outbox table
Expand All @@ -87,24 +141,6 @@ func FromConfig(ctx context.Context, envConfig Config, awsConfig AWSProvider) (*
runtime.postgresProxy = p
}

// Subscribe to SQS messages
if envConfig.WorkerConfig.SQSURL != "" {
sqs, err := awsConfig.SQS(ctx)
if err != nil {
return nil, err
}

router := messaging.NewRouter()
runtime.queueRouter = router

w, err := queueworker.NewApp(envConfig.WorkerConfig, srcConfig, runtime.sender, sqs, router)
if err != nil {
return nil, fmt.Errorf("creating queue worker: %w", err)
}

runtime.queueWorker = w
}

// Serve an internal gRPC server, for the app to use messaging without an outbox
if envConfig.BridgeConfig.AdapterAddr != "" {
if runtime.sender == nil {
Expand Down
Loading