From 329306224b8ded0712ae76f4219738ffb6973dc0 Mon Sep 17 00:00:00 2001 From: JovihanniCasenas Date: Fri, 12 Dec 2025 12:18:27 +0800 Subject: [PATCH 1/3] feat: add tags filtering --- main.go | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 33c5565..f378d3e 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "github.com/dchest/uniuri" lssqs "github.com/flowerinthenight/longsub/awssqs" lspubsub "github.com/flowerinthenight/longsub/gcppubsub" + yaml "github.com/goccy/go-yaml" "github.com/google/uuid" "github.com/spf13/cobra" ) @@ -113,10 +114,42 @@ func combineFilesAndDir() []string { return final } +func filterScenariosByTags(files []string) []string { + if len(tags) == 0 { + return files + } + + var filtered []string + for _, f := range files { + yml, err := os.ReadFile(f) + if err != nil { + log.Printf("failed to read file %v: %v", f, err) + continue + } + + var s Scenario + err = yaml.Unmarshal(yml, &s) + if err != nil { + log.Printf("failed to unmarshal yaml %v: %v", f, err) + continue + } + + if isAllowed(&s) { + filtered = append(filtered, f) + } else { + log.Printf("%v filtered out by tags", f) + } + } + + return filtered +} + func distributePubsub(app *appctx) { id := uuid.NewString() final := combineFilesAndDir() - for _, f := range final { + filtered := filterScenariosByTags(final) + log.Printf("distributing %d/%d scenarios matching tags", len(filtered), len(final)) + for _, f := range filtered { nc := cmd{ Code: "process", ID: id, @@ -147,7 +180,9 @@ func distributeSQS(app *appctx) { id := uuid.NewString() final := combineFilesAndDir() - for _, f := range final { + filtered := filterScenariosByTags(final) + log.Printf("distributing %d/%d scenarios matching tags", len(filtered), len(final)) + for _, f := range filtered { nc := cmd{ Code: "process", ID: id, From e0169ec782282be47022f76bac8f3ae395a9f9fb Mon Sep 17 00:00:00 2001 From: JovihanniCasenas Date: Fri, 12 Dec 2025 12:19:38 +0800 Subject: [PATCH 2/3] feat: add oopstctl scenario example --- examples/04-go-based.yaml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 examples/04-go-based.yaml diff --git a/examples/04-go-based.yaml b/examples/04-go-based.yaml new file mode 100644 index 0000000..5099b7a --- /dev/null +++ b/examples/04-go-based.yaml @@ -0,0 +1,3 @@ +run: | + #!/bin/bash + oopstctl ripple --logtostderr --test="getUsers" From b7d2e4a7e42d58dda285e10efbddd8d73bce30e8 Mon Sep 17 00:00:00 2001 From: JovihanniCasenas Date: Fri, 12 Dec 2025 13:11:03 +0800 Subject: [PATCH 3/3] feat: support tags in pubsub payload --- main.go | 49 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index f378d3e..2dc85a9 100644 --- a/main.go +++ b/main.go @@ -65,6 +65,10 @@ type cmd struct { // The file to process. Sent together with the 'process' code. Scenario string `json:"scenario"` + + // Optional tags to filter scenarios. Format: ["key=value", "key2=value2"] + // When provided with 'start' code, only scenarios matching ALL tags will be distributed. + Tags []string `json:"tags,omitempty"` } func runE(cmd *cobra.Command, args []string) error { @@ -114,8 +118,8 @@ func combineFilesAndDir() []string { return final } -func filterScenariosByTags(files []string) []string { - if len(tags) == 0 { +func filterScenariosByTags(files []string, tagFilters []string) []string { + if len(tagFilters) == 0 { return files } @@ -134,7 +138,7 @@ func filterScenariosByTags(files []string) []string { continue } - if isAllowed(&s) { + if isAllowedWithTags(&s, tagFilters) { filtered = append(filtered, f) } else { log.Printf("%v filtered out by tags", f) @@ -144,11 +148,33 @@ func filterScenariosByTags(files []string) []string { return filtered } -func distributePubsub(app *appctx) { +func isAllowedWithTags(s *Scenario, tagFilters []string) bool { + if len(tagFilters) == 0 { + return true + } + + var matched int + for _, t := range tagFilters { + tt := strings.Split(t, "=") + if len(tt) != 2 { + continue + } + + for k, v := range s.Tags { + if k == tt[0] && v == tt[1] { + matched++ + } + } + } + + return matched == len(tagFilters) +} + +func distributePubsub(app *appctx, tagFilters []string) { id := uuid.NewString() final := combineFilesAndDir() - filtered := filterScenariosByTags(final) - log.Printf("distributing %d/%d scenarios matching tags", len(filtered), len(final)) + filtered := filterScenariosByTags(final, tagFilters) + log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters) for _, f := range filtered { nc := cmd{ Code: "process", @@ -164,7 +190,7 @@ func distributePubsub(app *appctx) { } } -func distributeSQS(app *appctx) { +func distributeSQS(app *appctx, tagFilters []string) { sess, _ := session.NewSession(&aws.Config{ Region: aws.String(region), Credentials: credentials.NewStaticCredentials(key, secret, ""), @@ -180,8 +206,8 @@ func distributeSQS(app *appctx) { id := uuid.NewString() final := combineFilesAndDir() - filtered := filterScenariosByTags(final) - log.Printf("distributing %d/%d scenarios matching tags", len(filtered), len(final)) + filtered := filterScenariosByTags(final, tagFilters) + log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters) for _, f := range filtered { nc := cmd{ Code: "process", @@ -227,13 +253,14 @@ func process(ctx any, data []byte) error { switch c.Code { case "start": + log.Printf("received start command with tags: %v", c.Tags) var dist string switch { case pubsub != "": - distributePubsub(app) + distributePubsub(app, c.Tags) dist = fmt.Sprintf("pubsub=%v", pubsub) case snssqs != "": - distributeSQS(app) + distributeSQS(app, c.Tags) dist = snssqs dist = fmt.Sprintf("sns/sqs=%v", snssqs) }