From 3f6b0660e67e9c2f9e25b9508550fd36c393168a Mon Sep 17 00:00:00 2001 From: Jesus Hernandez Date: Thu, 18 Nov 2021 11:12:53 +0000 Subject: [PATCH 1/5] Make sure we handle non-errors --- middleware/recover/recover.go | 14 +++++++--- middleware/recover/recover_test.go | 43 +++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/middleware/recover/recover.go b/middleware/recover/recover.go index b6df643..75adf85 100644 --- a/middleware/recover/recover.go +++ b/middleware/recover/recover.go @@ -33,7 +33,7 @@ func (o Middleware) PublisherMsgInterceptor(serviceName string, next pubsub.Publ return func(ctx context.Context, topic string, m *pubsub.Msg) (err error) { defer func() { if r := recover(); r != nil { - err = recoverFrom(r, "pubsub: publish error", o.RecoveryHandlerFunc) + err = recoverFrom(r, "pubsub: publisher panic \n", o.RecoveryHandlerFunc) } }() err = next(ctx, topic, m) @@ -43,8 +43,16 @@ func (o Middleware) PublisherMsgInterceptor(serviceName string, next pubsub.Publ func recoverFrom(p interface{}, wrap string, r RecoveryHandlerFunc) error { if r == nil { - return errors.Wrap(p.(error), wrap) + var e error + switch val := p.(type) { + case string: + e = errors.New(val) + case error: + e = val + default: + e = errors.New("unknown error occurred") + } + return errors.Wrap(e, wrap) } - return r(p) } diff --git a/middleware/recover/recover_test.go b/middleware/recover/recover_test.go index 592eecf..5e84d92 100644 --- a/middleware/recover/recover_test.go +++ b/middleware/recover/recover_test.go @@ -17,17 +17,40 @@ type TestSubscriber struct { T *testing.T } -func (ts *TestSubscriber) DoSomething(ctx context.Context, t *test.Account, msg *pubsub.Msg) error { +func (ts *TestSubscriber) PanicWithError(ctx context.Context, t *test.Account, msg *pubsub.Msg) error { assert.True(ts.T, len(msg.Data) > 0) - panic(errors.New("ahhhhhhhh")) - return nil + panic(errors.New("this is an error")) +} + +func (ts *TestSubscriber) PanicWithString(ctx context.Context, t *test.Account, msg *pubsub.Msg) error { + assert.True(ts.T, len(msg.Data) > 0) + panic("this is a panic") +} + +func (ts *TestSubscriber) PanicUnknown(ctx context.Context, t *test.Account, msg *pubsub.Msg) error { + assert.True(ts.T, len(msg.Data) > 0) + panic(struct{}{}) } func (ts *TestSubscriber) Setup(c *pubsub.Client) { c.On(pubsub.HandlerOptions{ - Topic: "test_topic", - Name: "do_something", - Handler: ts.DoSomething, + Topic: "with_error", + Name: "test", + Handler: ts.PanicWithError, + JSON: ts.JSON, + }) + + c.On(pubsub.HandlerOptions{ + Topic: "with_string", + Name: "test", + Handler: ts.PanicWithString, + JSON: ts.JSON, + }) + + c.On(pubsub.HandlerOptions{ + Topic: "with_unknown", + Name: "test", + Handler: ts.PanicUnknown, JSON: ts.JSON, }) } @@ -51,7 +74,13 @@ func TestRecoverMiddleware(t *testing.T) { Name: "smth", } - err := c.Publish(context.Background(), "test_topic", &ps, false) + err := c.Publish(context.Background(), "with_error", &ps, false) + assert.Nil(t, err) + + err = c.Publish(context.Background(), "with_string", &ps, false) + assert.Nil(t, err) + + err = c.Publish(context.Background(), "with_unknown", &ps, false) assert.Nil(t, err) ts := TestSubscriber{T: t} From eb37ebd8fd13a351df39b291519582bcccd54812 Mon Sep 17 00:00:00 2001 From: Matt Schiros Date: Fri, 31 Dec 2021 08:59:21 -0800 Subject: [PATCH 2/5] Updated the memory provider to act like a very basic queue --- providers/memory/memory.go | 74 +++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/providers/memory/memory.go b/providers/memory/memory.go index f65cb70..f37bc11 100644 --- a/providers/memory/memory.go +++ b/providers/memory/memory.go @@ -3,40 +3,82 @@ package memory import ( "context" "fmt" - "github.com/lileio/pubsub/v2" + "sync" ) type MemoryProvider struct { - Msgs map[string][]*pubsub.Msg - ErrorHandler func(err error) + *sync.Mutex + Msgs map[string]chan *pubsub.Msg + Subscribers map[string][]pubsub.MsgHandler + Errors chan error } -func (mp *MemoryProvider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error { - if mp.Msgs == nil { - mp.Msgs = make(map[string][]*pubsub.Msg, 0) +func NewMemoryProvider() *MemoryProvider { + mp := &MemoryProvider{ + &sync.Mutex{}, + make(map[string]chan *pubsub.Msg), + make(map[string][]pubsub.MsgHandler), + make(chan error, 101), } + go mp.ProcessErrors() + return mp +} - mp.Msgs[topic] = append(mp.Msgs[topic], m) +func (mp *MemoryProvider) ProcessErrors() { + for err := range mp.Errors { + fmt.Println(err) + } +} +func (mp *MemoryProvider) SetupTopic(topic string) { + mp.Lock() + defer mp.Unlock() + if _, ok := mp.Msgs[topic]; !ok { + mp.Msgs[topic] = make(chan *pubsub.Msg, 100) + mp.Subscribers[topic] = make([]pubsub.MsgHandler, 0, 0) + go mp.process(topic) + } +} + +func (mp *MemoryProvider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error { + if _, ok := mp.Msgs[topic]; !ok { + mp.SetupTopic(topic) + } + mp.Msgs[topic] <- m return nil } func (mp *MemoryProvider) Subscribe(opts pubsub.HandlerOptions, h pubsub.MsgHandler) { - for _, v := range mp.Msgs[opts.Topic] { - err := h(context.Background(), *v) - - if err != nil { - if mp.ErrorHandler != nil { - mp.ErrorHandler(err) - } else { - fmt.Print(err.Error()) + mp.Lock() + defer mp.Unlock() + topic := opts.Topic + if _, ok := mp.Subscribers[topic]; !ok { + mp.SetupTopic(topic) + } + mp.Subscribers[topic] = append(mp.Subscribers[topic], h) + return +} + +func (mp *MemoryProvider) process(topic string) { + var err error + for msg := range mp.Msgs[topic] { + for _, handler := range mp.Subscribers[topic] { + err = handler(context.Background(), *msg) + if err != nil { + mp.Errors <- err } } } - return } func (mp *MemoryProvider) Shutdown() { + mp.Lock() + defer mp.Unlock() + + for _, c := range mp.Msgs { + close(c) + } + close(mp.Errors) return } From 5a200744ff10194a41207b0033f880a01f449a9d Mon Sep 17 00:00:00 2001 From: Matt Schiros Date: Fri, 31 Dec 2021 18:11:11 -0800 Subject: [PATCH 3/5] now with sync.Map{}, b/c keeping the maps straight with mutexes was turning into a nightmare --- providers/memory/memory.go | 49 +++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/providers/memory/memory.go b/providers/memory/memory.go index f37bc11..644b6fc 100644 --- a/providers/memory/memory.go +++ b/providers/memory/memory.go @@ -8,17 +8,17 @@ import ( ) type MemoryProvider struct { - *sync.Mutex - Msgs map[string]chan *pubsub.Msg - Subscribers map[string][]pubsub.MsgHandler + Msgs *sync.Map +// map[string]chan *pubsub.Msg + Subscribers *sync.Map +//map[string][]pubsub.MsgHandler Errors chan error } func NewMemoryProvider() *MemoryProvider { mp := &MemoryProvider{ - &sync.Mutex{}, - make(map[string]chan *pubsub.Msg), - make(map[string][]pubsub.MsgHandler), + &sync.Map{}, + &sync.Map{}, make(chan error, 101), } go mp.ProcessErrors() @@ -32,38 +32,38 @@ func (mp *MemoryProvider) ProcessErrors() { } func (mp *MemoryProvider) SetupTopic(topic string) { - mp.Lock() - defer mp.Unlock() - if _, ok := mp.Msgs[topic]; !ok { - mp.Msgs[topic] = make(chan *pubsub.Msg, 100) - mp.Subscribers[topic] = make([]pubsub.MsgHandler, 0, 0) + if _, ok := mp.Msgs.Load(topic); !ok { + mp.Msgs.Store(topic, make(chan *pubsub.Msg, 100)) + mp.Subscribers.Store(topic, make([]pubsub.MsgHandler, 0, 0)) go mp.process(topic) } } func (mp *MemoryProvider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error { - if _, ok := mp.Msgs[topic]; !ok { + if _, ok := mp.Msgs.Load(topic); !ok { mp.SetupTopic(topic) } - mp.Msgs[topic] <- m + c, _ := mp.Msgs.Load(topic) + c.(chan *pubsub.Msg) <- m return nil } func (mp *MemoryProvider) Subscribe(opts pubsub.HandlerOptions, h pubsub.MsgHandler) { - mp.Lock() - defer mp.Unlock() topic := opts.Topic - if _, ok := mp.Subscribers[topic]; !ok { + if _, ok := mp.Subscribers.Load(topic); !ok { mp.SetupTopic(topic) } - mp.Subscribers[topic] = append(mp.Subscribers[topic], h) + s, _ := mp.Subscribers.Load(topic) + mp.Subscribers.Store(topic, append(s.([]pubsub.MsgHandler), h)) return } func (mp *MemoryProvider) process(topic string) { var err error - for msg := range mp.Msgs[topic] { - for _, handler := range mp.Subscribers[topic] { + c, _ := mp.Msgs.Load(topic) + for msg := range c.(chan *pubsub.Msg) { + s, _ := mp.Subscribers.Load(topic) + for _, handler := range s.([]pubsub.MsgHandler){ err = handler(context.Background(), *msg) if err != nil { mp.Errors <- err @@ -73,12 +73,11 @@ func (mp *MemoryProvider) process(topic string) { } func (mp *MemoryProvider) Shutdown() { - mp.Lock() - defer mp.Unlock() - - for _, c := range mp.Msgs { - close(c) - } + + mp.Msgs.Range(func (k, v interface{}) bool { + close(v.(chan *pubsub.Msg)) + return true + }) close(mp.Errors) return } From b2154cd30e4d2807e6bdd952127f025ad38467bc Mon Sep 17 00:00:00 2001 From: Matt Schiros Date: Fri, 7 Jan 2022 10:13:46 -0800 Subject: [PATCH 4/5] removed comments --- providers/memory/memory.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/providers/memory/memory.go b/providers/memory/memory.go index 644b6fc..4bc8e58 100644 --- a/providers/memory/memory.go +++ b/providers/memory/memory.go @@ -9,9 +9,7 @@ import ( type MemoryProvider struct { Msgs *sync.Map -// map[string]chan *pubsub.Msg Subscribers *sync.Map -//map[string][]pubsub.MsgHandler Errors chan error } From 794bf2cc4b193c1999a6216705959b119986c137 Mon Sep 17 00:00:00 2001 From: Matt Schiros Date: Fri, 7 Jan 2022 10:14:14 -0800 Subject: [PATCH 5/5] go fmt --- providers/memory/memory.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/memory/memory.go b/providers/memory/memory.go index 4bc8e58..f30b867 100644 --- a/providers/memory/memory.go +++ b/providers/memory/memory.go @@ -8,9 +8,9 @@ import ( ) type MemoryProvider struct { - Msgs *sync.Map - Subscribers *sync.Map - Errors chan error + Msgs *sync.Map + Subscribers *sync.Map + Errors chan error } func NewMemoryProvider() *MemoryProvider { @@ -61,7 +61,7 @@ func (mp *MemoryProvider) process(topic string) { c, _ := mp.Msgs.Load(topic) for msg := range c.(chan *pubsub.Msg) { s, _ := mp.Subscribers.Load(topic) - for _, handler := range s.([]pubsub.MsgHandler){ + for _, handler := range s.([]pubsub.MsgHandler) { err = handler(context.Background(), *msg) if err != nil { mp.Errors <- err @@ -71,8 +71,8 @@ func (mp *MemoryProvider) process(topic string) { } func (mp *MemoryProvider) Shutdown() { - - mp.Msgs.Range(func (k, v interface{}) bool { + + mp.Msgs.Range(func(k, v interface{}) bool { close(v.(chan *pubsub.Msg)) return true })