From 9b59b62a9994849f66b0915a5c5080f3be57d21f Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 25 Oct 2023 15:53:50 +0100 Subject: [PATCH 1/2] fix: garbage collect node handlers --- dht.go | 5 + internal/coord/behaviour.go | 51 +++- internal/coord/coordinator.go | 14 +- internal/coord/network.go | 342 +++++++++++++++++++---- internal/coord/network_test.go | 40 +++ internal/coord/routing.go | 4 + internal/coord/routing/bootstrap.go | 12 + internal/coord/routing/bootstrap_test.go | 6 + internal/coord/routing/explore.go | 12 + internal/coord/routing/explore_test.go | 6 + internal/coord/routing/include.go | 12 + internal/coord/routing/include_test.go | 6 + internal/coord/routing/probe.go | 14 +- internal/coord/routing/probe_test.go | 6 + 14 files changed, 452 insertions(+), 78 deletions(-) create mode 100644 internal/coord/network_test.go diff --git a/dht.go b/dht.go index 29dd49c..9d9322e 100644 --- a/dht.go +++ b/dht.go @@ -129,6 +129,11 @@ func New(h host.Host, cfg *Config) (*DHT, error) { coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName) + coordCfg.Network.Clock = cfg.Clock + coordCfg.Network.Logger = cfg.Logger.With("behaviour", "network") + coordCfg.Network.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) + coordCfg.Network.Meter = cfg.MeterProvider.Meter(tele.MeterName) + rtr := &router{ host: h, protocolID: cfg.ProtocolID, diff --git a/internal/coord/behaviour.go b/internal/coord/behaviour.go index 28819aa..ac268e9 100644 --- a/internal/coord/behaviour.go +++ b/internal/coord/behaviour.go @@ -44,15 +44,17 @@ type WorkQueueFunc[E BehaviourEvent] func(context.Context, E) bool // WorkQueueFunc for each work item, passing the original context // and event. type WorkQueue[E BehaviourEvent] struct { - pending chan CtxEvent[E] - fn WorkQueueFunc[E] - done atomic.Bool - once sync.Once + pending chan CtxEvent[E] + fn WorkQueueFunc[E] + done chan struct{} + startOnce sync.Once + closeOnce sync.Once } func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] { w := &WorkQueue[E]{ pending: make(chan CtxEvent[E], 1), + done: make(chan struct{}), fn: fn, } return w @@ -70,19 +72,33 @@ type CtxEvent[E any] struct { // blocking it will return a context cancellation error if the work // item's context is cancelled. func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error { - if w.done.Load() { + select { + case <-w.done: + // the queue is not accepting any more work return nil + default: } - w.once.Do(func() { + + w.startOnce.Do(func() { go func() { - defer w.done.Store(true) - for cc := range w.pending { - if cc.Ctx.Err() != nil { - return - } - if done := w.fn(cc.Ctx, cc.Event); done { - w.done.Store(true) - return + for { + select { + case <-w.done: + // drain pending then exit the goroutine + for { + select { + case cc := <-w.pending: + if done := w.fn(cc.Ctx, cc.Event); done { + return + } + default: + return + } + } + case cc := <-w.pending: + if done := w.fn(cc.Ctx, cc.Event); done { + return + } } } }() @@ -96,10 +112,15 @@ func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error { Event: cmd, }: return nil - } } +func (w *WorkQueue[E]) Close() { + w.closeOnce.Do(func() { + close(w.done) + }) +} + // A Waiter is a Notifiee whose Notify method forwards the // notified event to a channel which a client can wait on. type Waiter[E BehaviourEvent] struct { diff --git a/internal/coord/coordinator.go b/internal/coord/coordinator.go index 760384f..2802410 100644 --- a/internal/coord/coordinator.go +++ b/internal/coord/coordinator.go @@ -92,6 +92,9 @@ type CoordinatorConfig struct { // Routing is the configuration used for the [RoutingBehaviour] which maintains the health of the routing table. Routing RoutingConfig + // Network is the configuration used for the [NetworkBehaviour] which routes messages to peer nodes. + Network NetworkConfig + // Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries. Query QueryConfig } @@ -149,6 +152,12 @@ func DefaultCoordinatorConfig() *CoordinatorConfig { cfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) cfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName) + cfg.Network = *DefaultNetworkConfig() + cfg.Network.Clock = cfg.Clock + cfg.Network.Logger = cfg.Logger.With("behaviour", "network") + cfg.Network.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) + cfg.Network.Meter = cfg.MeterProvider.Meter(tele.MeterName) + return cfg } @@ -175,7 +184,10 @@ func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, * return nil, fmt.Errorf("routing behaviour: %w", err) } - networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer) + networkBehaviour, err := NewNetworkBehaviour(rtr, &cfg.Network) + if err != nil { + return nil, fmt.Errorf("network behaviour: %w", err) + } b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil) if err != nil { diff --git a/internal/coord/network.go b/internal/coord/network.go index 4dc81a7..6597ed1 100644 --- a/internal/coord/network.go +++ b/internal/coord/network.go @@ -1,117 +1,281 @@ package coord import ( + "container/heap" "context" "fmt" "sync" + "sync/atomic" + "time" + "github.com/benbjohnson/clock" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/slog" + "github.com/plprobelab/zikade/errs" "github.com/plprobelab/zikade/internal/coord/coordt" "github.com/plprobelab/zikade/kadt" "github.com/plprobelab/zikade/pb" + "github.com/plprobelab/zikade/tele" ) +type NetworkConfig struct { + // Clock is a clock that may replaced by a mock when testing + Clock clock.Clock + + // Logger is a structured logger that will be used when logging. + Logger *slog.Logger + + // Tracer is the tracer that should be used to trace execution. + Tracer trace.Tracer + + // Meter is the meter that should be used to record metrics. + Meter metric.Meter +} + +// Validate checks the configuration options and returns an error if any have invalid values. +func (cfg *NetworkConfig) Validate() error { + if cfg.Clock == nil { + return &errs.ConfigurationError{ + Component: "NetworkConfig", + Err: fmt.Errorf("clock must not be nil"), + } + } + + if cfg.Logger == nil { + return &errs.ConfigurationError{ + Component: "NetworkConfig", + Err: fmt.Errorf("logger must not be nil"), + } + } + + if cfg.Tracer == nil { + return &errs.ConfigurationError{ + Component: "NetworkConfig", + Err: fmt.Errorf("tracer must not be nil"), + } + } + + if cfg.Meter == nil { + return &errs.ConfigurationError{ + Component: "NetworkConfig", + Err: fmt.Errorf("meter must not be nil"), + } + } + + return nil +} + +func DefaultNetworkConfig() *NetworkConfig { + return &NetworkConfig{ + Clock: clock.New(), + Logger: tele.DefaultLogger("coord"), + Tracer: tele.NoopTracer(), + Meter: tele.NoopMeter(), + } +} + type NetworkBehaviour struct { + // cfg is a copy of the optional configuration supplied to the behaviour + cfg NetworkConfig + + // performMu is held while Perform is executing to ensure sequential execution of work. + performMu sync.Mutex + // rtr is the message router used to send messages + // it must only be accessed while performMu is held rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] - nodeHandlersMu sync.Mutex - nodeHandlers map[kadt.PeerID]*NodeHandler // TODO: garbage collect node handlers + // pendingInboundMu guards access to pendingInbound + pendingInboundMu sync.Mutex + + // pendingInbound is a queue of inbound events that are awaiting processing + pendingInbound []CtxEvent[BehaviourEvent] - pendingMu sync.Mutex - pending []BehaviourEvent - ready chan struct{} + // nodeHandlers is a map of NodeHandler by peer id + // it must only be accessed while performMu is held + nodeHandlers map[kadt.PeerID]*nodeHandlerEntry - logger *slog.Logger - tracer trace.Tracer + // lastActive is a list of node handlers ordered by the time the node handler was last active. + // it must only be accessed while performMu is held + lastActive *lastActiveNodeHandlerList + + // gaugeNodeHandlerCount is a gauge that tracks the number of node handlers that are currently active. + gaugeNodeHandlerCount metric.Int64UpDownCounter + + ready chan struct{} } -func NewNetworkBehaviour(rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NetworkBehaviour { +func NewNetworkBehaviour(rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], cfg *NetworkConfig) (*NetworkBehaviour, error) { + if cfg == nil { + cfg = DefaultNetworkConfig() + } else if err := cfg.Validate(); err != nil { + return nil, err + } + b := &NetworkBehaviour{ + cfg: *cfg, rtr: rtr, - nodeHandlers: make(map[kadt.PeerID]*NodeHandler), + nodeHandlers: make(map[kadt.PeerID]*nodeHandlerEntry), + lastActive: new(lastActiveNodeHandlerList), ready: make(chan struct{}, 1), - logger: logger.With("behaviour", "network"), - tracer: tracer, } - return b + // initialise metrics + var err error + b.gaugeNodeHandlerCount, err = cfg.Meter.Int64UpDownCounter( + "node_handler_count", + metric.WithDescription("Total number of node handlers currently active"), + ) + if err != nil { + return nil, fmt.Errorf("create node_handler_count counter: %w", err) + } + + return b, nil } func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { - ctx, span := b.tracer.Start(ctx, "NetworkBehaviour.Notify") + b.pendingInboundMu.Lock() + defer b.pendingInboundMu.Unlock() + + ctx, span := b.cfg.Tracer.Start(ctx, "NetworkBehaviour.Notify") defer span.End() - b.pendingMu.Lock() - defer b.pendingMu.Unlock() + b.pendingInbound = append(b.pendingInbound, CtxEvent[BehaviourEvent]{Ctx: ctx, Event: ev}) - switch ev := ev.(type) { - case *EventOutboundGetCloserNodes: - b.nodeHandlersMu.Lock() - nh, ok := b.nodeHandlers[ev.To] - if !ok { - nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer) - b.nodeHandlers[ev.To] = nh - } - b.nodeHandlersMu.Unlock() - nh.Notify(ctx, ev) - case *EventOutboundSendMessage: - b.nodeHandlersMu.Lock() - nh, ok := b.nodeHandlers[ev.To] - if !ok { - nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer) - b.nodeHandlers[ev.To] = nh - } - b.nodeHandlersMu.Unlock() - nh.Notify(ctx, ev) + select { + case b.ready <- struct{}{}: default: - panic(fmt.Sprintf("unexpected dht event: %T", ev)) } +} + +func (b *NetworkBehaviour) Ready() <-chan struct{} { + return b.ready +} + +func (b *NetworkBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { + b.performMu.Lock() + defer b.performMu.Unlock() + + _, span := b.cfg.Tracer.Start(ctx, "NetworkBehaviour.Perform") + defer span.End() + + defer b.updateReadyStatus() + + // perform one piece of pending inbound work. + ev, ok := b.perfomNextInbound(ctx) + if ok { + return ev, true + } + + // perform some garbage collection on idle node handlers + b.garbageCollect(ctx) + + return nil, false +} + +func (b *NetworkBehaviour) nextPendingInbound() (CtxEvent[BehaviourEvent], bool) { + b.pendingInboundMu.Lock() + defer b.pendingInboundMu.Unlock() + if len(b.pendingInbound) == 0 { + return CtxEvent[BehaviourEvent]{}, false + } + var pev CtxEvent[BehaviourEvent] + pev, b.pendingInbound = b.pendingInbound[0], b.pendingInbound[1:] + return pev, true +} + +func (b *NetworkBehaviour) updateReadyStatus() { + b.pendingInboundMu.Lock() + hasPendingInbound := len(b.pendingInbound) != 0 + b.pendingInboundMu.Unlock() - if len(b.pending) > 0 { + if hasPendingInbound { select { case b.ready <- struct{}{}: default: } + return } } -func (b *NetworkBehaviour) Ready() <-chan struct{} { - return b.ready +func (b *NetworkBehaviour) perfomNextInbound(ctx context.Context) (BehaviourEvent, bool) { + ctx, span := b.cfg.Tracer.Start(ctx, "NetworkBehaviour.perfomNextInbound") + defer span.End() + pev, ok := b.nextPendingInbound() + if !ok { + return nil, false + } + + switch ev := pev.Event.(type) { + case *EventOutboundGetCloserNodes: + b.notifyNodeHandler(ctx, ev.To, ev) + case *EventOutboundSendMessage: + b.notifyNodeHandler(ctx, ev.To, ev) + default: + panic(fmt.Sprintf("unexpected event: %T", ev)) + } + + return nil, false } -func (b *NetworkBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool) { - _, span := b.tracer.Start(ctx, "NetworkBehaviour.Perform") +func (b *NetworkBehaviour) notifyNodeHandler(ctx context.Context, id kadt.PeerID, ev NodeHandlerRequest) { + ctx, span := b.cfg.Tracer.Start(ctx, "NetworkBehaviour.notifyNodeHandler") defer span.End() - // No inbound work can be done until Perform is complete - b.pendingMu.Lock() - defer b.pendingMu.Unlock() - - // drain queued events. - if len(b.pending) > 0 { - var ev BehaviourEvent - ev, b.pending = b.pending[0], b.pending[1:] - - if len(b.pending) > 0 { - select { - case b.ready <- struct{}{}: - default: - } + nhe, ok := b.nodeHandlers[id] + if !ok { + nhe = &nodeHandlerEntry{ + nh: NewNodeHandler(id, b.rtr, b.cfg.Logger, b.cfg.Tracer), + index: -1, } - return ev, true + b.nodeHandlers[id] = nhe + b.gaugeNodeHandlerCount.Add(ctx, 1) + } + if nhe.index == -1 { + heap.Push(b.lastActive, nhe) } + nhe.lastActive = b.cfg.Clock.Now() + heap.Fix(b.lastActive, nhe.index) + nhe.nh.Notify(ctx, ev) +} - return nil, false +func (b *NetworkBehaviour) garbageCollect(ctx context.Context) { + ctx, span := b.cfg.Tracer.Start(ctx, "NetworkBehaviour.garbageCollect") + defer span.End() + + if len(*b.lastActive) == 0 { + return + } + + // attempt to garbage collect the node that that has been inactive the longest + + // peek at the node handler with the oldest last notified time + nhe := (*b.lastActive)[0] + + // is the node handler still active? + if nhe.nh.IsActive() { + nhe.lastActive = b.cfg.Clock.Now() + heap.Fix(b.lastActive, nhe.index) + return + } + + b.cfg.Logger.Debug("garbage collecting node handler", "peer_id", nhe.nh.self, "last_active", nhe.lastActive) + nhe.nh.Close() + delete(b.nodeHandlers, nhe.nh.self) + b.gaugeNodeHandlerCount.Add(ctx, -1) + if nhe.index >= 0 { + heap.Remove(b.lastActive, nhe.index) + } } type NodeHandler struct { - self kadt.PeerID - rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] - queue *WorkQueue[NodeHandlerRequest] - logger *slog.Logger - tracer trace.Tracer + self kadt.PeerID + rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] + queue *WorkQueue[NodeHandlerRequest] + logger *slog.Logger + tracer trace.Tracer + sending atomic.Bool } func NewNodeHandler(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NodeHandler { @@ -133,7 +297,21 @@ func (h *NodeHandler) Notify(ctx context.Context, ev NodeHandlerRequest) { h.queue.Enqueue(ctx, ev) } +func (h *NodeHandler) IsActive() bool { + if len(h.queue.pending) > 0 { + return true + } + return h.sending.Load() +} + +func (h *NodeHandler) Close() { + h.queue.Close() +} + func (h *NodeHandler) send(ctx context.Context, ev NodeHandlerRequest) bool { + h.sending.Store(true) + defer h.sending.Store(false) + switch cmd := ev.(type) { case *EventOutboundGetCloserNodes: if cmd.Notify == nil { @@ -188,3 +366,45 @@ func (h *NodeHandler) send(ctx context.Context, ev NodeHandlerRequest) bool { func (h *NodeHandler) ID() kadt.PeerID { return h.self } + +type nodeHandlerEntry struct { + nh *NodeHandler + lastActive time.Time + index int +} + +// lastActiveNodeHandlerList is a min-heap of NodeHandlers ordered by time the node handler was last notified. +// The root node is the NodeHandler that has the earliest last notified time and has thus been +// inactive the longest. +type lastActiveNodeHandlerList []*nodeHandlerEntry + +func (o lastActiveNodeHandlerList) Len() int { return len(o) } +func (o lastActiveNodeHandlerList) Less(i, j int) bool { + return o[i].lastActive.Before(o[j].lastActive) +} + +func (o lastActiveNodeHandlerList) Swap(i, j int) { + o[i], o[j] = o[j], o[i] + o[i].index = i + o[j].index = j +} + +func (o *lastActiveNodeHandlerList) Push(x any) { + n := len(*o) + v := x.(*nodeHandlerEntry) + v.index = n + *o = append(*o, v) +} + +func (o *lastActiveNodeHandlerList) Pop() any { + if len(*o) == 0 { + return nil + } + old := *o + n := len(old) + v := old[n-1] + old[n-1] = nil + v.index = -1 + *o = old[0 : n-1] + return v +} diff --git a/internal/coord/network_test.go b/internal/coord/network_test.go new file mode 100644 index 0000000..91fca52 --- /dev/null +++ b/internal/coord/network_test.go @@ -0,0 +1,40 @@ +package coord + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNetworkConfigValidate(t *testing.T) { + t.Run("default is valid", func(t *testing.T) { + cfg := DefaultNetworkConfig() + + require.NoError(t, cfg.Validate()) + }) + + t.Run("clock is not nil", func(t *testing.T) { + cfg := DefaultNetworkConfig() + + cfg.Clock = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("logger not nil", func(t *testing.T) { + cfg := DefaultNetworkConfig() + cfg.Logger = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("tracer not nil", func(t *testing.T) { + cfg := DefaultNetworkConfig() + cfg.Tracer = nil + require.Error(t, cfg.Validate()) + }) + + t.Run("meter is not nil", func(t *testing.T) { + cfg := DefaultNetworkConfig() + cfg.Meter = nil + require.Error(t, cfg.Validate()) + }) +} diff --git a/internal/coord/routing.go b/internal/coord/routing.go index a62681a..9a6a311 100644 --- a/internal/coord/routing.go +++ b/internal/coord/routing.go @@ -333,6 +333,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, bootstrapCfg.Clock = cfg.Clock bootstrapCfg.Tracer = cfg.Tracer bootstrapCfg.Meter = cfg.Meter + bootstrapCfg.Logger = cfg.Logger.With("statemachine", "bootstrap") bootstrapCfg.Timeout = cfg.BootstrapTimeout bootstrapCfg.RequestConcurrency = cfg.BootstrapRequestConcurrency bootstrapCfg.RequestTimeout = cfg.BootstrapRequestTimeout @@ -346,6 +347,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, includeCfg.Clock = cfg.Clock includeCfg.Tracer = cfg.Tracer includeCfg.Meter = cfg.Meter + includeCfg.Logger = cfg.Logger.With("statemachine", "include") includeCfg.Timeout = cfg.ConnectivityCheckTimeout includeCfg.QueueCapacity = cfg.IncludeQueueCapacity includeCfg.Concurrency = cfg.IncludeRequestConcurrency @@ -359,6 +361,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, probeCfg.Clock = cfg.Clock probeCfg.Tracer = cfg.Tracer probeCfg.Meter = cfg.Meter + probeCfg.Logger = cfg.Logger.With("statemachine", "probe") probeCfg.Timeout = cfg.ConnectivityCheckTimeout probeCfg.Concurrency = cfg.ProbeRequestConcurrency probeCfg.CheckInterval = cfg.ProbeCheckInterval @@ -372,6 +375,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, exploreCfg.Clock = cfg.Clock exploreCfg.Tracer = cfg.Tracer exploreCfg.Meter = cfg.Meter + exploreCfg.Logger = cfg.Logger.With("statemachine", "explore") exploreCfg.Timeout = cfg.ExploreTimeout exploreCfg.RequestConcurrency = cfg.ExploreRequestConcurrency exploreCfg.RequestTimeout = cfg.ExploreRequestTimeout diff --git a/internal/coord/routing/bootstrap.go b/internal/coord/routing/bootstrap.go index 2047dce..1e1c7ba 100644 --- a/internal/coord/routing/bootstrap.go +++ b/internal/coord/routing/bootstrap.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/slog" "github.com/plprobelab/zikade/errs" "github.com/plprobelab/zikade/internal/coord/coordt" @@ -59,6 +60,9 @@ type BootstrapConfig struct { // Meter is the meter that should be used to record metrics. Meter metric.Meter + + // Logger is a structured logger that will be used when logging. + Logger *slog.Logger } // Validate checks the configuration options and returns an error if any have invalid values. @@ -105,6 +109,13 @@ func (cfg *BootstrapConfig) Validate() error { } } + if cfg.Logger == nil { + return &errs.ConfigurationError{ + Component: "BootstrapConfig", + Err: fmt.Errorf("logger must not be nil"), + } + } + return nil } @@ -115,6 +126,7 @@ func DefaultBootstrapConfig() *BootstrapConfig { Clock: clock.New(), // use standard time Tracer: tele.NoopTracer(), Meter: tele.NoopMeter(), + Logger: tele.DefaultLogger("routing"), Timeout: 5 * time.Minute, // MAGIC RequestConcurrency: 3, // MAGIC diff --git a/internal/coord/routing/bootstrap_test.go b/internal/coord/routing/bootstrap_test.go index 01c2ec9..242a2bb 100644 --- a/internal/coord/routing/bootstrap_test.go +++ b/internal/coord/routing/bootstrap_test.go @@ -59,6 +59,12 @@ func TestBootstrapConfigValidate(t *testing.T) { cfg.RequestTimeout = -1 require.Error(t, cfg.Validate()) }) + + t.Run("logger not nil", func(t *testing.T) { + cfg := DefaultBootstrapConfig() + cfg.Logger = nil + require.Error(t, cfg.Validate()) + }) } func TestBootstrapStartsIdle(t *testing.T) { diff --git a/internal/coord/routing/explore.go b/internal/coord/routing/explore.go index 62f4bed..b1522f0 100644 --- a/internal/coord/routing/explore.go +++ b/internal/coord/routing/explore.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/slog" "github.com/plprobelab/zikade/errs" "github.com/plprobelab/zikade/internal/coord/coordt" @@ -97,6 +98,9 @@ type ExploreConfig struct { // Meter is the meter that should be used to record metrics. Meter metric.Meter + // Logger is a structured logger that will be used when logging. + Logger *slog.Logger + // Timeout is maximum time to allow for performing an explore for a CPL. Timeout time.Duration @@ -148,6 +152,13 @@ func (cfg *ExploreConfig) Validate() error { } } + if cfg.Logger == nil { + return &errs.ConfigurationError{ + Component: "ExploreConfig", + Err: fmt.Errorf("logger must not be nil"), + } + } + return nil } @@ -158,6 +169,7 @@ func DefaultExploreConfig() *ExploreConfig { Clock: clock.New(), // use standard time Tracer: tele.NoopTracer(), Meter: tele.NoopMeter(), + Logger: tele.DefaultLogger("routing"), Timeout: 10 * time.Minute, // MAGIC RequestConcurrency: 3, // MAGIC diff --git a/internal/coord/routing/explore_test.go b/internal/coord/routing/explore_test.go index 191ca7e..82d5af7 100644 --- a/internal/coord/routing/explore_test.go +++ b/internal/coord/routing/explore_test.go @@ -60,6 +60,12 @@ func TestExploreConfigValidate(t *testing.T) { cfg.RequestTimeout = -1 require.Error(t, cfg.Validate()) }) + + t.Run("logger not nil", func(t *testing.T) { + cfg := DefaultExploreConfig() + cfg.Logger = nil + require.Error(t, cfg.Validate()) + }) } // maxCpl is 7 since we are using tiny 8-bit keys diff --git a/internal/coord/routing/include.go b/internal/coord/routing/include.go index 1bd4a71..52dc1d8 100644 --- a/internal/coord/routing/include.go +++ b/internal/coord/routing/include.go @@ -11,6 +11,7 @@ import ( "github.com/plprobelab/go-libdht/kad/key" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/slog" "github.com/plprobelab/zikade/errs" "github.com/plprobelab/zikade/tele" @@ -67,6 +68,9 @@ type IncludeConfig struct { // Meter is the meter that should be used to record metrics. Meter metric.Meter + + // Logger is a structured logger that will be used when logging. + Logger *slog.Logger } // Validate checks the configuration options and returns an error if any have invalid values. @@ -113,6 +117,13 @@ func (cfg *IncludeConfig) Validate() error { } } + if cfg.Logger == nil { + return &errs.ConfigurationError{ + Component: "IncludeConfig", + Err: fmt.Errorf("logger must not be nil"), + } + } + return nil } @@ -123,6 +134,7 @@ func DefaultIncludeConfig() *IncludeConfig { Clock: clock.New(), // use standard time Tracer: tele.NoopTracer(), Meter: tele.NoopMeter(), + Logger: tele.DefaultLogger("routing"), Concurrency: 3, Timeout: time.Minute, diff --git a/internal/coord/routing/include_test.go b/internal/coord/routing/include_test.go index 6c89dc8..47ffe0a 100644 --- a/internal/coord/routing/include_test.go +++ b/internal/coord/routing/include_test.go @@ -59,6 +59,12 @@ func TestIncludeConfigValidate(t *testing.T) { cfg.QueueCapacity = -1 require.Error(t, cfg.Validate()) }) + + t.Run("logger not nil", func(t *testing.T) { + cfg := DefaultIncludeConfig() + cfg.Logger = nil + require.Error(t, cfg.Validate()) + }) } func TestIncludeStartsIdle(t *testing.T) { diff --git a/internal/coord/routing/probe.go b/internal/coord/routing/probe.go index 3d31214..d8b4183 100644 --- a/internal/coord/routing/probe.go +++ b/internal/coord/routing/probe.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + "golang.org/x/exp/slog" "github.com/plprobelab/zikade/errs" "github.com/plprobelab/zikade/tele" @@ -96,6 +97,9 @@ type ProbeConfig struct { // Meter is the meter that should be used to record metrics. Meter metric.Meter + + // Logger is a structured logger that will be used when logging. + Logger *slog.Logger } // Validate checks the configuration options and returns an error if any have invalid values. @@ -121,6 +125,13 @@ func (cfg *ProbeConfig) Validate() error { } } + if cfg.Logger == nil { + return &errs.ConfigurationError{ + Component: "ProbeConfig", + Err: fmt.Errorf("logger must not be nil"), + } + } + if cfg.Concurrency < 1 { return &errs.ConfigurationError{ Component: "ProbeConfig", @@ -152,6 +163,7 @@ func DefaultProbeConfig() *ProbeConfig { Clock: clock.New(), // use standard time Tracer: tele.NoopTracer(), Meter: tele.NoopMeter(), + Logger: tele.DefaultLogger("routing"), Concurrency: 3, // MAGIC Timeout: time.Minute, // MAGIC @@ -487,7 +499,7 @@ func (l *nodeValueList[K, N]) nodeCount() int { } // Put removes a node value from the list, deleting its information. -// It is removed from the pending list andongoing list if it was already present in either. +// It is removed from the pending list and ongoing list if it was already present in either. func (l *nodeValueList[K, N]) Remove(n N) { mk := key.HexString(n.Key()) nve, ok := l.nodes[mk] diff --git a/internal/coord/routing/probe_test.go b/internal/coord/routing/probe_test.go index 1d42d6e..703e897 100644 --- a/internal/coord/routing/probe_test.go +++ b/internal/coord/routing/probe_test.go @@ -63,6 +63,12 @@ func TestProbeConfigValidate(t *testing.T) { cfg.CheckInterval = -1 require.Error(t, cfg.Validate()) }) + + t.Run("logger not nil", func(t *testing.T) { + cfg := DefaultProbeConfig() + cfg.Logger = nil + require.Error(t, cfg.Validate()) + }) } func TestProbeStartsIdle(t *testing.T) { From b11c889718c7cbf6ef439ec76dc9f696ac9dfa7b Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:36:50 +0100 Subject: [PATCH 2/2] Adjust/restore probe and explore intervals --- internal/coord/routing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/coord/routing.go b/internal/coord/routing.go index 9a6a311..969d17c 100644 --- a/internal/coord/routing.go +++ b/internal/coord/routing.go @@ -265,8 +265,8 @@ func DefaultRoutingConfig() *RoutingConfig { ConnectivityCheckTimeout: time.Minute, // MAGIC - ProbeRequestConcurrency: 3, // MAGIC - ProbeCheckInterval: 6 * time.Hour, // MAGIC + ProbeRequestConcurrency: 3, // MAGIC + ProbeCheckInterval: 15 * time.Minute, // MAGIC IncludeRequestConcurrency: 3, // MAGIC IncludeQueueCapacity: 128, // MAGIC