Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions server/pkg/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ func processSPYTDirectSubmitOperation(op ytsdk.OperationStatus) ([]Task, error)
}
return []Task{
{
operationID: op.ID.String(),
taskName: "driver",
service: "ui",
jobs: []HostPort{*hostPort},
protocol: HTTP,
operationID: op.ID.String(),
operationAlias: parseOperationAlias(op),
taskName: "driver",
service: "ui",
jobs: []HostPort{*hostPort},
protocol: HTTP,
},
}, nil
}
Expand Down Expand Up @@ -178,11 +179,12 @@ func (d *taskDiscovery) processSPYTStandaloneClusterOperation(ctx context.Contex
}

tasks = append(tasks, Task{
operationID: op.ID.String(),
taskName: t.taskName,
service: t.service,
jobs: jobs,
protocol: HTTP,
operationID: op.ID.String(),
operationAlias: parseOperationAlias(op),
taskName: t.taskName,
service: t.service,
jobs: jobs,
protocol: HTTP,
})
}
return tasks, nil
Expand Down Expand Up @@ -231,17 +233,18 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context,
}
if serviceInfo == nil {
serviceInfo = &taskServiceInfo{
service: fmt.Sprintf("port_%d", i),
service: fmt.Sprintf("port%d", i),
protocol: HTTP,
}
}
hostParts := strings.Split(job.Address, ":") // job address contains port also

taskProto := Task{
operationID: op.ID.String(),
taskName: job.TaskName,
service: serviceInfo.service,
protocol: serviceInfo.protocol,
operationID: op.ID.String(),
operationAlias: parseOperationAlias(op),
taskName: job.TaskName,
service: serviceInfo.service,
protocol: serviceInfo.protocol,
}
if _, ok := idToTask[taskProto.ID()]; !ok {
idToTask[taskProto.ID()] = &taskProto
Expand All @@ -257,7 +260,11 @@ func (d *taskDiscovery) processTaskProxyAnnotatedOperation(ctx context.Context,

var tasks []Task
for _, task := range idToTask {
tasks = append(tasks, *task)
if err := task.Validate(); err != nil {
d.logger.Warnf("invalid task %v: %v", task, err)
} else {
tasks = append(tasks, *task)
}
}
return tasks, nil
}
Expand All @@ -283,7 +290,7 @@ func (d *taskDiscovery) save(ctx context.Context, hashToTask map[string]Task) er
TaskName: task.taskName,
Service: task.service,
Protocol: string(task.protocol),
Domain: getTaskDomain(hash, d.baseDomain),
Domain: getTaskHashDomain(hash, d.baseDomain),
})
if err != nil {
return err
Expand Down Expand Up @@ -446,3 +453,15 @@ func parseOperationTitle(op ytsdk.OperationStatus) string {
}
return title
}

func parseOperationAlias(op ytsdk.OperationStatus) string {
aliasAny, ok := op.BriefSpec["alias"]
if !ok {
return ""
}
alias, ok := aliasAny.(string)
if !ok {
return ""
}
return alias[1:] // YT operation alias must start with '*', but we skip it to use alias in domains
}
51 changes: 44 additions & 7 deletions server/pkg/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pkg
import (
"crypto/sha256"
"fmt"
"regexp"
"strings"
)

Expand All @@ -19,13 +20,16 @@ type HostPort struct {
}

type Task struct {
operationID string
taskName string
service string
protocol Protocol
jobs []HostPort
operationID string
operationAlias string
taskName string
service string
protocol Protocol
jobs []HostPort
}

var valueRegexp = regexp.MustCompile(`^[a-z0-9]+$`)

// Identifies task, for sorting and domain hash
func (t *Task) ID() string {
return t.operationID + t.taskName + t.service
Expand All @@ -42,6 +46,35 @@ func (t *Task) IDWithHostPort() string {
return sb.String()
}

func (t *Task) Validate() error {
if t.operationAlias == "" {
return nil
}
// to avoid collisions in alias domains, we should check some fields on regexp
for _, f := range []struct {
value string
name string
}{
{
value: t.operationAlias,
name: "operationAlias",
},
{
value: t.taskName,
name: "taskName",
},
{
value: t.service,
name: "service",
},
} {
if !valueRegexp.MatchString(f.value) {
return fmt.Errorf("field %q value %q does not match regexp %q", f.name, f.value, valueRegexp.String())
}
}
return nil
}

type TaskRow struct {
OperationID string `yson:"operation_id"`
TaskName string `yson:"task_name"`
Expand All @@ -50,8 +83,12 @@ type TaskRow struct {
Domain string `yson:"domain"`
}

func getTaskDomain(taskHash, baseDomain string) string {
return taskHash + "." + baseDomain
func getTaskHashDomain(taskHash, baseDomain string) string {
return fmt.Sprintf("%s.%s", taskHash, baseDomain)
}

func getTaskAliasDomain(task Task, baseDomain string) string {
return fmt.Sprintf("%s-%s-%s.%s", task.operationAlias, task.taskName, task.service, baseDomain)
}

func Hash(source []byte) string {
Expand Down
68 changes: 68 additions & 0 deletions server/pkg/task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pkg

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidateTask(t *testing.T) {
for _, tt := range []struct {
name string
task Task
err error
}{
{
name: "valid",
task: Task{
operationID: "123",
operationAlias: "alias",
taskName: "task",
service: "service",
},
},
{
name: "invalid alias",
task: Task{
operationID: "123",
operationAlias: "ali-as",
taskName: "task",
service: "service",
},
err: errors.New("field \"operationAlias\" value \"ali-as\" does not match regexp \"^[a-z0-9]+$\""),
},
{
name: "invalid task name",
task: Task{
operationID: "123",
operationAlias: "alias",
taskName: "Task",
service: "service",
},
err: errors.New("field \"taskName\" value \"Task\" does not match regexp \"^[a-z0-9]+$\""),
},
{
name: "invalid service",
task: Task{
operationID: "123",
operationAlias: "alias",
taskName: "task",
service: "$ervice",
},
err: errors.New("field \"service\" value \"$ervice\" does not match regexp \"^[a-z0-9]+$\""),
},
{
name: "do not check if no alias",
task: Task{
operationID: "123-456",
taskName: "Task",
service: "$ervice",
},
},
} {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.err, tt.task.Validate())
})
}
}
8 changes: 6 additions & 2 deletions server/pkg/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ func makeSnapshot(hashToTask map[string]Task, version string, baseDomain string,
},
},
}
// route either by domain
domains := []string{getTaskHashDomain(hash, baseDomain)}
if task.operationAlias != "" {
domains = append(domains, getTaskAliasDomain(task, baseDomain))
}
// route either by domain(-s)
vhosts = append(vhosts, &routev3.VirtualHost{
Name: vhostName,
Domains: []string{getTaskDomain(hash, baseDomain)},
Domains: domains,
Routes: []*routev3.Route{{
Match: &routev3.RouteMatch{PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"}},
Action: action,
Expand Down
Loading