diff --git a/server/pkg/discovery.go b/server/pkg/discovery.go index 8b9a1af..81a1762 100644 --- a/server/pkg/discovery.go +++ b/server/pkg/discovery.go @@ -101,11 +101,12 @@ func processSPYTDirectSubmitOperation(op ytsdk.OperationStatus) ([]Task, error) } return []Task{ { - operationID: op.ID.String(), - taskName: "driver", - service: "ui", - jobs: []HostPort{*hostPort}, - protocol: HTTP, + operationID: op.ID.String(), + operationAlias: parseOperationAlias(op), + taskName: "driver", + service: "ui", + jobs: []HostPort{*hostPort}, + protocol: HTTP, }, }, nil } @@ -178,11 +179,12 @@ func (d *taskDiscovery) processSPYTStandaloneClusterOperation(ctx context.Contex } tasks = append(tasks, Task{ - operationID: op.ID.String(), - taskName: t.taskName, - service: t.service, - jobs: jobs, - protocol: HTTP, + operationID: op.ID.String(), + operationAlias: parseOperationAlias(op), + taskName: t.taskName, + service: t.service, + jobs: jobs, + protocol: HTTP, }) } return tasks, nil @@ -231,17 +233,18 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, } if serviceInfo == nil { serviceInfo = &taskServiceInfo{ - service: fmt.Sprintf("port_%d", i), + service: fmt.Sprintf("port%d", i), protocol: HTTP, } } hostParts := strings.Split(job.Address, ":") // job address contains port also taskProto := Task{ - operationID: op.ID.String(), - taskName: job.TaskName, - service: serviceInfo.service, - protocol: serviceInfo.protocol, + operationID: op.ID.String(), + operationAlias: parseOperationAlias(op), + taskName: job.TaskName, + service: serviceInfo.service, + protocol: serviceInfo.protocol, } if _, ok := idToTask[taskProto.ID()]; !ok { idToTask[taskProto.ID()] = &taskProto @@ -257,7 +260,11 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context, var tasks []Task for _, task := range idToTask { - tasks = append(tasks, *task) + if err := task.Validate(); err != nil { + d.logger.Warnf("invalid task %v: %v", task, err) + } else { + tasks = append(tasks, *task) + } } return tasks, nil } @@ -283,7 +290,7 @@ func (d *taskDiscovery) save(ctx context.Context, hashToTask map[string]Task) er TaskName: task.taskName, Service: task.service, Protocol: string(task.protocol), - Domain: getTaskDomain(hash, d.baseDomain), + Domain: getTaskHashDomain(hash, d.baseDomain), }) if err != nil { return err @@ -446,3 +453,15 @@ func parseOperationTitle(op ytsdk.OperationStatus) string { } return title } + +func parseOperationAlias(op ytsdk.OperationStatus) string { + aliasAny, ok := op.BriefSpec["alias"] + if !ok { + return "" + } + alias, ok := aliasAny.(string) + if !ok { + return "" + } + return alias[1:] // YT operation alias must start with '*', but we skip it to use alias in domains +} diff --git a/server/pkg/task.go b/server/pkg/task.go index b3f8860..7e36381 100644 --- a/server/pkg/task.go +++ b/server/pkg/task.go @@ -3,6 +3,7 @@ package pkg import ( "crypto/sha256" "fmt" + "regexp" "strings" ) @@ -19,13 +20,16 @@ type HostPort struct { } type Task struct { - operationID string - taskName string - service string - protocol Protocol - jobs []HostPort + operationID string + operationAlias string + taskName string + service string + protocol Protocol + jobs []HostPort } +var valueRegexp = regexp.MustCompile(`^[a-z0-9]+$`) + // Identifies task, for sorting and domain hash func (t *Task) ID() string { return t.operationID + t.taskName + t.service @@ -42,6 +46,35 @@ func (t *Task) IDWithHostPort() string { return sb.String() } +func (t *Task) Validate() error { + if t.operationAlias == "" { + return nil + } + // to avoid collisions in alias domains, we should check some fields on regexp + for _, f := range []struct { + value string + name string + }{ + { + value: t.operationAlias, + name: "operationAlias", + }, + { + value: t.taskName, + name: "taskName", + }, + { + value: t.service, + name: "service", + }, + } { + if !valueRegexp.MatchString(f.value) { + return fmt.Errorf("field %q value %q does not match regexp %q", f.name, f.value, valueRegexp.String()) + } + } + return nil +} + type TaskRow struct { OperationID string `yson:"operation_id"` TaskName string `yson:"task_name"` @@ -50,8 +83,12 @@ type TaskRow struct { Domain string `yson:"domain"` } -func getTaskDomain(taskHash, baseDomain string) string { - return taskHash + "." + baseDomain +func getTaskHashDomain(taskHash, baseDomain string) string { + return fmt.Sprintf("%s.%s", taskHash, baseDomain) +} + +func getTaskAliasDomain(task Task, baseDomain string) string { + return fmt.Sprintf("%s-%s-%s.%s", task.operationAlias, task.taskName, task.service, baseDomain) } func Hash(source []byte) string { diff --git a/server/pkg/task_test.go b/server/pkg/task_test.go new file mode 100644 index 0000000..6d290f6 --- /dev/null +++ b/server/pkg/task_test.go @@ -0,0 +1,68 @@ +package pkg + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateTask(t *testing.T) { + for _, tt := range []struct { + name string + task Task + err error + }{ + { + name: "valid", + task: Task{ + operationID: "123", + operationAlias: "alias", + taskName: "task", + service: "service", + }, + }, + { + name: "invalid alias", + task: Task{ + operationID: "123", + operationAlias: "ali-as", + taskName: "task", + service: "service", + }, + err: errors.New("field \"operationAlias\" value \"ali-as\" does not match regexp \"^[a-z0-9]+$\""), + }, + { + name: "invalid task name", + task: Task{ + operationID: "123", + operationAlias: "alias", + taskName: "Task", + service: "service", + }, + err: errors.New("field \"taskName\" value \"Task\" does not match regexp \"^[a-z0-9]+$\""), + }, + { + name: "invalid service", + task: Task{ + operationID: "123", + operationAlias: "alias", + taskName: "task", + service: "$ervice", + }, + err: errors.New("field \"service\" value \"$ervice\" does not match regexp \"^[a-z0-9]+$\""), + }, + { + name: "do not check if no alias", + task: Task{ + operationID: "123-456", + taskName: "Task", + service: "$ervice", + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.err, tt.task.Validate()) + }) + } +} diff --git a/server/pkg/xds.go b/server/pkg/xds.go index 24d8740..e367bcf 100644 --- a/server/pkg/xds.go +++ b/server/pkg/xds.go @@ -86,10 +86,14 @@ func makeSnapshot(hashToTask map[string]Task, version string, baseDomain string, }, }, } - // route either by domain + domains := []string{getTaskHashDomain(hash, baseDomain)} + if task.operationAlias != "" { + domains = append(domains, getTaskAliasDomain(task, baseDomain)) + } + // route either by domain(-s) vhosts = append(vhosts, &routev3.VirtualHost{ Name: vhostName, - Domains: []string{getTaskDomain(hash, baseDomain)}, + Domains: domains, Routes: []*routev3.Route{{ Match: &routev3.RouteMatch{PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"}}, Action: action,