diff --git a/example/plugins/examplePlugin1.go b/example/plugins/examplePlugin1.go index deca414..568bd98 100644 --- a/example/plugins/examplePlugin1.go +++ b/example/plugins/examplePlugin1.go @@ -38,7 +38,7 @@ func (x *ExamplePlugin1) Subscribe() []string { } } -func (x *ExamplePlugin1) Process(e transistor.Event) error { +func (x *ExamplePlugin1) Process(e transistor.Event, workerID string) error { if e.Event() == "examplePlugin1:create" { hello := e.Payload.(Hello) log.Info("ExamplePlugin1 received a message:", hello) diff --git a/example/plugins/examplePlugin2.go b/example/plugins/examplePlugin2.go index ff1035c..7f3ee38 100644 --- a/example/plugins/examplePlugin2.go +++ b/example/plugins/examplePlugin2.go @@ -39,7 +39,7 @@ func (x *ExamplePlugin2) Subscribe() []string { } } -func (x *ExamplePlugin2) Process(e transistor.Event) error { +func (x *ExamplePlugin2) Process(e transistor.Event, workerID string) error { if e.Event() == "examplePlugin2:create" { hello := e.Payload.(Hello) log.Info("ExamplePlugin2 received a message:", hello) diff --git a/plugin.go b/plugin.go index 8066f78..dbab56f 100644 --- a/plugin.go +++ b/plugin.go @@ -13,7 +13,7 @@ type Plugin interface { Subscribe() []string // Process takes in an event message and tries to process it - Process(Event) error + Process(Event, string) error } type RunningPlugin struct { diff --git a/transistor.go b/transistor.go index 0b79ec5..de7b380 100644 --- a/transistor.go +++ b/transistor.go @@ -123,9 +123,8 @@ func (t *Transistor) addPlugin(name string) error { log.Fatal(fmt.Errorf("PayloadModel not found: %s. Did you add it to ApiRegistry?", event.PayloadModel)) } - //event.Dump() - - plugin.Process(event) + workerID := uuid.NewV4() + plugin.Process(event, workerID.String()) } wc := t.Config.Plugins[name].(map[string]interface{}) @@ -198,7 +197,8 @@ func (t *Transistor) flusher() { workers.EnqueueWithOptions(plugin.Name, "Event", e, options) } else { go func() { - plugin.Plugin.Process(e) + workerID := uuid.NewV4() + plugin.Plugin.Process(e, workerID.String()) }() } }