Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 44 additions & 28 deletions docs/adapters.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ Workflow uses the adapter pattern to decouple core workflow logic from infrastru
**Interface**:
```go
type EventStreamer interface {
NewProducer() (Producer, error)
NewConsumer(name string) (Consumer, error)
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}

type Producer interface {
Produce(ctx context.Context, event OutboxEvent) error
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}

type Consumer interface {
Consume(ctx context.Context, fn func(*Event, Ack) error) error
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
```
Expand Down Expand Up @@ -281,46 +281,62 @@ type MyEventStreamer struct {
config Config
}

func (s *MyEventStreamer) NewProducer() (workflow.Producer, error) {
return &MyProducer{client: s.client}, nil
func (s *MyEventStreamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
return &MySender{
client: s.client,
topic: topic,
}, nil
}

func (s *MyEventStreamer) NewConsumer(name string) (workflow.Consumer, error) {
return &MyConsumer{
func (s *MyEventStreamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) {
return &MyReceiver{
client: s.client,
topic: topic,
groupName: name,
}, nil
}

type MyProducer struct {
type MySender struct {
client MyClient
topic string
}

func (p *MyProducer) Produce(ctx context.Context, event workflow.OutboxEvent) error {
return p.client.Publish(ctx, event.Topic, event.Data)
func (s *MySender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
return s.client.Publish(ctx, s.topic, foreignID, statusType, headers)
}

func (p *MyProducer) Close() error {
return p.client.Close()
func (s *MySender) Close() error {
return s.client.Close()
}

type MyConsumer struct {
type MyReceiver struct {
client MyClient
topic string
groupName string
}

func (c *MyConsumer) Consume(ctx context.Context, fn func(*workflow.Event, workflow.Ack) error) error {
return c.client.Subscribe(ctx, c.groupName, func(msg Message) error {
event := &workflow.Event{
ID: msg.ID,
ForeignID: msg.ForeignID,
Type: msg.Type,
// ... map other fields
}

ack := &MyAck{msg: msg}
return fn(event, ack)
})
func (r *MyReceiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
msg, err := r.client.PollMessage(ctx, r.topic, r.groupName)
if err != nil {
return nil, nil, err
}

event := &workflow.Event{
ID: msg.ID,
ForeignID: msg.ForeignID,
Type: msg.Type,
// ... map other fields
}

ack := func() error {
return r.client.AckMessage(ctx, msg.ID)
}

return event, ack, nil
}

func (r *MyReceiver) Close() error {
return r.client.Close()
}
```

Expand Down
12 changes: 6 additions & 6 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,17 @@ Adapters provide infrastructure abstraction:

```go
type EventStreamer interface {
NewProducer() (Producer, error)
NewConsumer(name string) (Consumer, error)
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}

type Producer interface {
Produce(ctx context.Context, event OutboxEvent) error
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}

type Consumer interface {
Consume(ctx context.Context, fn func(*Event, Ack) error) error
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
```
Expand Down
Loading