Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/04-go-based.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
run: |
#!/bin/bash
oopstctl ripple --logtostderr --test="getUsers"
74 changes: 68 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/dchest/uniuri"
lssqs "github.com/flowerinthenight/longsub/awssqs"
lspubsub "github.com/flowerinthenight/longsub/gcppubsub"
yaml "github.com/goccy/go-yaml"
"github.com/google/uuid"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -64,6 +65,10 @@ type cmd struct {

// The file to process. Sent together with the 'process' code.
Scenario string `json:"scenario"`

// Optional tags to filter scenarios. Format: ["key=value", "key2=value2"]
// When provided with 'start' code, only scenarios matching ALL tags will be distributed.
Tags []string `json:"tags,omitempty"`
}

func runE(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -113,10 +118,64 @@ func combineFilesAndDir() []string {
return final
}

func distributePubsub(app *appctx) {
func filterScenariosByTags(files []string, tagFilters []string) []string {
if len(tagFilters) == 0 {
return files
}

var filtered []string
for _, f := range files {
yml, err := os.ReadFile(f)
if err != nil {
log.Printf("failed to read file %v: %v", f, err)
continue
}

var s Scenario
err = yaml.Unmarshal(yml, &s)
if err != nil {
log.Printf("failed to unmarshal yaml %v: %v", f, err)
continue
}

if isAllowedWithTags(&s, tagFilters) {
filtered = append(filtered, f)
} else {
log.Printf("%v filtered out by tags", f)
}
}

return filtered
}

func isAllowedWithTags(s *Scenario, tagFilters []string) bool {
if len(tagFilters) == 0 {
return true
}

var matched int
for _, t := range tagFilters {
tt := strings.Split(t, "=")
if len(tt) != 2 {
continue
}

for k, v := range s.Tags {
if k == tt[0] && v == tt[1] {
matched++
}
}
}

return matched == len(tagFilters)
}

func distributePubsub(app *appctx, tagFilters []string) {
id := uuid.NewString()
final := combineFilesAndDir()
for _, f := range final {
filtered := filterScenariosByTags(final, tagFilters)
log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters)
for _, f := range filtered {
nc := cmd{
Code: "process",
ID: id,
Expand All @@ -131,7 +190,7 @@ func distributePubsub(app *appctx) {
}
}

func distributeSQS(app *appctx) {
func distributeSQS(app *appctx, tagFilters []string) {
sess, _ := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(key, secret, ""),
Expand All @@ -147,7 +206,9 @@ func distributeSQS(app *appctx) {

id := uuid.NewString()
final := combineFilesAndDir()
for _, f := range final {
filtered := filterScenariosByTags(final, tagFilters)
log.Printf("distributing %d/%d scenarios matching tags %v", len(filtered), len(final), tagFilters)
for _, f := range filtered {
nc := cmd{
Code: "process",
ID: id,
Expand Down Expand Up @@ -192,13 +253,14 @@ func process(ctx any, data []byte) error {

switch c.Code {
case "start":
log.Printf("received start command with tags: %v", c.Tags)
var dist string
switch {
case pubsub != "":
distributePubsub(app)
distributePubsub(app, c.Tags)
dist = fmt.Sprintf("pubsub=%v", pubsub)
case snssqs != "":
distributeSQS(app)
distributeSQS(app, c.Tags)
dist = snssqs
dist = fmt.Sprintf("sns/sqs=%v", snssqs)
}
Expand Down