Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/code.cloudfoundry.org/gorouter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
LOAD_BALANCE_RR string = "round-robin"
LOAD_BALANCE_LC string = "least-connection"
LOAD_BALANCE_HB string = "hash"
AZ_PREF_NONE string = "none"
AZ_PREF_LOCAL string = "locally-optimistic"
SHARD_ALL string = "all"
Expand All @@ -38,7 +39,8 @@ const (
)

var (
LoadBalancingStrategies = []string{LOAD_BALANCE_RR, LOAD_BALANCE_LC}
GlobalLoadBalancingStrategies = []string{LOAD_BALANCE_RR, LOAD_BALANCE_LC}
LoadBalancingStrategies = []string{LOAD_BALANCE_RR, LOAD_BALANCE_LC, LOAD_BALANCE_HB}
AZPreferences = []string{AZ_PREF_NONE, AZ_PREF_LOCAL}
AllowedShardingModes = []string{SHARD_ALL, SHARD_SEGMENTS, SHARD_SHARED_AND_SEGMENTS}
AllowedForwardedClientCertModes = []string{ALWAYS_FORWARD, FORWARD, SANITIZE_SET}
Expand Down Expand Up @@ -595,6 +597,10 @@ func DefaultConfig() (*Config, error) {
return &c, nil
}

func IsGlobalLoadBalancingAlgorithmValid(lbAlgo string) bool {
return slices.Contains(GlobalLoadBalancingStrategies, lbAlgo)
}

func IsLoadBalancingAlgorithmValid(lbAlgo string) bool {
return slices.Contains(LoadBalancingStrategies, lbAlgo)
}
Expand Down Expand Up @@ -755,8 +761,8 @@ func (c *Config) Process() error {
c.RouteServiceEnabled = true
}

if !IsLoadBalancingAlgorithmValid(c.LoadBalance) {
return fmt.Errorf("Invalid load balancing algorithm %s. Allowed values are %s", c.LoadBalance, LoadBalancingStrategies)
if !IsGlobalLoadBalancingAlgorithmValid(c.LoadBalance) {
return fmt.Errorf("Invalid global load balancing algorithm %s. Allowed values are %s", c.LoadBalance, GlobalLoadBalancingStrategies)
}

validAZPref := false
Expand Down
36 changes: 32 additions & 4 deletions src/code.cloudfoundry.org/gorouter/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,31 @@ zone: meow-zone
})
})

Context("Global Load Balancing Algorithm Validator", func() {
It("Returns false if the value is not in the list of configured global load balancing strategies", func() {
Expect(IsGlobalLoadBalancingAlgorithmValid("wrong.algo")).To(BeFalse())
})

It("Returns true if the value is in the list of configured global load balancing strategies", func() {
Expect(IsGlobalLoadBalancingAlgorithmValid(LOAD_BALANCE_RR)).To(BeTrue())
})

It("Returns false for hash load balancing algorithm as global setting", func() {
Expect(IsGlobalLoadBalancingAlgorithmValid(LOAD_BALANCE_HB)).To(BeFalse())
})
})

Context("Load Balancing Algorithm Validator", func() {
It("Returns false if the value is not in the list of configured load balancing strategies", func() {
Expect(IsLoadBalancingAlgorithmValid("wrong.algo")).To(Equal(false))
Expect(IsLoadBalancingAlgorithmValid("wrong.algo")).To(BeFalse())
})

It("Returns true if the value is in the list of configured load balancing strategies", func() {
Expect(IsLoadBalancingAlgorithmValid(LOAD_BALANCE_RR)).To(Equal(true))
Expect(IsLoadBalancingAlgorithmValid(LOAD_BALANCE_RR)).To(BeTrue())
})

It("Returns true for hash load balancing algorithm", func() {
Expect(IsLoadBalancingAlgorithmValid(LOAD_BALANCE_HB)).To(BeTrue())
})
})

Expand All @@ -73,12 +91,22 @@ balancing_algorithm: least-connection
Expect(cfg.LoadBalance).To(Equal(LOAD_BALANCE_LC))
})

It("can NOT override the load balance strategy to hash (not available as global default)", func() {
cfg, err := DefaultConfig()
Expect(err).ToNot(HaveOccurred())
var b = []byte(`
balancing_algorithm: hash
`)
cfg.Initialize(b)
Expect(cfg.Process()).To(MatchError("Invalid global load balancing algorithm hash. Allowed values are [round-robin least-connection]"))
})

