diff --git a/examples/04-go-based.yaml b/examples/04-go-based.yaml new file mode 100644 index 0000000..5099b7a --- /dev/null +++ b/examples/04-go-based.yaml @@ -0,0 +1,3 @@ +run: | + #!/bin/bash + oopstctl ripple --logtostderr --test="getUsers" diff --git a/main.go b/main.go index 33c5565..2dc85a9 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "github.com/dchest/uniuri" lssqs "github.com/flowerinthenight/longsub/awssqs" lspubsub "github.com/flowerinthenight/longsub/gcppubsub" + yaml "github.com/goccy/go-yaml" "github.com/google/uuid" "github.com/spf13/cobra" ) @@ -64,6 +65,10 @@ type cmd struct { // The file to process. Sent together with the 'process' code. Scenario string `json:"scenario"` + + // Optional tags to filter scenarios. Format: ["key=value", "key2=value2"] + // When provided with 'start' code, only scenarios matching ALL tags will be distributed. + Tags []string `json:"tags,omitempty"` } func runE(cmd *cobra.Command, args []string) error { @@ -113,10 +118,64 @@ func combineFilesAndDir() []string { return final } -func distributePubsub(app *appctx) { +func filterScenariosByTags(files []string, tagFilters []string) []string { + if len(tagFilters) == 0 { + return files + } + + var filtered []string + for _, f := range files { + yml, err := os.ReadFile(f) + if err != nil { + log.Printf("failed to read file %v: %v", f, err) + continue + } + + var s Scenario + err = yaml.Unmarshal(yml, &s) + if err != nil { + log.Printf("failed to unmarshal yaml %v: %v", f, err) + continue + } + + if isAllowedWithTags(&s, tagFilters) { + filtered = append(filtered, f) + } else { + log.Printf("%v filtered out by tags", f) + } + } + + return filtered +} + +func isAllowedWithTags(s *Scenario, tagFilters []string) bool { + if len(tagFilters) == 0 { + return true + } + + var matched int + for _, t := range tagFilters { + tt := strings.Split(t, "=") + if len(tt) != 2 { + continue + } + + for k, v := range s.Tags { + if k == tt[0] && v == tt[1] { + matched++ + } + } + } + + return matched == len(tagFilters) +} + +func distributePubsub(app *appctx, tagFilters []string) { id := uuid.NewString() final := combineFilesAndDir() - for _, f := range final { + filtered := filterScenariosByTags(final, tagFilters) + log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters) + for _, f := range filtered { nc := cmd{ Code: "process", ID: id, @@ -131,7 +190,7 @@ func distributePubsub(app *appctx) { } } -func distributeSQS(app *appctx) { +func distributeSQS(app *appctx, tagFilters []string) { sess, _ := session.NewSession(&aws.Config{ Region: aws.String(region), Credentials: credentials.NewStaticCredentials(key, secret, ""), @@ -147,7 +206,9 @@ func distributeSQS(app *appctx) { id := uuid.NewString() final := combineFilesAndDir() - for _, f := range final { + filtered := filterScenariosByTags(final, tagFilters) + log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters) + for _, f := range filtered { nc := cmd{ Code: "process", ID: id, @@ -192,13 +253,14 @@ func process(ctx any, data []byte) error { switch c.Code { case "start": + log.Printf("received start command with tags: %v", c.Tags) var dist string switch { case pubsub != "": - distributePubsub(app) + distributePubsub(app, c.Tags) dist = fmt.Sprintf("pubsub=%v", pubsub) case snssqs != "": - distributeSQS(app) + distributeSQS(app, c.Tags) dist = snssqs dist = fmt.Sprintf("sns/sqs=%v", snssqs) }