Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/plugins/examplePlugin1.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (x *ExamplePlugin1) Subscribe() []string {
}
}

func (x *ExamplePlugin1) Process(e transistor.Event) error {
func (x *ExamplePlugin1) Process(e transistor.Event, workerID string) error {
if e.Event() == "examplePlugin1:create" {
hello := e.Payload.(Hello)
log.Info("ExamplePlugin1 received a message:", hello)
Expand Down
2 changes: 1 addition & 1 deletion example/plugins/examplePlugin2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (x *ExamplePlugin2) Subscribe() []string {
}
}

func (x *ExamplePlugin2) Process(e transistor.Event) error {
func (x *ExamplePlugin2) Process(e transistor.Event, workerID string) error {
if e.Event() == "examplePlugin2:create" {
hello := e.Payload.(Hello)
log.Info("ExamplePlugin2 received a message:", hello)
Expand Down
2 changes: 1 addition & 1 deletion plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Plugin interface {
Subscribe() []string

// Process takes in an event message and tries to process it
Process(Event) error
Process(Event, string) error
}

type RunningPlugin struct {
Expand Down
8 changes: 4 additions & 4 deletions transistor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ func (t *Transistor) addPlugin(name string) error {
log.Fatal(fmt.Errorf("PayloadModel not found: %s. Did you add it to ApiRegistry?", event.PayloadModel))
}

//event.Dump()

plugin.Process(event)
workerID := uuid.NewV4()
plugin.Process(event, workerID.String())
}

wc := t.Config.Plugins[name].(map[string]interface{})
Expand Down Expand Up @@ -198,7 +197,8 @@ func (t *Transistor) flusher() {
workers.EnqueueWithOptions(plugin.Name, "Event", e, options)
} else {
go func() {
plugin.Plugin.Process(e)
workerID := uuid.NewV4()
plugin.Plugin.Process(e, workerID.String())
}()
}
}
Expand Down