It("does not allow an invalid load balance strategy", func() {
cfg, err := DefaultConfig()
Expect(err).ToNot(HaveOccurred())
cfgForSnippet.LoadBalance = "foo-bar"
cfg.Initialize(createYMLSnippet(cfgForSnippet))
Expect(cfg.Process()).To(MatchError("Invalid load balancing algorithm foo-bar. Allowed values are [round-robin least-connection]"))
Expect(cfg.Process()).To(MatchError("Invalid global load balancing algorithm foo-bar. Allowed values are [round-robin least-connection]"))
})
})

Expand Down Expand Up @@ -1805,7 +1833,7 @@ load_balancer_healthy_threshold: 10s
})
It("setting hop_by_hop_headers_to_filter succeeds", func() {
err := config.Initialize(createYMLSnippet(cfgForSnippet))
Expect(err).NotTo(HaveOccurred())
Expect(err).ToNot(HaveOccurred())
Expect(config.Process()).To(Succeed())
Expect(config.HopByHopHeadersToFilter).To(Equal([]string{"X-ME", "X-Foo"}))
})
Expand Down
6 changes: 5 additions & 1 deletion src/code.cloudfoundry.org/gorouter/mbus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type RegistryMessage struct {
}

type RegistryMessageOpts struct {
LoadBalancingAlgorithm string `json:"loadbalancing"`
LoadBalancingAlgorithm string `json:"loadbalancing"`
HashHeaderName string `json:"hash_header"`
HashBalance float64 `json:"hash_balance"`
}

func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, error) {
Expand Down Expand Up @@ -76,6 +78,8 @@ func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, err
UseTLS: useTLS,
UpdatedAt: updatedAt,
LoadBalancingAlgorithm: rm.Options.LoadBalancingAlgorithm,
HashHeaderName: rm.Options.HashHeaderName,
HashBalanceFactor: rm.Options.HashBalance,
}), nil
}

Expand Down
208 changes: 158 additions & 50 deletions src/code.cloudfoundry.org/gorouter/mbus/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,82 @@ var _ = Describe("Subscriber", func() {
Expect(originalEndpoint).To(Equal(expectedEndpoint))
})

Context("when HTTP/2 is disabled and the protocol is http2", func() {
BeforeEach(func() {
cfg.EnableHTTP2 = false
})
It("constructs the endpoint with protocol 'http1'", func() {
msg := mbus.RegistryMessage{
Host: "host",
App: "app",
Protocol: "http2",
Uris: []route.Uri{"test.example.com"},
}

data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())

err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Protocol: "http1",
})

Expect(originalEndpoint).To(Equal(expectedEndpoint))
})
})
})

Context("when the message contains a tls port for route", func() {
BeforeEach(func() {
sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, logger.Logger)
process = ifrit.Invoke(sub)
Eventually(process.Ready()).Should(BeClosed())
})
It("endpoint is constructed with tls port instead of http", func() {
msg := mbus.RegistryMessage{
Host: "host",
App: "app",
TLSPort: 1999,
ServerCertDomainSAN: "san",
PrivateInstanceID: "id",
PrivateInstanceIndex: "index",
StaleThresholdInSeconds: 120,
Uris: []route.Uri{"test.example.com"},
Tags: map[string]string{"key": "value"},
}

data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())

err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Port: 1999,
Protocol: "http1",
UseTLS: true,
ServerCertDomainSAN: "san",
PrivateInstanceId: "id",
PrivateInstanceIndex: "index",
StaleThresholdInSeconds: 120,
Tags: map[string]string{"key": "value"},
})

Expect(originalEndpoint).To(Equal(expectedEndpoint))
})
})

