Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func New(h host.Host, cfg *Config) (*DHT, error) {
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)

coordCfg.Network.Clock = cfg.Clock
coordCfg.Network.Logger = cfg.Logger.With("behaviour", "network")
coordCfg.Network.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Network.Meter = cfg.MeterProvider.Meter(tele.MeterName)

rtr := &router{
host: h,
protocolID: cfg.ProtocolID,
Expand Down
51 changes: 36 additions & 15 deletions internal/coord/behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ type WorkQueueFunc[E BehaviourEvent] func(context.Context, E) bool
// WorkQueueFunc for each work item, passing the original context
// and event.
type WorkQueue[E BehaviourEvent] struct {
pending chan CtxEvent[E]
fn WorkQueueFunc[E]
done atomic.Bool
once sync.Once
pending chan CtxEvent[E]
fn WorkQueueFunc[E]
done chan struct{}
startOnce sync.Once
closeOnce sync.Once
}

func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] {
w := &WorkQueue[E]{
pending: make(chan CtxEvent[E], 1),
done: make(chan struct{}),
fn: fn,
}
return w
Expand All @@ -70,19 +72,33 @@ type CtxEvent[E any] struct {
// blocking it will return a context cancellation error if the work
// item's context is cancelled.
func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error {
if w.done.Load() {
select {
case <-w.done:
// the queue is not accepting any more work
return nil
default:
}
w.once.Do(func() {

w.startOnce.Do(func() {
go func() {
defer w.done.Store(true)
for cc := range w.pending {
if cc.Ctx.Err() != nil {
return
}
if done := w.fn(cc.Ctx, cc.Event); done {
w.done.Store(true)
return
for {
select {
case <-w.done:
// drain pending then exit the goroutine
for {
select {
case cc := <-w.pending:
if done := w.fn(cc.Ctx, cc.Event); done {
return
}
default:
return
}
}
case cc := <-w.pending:
if done := w.fn(cc.Ctx, cc.Event); done {
return
}
}
}
}()
Expand All @@ -96,10 +112,15 @@ func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error {
Event: cmd,
}:
return nil

}
}

func (w *WorkQueue[E]) Close() {
w.closeOnce.Do(func() {
close(w.done)
})
}

// A Waiter is a Notifiee whose Notify method forwards the
// notified event to a channel which a client can wait on.
type Waiter[E BehaviourEvent] struct {
Expand Down
14 changes: 13 additions & 1 deletion internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type CoordinatorConfig struct {
// Routing is the configuration used for the [RoutingBehaviour] which maintains the health of the routing table.
Routing RoutingConfig

// Network is the configuration used for the [NetworkBehaviour] which routes messages to peer nodes.
Network NetworkConfig

// Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries.
Query QueryConfig
}
Expand Down Expand Up @@ -149,6 +152,12 @@ func DefaultCoordinatorConfig() *CoordinatorConfig {
cfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
cfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)

cfg.Network = *DefaultNetworkConfig()
cfg.Network.Clock = cfg.Clock
cfg.Network.Logger = cfg.Logger.With("behaviour", "network")
cfg.Network.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
cfg.Network.Meter = cfg.MeterProvider.Meter(tele.MeterName)

return cfg
}

Expand All @@ -175,7 +184,10 @@ func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *
return nil, fmt.Errorf("routing behaviour: %w", err)
}

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer)
networkBehaviour, err := NewNetworkBehaviour(rtr, &cfg.Network)
if err != nil {
return nil, fmt.Errorf("network behaviour: %w", err)
}

b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil)
if err != nil {
Expand Down
Loading