diff --git a/docs/03-how-to-add-new-route-option.md b/docs/03-how-to-add-new-route-option.md index 39ce25447..308213fcc 100644 --- a/docs/03-how-to-add-new-route-option.md +++ b/docs/03-how-to-add-new-route-option.md @@ -22,6 +22,11 @@ applications: - route: example2.com options: loadbalancing: least-connection + - route: example3.com + options: + loadbalancing: hash + hash_header: tenant-id + hash_balance: 1.25 ``` **NOTE**: In the implementation, the `options` property of a route represents per-route features. diff --git a/src/code.cloudfoundry.org/gorouter/handlers/helpers.go b/src/code.cloudfoundry.org/gorouter/handlers/helpers.go index f1f048819..d2716613f 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/helpers.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/helpers.go @@ -63,13 +63,19 @@ func upgradeHeader(request *http.Request) string { return "" } -func EndpointIteratorForRequest(logger *slog.Logger, request *http.Request, stickySessionCookieNames config.StringSet, authNegotiateSticky bool, azPreference string, az string) (route.EndpointIterator, error) { +func EndpointIteratorForRequest(logger *slog.Logger, request *http.Request, stickySessionCookieNames config.StringSet, authNegotiateSticky bool, locallyOptimistic bool, az string, globalLB string) (route.EndpointIterator, error) { reqInfo, err := ContextRequestInfo(request) if err != nil { return nil, fmt.Errorf("could not find reqInfo in context") } stickyEndpointID, mustBeSticky := GetStickySession(request, stickySessionCookieNames, authNegotiateSticky) - return reqInfo.RoutePool.Endpoints(logger, stickyEndpointID, mustBeSticky, azPreference, az), nil + routingProperties := route.RoutingProperties{ + RequestHeaders: &request.Header, + LocallyOptimistic: locallyOptimistic, + GlobalLB: globalLB, + AZ: az, + } + return reqInfo.RoutePool.Endpoints(logger, stickyEndpointID, mustBeSticky, routingProperties), nil } func GetStickySession(request *http.Request, stickySessionCookieNames config.StringSet, authNegotiateSticky bool) (string, bool) { diff --git a/src/code.cloudfoundry.org/gorouter/handlers/max_request_size.go b/src/code.cloudfoundry.org/gorouter/handlers/max_request_size.go index d164b5e80..e88654c74 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/max_request_size.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/max_request_size.go @@ -69,7 +69,8 @@ func (m *MaxRequestSize) ServeHTTP(rw http.ResponseWriter, r *http.Request, next if err != nil { logger.Error("request-info-err", log.ErrAttr(err)) } else { - endpointIterator, err := EndpointIteratorForRequest(logger, r, m.cfg.StickySessionCookieNames, m.cfg.StickySessionsForAuthNegotiate, m.cfg.LoadBalanceAZPreference, m.cfg.Zone) + locallyOptimistic := m.cfg.LoadBalanceAZPreference == config.AZ_PREF_LOCAL + endpointIterator, err := EndpointIteratorForRequest(logger, r, m.cfg.StickySessionCookieNames, m.cfg.StickySessionsForAuthNegotiate, locallyOptimistic, m.cfg.Zone, m.cfg.LoadBalance) if err != nil { logger.Error("failed-to-find-endpoint-for-req-during-431-short-circuit", log.ErrAttr(err)) } else { diff --git a/src/code.cloudfoundry.org/gorouter/mbus/subscriber.go b/src/code.cloudfoundry.org/gorouter/mbus/subscriber.go index af17a19b4..758a6c1a6 100644 --- a/src/code.cloudfoundry.org/gorouter/mbus/subscriber.go +++ b/src/code.cloudfoundry.org/gorouter/mbus/subscriber.go @@ -43,7 +43,7 @@ type RegistryMessage struct { type RegistryMessageOpts struct { LoadBalancingAlgorithm string `json:"loadbalancing"` HashHeaderName string `json:"hash_header"` - HashBalance float64 `json:"hash_balance"` + HashBalance float64 `json:"hash_balance,string"` } func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, error) { diff --git a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper.go b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper.go index 88cfd20a5..48e67221c 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper.go @@ -126,7 +126,15 @@ func (rt *roundTripper) RoundTrip(originalRequest *http.Request) (*http.Response stickyEndpointID, mustBeSticky := handlers.GetStickySession(request, rt.config.StickySessionCookieNames, rt.config.StickySessionsForAuthNegotiate) numberOfEndpoints := reqInfo.RoutePool.NumEndpoints() - iter := reqInfo.RoutePool.Endpoints(rt.logger, stickyEndpointID, mustBeSticky, rt.config.LoadBalanceAZPreference, rt.config.Zone) + locallyOptimistic := rt.config.LoadBalanceAZPreference == config.AZ_PREF_LOCAL + routingProperties := route.RoutingProperties{ + RequestHeaders: &request.Header, + LocallyOptimistic: locallyOptimistic, + GlobalLB: rt.config.LoadBalance, + AZ: rt.config.Zone, + } + + iter := reqInfo.RoutePool.Endpoints(rt.logger, stickyEndpointID, mustBeSticky, routingProperties) // The selectEndpointErr needs to be tracked separately. If we get an error // while selecting an endpoint we might just have run out of routes. In diff --git a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go index 9d270867c..dab18e3e4 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net" "net/http" "net/http/httptest" @@ -273,8 +274,14 @@ var _ = Describe("ProxyRoundTripper", func() { It("logs the error and removes offending backend", func() { res, err := proxyRoundTripper.RoundTrip(req) Expect(err).NotTo(HaveOccurred()) + routingProps := route.RoutingProperties{ + LocallyOptimistic: false, + GlobalLB: cfg.LoadBalance, + AZ: AZ, + RequestHeaders: &req.Header, + } - iter := routePool.Endpoints(logger.Logger, "", false, AZPreference, AZ) + iter := routePool.Endpoints(logger.Logger, "", false, routingProps) ep1 := iter.Next(0) ep2 := iter.Next(1) Expect(ep1.PrivateInstanceId).To(Equal(ep2.PrivateInstanceId)) @@ -602,13 +609,20 @@ var _ = Describe("ProxyRoundTripper", func() { PrivateInstanceIndex: "2", }) + routingProps := route.RoutingProperties{ + LocallyOptimistic: false, + GlobalLB: cfg.LoadBalance, + AZ: AZ, + RequestHeaders: &req.Header, + } + added := routePool.Put(endpoint) Expect(added).To(Equal(route.EndpointAdded)) _, err := proxyRoundTripper.RoundTrip(req) Expect(err).To(MatchError(ContainSubstring("tls: handshake failure"))) - iter := routePool.Endpoints(logger.Logger, "", false, AZPreference, AZ) + iter := routePool.Endpoints(logger.Logger, "", false, routingProps) ep1 := iter.Next(0) ep2 := iter.Next(1) Expect(ep1).To(Equal(ep2)) @@ -1700,6 +1714,167 @@ var _ = Describe("ProxyRoundTripper", func() { }) }) + Context("when load-balancing strategy is set to hash-based routing", func() { + JustBeforeEach(func() { + for i := 1; i <= 3; i++ { + endpoint = route.NewEndpoint(&route.EndpointOpts{ + AppId: fmt.Sprintf("appID%d", i), + Host: fmt.Sprintf("%d.%d.%d.%d", i, i, i, i), + Port: 9090, + PrivateInstanceId: fmt.Sprintf("instanceID%d", i), + PrivateInstanceIndex: fmt.Sprintf("%d", i), + AvailabilityZone: AZ, + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + HashHeaderName: "X-Hash", + }) + + _ = routePool.Put(endpoint) + Expect(routePool.HashLookupTable).ToNot(BeNil()) + + } + }) + + It("routes requests with same hash header value to the same endpoint", func() { + req.Header.Set("X-Hash", "value") + reqInfo, err := handlers.ContextRequestInfo(req) + Expect(err).ToNot(HaveOccurred()) + reqInfo.RoutePool = routePool + + var selectedEndpoints []*route.Endpoint + + // Make multiple requests with the same hash value + for i := 0; i < 5; i++ { + _, err = proxyRoundTripper.RoundTrip(req) + Expect(err).NotTo(HaveOccurred()) + selectedEndpoints = append(selectedEndpoints, reqInfo.RouteEndpoint) + } + + // All requests should go to the same endpoint + firstEndpoint := selectedEndpoints[0] + for _, ep := range selectedEndpoints[1:] { + Expect(ep.PrivateInstanceId).To(Equal(firstEndpoint.PrivateInstanceId)) + } + }) + + It("routes requests with different hash header values to potentially different endpoints", func() { + reqInfo, err := handlers.ContextRequestInfo(req) + Expect(err).ToNot(HaveOccurred()) + reqInfo.RoutePool = routePool + + endpointDistribution := make(map[string]int) + + // Make requests with different hash values + for i := 0; i < 10; i++ { + req.Header.Set("X-Hash", fmt.Sprintf("value-%d", i)) + _, err = proxyRoundTripper.RoundTrip(req) + Expect(err).NotTo(HaveOccurred()) + endpointDistribution[reqInfo.RouteEndpoint.PrivateInstanceId]++ + } + + // Should distribute across multiple endpoints (not all to one) + Expect(len(endpointDistribution)).To(BeNumerically(">", 1)) + }) + + It("falls back to default load balancing algorithm when hash header is missing", func() { + reqInfo, err := handlers.ContextRequestInfo(req) + Expect(err).ToNot(HaveOccurred()) + + reqInfo.RoutePool = routePool + + _, err = proxyRoundTripper.RoundTrip(req) + Expect(err).NotTo(HaveOccurred()) + + infoLogs := logger.Lines(zap.InfoLevel) + count := 0 + for i := 0; i < len(infoLogs); i++ { + if strings.Contains(infoLogs[i], "hash-based-routing-header-value-not-found") { + count++ + } + } + Expect(count).To(Equal(1)) + // Verify it still selects an endpoint + Expect(reqInfo.RouteEndpoint).ToNot(BeNil()) + }) + + Context("when sticky session cookies (JSESSIONID and VCAP_ID) are on the request", func() { + var ( + sessionCookie *http.Cookie + cookies []*http.Cookie + ) + + JustBeforeEach(func() { + sessionCookie = &http.Cookie{ + Name: StickyCookieKey, //JSESSIONID + } + transport.RoundTripStub = func(req *http.Request) (*http.Response, error) { + resp := &http.Response{StatusCode: http.StatusTeapot, Header: make(map[string][]string)} + //Attach the same JSESSIONID on to the response if it exists on the request + + if len(req.Cookies()) > 0 { + for _, cookie := range req.Cookies() { + if cookie.Name == StickyCookieKey { + resp.Header.Add(round_tripper.CookieHeader, cookie.String()) + return resp, nil + } + } + } + + sessionCookie.Value, _ = uuid.GenerateUUID() + resp.Header.Add(round_tripper.CookieHeader, sessionCookie.String()) + return resp, nil + } + resp, err := proxyRoundTripper.RoundTrip(req) + Expect(err).ToNot(HaveOccurred()) + + cookies = resp.Cookies() + Expect(cookies).To(HaveLen(2)) + + }) + + Context("when there is a JSESSIONID and __VCAP_ID__ set on the request", func() { + It("will always route to the instance specified with the __VCAP_ID__ cookie", func() { + + // Generate 20 random values for the hash header, so chance that all go to instanceID1 + // by accident is 0.33^20 + for i := 0; i < 20; i++ { + randomStr := make([]byte, 8) + for j := range randomStr { + randomStr[j] = byte('a' + rand.Intn(26)) + } + + req.Header.Set("X-Hash", string(randomStr)) + reqInfo, err := handlers.ContextRequestInfo(req) + req.AddCookie(&http.Cookie{Name: round_tripper.VcapCookieId, Value: "instanceID1"}) + req.AddCookie(&http.Cookie{Name: StickyCookieKey, Value: "abc"}) + + Expect(err).ToNot(HaveOccurred()) + reqInfo.RoutePool = routePool + + resp, err := proxyRoundTripper.RoundTrip(req) + Expect(err).ToNot(HaveOccurred()) + + new_cookies := resp.Cookies() + Expect(new_cookies).To(HaveLen(2)) + + for _, cookie := range new_cookies { + Expect(cookie.Name).To(SatisfyAny( + Equal(StickyCookieKey), + Equal(round_tripper.VcapCookieId), + )) + if cookie.Name == StickyCookieKey { + Expect(cookie.Value).To(Equal("abc")) + } else { + Expect(cookie.Value).To(Equal("instanceID1")) + } + } + + } + + }) + }) + }) + }) + Context("when endpoint timeout is not 0", func() { var reqCh chan *http.Request BeforeEach(func() { diff --git a/src/code.cloudfoundry.org/gorouter/registry/registry_test.go b/src/code.cloudfoundry.org/gorouter/registry/registry_test.go index 9ac5c1ce9..eda8710a3 100644 --- a/src/code.cloudfoundry.org/gorouter/registry/registry_test.go +++ b/src/code.cloudfoundry.org/gorouter/registry/registry_test.go @@ -25,10 +25,11 @@ var _ = Describe("RouteRegistry", func() { var configObj *config.Config var logger *test_util.TestLogger - var azPreference, az string + var locallyOptimistic bool + var az string BeforeEach(func() { - azPreference = "none" + locallyOptimistic = false az = "meow-zone" logger = test_util.NewTestLogger("test") @@ -391,14 +392,20 @@ var _ = Describe("RouteRegistry", func() { Context("Modification Tags", func() { var ( - endpoint *route.Endpoint - modTag models.ModificationTag + endpoint *route.Endpoint + modTag models.ModificationTag + routingProps route.RoutingProperties ) BeforeEach(func() { modTag = models.ModificationTag{Guid: "abc"} endpoint = route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag}) r.Register("foo.com", endpoint) + routingProps = route.RoutingProperties{ + LocallyOptimistic: locallyOptimistic, + GlobalLB: config.LOAD_BALANCE_RR, + AZ: r.DefaultLoadBalancingAlgorithm, + } }) Context("registering a new route", func() { @@ -407,7 +414,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(1)) p := r.Lookup("foo.com") - Expect(p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0).ModificationTag).To(Equal(modTag)) + Expect(p.Endpoints(logger.Logger, "", false, routingProps).Next(0).ModificationTag).To(Equal(modTag)) }) }) @@ -429,7 +436,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(1)) p := r.Lookup("foo.com") - Expect(p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0).ModificationTag).To(Equal(modTag)) + Expect(p.Endpoints(logger.Logger, "", false, routingProps).Next(0).ModificationTag).To(Equal(modTag)) }) Context("updating an existing route with an older modification tag", func() { @@ -449,7 +456,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(1)) p := r.Lookup("foo.com") - ep := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + ep := p.Endpoints(logger.Logger, "", false, routingProps).Next(0) Expect(ep.ModificationTag).To(Equal(modTag)) Expect(ep).To(Equal(endpoint2)) }) @@ -468,7 +475,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(1)) p := r.Lookup("foo.com") - Expect(p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0).ModificationTag).To(Equal(modTag)) + Expect(p.Endpoints(logger.Logger, "", false, routingProps).Next(0).ModificationTag).To(Equal(modTag)) }) }) }) @@ -813,7 +820,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumUris()).To(Equal(1)) p1 := r.Lookup("foo/bar") - iter := p1.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p1.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) p2 := r.Lookup("foo") @@ -917,7 +924,7 @@ var _ = Describe("RouteRegistry", func() { p2 := r.Lookup("FOO") Expect(p1).To(Equal(p2)) - iter := p1.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p1.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) @@ -936,7 +943,7 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("bar") Expect(p).ToNot(BeNil()) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(MatchRegexp("192.168.1.1:123[4|5]")) @@ -951,13 +958,13 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("foo.wild.card") Expect(p).ToNot(BeNil()) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(Equal("192.168.1.2:1234")) p = r.Lookup("foo.space.wild.card") Expect(p).ToNot(BeNil()) - e = p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e = p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(Equal("192.168.1.2:1234")) }) @@ -971,7 +978,7 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("not.wild.card") Expect(p).ToNot(BeNil()) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) @@ -1003,7 +1010,7 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("dora.app.com/env?foo=bar") Expect(p).ToNot(BeNil()) - iter := p.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) @@ -1012,7 +1019,7 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("dora.app.com/env/abc?foo=bar&baz=bing") Expect(p).ToNot(BeNil()) - iter := p.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) }) @@ -1032,7 +1039,7 @@ var _ = Describe("RouteRegistry", func() { p1 := r.Lookup("foo/extra/paths") Expect(p1).ToNot(BeNil()) - iter := p1.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p1.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) @@ -1044,7 +1051,7 @@ var _ = Describe("RouteRegistry", func() { p1 := r.Lookup("foo?fields=foo,bar") Expect(p1).ToNot(BeNil()) - iter := p1.Endpoints(logger.Logger, "", false, azPreference, az) + iter := p1.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) Expect(iter.Next(0).CanonicalAddr()).To(Equal("192.168.1.1:1234")) }) @@ -1131,7 +1138,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(2)) p := r.LookupWithAppInstance("bar.com/foo", appId, appIndex) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(MatchRegexp("192.168.1.1:1234")) @@ -1152,7 +1159,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(2)) p := r.LookupWithAppInstance("bar.com/foo", appId, appIndex) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(MatchRegexp("192.168.1.1:1234")) @@ -1260,7 +1267,7 @@ var _ = Describe("RouteRegistry", func() { p := r.LookupWithProcessInstance("bar.com/foo", processId, processIndex) Expect(p.NumEndpoints()).To(Equal(2)) - es := p.Endpoints(logger.Logger, "", false, azPreference, az) + es := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}) e1 := es.Next(0) Expect(e1).ToNot(BeNil()) e2 := es.Next(0) @@ -1299,7 +1306,7 @@ var _ = Describe("RouteRegistry", func() { Expect(r.NumEndpoints()).To(Equal(5)) p := r.LookupWithProcessInstance("bar.com/foo", processId, processIndex) - e := p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0) + e := p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0) Expect(e).ToNot(BeNil()) Expect(e.CanonicalAddr()).To(MatchRegexp("192.168.1.4:1237")) @@ -1506,7 +1513,7 @@ var _ = Describe("RouteRegistry", func() { p := r.Lookup("foo") Expect(p).ToNot(BeNil()) - Expect(p.Endpoints(logger.Logger, "", false, azPreference, az).Next(0)).To(Equal(endpoint)) + Expect(p.Endpoints(logger.Logger, "", false, route.RoutingProperties{LocallyOptimistic: locallyOptimistic, AZ: az}).Next(0)).To(Equal(endpoint)) p = r.Lookup("bar") Expect(p).To(BeNil()) diff --git a/src/code.cloudfoundry.org/gorouter/route/hash_based.go b/src/code.cloudfoundry.org/gorouter/route/hash_based.go new file mode 100644 index 000000000..752a8b012 --- /dev/null +++ b/src/code.cloudfoundry.org/gorouter/route/hash_based.go @@ -0,0 +1,224 @@ +package route + +import ( + "errors" + "log/slog" + "sync" + + log "code.cloudfoundry.org/gorouter/logger" +) + +// HashBased load balancing algorithm distributes requests based on a hash of a specific header value. +// The sticky session cookie has precedence over hash-based routing and the request should be routed to the instance stored in the cookie. +// If requests do not contain the hash-related header set configured for the hash-based route option, use the default load-balancing algorithm. +type HashBased struct { + lock *sync.Mutex + + logger *slog.Logger + pool *EndpointPool + lastEndpoint *Endpoint + lastLookupTableIndex uint64 + + stickyEndpointID string + mustBeSticky bool + + HeaderValue string +} + +// NewHashBased initializes an endpoint iterator that selects endpoints based on a hash of a header value. +// The global properties locallyOptimistic and localAvailabilityZone will be ignored when using Hash-Based Routing. +func NewHashBased(logger *slog.Logger, p *EndpointPool, initial string, mustBeSticky bool, headerValue string) EndpointIterator { + return &HashBased{ + logger: logger, + pool: p, + lock: &sync.Mutex{}, + stickyEndpointID: initial, + mustBeSticky: mustBeSticky, + HeaderValue: headerValue, + } +} + +// Next selects the next endpoint based on the hash of the header value. +// If a sticky session endpoint is available and not overloaded, it will be returned. +// If the request must be sticky and the sticky endpoint is unavailable or overloaded, nil will be returned. +// If no sticky session is present, the endpoint will be selected based on the hash of the header value. +// It returns the same endpoint for the same header value consistently. +// If the hash lookup fails or the endpoint is not found, nil will be returned. +func (h *HashBased) Next(attempt int) *Endpoint { + h.lock.Lock() + defer h.lock.Unlock() + + endpoint := h.pool.FindStickyEndpoint(h.logger, &h.stickyEndpointID, h.mustBeSticky) + if endpoint != nil { + h.lastEndpoint = endpoint + return endpoint + } + + if h.mustBeSticky { + return nil + } + + // Check for empty pool + if len(h.pool.endpoints) == 0 { + h.logger.Warn("hash-based-routing-pool-empty", slog.String("host", h.pool.host)) + return nil + } + + endpoint = h.getSingleEndpoint() + if endpoint != nil { + h.lastEndpoint = endpoint + return endpoint + } + + // Perform hash-based selection + endpoint = h.selectHashBasedEndpoint(attempt) + if endpoint != nil { + h.lastEndpoint = endpoint + } + return endpoint +} + +// selectHashBasedEndpoint performs hash-based endpoint selection using the lookup table. +func (h *HashBased) selectHashBasedEndpoint(attempt int) *Endpoint { + if h.pool.HashLookupTable == nil { + h.logger.Error("hash-based-routing-failed", slog.String("host", h.pool.host), log.ErrAttr(errors.New("lookup table is empty"))) + return nil + } + + startIndex, err := h.getStartingIndex(attempt) + if err != nil { + h.logger.Error("hash-based-routing-failed", slog.String("host", h.pool.host), log.ErrAttr(err)) + return nil + } + + return h.findEndpoint(startIndex, attempt) +} + +// getStartingIndex determines the starting index in the lookup table based on the attempt number. +// For the initial attempt, it uses the hash of the header value. +// For retries, it uses the next index after the last lookup. +func (h *HashBased) getStartingIndex(attempt int) (uint64, error) { + if attempt == 0 || h.lastLookupTableIndex == 0 { + index, _, err := h.pool.HashLookupTable.GetInstanceForHashHeader(h.HeaderValue) + return index, err + } + + // On retries, start from the next index in the lookup table + nextIndex := (h.lastLookupTableIndex + 1) % h.pool.HashLookupTable.GetLookupTableSize() + return nextIndex, nil +} + +func (h *HashBased) findEndpoint(index uint64, attempt int) *Endpoint { + // Ensure we don't exceed the lookup table size + lookupTableSize := h.pool.HashLookupTable.GetLookupTableSize() + + // Normalize index + currentIndex := index % lookupTableSize + // Keep track of endpoints already visited, to avoid visiting them twice + visitedEndpoints := make(map[string]bool) + + numberOfEndpoints := len(h.pool.HashLookupTable.GetEndpointList()) + + lastEndpointPrivateId := "" + if attempt > 0 && h.lastEndpoint != nil { + lastEndpointPrivateId = h.lastEndpoint.PrivateInstanceId + } + + // abort when we have visited all available endpoints unsuccessfully + for len(visitedEndpoints) < numberOfEndpoints { + id := h.pool.HashLookupTable.GetEndpointId(currentIndex) + + if visitedEndpoints[id] || id == lastEndpointPrivateId { + currentIndex = (currentIndex + 1) % lookupTableSize + continue + } + visitedEndpoints[id] = true + + endpointElem := h.pool.findById(id) + if endpointElem == nil { + h.logger.Error("hash-based-routing-failed", slog.String("host", h.pool.host), log.ErrAttr(errors.New("endpoint not found in pool")), slog.String("endpoint-id", id)) + currentIndex = (currentIndex + 1) % lookupTableSize + continue + } + + lastEndpointPrivateId = id + + if endpointElem.isOverloaded() { + // If the selected endpoint has reached the limit of max request per backend, log the info about it and try the next one in the lookup table + h.logger.Info("hash-based-routing-endpoint-overloaded", slog.String("host", h.pool.host), slog.String("endpoint-id", endpointElem.endpoint.PrivateInstanceId)) + } else if !h.IsImbalanced(endpointElem.endpoint) { + h.lastLookupTableIndex = currentIndex + return endpointElem.endpoint + } + + currentIndex = (currentIndex + 1) % lookupTableSize + } + // All endpoints checked and overloaded or not found + h.logger.Error("hash-based-routing-failed", slog.String("host", h.pool.host), log.ErrAttr(errors.New("all endpoints are overloaded"))) + return nil +} + +func (h *HashBased) IsImbalanced(endpoint *Endpoint) bool { + // endpoint cannot be imbalanced if balance factor is not set + if h.pool.HashRoutingProperties.BalanceFactor <= 0 { + return false + } + + avgNumberOfInFlightRequests := h.CalculateAverageLoad() + // Check if avgNumberOfInFlightRequests is 0 to avoid division by 0 in the next if-condition + if avgNumberOfInFlightRequests == 0 { + return false + } + + currentInFlightRequestCount := endpoint.Stats.NumberConnections.Count() + balanceFactor := h.pool.HashRoutingProperties.BalanceFactor + + if float64(currentInFlightRequestCount)/avgNumberOfInFlightRequests > balanceFactor { + h.logger.Debug("hash-based-routing-endpoint-imbalanced", slog.String("host", h.pool.host), slog.String("endpoint-id", endpoint.PrivateInstanceId), slog.Int64("endpoint-connections", currentInFlightRequestCount), slog.Float64("average-load", avgNumberOfInFlightRequests)) + return true + } + return false +} + +// EndpointFailed notifies the endpoint pool that the last selected endpoint has failed. +func (h *HashBased) EndpointFailed(err error) { + if h.lastEndpoint != nil { + h.pool.EndpointFailed(h.lastEndpoint, err) + } +} + +// PreRequest increments the in-flight request count for the selected endpoint from current Gorouter. +func (h *HashBased) PreRequest(e *Endpoint) { + e.Stats.NumberConnections.Increment() +} + +// PostRequest decrements the in-flight request count for the selected endpoint from current Gorouter. +func (h *HashBased) PostRequest(e *Endpoint) { + e.Stats.NumberConnections.Decrement() +} + +// CalculateAverageLoad computes the average number of in-flight requests across all endpoints in the pool. +func (h *HashBased) CalculateAverageLoad() float64 { + if len(h.pool.endpoints) == 0 { + return 0 + } + + var currentInFlightRequestCount int64 + for _, endpointElem := range h.pool.endpoints { + currentInFlightRequestCount += endpointElem.endpoint.Stats.NumberConnections.Count() + } + + return float64(currentInFlightRequestCount) / float64(len(h.pool.endpoints)) +} + +func (h *HashBased) getSingleEndpoint() *Endpoint { + if len(h.pool.endpoints) == 1 { + e := h.pool.endpoints[0] + if e.isOverloaded() { + return nil + } + + return e.endpoint + } + return nil +} diff --git a/src/code.cloudfoundry.org/gorouter/route/hash_based_test.go b/src/code.cloudfoundry.org/gorouter/route/hash_based_test.go new file mode 100644 index 000000000..3d5d70a0e --- /dev/null +++ b/src/code.cloudfoundry.org/gorouter/route/hash_based_test.go @@ -0,0 +1,441 @@ +package route_test + +import ( + _ "errors" + "hash/fnv" + "time" + + "code.cloudfoundry.org/gorouter/config" + "code.cloudfoundry.org/gorouter/route" + "code.cloudfoundry.org/gorouter/test_util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" +) + +var _ = Describe("HashBased", func() { + var ( + pool *route.EndpointPool + logger *test_util.TestLogger + ) + + BeforeEach(func() { + logger = test_util.NewTestLogger("test") + pool = route.NewPool(&route.PoolOpts{ + Logger: logger.Logger, + RetryAfterFailure: 2 * time.Minute, + Host: "", + ContextPath: "", + MaxConnsPerBackend: 500, + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + HashHeader: "tenant-id", + }) + }) + + Describe("Next", func() { + + Context("when pool is empty", func() { + It("does not select an endpoint", func() { + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + Expect(iter.Next(0)).To(BeNil()) + }) + }) + + Context("when pool has endpoints", func() { + var ( + endpoints []*route.Endpoint + ) + BeforeEach(func() { + e1 := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID1"}) + e2 := route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID2"}) + endpoints = []*route.Endpoint{e1, e2} + for _, e := range endpoints { + pool.Put(e) + } + + }) + It("It returns the same endpoint for the same header value", func() { + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + first := iter.Next(0) + second := iter.Next(0) + Expect(first).NotTo(BeNil()) + Expect(second).NotTo(BeNil()) + Expect(first).To(Equal(second)) + }) + }) + + Context("when endpoint overloaded", func() { + var ( + endpoints []*route.Endpoint + e1 *route.Endpoint + e2 *route.Endpoint + e3 *route.Endpoint + ) + It("It returns the next endpoint for the same header value when balancer factor set", func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID3"}) + endpoints = []*route.Endpoint{e1, e2, e3} + for _, e := range endpoints { + pool.Put(e) + } + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + first := iter.Next(0) + Expect(iter.Next(0)).To(Equal(first)) + for i := 0; i < 6; i++ { + iter.PreRequest(first) + } + second := iter.Next(0) + Expect(second).NotTo(Equal(first)) + }) + It("It returns the same overloaded endpoint for the same header value when balancer factor not set", func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 0, PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 0, PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 0, PrivateInstanceId: "ID3"}) + endpoints = []*route.Endpoint{e1, e2, e3} + for _, e := range endpoints { + pool.Put(e) + } + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + iter.(*route.HashBased).HeaderValue = "tenant-1" + first := iter.Next(0) + Expect(iter.Next(0)).To(Equal(first)) + for i := 0; i < 6; i++ { + iter.PreRequest(first) + } + second := iter.Next(0) + Expect(second).To(Equal(first)) + }) + + }) + + Context("with retries", func() { + var ( + endpoints []*route.Endpoint + e1 *route.Endpoint + e2 *route.Endpoint + e3 *route.Endpoint + e4 *route.Endpoint + MaglevLookupTable = []int16{2, 2, 1, 0, 1, 0, 0, 0, 2, 0, 1, 3, 1, 0, 1, 0, 3, 0, 3, 0, 0, 0, 1, 0, 1, 2, 2, 0, 3, 2, 3, 0, 1, 0, 1, 0, 3, 3, 2, 0, 3, 1, 2, 0, 3, 0, 1, 0, 2, 3, 2, 3, 2, 0, 1, 2, 1, 0, 3, 2, 2, 1, 1, 2, 1, 3, 1, 2, 2, 0, 3, 2, 3, 1, 1, 3, 1, 3, 1, 0, 2, 1, 3, 1, 2, 2, 1, 3, 2, 2, 2, 3, 3, 1, 3, 0, 3, 2, 3, 3, 0} + ) + It("It returns next endpoint from maglev lookup table", func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID3"}) + e4 = route.NewEndpoint(&route.EndpointOpts{Host: "4.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID4"}) + + endpoints = []*route.Endpoint{e1, e2, e3, e4} + endpointIDList := make([]string, 0, 4) + for _, e := range endpoints { + pool.Put(e) + endpointIDList = append(endpointIDList, e.PrivateInstanceId) + } + maglevMock := NewMockHashLookupTable(MaglevLookupTable, endpointIDList) + pool.HashLookupTable = maglevMock + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + // The returned endpoint has always ID3 according to the Maglev lookup table + first := iter.Next(0) + Expect(first).To(Equal(e4)) + second := iter.Next(1) + Expect(second).To(Equal(e1)) + third := iter.Next(2) + Expect(third).To(Equal(e4)) + }) + It("It returns the next not overloaded endpoint for the second attempt", func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID3"}) + e4 = route.NewEndpoint(&route.EndpointOpts{Host: "4.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID3"}) + + endpoints = []*route.Endpoint{e1, e2, e3, e4} + for _, e := range endpoints { + pool.Put(e) + } + iter := route.NewHashBased(logger.Logger, pool, "", false, "tenant-1") + firstAttemptResult := iter.Next(0) + Expect(iter.Next(0)).To(Equal(firstAttemptResult)) + for i := 0; i < 6; i++ { + // Simulate requests to overload the endpoints + iter.PreRequest(e1) + iter.PreRequest(e2) + } + secondAttemptResult := iter.Next(1) + Expect(secondAttemptResult).NotTo(Equal(firstAttemptResult)) + Expect(secondAttemptResult).NotTo(Equal(e1)) + Expect(secondAttemptResult).NotTo(Equal(e2)) + }) + }) + + Context("when using sticky sessions", func() { + var ( + endpoints []*route.Endpoint + iter route.EndpointIterator + ) + + BeforeEach(func() { + e1 := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID1"}) + e2 := route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID2"}) + e3 := route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", PrivateInstanceId: "ID3"}) + endpoints = []*route.Endpoint{e1, e2, e3} + for _, e := range endpoints { + pool.Put(e) + } + }) + + Context("when mustBeSticky is true", func() { + It("returns the sticky endpoint when it exists", func() { + iter = route.NewHashBased(logger.Logger, pool, "ID1", true, "abc") + endpoint := iter.Next(0) + Expect(endpoint).NotTo(BeNil()) + Expect(endpoint.PrivateInstanceId).To(Equal("ID1")) + }) + + It("returns nil when sticky endpoint doesn't exist", func() { + iter = route.NewHashBased(logger.Logger, pool, "nonexistent-id", true, "abc") + Expect(iter.Next(0)).To(BeNil()) + }) + It("returns nil when sticky endpoint is overloaded and mustBeSticky is true", func() { + iter = route.NewHashBased(logger.Logger, pool, "ID1", true, "abc") + for i := 0; i < 1000; i++ { + iter.PreRequest(endpoints[0]) + } + Expect(iter.Next(0)).To(BeNil()) + }) + }) + + Context("when mustBeSticky is false", func() { + BeforeEach(func() { + iter = route.NewHashBased(logger.Logger, pool, "ID1", false, "some-value") + }) + + It("returns the sticky endpoint when it exists", func() { + endpoint := iter.Next(0) + Expect(endpoint).NotTo(BeNil()) + Expect(endpoint.PrivateInstanceId).To(Equal("ID1")) + }) + + It("falls back to hash-based routing when sticky endpoint doesn't exist", func() { + iter = route.NewHashBased(logger.Logger, pool, "nonexistent-id", false, "some-value") + endpoint := iter.Next(0) + Expect(endpoint).NotTo(BeNil()) + }) + }) + }) + }) + + Context("when testing PreRequest and PostRequest", func() { + var ( + endpoint *route.Endpoint + iter route.EndpointIterator + ) + + BeforeEach(func() { + endpoint = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID1"}) + pool.Put(endpoint) + iter = route.NewHashBased(logger.Logger, pool, "", false, "abc") + }) + + It("increments connection count on PreRequest", func() { + initialCount := endpoint.Stats.NumberConnections.Count() + iter.PreRequest(endpoint) + Expect(endpoint.Stats.NumberConnections.Count()).To(Equal(initialCount + 1)) + }) + + It("decrements connection count on PostRequest", func() { + iter.PreRequest(endpoint) + initialCount := endpoint.Stats.NumberConnections.Count() + iter.PostRequest(endpoint) + Expect(endpoint.Stats.NumberConnections.Count()).To(Equal(initialCount - 1)) + }) + }) + Describe("IsImbalancedOrOverloaded", func() { + var iter *route.HashBased + var endpoints []*route.Endpoint + + BeforeEach(func() { + iter = route.NewHashBased(logger.Logger, pool, "", false, "abc").(*route.HashBased) + }) + + Context("when endpoints have a lot of in-flight requests", func() { + var e1, e2, e3 *route.Endpoint + BeforeEach(func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", HashHeaderName: "tenant-id", HashBalanceFactor: 1.2, PrivateInstanceId: "ID3"}) + endpoints = []*route.Endpoint{e1, e2, e3} + for _, e := range endpoints { + pool.Put(e) + } + + }) + It("do not mark as imbalanced if every endpoint has 499 in-flight requests", func() { + for i := 0; i < 498; i++ { + iter.PreRequest(e1) + } + for i := 0; i < 498; i++ { + iter.PreRequest(e2) + } + for i := 0; i < 498; i++ { + iter.PreRequest(e3) + } + // in general 500 in flight requests counted by e1 + Expect(iter.IsImbalanced(e1)).To(BeFalse()) + }) + It("mark as imbalanced if it has more in-flight requests", func() { + for i := 0; i < 300; i++ { + iter.PreRequest(e1) + } + for i := 0; i < 200; i++ { + iter.PreRequest(e2) + } + for i := 0; i < 200; i++ { + iter.PreRequest(e3) + } + Expect(iter.IsImbalanced(e1)).To(BeTrue()) + Eventually(logger).Should(gbytes.Say("hash-based-routing-endpoint-imbalanced")) + Expect(iter.IsImbalanced(e2)).To(BeFalse()) + Expect(iter.IsImbalanced(e3)).To(BeFalse()) + }) + }) + }) + + Describe("CalculateAverageNumberOfConnections", func() { + var iter *route.HashBased + var endpoints []*route.Endpoint + + BeforeEach(func() { + iter = route.NewHashBased(logger.Logger, pool, "", false, "abc").(*route.HashBased) + }) + + Context("when there are no endpoints", func() { + It("returns 0", func() { + Expect(iter.CalculateAverageLoad()).To(Equal(float64(0))) + }) + }) + + Context("when all endpoints have zero connections", func() { + BeforeEach(func() { + pool.Put(route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID1"})) + pool.Put(route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID2"})) + }) + It("returns 0", func() { + Expect(iter.CalculateAverageLoad()).To(Equal(float64(0))) + }) + }) + + Context("when endpoints have varying connection counts", func() { + var e1, e2, e3 *route.Endpoint + BeforeEach(func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID2"}) + e3 = route.NewEndpoint(&route.EndpointOpts{Host: "3.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID3"}) + endpoints = []*route.Endpoint{e1, e2, e3} + for _, e := range endpoints { + pool.Put(e) + } + for i := 0; i < 2; i++ { + iter.PreRequest(e1) + } + for i := 0; i < 4; i++ { + iter.PreRequest(e2) + } + for i := 0; i < 6; i++ { + iter.PreRequest(e3) + } + }) + It("returns the correct average", func() { + // in general 12 in flight requests + Expect(iter.CalculateAverageLoad()).To(Equal(float64(4))) + }) + }) + + Context("when one endpoint has many connections", func() { + var e1, e2 *route.Endpoint + BeforeEach(func() { + e1 = route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID1"}) + e2 = route.NewEndpoint(&route.EndpointOpts{Host: "2.2.3.4", Port: 5678, LoadBalancingAlgorithm: "hash", PrivateInstanceId: "ID2"}) + endpoints = []*route.Endpoint{e1, e2} + for _, e := range endpoints { + pool.Put(e) + } + for i := 0; i < 10; i++ { + iter.PreRequest(e1) + } + }) + It("returns the correct average", func() { + Expect(iter.CalculateAverageLoad()).To(Equal(float64(5))) + }) + }) + }) + +}) + +// MockHashLookupTable provides a simple mock implementation of MaglevLookup interface for testing. +type MockHashLookupTable struct { + lookupTable []int16 + endpointList []string +} + +// NewMockHashLookupTable creates a new mock lookup table with predefined mappings +func NewMockHashLookupTable(lookupTable []int16, endpointList []string) *MockHashLookupTable { + return &MockHashLookupTable{ + lookupTable: lookupTable, + endpointList: endpointList, + } +} + +func (m *MockHashLookupTable) GetInstanceForHashHeader(hashHeaderValue string) (uint64, string, error) { + if len(m.endpointList) == 0 { + return 0, "", nil + } + h := fnv.New64a() + _, _ = h.Write([]byte(hashHeaderValue)) + key := h.Sum64() + index := key % m.GetLookupTableSize() + return index, m.endpointList[m.lookupTable[index]], nil + +} + +func (m *MockHashLookupTable) GetLookupTableSize() uint64 { + return uint64(len(m.lookupTable)) +} + +func (m *MockHashLookupTable) GetEndpointId(lookupTableIndex uint64) string { + return m.endpointList[m.lookupTable[lookupTableIndex]] +} + +func (m *MockHashLookupTable) Add(endpoint string) { + // Check if endpoint already exists + for _, existing := range m.endpointList { + if existing == endpoint { + return + } + } + m.endpointList = append(m.endpointList, endpoint) +} + +func (m *MockHashLookupTable) Remove(endpoint string) { + for i, existing := range m.endpointList { + if existing == endpoint { + m.endpointList = append(m.endpointList[:i], m.endpointList[i+1:]...) + return + } + } +} + +func (m *MockHashLookupTable) GetEndpointList() []string { + return append([]string(nil), m.endpointList...) // return a copy +} + +// GetLookupTable returns a copy of the current lookup table (for testing) +func (m *MockHashLookupTable) GetLookupTable() []int16 { + return m.lookupTable // return a copy +} + +// GetPermutationTable returns a copy of the current permutation table (for testing) +func (m *MockHashLookupTable) GetPermutationTable() [][]uint16 { + return nil // not implemented in mock +} + +// Compile-time check to ensure MockHashLookupTable implements MaglevLookup interface +var _ route.MaglevLookup = (*MockHashLookupTable)(nil) diff --git a/src/code.cloudfoundry.org/gorouter/route/leastconnection.go b/src/code.cloudfoundry.org/gorouter/route/leastconnection.go index d538b65a4..ee288f2ac 100644 --- a/src/code.cloudfoundry.org/gorouter/route/leastconnection.go +++ b/src/code.cloudfoundry.org/gorouter/route/leastconnection.go @@ -1,7 +1,6 @@ package route import ( - "context" "log/slog" "math/rand" "time" @@ -31,43 +30,22 @@ func NewLeastConnection(logger *slog.Logger, p *EndpointPool, initial string, mu } func (r *LeastConnection) Next(attempt int) *Endpoint { - var e *endpointElem - if r.initialEndpoint != "" { - e = r.pool.findById(r.initialEndpoint) - if e != nil && e.isOverloaded() { - if r.mustBeSticky { - if r.logger.Enabled(context.Background(), slog.LevelDebug) { - r.logger.Debug("endpoint-overloaded-but-request-must-be-sticky", e.endpoint.ToLogData()...) - } - return nil - } - e = nil - } - - if e == nil && r.mustBeSticky { - r.logger.Debug("endpoint-missing-but-request-must-be-sticky", slog.String("requested-endpoint", r.initialEndpoint)) - return nil - } - - if !r.mustBeSticky { - r.logger.Debug("endpoint-missing-choosing-alternate", slog.String("requested-endpoint", r.initialEndpoint)) - r.initialEndpoint = "" - } + e := r.pool.FindStickyEndpoint(r.logger, &r.initialEndpoint, r.mustBeSticky) + if e != nil { + r.lastEndpoint = e + return e } - if e != nil { - e.RLock() - defer e.RUnlock() - r.lastEndpoint = e.endpoint - return e.endpoint + if r.mustBeSticky { + return nil } - e = r.next(attempt) - if e != nil { - e.RLock() - defer e.RUnlock() - r.lastEndpoint = e.endpoint - return e.endpoint + endpointElem := r.next(attempt) + if endpointElem != nil { + endpointElem.RLock() + defer endpointElem.RUnlock() + r.lastEndpoint = endpointElem.endpoint + return endpointElem.endpoint } r.lastEndpoint = nil diff --git a/src/code.cloudfoundry.org/gorouter/route/maglev.go b/src/code.cloudfoundry.org/gorouter/route/maglev.go new file mode 100644 index 000000000..225dcde49 --- /dev/null +++ b/src/code.cloudfoundry.org/gorouter/route/maglev.go @@ -0,0 +1,272 @@ +package route + +// Original https://github.com/kkdai/maglev +// +// Copyright (c) 2019 Evan Lin (github.com/kkdai) +// +// This program and the accompanying materials are made available under +// the terms of the Apache License, Version 2.0 which is available at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// CHANGES: +// - Modified for integration with CF GoRouter +// - Added MaglevLookup interface for testability and abstraction +// - Enhanced with structured logging using slog +// - Added thread-safe operations +// - Extended with getter methods for unit testing +// - Added error handling and safety checks +// - Customized for hash-based routing requirements + +import ( + "context" + "errors" + "hash/fnv" + "log/slog" + "sort" + "sync" +) + +const ( + // lookupTableSize is prime number for the size of the maglev lookup table, which should be approximately 100x + // the number of expected endpoints + lookupTableSize uint64 = 1801 +) + +// permutationParams stores the parameters needed to compute permutation values on-the-fly +type permutationParams struct { + offset uint64 + skip uint64 +} + +// MaglevLookup defines the interface for consistent hashing lookup table implementations. +// This interface allows for different implementations of the Maglev algorithm and +// enables easy testing with mock implementations. +type MaglevLookup interface { + // Add a new endpoint to the lookup table + Add(endpoint string) + + // Remove an endpoint from the lookup table + Remove(endpoint string) + + // GetInstanceForHashHeader endpoint by specified request header value + GetInstanceForHashHeader(hashHeaderValue string) (uint64, string, error) + + // GetEndpointId returns the endpoint ID by specified lookup table index + GetEndpointId(lookupTableIndex uint64) string + + // GetLookupTableSize returns the size of the lookup table + GetLookupTableSize() uint64 + + // GetEndpointList returns a copy of the current endpoint list (for testing) + GetEndpointList() []string + + // GetLookupTable returns a copy of the current lookup table (for testing) + GetLookupTable() []int16 + + // GetPermutationTable returns a copy of the current permutation table (for testing) + GetPermutationTable() [][]uint16 +} + +// Maglev implementation of consistent hashing algorithm described in "Maglev: A Fast and Reliable Software Network +// Load Balancer" (https://storage.googleapis.com/gweb-research2023-media/pubtools/2904.pdf) +type Maglev struct { + logger *slog.Logger + permutations []permutationParams // Stores offset and skip for computing permutations on-the-fly + lookupTable []int16 + endpointList []string + lock *sync.RWMutex +} + +// NewMaglev initializes an empty maglev lookupTable table +func NewMaglev(logger *slog.Logger) *Maglev { + return &Maglev{ + lock: &sync.RWMutex{}, + lookupTable: make([]int16, lookupTableSize), + endpointList: make([]string, 0, 2), + permutations: make([]permutationParams, 0, 2), + logger: logger, + } +} + +// Add a new endpoint to lookupTable if it's not already contained. +func (m *Maglev) Add(endpoint string) { + m.lock.Lock() + defer m.lock.Unlock() + + if lookupTableSize == uint64(len(m.endpointList)) { + m.logger.Warn("maglev-add-lookuptable-capacity-exceeded", slog.String("endpoint-id", endpoint)) + return + } + + index := sort.SearchStrings(m.endpointList, endpoint) + if index < len(m.endpointList) && m.endpointList[index] == endpoint { + if m.logger.Enabled(context.Background(), slog.LevelDebug) { + m.logger.Debug("maglev-add-lookuptable-endpoint-exists", slog.String("endpoint-id", endpoint), slog.Int("current-endpoints", len(m.endpointList))) + } + return + } + + m.endpointList = append(m.endpointList, "") + copy(m.endpointList[index+1:], m.endpointList[index:]) + m.endpointList[index] = endpoint + m.logger.Info("maglev-add-endpoint", slog.String("endpoint-id", endpoint), slog.Int("current-endpoints", len(m.endpointList))) + + m.generatePermutation(endpoint) + m.fillLookupTable() +} + +// Remove an endpoint from lookupTable if it's contained. +func (m *Maglev) Remove(endpoint string) { + m.lock.Lock() + defer m.lock.Unlock() + + index := sort.SearchStrings(m.endpointList, endpoint) + if index >= len(m.endpointList) || m.endpointList[index] != endpoint { + m.logger.Debug("maglev-remove-endpoint-not-found", slog.String("endpoint-id", endpoint)) + return + } + + m.endpointList = append(m.endpointList[:index], m.endpointList[index+1:]...) + m.permutations = append(m.permutations[:index], m.permutations[index+1:]...) + + m.fillLookupTable() +} + +func (m *Maglev) hashKey(headerValue string) uint64 { + return m.calculateFNVHash64(headerValue) +} + +// GetInstanceForHashHeader lookup table index and private instance ID for the specified request header value +func (m *Maglev) GetInstanceForHashHeader(hashHeaderValue string) (uint64, string, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + if len(m.endpointList) == 0 { + return 0, "", errors.New("no endpoint available") + } + key := m.hashKey(hashHeaderValue) + index := key % lookupTableSize + return index, m.endpointList[m.lookupTable[key%lookupTableSize]], nil +} + +// GetEndpointId by specified lookup table index +func (m *Maglev) GetEndpointId(lookupTableIndex uint64) string { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.endpointList[m.lookupTable[lookupTableIndex]] +} + +// generatePermutation stores the permutation parameters (offset and skip) for the endpoint +func (m *Maglev) generatePermutation(endpoint string) { + pos := sort.SearchStrings(m.endpointList, endpoint) + if pos == len(m.endpointList) { + m.logger.Debug("maglev-permutation-no-endpoints") + return + } + + endpointHash := m.calculateFNVHash64(endpoint) + permutationParameters := permutationParams{ + offset: endpointHash % lookupTableSize, + skip: (endpointHash % (lookupTableSize - 1)) + 1, + } + + // insert params at position pos, shifting the rest to the right + m.permutations = append(m.permutations, permutationParams{}) + copy(m.permutations[pos+1:], m.permutations[pos:]) + m.permutations[pos] = permutationParameters +} + +// computePermutation calculates the permutation value for endpoint i at position j on-the-fly +func (m *Maglev) computePermutation(i int, j int) uint16 { + params := m.permutations[i] + return uint16((params.offset + uint64(j)*params.skip) % lookupTableSize) +} + +func (m *Maglev) fillLookupTable() { + if len(m.endpointList) == 0 { + return + } + + numberOfEndpoints := len(m.endpointList) + next := make([]int, numberOfEndpoints) + entry := make([]int16, lookupTableSize) + for j := range entry { + entry[j] = -1 + } + + for n := uint64(0); n <= lookupTableSize; { + for i := 0; i < numberOfEndpoints; i++ { + candidate := m.findNextAvailableSlot(i, next, entry) + entry[candidate] = int16(i) + next[i] = next[i] + 1 + n++ + + if n == lookupTableSize { + m.lookupTable = entry + return + } + } + } +} + +func (m *Maglev) findNextAvailableSlot(i int, next []int, entry []int16) uint16 { + candidate := m.computePermutation(i, next[i]) + for entry[candidate] >= 0 { + next[i]++ + if next[i] >= int(lookupTableSize) { + // This should not happen in a properly functioning Maglev algorithm, + // but we add this safety check to prevent panic + m.logger.Error("maglev-permutation-table-exhausted", + slog.Int("endpoint-index", i), + slog.Int("next-value", next[i]), + slog.Int("table-size", int(lookupTableSize))) + // Reset to beginning of permutation table as fallback + next[i] = 0 + } + candidate = m.computePermutation(i, next[i]) + } + return candidate +} + +// Getters for unit tests +func (m *Maglev) GetEndpointList() []string { + m.lock.RLock() + defer m.lock.RUnlock() + return append([]string(nil), m.endpointList...) +} + +func (m *Maglev) GetLookupTable() []int16 { + m.lock.RLock() + defer m.lock.RUnlock() + return append([]int16(nil), m.lookupTable...) +} + +func (m *Maglev) GetPermutationTable() [][]uint16 { + m.lock.RLock() + defer m.lock.RUnlock() + + // Compute permutation table on-the-fly for testing + copied := make([][]uint16, len(m.permutations)) + for i := range m.permutations { + copied[i] = make([]uint16, lookupTableSize) + for j := uint64(0); j < lookupTableSize; j++ { + copied[i][j] = m.computePermutation(i, int(j)) + } + } + return copied +} + +func (m *Maglev) GetLookupTableSize() uint64 { + return lookupTableSize +} + +// calculateFNVHash64 computes a hash using the non-cryptographic FNV hash algorithm. +func (m *Maglev) calculateFNVHash64(key string) uint64 { + h := fnv.New64a() + _, _ = h.Write([]byte(key)) + return h.Sum64() +} + +// Compile-time check to ensure Maglev implements MaglevLookup interface +var _ MaglevLookup = (*Maglev)(nil) diff --git a/src/code.cloudfoundry.org/gorouter/route/maglev_test.go b/src/code.cloudfoundry.org/gorouter/route/maglev_test.go new file mode 100644 index 000000000..b72d13b3d --- /dev/null +++ b/src/code.cloudfoundry.org/gorouter/route/maglev_test.go @@ -0,0 +1,345 @@ +package route_test + +import ( + "fmt" + "strconv" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "code.cloudfoundry.org/gorouter/route" + "code.cloudfoundry.org/gorouter/test_util" +) + +var _ = Describe("Maglev", func() { + var ( + logger *test_util.TestLogger + maglev *route.Maglev + ) + + BeforeEach(func() { + logger = test_util.NewTestLogger("test") + + maglev = route.NewMaglev(logger.Logger) + }) + + Describe("NewMaglev", func() { + It("should create a new Maglev instance", func() { + Expect(maglev).NotTo(BeNil()) + }) + }) + + Describe("Add", func() { + Context("when adding a new backend", func() { + It("should add the backend successfully", func() { + maglev.Add("backend1") + + Expect(maglev.GetEndpointList()).To(HaveLen(1)) + Expect(maglev.GetLookupTable()).To(HaveLen(int(maglev.GetLookupTableSize()))) + Expect(maglev.GetPermutationTable()).To(HaveLen(1)) + Expect(maglev.GetPermutationTable()[0]).To(HaveLen(int(maglev.GetLookupTableSize()))) + + _, backend, err := maglev.GetInstanceForHashHeader("test-key") + Expect(err).NotTo(HaveOccurred()) + Expect(backend).To(Equal("backend1")) + }) + }) + + Context("when adding a backend twice", func() { + It("should skip adding subsequent adds", func() { + maglev.Add("backend1") + maglev.Add("backend1") + + Expect(maglev.GetEndpointList()).To(HaveLen(1)) + Expect(maglev.GetLookupTable()).To(HaveLen(int(maglev.GetLookupTableSize()))) + Expect(maglev.GetPermutationTable()).To(HaveLen(1)) + Expect(maglev.GetPermutationTable()[0]).To(HaveLen(int(maglev.GetLookupTableSize()))) + + _, backend, err := maglev.GetInstanceForHashHeader("test-key") + Expect(err).NotTo(HaveOccurred()) + Expect(backend).To(Equal("backend1")) + }) + }) + + Context("when adding multiple backends", func() { + It("should make all backends reachable", func() { + maglev.Add("backend1") + maglev.Add("backend2") + maglev.Add("backend3") + + Expect(maglev.GetEndpointList()).To(HaveLen(3)) + Expect(maglev.GetLookupTable()).To(HaveLen(int(maglev.GetLookupTableSize()))) + Expect(maglev.GetPermutationTable()).To(HaveLen(len(maglev.GetEndpointList()))) + for i := range len(maglev.GetEndpointList()) { + Expect(maglev.GetPermutationTable()[i]).To(HaveLen(int(maglev.GetLookupTableSize()))) + } + + backends := make(map[string]bool) + for i := 0; i < 1000; i++ { + _, backend, err := maglev.GetInstanceForHashHeader(string(rune(i))) + Expect(err).NotTo(HaveOccurred()) + backends[backend] = true + } + + Expect(backends["backend1"]).To(BeTrue()) + Expect(backends["backend2"]).To(BeTrue()) + Expect(backends["backend3"]).To(BeTrue()) + }) + }) + }) + + Describe("Remove", func() { + Context("when removing an existing backend", func() { + It("should remove the backend successfully", func() { + maglev.Add("backend1") + maglev.Add("backend2") + + maglev.Remove("backend1") + + Expect(maglev.GetEndpointList()).To(HaveLen(1)) + Expect(maglev.GetLookupTable()).To(HaveLen(int(maglev.GetLookupTableSize()))) + Expect(maglev.GetPermutationTable()).To(HaveLen(1)) + Expect(maglev.GetPermutationTable()[0]).To(HaveLen(int(maglev.GetLookupTableSize()))) + + }) + }) + + Context("when removing a non-existent backend", func() { + It("should handle gracefully without error", func() { + maglev.Add("backend1") + + Expect(func() { maglev.Remove("non-existent") }).NotTo(Panic()) + + Expect(maglev.GetEndpointList()).To(HaveLen(1)) + Expect(maglev.GetLookupTable()).To(HaveLen(int(maglev.GetLookupTableSize()))) + Expect(maglev.GetPermutationTable()).To(HaveLen(1)) + Expect(maglev.GetPermutationTable()[0]).To(HaveLen(int(maglev.GetLookupTableSize()))) + }) + }) + }) + + Describe("Get", func() { + Context("when no backends were added", func() { + It("should return an error", func() { + _, _, err := maglev.GetInstanceForHashHeader("test-key") + Expect(err).To(HaveOccurred()) + }) + }) + + Context("when backends are added", func() { + BeforeEach(func() { + maglev.Add("backend1") + maglev.Add("backend2") + }) + + It("should return consistent results for the same key", func() { + var counter = make(map[string]int) + var result string + var err error + for range 100 { + _, result, err = maglev.GetInstanceForHashHeader("consistent-key") + Expect(err).NotTo(HaveOccurred()) + counter[result]++ + } + + Expect(counter[result]).To(Equal(100)) + }) + + It("should distribute keys across backends", func() { + maglev.Add("backend1") + maglev.Add("backend2") + maglev.Add("backend3") + + distribution := make(map[string]int) + for i := range 1000 { + _, backend, err := maglev.GetInstanceForHashHeader(string(rune(i))) + Expect(err).NotTo(HaveOccurred()) + distribution[backend]++ + } + + Expect(distribution["backend1"]).To(BeNumerically(">", 0)) + Expect(distribution["backend2"]).To(BeNumerically(">", 0)) + Expect(distribution["backend3"]).To(BeNumerically(">", 0)) + }) + }) + + Context("when backends are removed", func() { + BeforeEach(func() { + maglev.Add("backend1") + maglev.Add("backend2") + maglev.Remove("backend1") + }) + + It("should not return the removed backend", func() { + for range 100 { + _, backend, err := maglev.GetInstanceForHashHeader("consistent-key") + Expect(err).NotTo(HaveOccurred()) + Expect(backend).To(Equal("backend2")) + } + }) + }) + }) + + Describe("GetInstanceForHashHeader", func() { + Context("when no backends were added", func() { + It("should return an error", func() { + _, _, err := maglev.GetInstanceForHashHeader("test-key") + Expect(err).To(HaveOccurred()) + }) + }) + + Context("when backends are added", func() { + BeforeEach(func() { + maglev.Add("backend1") + maglev.Add("backend2") + }) + + It("should return consistent results for the same key", func() { + var counter = make(map[uint64]int) + var lookupTableIndex uint64 + var err error + for range 100 { + lookupTableIndex, _, err = maglev.GetInstanceForHashHeader("consistent-key") + Expect(err).NotTo(HaveOccurred()) + counter[lookupTableIndex]++ + } + + Expect(counter[lookupTableIndex]).To(Equal(100)) + }) + }) + }) + + Describe("GetEndpointId", func() { + Context("when backends are added", func() { + BeforeEach(func() { + maglev.Add("app_instance_1") + maglev.Add("app_instance_2") + }) + + It("should return consistent results for the same key", func() { + var counter = make(map[string]int) + var endpointID string + for range 100 { + lookupTableIndex, _, err := maglev.GetInstanceForHashHeader("consistent-key") + Expect(err).NotTo(HaveOccurred()) + endpointID = maglev.GetEndpointId(lookupTableIndex) + Expect(err).NotTo(HaveOccurred()) + counter[endpointID]++ + } + + Expect(counter[endpointID]).To(Equal(100)) + }) + + It("should distribute keys across backends", func() { + maglev.Add("app_instance_1") + maglev.Add("app_instance_2") + maglev.Add("app_instance_3") + + distribution := make(map[string]int) + for i := range 1000 { + lookupTableIndex, _, err := maglev.GetInstanceForHashHeader(string(rune(i))) + Expect(err).NotTo(HaveOccurred()) + endpointID := maglev.GetEndpointId(lookupTableIndex) + Expect(err).NotTo(HaveOccurred()) + distribution[endpointID]++ + } + + Expect(distribution["app_instance_1"]).To(BeNumerically(">", 0)) + Expect(distribution["app_instance_2"]).To(BeNumerically(">", 0)) + Expect(distribution["app_instance_3"]).To(BeNumerically(">", 0)) + }) + }) + + Context("when backends are removed", func() { + BeforeEach(func() { + maglev.Add("app_instance_1") + maglev.Add("app_instance_2") + maglev.Remove("app_instance_1") + }) + + It("should not return the removed backend", func() { + for i := range 1000 { + lookupTableIndex, _, err := maglev.GetInstanceForHashHeader(string(rune(i))) + Expect(err).NotTo(HaveOccurred()) + endpointID := maglev.GetEndpointId(lookupTableIndex) + Expect(endpointID).To(Equal("app_instance_2")) + } + }) + }) + }) + + Describe("Consistency", func() { + // We test that at most half the keys are reassigned to new backends, when one backend is added. + // This ensures a minimal level of consistency. + It("should minimize disruption when adding backends", func() { + for i := range 10 { + maglev.Add(fmt.Sprintf("backend%d", i+1)) + } + keys := make([]string, 1000) + for i := range keys { + keys[i] = fmt.Sprintf("key%d", i+1) + } + + initialMappings := make(map[string]string) + + for _, key := range keys { + _, backend, err := maglev.GetInstanceForHashHeader(key) + Expect(err).NotTo(HaveOccurred()) + initialMappings[key] = backend + } + + maglev.Add("newbackend") + + changedMappings := 0 + for _, key := range keys { + _, backend, err := maglev.GetInstanceForHashHeader(key) + Expect(err).NotTo(HaveOccurred()) + if initialMappings[key] != backend { + changedMappings++ + } + } + + Expect(changedMappings).To(BeNumerically("<=", len(keys)/2)) + }) + }) + + Describe("Concurrency", func() { + It("should handle concurrent reads safely", func() { + maglev.Add("backend1") + + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer GinkgoRecover() + for j := 0; j < 100; j++ { + _, _, err := maglev.GetInstanceForHashHeader("test-key") + Expect(err).NotTo(HaveOccurred()) + } + done <- true + }() + } + + for i := 0; i < 10; i++ { + Eventually(done).Should(Receive()) + } + }) + It("should handle concurrent endpoint registrations safely", func() { + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer GinkgoRecover() + for j := 0; j < 100; j++ { + Expect(func() { maglev.Add("endpoint" + strconv.Itoa(j)) }).NotTo(Panic()) + } + done <- true + }() + } + + for i := 0; i < 10; i++ { + Eventually(done).Should(Receive()) + } + Expect(len(maglev.GetEndpointList())).To(Equal(100)) + }) + + }) +}) diff --git a/src/code.cloudfoundry.org/gorouter/route/pool.go b/src/code.cloudfoundry.org/gorouter/route/pool.go index f089fc15b..a044e4905 100644 --- a/src/code.cloudfoundry.org/gorouter/route/pool.go +++ b/src/code.cloudfoundry.org/gorouter/route/pool.go @@ -1,6 +1,7 @@ package route import ( + "context" "encoding/json" "fmt" "log/slog" @@ -74,6 +75,28 @@ type ProxyRoundTripper interface { CancelRequest(*http.Request) } +type RoutingProperties struct { + RequestHeaders *http.Header + LocallyOptimistic bool + GlobalLB string + AZ string +} + +type HashRoutingProperties struct { + Header string + BalanceFactor float64 +} + +func (hrp *HashRoutingProperties) Equal(hrp2 *HashRoutingProperties) bool { + if hrp == nil && hrp2 == nil { + return true + } + if hrp == nil || hrp2 == nil { + return false + } + return hrp.Header == hrp2.Header && hrp.BalanceFactor == hrp2.BalanceFactor +} + type Endpoint struct { ApplicationId string AvailabilityZone string @@ -186,6 +209,8 @@ type EndpointPool struct { logger *slog.Logger updatedAt time.Time LoadBalancingAlgorithm string + HashRoutingProperties *HashRoutingProperties + HashLookupTable MaglevLookup } type EndpointOpts struct { @@ -248,10 +273,12 @@ type PoolOpts struct { MaxConnsPerBackend int64 Logger *slog.Logger LoadBalancingAlgorithm string + HashHeader string + HashBalanceFactor float64 } func NewPool(opts *PoolOpts) *EndpointPool { - return &EndpointPool{ + pool := &EndpointPool{ endpoints: make([]*endpointElem, 0, 1), index: make(map[string]*endpointElem), retryAfterFailure: opts.RetryAfterFailure, @@ -264,6 +291,14 @@ func NewPool(opts *PoolOpts) *EndpointPool { updatedAt: time.Now(), LoadBalancingAlgorithm: opts.LoadBalancingAlgorithm, } + if pool.LoadBalancingAlgorithm == config.LOAD_BALANCE_HB { + pool.HashLookupTable = NewMaglev(opts.Logger) + pool.HashRoutingProperties = &HashRoutingProperties{ + Header: opts.HashHeader, + BalanceFactor: opts.HashBalanceFactor, + } + } + return pool } func PoolsMatch(p1, p2 *EndpointPool) bool { @@ -336,6 +371,9 @@ func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult { p.RouteSvcUrl = e.endpoint.RouteServiceUrl p.setPoolLoadBalancingAlgorithm(e.endpoint) e.updated = time.Now() + if p.LoadBalancingAlgorithm == config.LOAD_BALANCE_HB { + p.HashLookupTable.Add(e.endpoint.PrivateInstanceId) + } p.Update() return EndpointUpdated @@ -348,7 +386,6 @@ func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult { updated: time.Now(), maxConnsPerBackend: p.maxConnsPerBackend, } - p.endpoints = append(p.endpoints, e) p.index[endpoint.CanonicalAddr()] = e @@ -356,6 +393,9 @@ func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult { p.RouteSvcUrl = e.endpoint.RouteServiceUrl p.setPoolLoadBalancingAlgorithm(e.endpoint) + if p.LoadBalancingAlgorithm == config.LOAD_BALANCE_HB { + p.HashLookupTable.Add(e.endpoint.PrivateInstanceId) + } p.Update() return EndpointAdded @@ -433,23 +473,55 @@ func (p *EndpointPool) removeEndpoint(e *endpointElem) { delete(p.index, e.endpoint.CanonicalAddr()) delete(p.index, e.endpoint.PrivateInstanceId) p.Update() + + if p.LoadBalancingAlgorithm == config.LOAD_BALANCE_HB { + p.HashLookupTable.Remove(e.endpoint.PrivateInstanceId) + } + } -func (p *EndpointPool) Endpoints(logger *slog.Logger, initial string, mustBeSticky bool, azPreference string, az string) EndpointIterator { - switch p.LoadBalancingAlgorithm { +func (p *EndpointPool) Endpoints(logger *slog.Logger, initial string, mustBeSticky bool, routingProps RoutingProperties) EndpointIterator { + lbAlgo := p.LoadBalancingAlgorithm + // Handle hash-based routing as special case + if lbAlgo == config.LOAD_BALANCE_HB { + headerValue := p.GetValidHashHeaderValue(routingProps.RequestHeaders, logger) + if headerValue != "" { + return NewHashBased(logger, p, initial, mustBeSticky, headerValue) + } + lbAlgo = routingProps.GlobalLB + } + + switch lbAlgo { case config.LOAD_BALANCE_LC: - logger.Debug("endpoint-iterator-with-least-connection-lb-algo") - return NewLeastConnection(logger, p, initial, mustBeSticky, azPreference == config.AZ_PREF_LOCAL, az) + logDebugIfEnabled(logger, "endpoint-iterator-with-least-connection-lb-algo") + return NewLeastConnection(logger, p, initial, mustBeSticky, routingProps.LocallyOptimistic, routingProps.AZ) case config.LOAD_BALANCE_RR: - logger.Debug("endpoint-iterator-with-round-robin-lb-algo") - return NewRoundRobin(logger, p, initial, mustBeSticky, azPreference == config.AZ_PREF_LOCAL, az) + logDebugIfEnabled(logger, "endpoint-iterator-with-round-robin-lb-algo") + return NewRoundRobin(logger, p, initial, mustBeSticky, routingProps.LocallyOptimistic, routingProps.AZ) default: logger.Error("invalid-pool-load-balancing-algorithm", - slog.String("poolLBAlgorithm", p.LoadBalancingAlgorithm), + slog.String("poolLBAlgorithm", lbAlgo), + slog.String("Host", p.host), + slog.String("Path", p.contextPath)) + logDebugIfEnabled(logger, "endpoint-iterator-with-round-robin-lb-algo") + return NewRoundRobin(logger, p, initial, mustBeSticky, routingProps.LocallyOptimistic, routingProps.AZ) + } +} + +func (p *EndpointPool) GetValidHashHeaderValue(header *http.Header, logger *slog.Logger) string { + if p.HashRoutingProperties == nil || p.HashRoutingProperties.Header == "" { + logger.Error("hash-routing-properties-missing", slog.String("host", p.Host())) + return "" + } + + hashHeader := header.Get(p.HashRoutingProperties.Header) + if hashHeader == "" { + logger.Info("hash-based-routing-header-value-not-found", slog.String("Host", p.host), slog.String("Path", p.contextPath)) - return NewRoundRobin(logger, p, initial, mustBeSticky, azPreference == config.AZ_PREF_LOCAL, az) + return "" } + return hashHeader } func (p *EndpointPool) NumEndpoints() int { @@ -464,6 +536,53 @@ func (p *EndpointPool) findById(id string) *endpointElem { return p.index[id] } +// FindStickyEndpoint attempts to find and return a sticky session endpoint. +// If the endpoint is found and not overloaded, it returns the endpoint. +// If mustBeSticky is true and the endpoint is missing or overloaded, it returns nil. +// If mustBeSticky is false and the endpoint is missing or overloaded, it clears the stickyEndpointID and returns nil. +// The stickyEndpointID pointer is modified in place when the endpoint is not sticky. +func (p *EndpointPool) FindStickyEndpoint(logger *slog.Logger, stickyEndpointID *string, mustBeSticky bool) *Endpoint { + if *stickyEndpointID == "" { + return nil + } + + var e *endpointElem + e = p.findById(*stickyEndpointID) + if e != nil && e.isOverloaded() { + if mustBeSticky { + logDebugIfEnabled(logger, "endpoint-overloaded-but-request-must-be-sticky", e.endpoint.ToLogData()...) + return nil + } + e = nil + } + + if e == nil && mustBeSticky { + logDebugIfEnabled(logger, "endpoint-missing-but-request-must-be-sticky", slog.String("requested-endpoint", *stickyEndpointID)) + return nil + } + + if !mustBeSticky { + if e == nil { + logDebugIfEnabled(logger, "endpoint-missing-choosing-alternate", slog.String("requested-endpoint", *stickyEndpointID)) + } + *stickyEndpointID = "" + } + + if e != nil { + e.RLock() + defer e.RUnlock() + return e.endpoint + } + return nil +} + +// logDebugIfEnabled logs a debug message only if debug level is enabled +func logDebugIfEnabled(logger *slog.Logger, msg string, args ...any) { + if logger.Enabled(context.Background(), slog.LevelDebug) { + logger.Debug(msg, args...) + } +} + func (p *EndpointPool) IsEmpty() bool { p.Lock() l := len(p.endpoints) @@ -561,18 +680,42 @@ func (p *EndpointPool) MarshalJSON() ([]byte, error) { // setPoolLoadBalancingAlgorithm overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid. func (p *EndpointPool) setPoolLoadBalancingAlgorithm(endpoint *Endpoint) { - if len(endpoint.LoadBalancingAlgorithm) > 0 && endpoint.LoadBalancingAlgorithm != p.LoadBalancingAlgorithm { + if endpoint.LoadBalancingAlgorithm == "" { + return + } + + if endpoint.LoadBalancingAlgorithm != p.LoadBalancingAlgorithm { if config.IsLoadBalancingAlgorithmValid(endpoint.LoadBalancingAlgorithm) { p.LoadBalancingAlgorithm = endpoint.LoadBalancingAlgorithm - p.logger.Debug("setting-pool-load-balancing-algorithm-to-that-of-an-endpoint", + logDebugIfEnabled(p.logger, "setting-pool-load-balancing-algorithm-to-that-of-an-endpoint", slog.String("endpointLBAlgorithm", endpoint.LoadBalancingAlgorithm), slog.String("poolLBAlgorithm", p.LoadBalancingAlgorithm)) + } else { p.logger.Error("invalid-endpoint-load-balancing-algorithm-provided-keeping-pool-lb-algo", slog.String("endpointLBAlgorithm", endpoint.LoadBalancingAlgorithm), slog.String("poolLBAlgorithm", p.LoadBalancingAlgorithm)) } } + p.prepareHashBasedRouting(endpoint) +} + +func (p *EndpointPool) prepareHashBasedRouting(endpoint *Endpoint) { + if p.LoadBalancingAlgorithm != config.LOAD_BALANCE_HB { + return + } + if p.HashLookupTable == nil { + p.HashLookupTable = NewMaglev(p.logger) + } + + newProps := &HashRoutingProperties{ + Header: endpoint.HashHeaderName, + BalanceFactor: endpoint.HashBalanceFactor, + } + + if p.HashRoutingProperties == nil || !p.HashRoutingProperties.Equal(newProps) { + p.HashRoutingProperties = newProps + } } func (e *endpointElem) failed() { @@ -602,7 +745,7 @@ func (e *Endpoint) MarshalJSON() ([]byte, error) { ServerCertDomainSAN string `json:"server_cert_domain_san,omitempty"` LoadBalancingAlgorithm string `json:"load_balancing_algorithm,omitempty"` HashHeader string `json:"hash_header,omitempty"` - HashBalance float64 `json:"hash_balance,omitempty"` + HashBalance float64 `json:"hash_balance,omitempty,string"` } jsonObj.Address = e.addr diff --git a/src/code.cloudfoundry.org/gorouter/route/pool_test.go b/src/code.cloudfoundry.org/gorouter/route/pool_test.go index 31da6c8d7..bba5a20eb 100644 --- a/src/code.cloudfoundry.org/gorouter/route/pool_test.go +++ b/src/code.cloudfoundry.org/gorouter/route/pool_test.go @@ -198,8 +198,8 @@ var _ = Describe("EndpointPool", func() { Context("Put", func() { var ( - az = "meow-zone" - azPreference = "none" + az = "meow-zone" + locallyOptimistic = false ) It("adds endpoints", func() { @@ -233,6 +233,7 @@ var _ = Describe("EndpointPool", func() { Context("with modification tags", func() { var modTag models.ModificationTag var modTag2 models.ModificationTag + var routingProps route.RoutingProperties BeforeEach(func() { modTag = models.ModificationTag{} @@ -240,13 +241,19 @@ var _ = Describe("EndpointPool", func() { endpoint1 := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, ModificationTag: modTag}) Expect(pool.Put(endpoint1)).To(Equal(route.EndpointAdded)) + + routingProps = route.RoutingProperties{ + LocallyOptimistic: locallyOptimistic, + GlobalLB: config.LOAD_BALANCE_RR, + AZ: az, + } }) It("updates an endpoint with modification tag", func() { endpoint := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, ModificationTag: modTag2}) Expect(pool.Put(endpoint)).To(Equal(route.EndpointUpdated)) - Expect(pool.Endpoints(logger.Logger, "", false, azPreference, az).Next(0).ModificationTag).To(Equal(modTag2)) + Expect(pool.Endpoints(logger.Logger, "", false, routingProps).Next(0).ModificationTag).To(Equal(modTag2)) }) Context("when modification_tag is older", func() { @@ -261,7 +268,7 @@ var _ = Describe("EndpointPool", func() { endpoint := route.NewEndpoint(&route.EndpointOpts{Host: "1.2.3.4", Port: 5678, ModificationTag: olderModTag}) Expect(pool.Put(endpoint)).To(Equal(route.EndpointUnmodified)) - Expect(pool.Endpoints(logger.Logger, "", false, azPreference, az).Next(0).ModificationTag).To(Equal(modTag2)) + Expect(pool.Endpoints(logger.Logger, "", false, routingProps).Next(0).ModificationTag).To(Equal(modTag2)) }) }) }) @@ -297,7 +304,18 @@ var _ = Describe("EndpointPool", func() { }) }) Context("Customizable Per Route Load Balancing", func() { + var ( + locallyOptimistic = false + routingProps route.RoutingProperties + ) + BeforeEach(func() { + routingProps = route.RoutingProperties{ + LocallyOptimistic: locallyOptimistic, + GlobalLB: config.LOAD_BALANCE_RR, + AZ: "az", + } + }) Context("Load Balancing Algorithm of a pool", func() { It("has a value specified in the pool options", func() { poolWithLBAlgo := route.NewPool(&route.PoolOpts{ @@ -312,7 +330,7 @@ var _ = Describe("EndpointPool", func() { Logger: logger.Logger, LoadBalancingAlgorithm: "wrong-lb-algo", }) - iterator := poolWithLBAlgo2.Endpoints(logger.Logger, "", false, "none", "zone") + iterator := poolWithLBAlgo2.Endpoints(logger.Logger, "", false, routingProps) Expect(iterator).To(BeAssignableToTypeOf(&route.RoundRobin{})) Eventually(logger).Should(gbytes.Say(`invalid-pool-load-balancing-algorithm`)) }) @@ -322,7 +340,7 @@ var _ = Describe("EndpointPool", func() { Logger: logger.Logger, LoadBalancingAlgorithm: config.LOAD_BALANCE_LC, }) - iterator := poolWithLBAlgoLC.Endpoints(logger.Logger, "", false, "none", "az") + iterator := poolWithLBAlgoLC.Endpoints(logger.Logger, "", false, routingProps) Expect(iterator).To(BeAssignableToTypeOf(&route.LeastConnection{})) Eventually(logger).Should(gbytes.Say(`endpoint-iterator-with-least-connection-lb-algo`)) }) @@ -332,7 +350,7 @@ var _ = Describe("EndpointPool", func() { Logger: logger.Logger, LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, }) - iterator := poolWithLBAlgoLC.Endpoints(logger.Logger, "", false, "none", "az") + iterator := poolWithLBAlgoLC.Endpoints(logger.Logger, "", false, routingProps) Expect(iterator).To(BeAssignableToTypeOf(&route.RoundRobin{})) Eventually(logger).Should(gbytes.Say(`endpoint-iterator-with-round-robin-lb-algo`)) }) @@ -428,6 +446,46 @@ var _ = Describe("EndpointPool", func() { Expect(pool.LoadBalancingAlgorithm).To(Equal(config.LOAD_BALANCE_RR)) }) }) + + Context("When switching to hash-based routing", func() { + It("will create the maglev table and add the endpoint", func() { + pool := route.NewPool(&route.PoolOpts{ + Logger: logger.Logger, + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + }) + + endpointOpts := route.EndpointOpts{ + Host: "host-1", + Port: 1234, + RouteServiceUrl: "url", + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + } + + initalEndpoint := route.NewEndpoint(&endpointOpts) + + pool.Put(initalEndpoint) + Expect(pool.LoadBalancingAlgorithm).To(Equal(config.LOAD_BALANCE_RR)) + + endpointOptsHash := route.EndpointOpts{ + Host: "host-1", + Port: 1234, + RouteServiceUrl: "url", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + HashBalanceFactor: 1.25, + HashHeaderName: "X-Tenant", + } + + hashEndpoint := route.NewEndpoint(&endpointOptsHash) + + pool.Put(hashEndpoint) + Expect(pool.LoadBalancingAlgorithm).To(Equal(config.LOAD_BALANCE_HB)) + Expect(pool.HashLookupTable).ToNot(BeNil()) + Expect(pool.HashLookupTable.GetEndpointList()).To(HaveLen(1)) + Expect(pool.HashLookupTable.GetEndpointList()[0]).To(Equal(hashEndpoint.PrivateInstanceId)) + }) + + }) + }) Context("RouteServiceUrl", func() { @@ -497,10 +555,15 @@ var _ = Describe("EndpointPool", func() { Context("when a read connection is reset", func() { It("marks the endpoint as failed", func() { az := "meow-zone" - azPreference := "none" + locallyOptimistic := false + routingProps := route.RoutingProperties{ + LocallyOptimistic: locallyOptimistic, + GlobalLB: config.LOAD_BALANCE_RR, + AZ: az, + } connectionResetError := &net.OpError{Op: "read", Err: errors.New("read: connection reset by peer")} pool.EndpointFailed(failedEndpoint, connectionResetError) - i := pool.Endpoints(logger.Logger, "", false, azPreference, az) + i := pool.Endpoints(logger.Logger, "", false, routingProps) epOne := i.Next(0) epTwo := i.Next(1) Expect(epOne).To(Equal(epTwo)) @@ -927,7 +990,7 @@ var _ = Describe("EndpointPool", func() { json, err := pool.MarshalJSON() Expect(err).ToNot(HaveOccurred()) - Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","availability_zone":"az-meow","protocol":"http1","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":null},{"address":"5.6.7.8:5678","availability_zone":"","protocol":"http2","tls":true,"ttl":-1,"tags":null,"private_instance_id":"pvt_test_instance_id","server_cert_domain_san":"pvt_test_san","load_balancing_algorithm":"hash","hash_header":"X-Header","hash_balance":1.25}]`)) + Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","availability_zone":"az-meow","protocol":"http1","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":null},{"address":"5.6.7.8:5678","availability_zone":"","protocol":"http2","tls":true,"ttl":-1,"tags":null,"private_instance_id":"pvt_test_instance_id","server_cert_domain_san":"pvt_test_san","load_balancing_algorithm":"hash","hash_header":"X-Header","hash_balance":"1.25"}]`)) }) Context("when endpoints do not have empty tags", func() { diff --git a/src/code.cloudfoundry.org/gorouter/route/roundrobin.go b/src/code.cloudfoundry.org/gorouter/route/roundrobin.go index 9af2735a3..f9820fa0f 100644 --- a/src/code.cloudfoundry.org/gorouter/route/roundrobin.go +++ b/src/code.cloudfoundry.org/gorouter/route/roundrobin.go @@ -1,7 +1,6 @@ package route import ( - "context" "log/slog" "sync" "time" @@ -38,43 +37,22 @@ func (r *RoundRobin) Next(attempt int) *Endpoint { r.lock.Lock() defer r.lock.Unlock() - var e *endpointElem - if r.initialEndpoint != "" { - e = r.pool.findById(r.initialEndpoint) - if e != nil && e.isOverloaded() { - if r.mustBeSticky { - if r.logger.Enabled(context.Background(), slog.LevelDebug) { - r.logger.Debug("endpoint-overloaded-but-request-must-be-sticky", e.endpoint.ToLogData()...) - } - return nil - } - e = nil - } - - if e == nil && r.mustBeSticky { - r.logger.Debug("endpoint-missing-but-request-must-be-sticky", slog.String("requested-endpoint", r.initialEndpoint)) - return nil - } - - if !r.mustBeSticky { - r.logger.Debug("endpoint-missing-choosing-alternate", slog.String("requested-endpoint", r.initialEndpoint)) - r.initialEndpoint = "" - } + e := r.pool.FindStickyEndpoint(r.logger, &r.initialEndpoint, r.mustBeSticky) + if e != nil { + r.lastEndpoint = e + return e } - if e != nil { - e.RLock() - defer e.RUnlock() - r.lastEndpoint = e.endpoint - return e.endpoint + if r.mustBeSticky { + return nil } - e = r.next(attempt) - if e != nil { - e.RLock() - defer e.RUnlock() - r.lastEndpoint = e.endpoint - return e.endpoint + endpointElem := r.next(attempt) + if endpointElem != nil { + endpointElem.RLock() + defer endpointElem.RUnlock() + r.lastEndpoint = endpointElem.endpoint + return endpointElem.endpoint } r.lastEndpoint = nil diff --git a/src/code.cloudfoundry.org/route-registrar/config/config.go b/src/code.cloudfoundry.org/route-registrar/config/config.go index 63eb01101..1c130de25 100644 --- a/src/code.cloudfoundry.org/route-registrar/config/config.go +++ b/src/code.cloudfoundry.org/route-registrar/config/config.go @@ -75,6 +75,8 @@ type RouteSchema struct { type Options struct { LoadBalancingAlgorithm LoadBalancingAlgorithm `json:"loadbalancing,omitempty" yaml:"loadbalancing,omitempty"` + HashHeader string `json:"hash_header,omitempty" yaml:"hash_header,omitempty"` + HashBalance float64 `json:"hash_balance,omitempty,string" yaml:"hash_balance,omitempty"` } type LoadBalancingAlgorithm string