Context("when the message contains per-route options", func() {
Context("when the message contains load balancing algorithm option", func() {
JustBeforeEach(func() {
sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, logger.Logger)
Expand All @@ -498,7 +574,7 @@ var _ = Describe("Subscriber", func() {
err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(2))
Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
Expand Down Expand Up @@ -531,7 +607,7 @@ var _ = Describe("Subscriber", func() {
err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(2))
Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
Expand All @@ -544,18 +620,26 @@ var _ = Describe("Subscriber", func() {

})

Context("when HTTP/2 is disabled and the protocol is http2", func() {
BeforeEach(func() {
cfg.EnableHTTP2 = false
Context("when the message contains hash-based load balancing options", func() {
JustBeforeEach(func() {
sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, logger.Logger)
process = ifrit.Invoke(sub)
Eventually(process.Ready()).Should(BeClosed())
})
It("constructs the endpoint with protocol 'http1'", func() {
msg := mbus.RegistryMessage{
It("constructs an endpoint with the correct load balancing algorithm (HashBalance is set)", func() {
var expectedLBAlgo = "hash"

var msg = mbus.RegistryMessage{
Host: "host",
App: "app",
Protocol: "http2",
Uris: []route.Uri{"test.example.com"},
Options: mbus.RegistryMessageOpts{
LoadBalancingAlgorithm: expectedLBAlgo,
HashHeaderName: "X-Header",
HashBalance: 1.5,
},
}

data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -565,57 +649,82 @@ var _ = Describe("Subscriber", func() {
Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Protocol: "http1",
Host: "host",
AppId: "app",
Protocol: "http2",
LoadBalancingAlgorithm: expectedLBAlgo,
HashHeaderName: "X-Header",
HashBalanceFactor: 1.5,
})

Expect(originalEndpoint).To(Equal(expectedEndpoint))
})
})
})

Context("when the message contains a tls port for route", func() {
BeforeEach(func() {
sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, logger.Logger)
process = ifrit.Invoke(sub)
Eventually(process.Ready()).Should(BeClosed())
})
It("endpoint is constructed with tls port instead of http", func() {
msg := mbus.RegistryMessage{
Host: "host",
App: "app",
TLSPort: 1999,
ServerCertDomainSAN: "san",
PrivateInstanceID: "id",
PrivateInstanceIndex: "index",
StaleThresholdInSeconds: 120,
Uris: []route.Uri{"test.example.com"},
Tags: map[string]string{"key": "value"},
}
It("constructs an endpoint with the correct load balancing algorithm (HashBalance is not set)", func() {
var expectedLBAlgo = "hash"

data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
var msg = mbus.RegistryMessage{
Host: "host",
App: "app",
Protocol: "http2",
Uris: []route.Uri{"test.example.com"},
Options: mbus.RegistryMessageOpts{
LoadBalancingAlgorithm: expectedLBAlgo,
HashHeaderName: "X-Header",
},
}
data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())

err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())
err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Port: 1999,
Protocol: "http1",
UseTLS: true,
ServerCertDomainSAN: "san",
PrivateInstanceId: "id",
PrivateInstanceIndex: "index",
StaleThresholdInSeconds: 120,
Tags: map[string]string{"key": "value"},
Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Protocol: "http2",
LoadBalancingAlgorithm: expectedLBAlgo,
HashHeaderName: "X-Header",
HashBalanceFactor: 0.0,
})
Expect(expectedEndpoint.HashHeaderName).To(Equal("X-Header"))
Expect(expectedEndpoint.HashBalanceFactor).To(Equal(0.0))
Expect(originalEndpoint).To(Equal(expectedEndpoint))
})

Expect(originalEndpoint).To(Equal(expectedEndpoint))
It("ignores HashRoutingProperties if load balancing algorithm is not 'hash'", func() {
var expectedLBAlgo = "round-robin"

var msg = mbus.RegistryMessage{
Host: "host",
App: "app",
Protocol: "http2",
Uris: []route.Uri{"test.example.com"},
Options: mbus.RegistryMessageOpts{
LoadBalancingAlgorithm: expectedLBAlgo,
HashHeaderName: "X-Header",
},
}
data, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())

err = natsClient.Publish("router.register", data)
Expect(err).ToNot(HaveOccurred())

Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Protocol: "http2",
LoadBalancingAlgorithm: expectedLBAlgo,
})
Expect(expectedEndpoint.HashHeaderName).To(BeEmpty())
Expect(expectedEndpoint.HashBalanceFactor).To(Equal(0.0))
Expect(originalEndpoint).To(Equal(expectedEndpoint))
})
})
})

Expand Down Expand Up @@ -839,5 +948,4 @@ var _ = Describe("Subscriber", func() {
}
})
})

})
Loading