From a6a4eacd650ed2e3d503be3163bfe2441e39b037 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Fri, 2 Jan 2026 10:50:53 +0000 Subject: [PATCH] docs: Issue 168 fix doc discrepancy --- docs/adapters.md | 72 +++++++++++++++++++++++++++----------------- docs/architecture.md | 12 ++++---- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/docs/adapters.md b/docs/adapters.md index f1cfd3a..ae75e9a 100644 --- a/docs/adapters.md +++ b/docs/adapters.md @@ -28,17 +28,17 @@ Workflow uses the adapter pattern to decouple core workflow logic from infrastru **Interface**: ```go type EventStreamer interface { - NewProducer() (Producer, error) - NewConsumer(name string) (Consumer, error) + NewSender(ctx context.Context, topic string) (EventSender, error) + NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error) } -type Producer interface { - Produce(ctx context.Context, event OutboxEvent) error +type EventSender interface { + Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error Close() error } -type Consumer interface { - Consume(ctx context.Context, fn func(*Event, Ack) error) error +type EventReceiver interface { + Recv(ctx context.Context) (*Event, Ack, error) Close() error } ``` @@ -281,46 +281,62 @@ type MyEventStreamer struct { config Config } -func (s *MyEventStreamer) NewProducer() (workflow.Producer, error) { - return &MyProducer{client: s.client}, nil +func (s *MyEventStreamer) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) { + return &MySender{ + client: s.client, + topic: topic, + }, nil } -func (s *MyEventStreamer) NewConsumer(name string) (workflow.Consumer, error) { - return &MyConsumer{ +func (s *MyEventStreamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) { + return &MyReceiver{ client: s.client, + topic: topic, groupName: name, }, nil } -type MyProducer struct { +type MySender struct { client MyClient + topic string } -func (p *MyProducer) Produce(ctx context.Context, event workflow.OutboxEvent) error { - return p.client.Publish(ctx, event.Topic, event.Data) +func (s *MySender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error { + return s.client.Publish(ctx, s.topic, foreignID, statusType, headers) } -func (p *MyProducer) Close() error { - return p.client.Close() +func (s *MySender) Close() error { + return s.client.Close() } -type MyConsumer struct { +type MyReceiver struct { client MyClient + topic string groupName string } -func (c *MyConsumer) Consume(ctx context.Context, fn func(*workflow.Event, workflow.Ack) error) error { - return c.client.Subscribe(ctx, c.groupName, func(msg Message) error { - event := &workflow.Event{ - ID: msg.ID, - ForeignID: msg.ForeignID, - Type: msg.Type, - // ... map other fields - } - - ack := &MyAck{msg: msg} - return fn(event, ack) - }) +func (r *MyReceiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) { + msg, err := r.client.PollMessage(ctx, r.topic, r.groupName) + if err != nil { + return nil, nil, err + } + + event := &workflow.Event{ + ID: msg.ID, + ForeignID: msg.ForeignID, + Type: msg.Type, + // ... map other fields + } + + ack := func() error { + return r.client.AckMessage(ctx, msg.ID) + } + + return event, ack, nil +} + +func (r *MyReceiver) Close() error { + return r.client.Close() } ``` diff --git a/docs/architecture.md b/docs/architecture.md index 667fbf4..b19a431 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -190,17 +190,17 @@ Adapters provide infrastructure abstraction: ```go type EventStreamer interface { - NewProducer() (Producer, error) - NewConsumer(name string) (Consumer, error) + NewSender(ctx context.Context, topic string) (EventSender, error) + NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error) } -type Producer interface { - Produce(ctx context.Context, event OutboxEvent) error +type EventSender interface { + Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error Close() error } -type Consumer interface { - Consume(ctx context.Context, fn func(*Event, Ack) error) error +type EventReceiver interface { + Recv(ctx context.Context) (*Event, Ack, error) Close() error } ```