From cdf84e8f4f993814341d01186368f1311563c723 Mon Sep 17 00:00:00 2001 From: Shreyas Date: Thu, 30 Aug 2018 10:09:18 -0700 Subject: [PATCH 1/3] Add workerID to Process params --- plugin.go | 2 +- transistor.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/plugin.go b/plugin.go index 8066f78..dbab56f 100644 --- a/plugin.go +++ b/plugin.go @@ -13,7 +13,7 @@ type Plugin interface { Subscribe() []string // Process takes in an event message and tries to process it - Process(Event) error + Process(Event, string) error } type RunningPlugin struct { diff --git a/transistor.go b/transistor.go index 0b79ec5..af4f40e 100644 --- a/transistor.go +++ b/transistor.go @@ -125,7 +125,10 @@ func (t *Transistor) addPlugin(name string) error { //event.Dump() - plugin.Process(event) + //event.Dump() + workerID := uuid.NewV4() + + plugin.Process(event, workerID.String()) } wc := t.Config.Plugins[name].(map[string]interface{}) @@ -198,7 +201,9 @@ func (t *Transistor) flusher() { workers.EnqueueWithOptions(plugin.Name, "Event", e, options) } else { go func() { - plugin.Plugin.Process(e) + workerID := uuid.NewV4() + + plugin.Plugin.Process(e, workerID.String()) }() } } From 97687d104d5b919704759e11892faa3ede93d970 Mon Sep 17 00:00:00 2001 From: Shreyas Date: Thu, 30 Aug 2018 17:14:56 -0700 Subject: [PATCH 2/3] fix exampleplugins --- example/plugins/examplePlugin1.go | 2 +- example/plugins/examplePlugin2.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/example/plugins/examplePlugin1.go b/example/plugins/examplePlugin1.go index deca414..568bd98 100644 --- a/example/plugins/examplePlugin1.go +++ b/example/plugins/examplePlugin1.go @@ -38,7 +38,7 @@ func (x *ExamplePlugin1) Subscribe() []string { } } -func (x *ExamplePlugin1) Process(e transistor.Event) error { +func (x *ExamplePlugin1) Process(e transistor.Event, workerID string) error { if e.Event() == "examplePlugin1:create" { hello := e.Payload.(Hello) log.Info("ExamplePlugin1 received a message:", hello) diff --git a/example/plugins/examplePlugin2.go b/example/plugins/examplePlugin2.go index ff1035c..7f3ee38 100644 --- a/example/plugins/examplePlugin2.go +++ b/example/plugins/examplePlugin2.go @@ -39,7 +39,7 @@ func (x *ExamplePlugin2) Subscribe() []string { } } -func (x *ExamplePlugin2) Process(e transistor.Event) error { +func (x *ExamplePlugin2) Process(e transistor.Event, workerID string) error { if e.Event() == "examplePlugin2:create" { hello := e.Payload.(Hello) log.Info("ExamplePlugin2 received a message:", hello) From bacbf34ed6b4e6c430fd564e05c4f3e9e0d478ae Mon Sep 17 00:00:00 2001 From: Shreyas Date: Thu, 30 Aug 2018 17:21:03 -0700 Subject: [PATCH 3/3] nits --- transistor.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/transistor.go b/transistor.go index af4f40e..de7b380 100644 --- a/transistor.go +++ b/transistor.go @@ -123,11 +123,7 @@ func (t *Transistor) addPlugin(name string) error { log.Fatal(fmt.Errorf("PayloadModel not found: %s. Did you add it to ApiRegistry?", event.PayloadModel)) } - //event.Dump() - - //event.Dump() workerID := uuid.NewV4() - plugin.Process(event, workerID.String()) } @@ -202,7 +198,6 @@ func (t *Transistor) flusher() { } else { go func() { workerID := uuid.NewV4() - plugin.Plugin.Process(e, workerID.String()) }() }