diff --git a/channel_observer.go b/channel_observer.go index 0b91057..85aa042 100644 --- a/channel_observer.go +++ b/channel_observer.go @@ -8,6 +8,7 @@ import ( "github.com/devimteam/amqp/conn" "github.com/devimteam/amqp/logger" + "github.com/devimteam/amqp/metrics" ) const defaultChannelIdleDuration = time.Second * 15 @@ -20,7 +21,6 @@ type ( idle chan idleChan lastRevision time.Time options observerOpts - logger logger.Logger ctx context.Context } observerOpts struct { @@ -31,6 +31,9 @@ type ( count int size int } + idleMetric metrics.Gauge + allMetric metrics.Gauge + logger logger.Logger } ObserverOption func(opts *observerOpts) ) @@ -75,11 +78,32 @@ func LimitSize(size int) ObserverOption { } } +func ObserverWithLogger(logger logger.Logger) ObserverOption { + return func(opts *observerOpts) { + opts.logger = logger + } +} + +func AllMetric(counter metrics.Gauge) ObserverOption { + return func(opts *observerOpts) { + opts.allMetric = counter + } +} + +func IdleMetric(counter metrics.Gauge) ObserverOption { + return func(opts *observerOpts) { + opts.idleMetric = counter + } +} + func newObserver(ctx context.Context, conn *conn.Connection, options ...ObserverOption) *observer { opts := observerOpts{ idleDuration: defaultChannelIdleDuration, min: 0, - max: math.MaxUint16, // From https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf, section 4.9 Limitations + max: math.MaxUint16, + idleMetric: metrics.NoopGauge, + allMetric: metrics.NoopGauge, + logger: logger.NoopLogger, } for _, o := range options { o(&opts) @@ -90,7 +114,6 @@ func newObserver(ctx context.Context, conn *conn.Connection, options ...Observer counter: make(chan struct{}, opts.max), lastRevision: time.Now(), options: opts, - logger: logger.NoopLogger, ctx: ctx, } go func() { @@ -126,7 +149,7 @@ func (p *observer) channel() *Channel { case p.counter <- struct{}{}: ch := Channel{ conn: p.conn, - logger: p.logger, + logger: p.options.logger, } ch.callMx.Lock() // Lock to prevent calls on nil channel. Mutex should be unlocked in `keepalive` function. go ch.keepalive(p.ctx, time.Minute) @@ -141,19 +164,21 @@ func (p *observer) clear() { revisionTime := time.Now() Loop: for { + // fetch all idle channels from queue select { case c := <-p.idle: if c.ch.closed { continue } - if revisionTime.Sub(c.since) > p.options.idleDuration && len(p.counter) < p.options.min { + if p.shouldBeClosed(revisionTime, c) { if e := c.ch.close(); e != nil { - _ = p.logger.Log(e) + _ = p.options.logger.Log(e) } continue } channels = append(channels, c) default: + // there is no idle channels: break the loop break Loop } } @@ -162,12 +187,18 @@ Loop: } p.lastRevision = revisionTime p.m.Unlock() + go p.updateMetrics() // non-blocking call } -func (p *observer) shouldBeClosed(revisionTime time.Time, c *idleChan) bool { +func (p *observer) shouldBeClosed(revisionTime time.Time, c idleChan) bool { return revisionTime.Sub(c.since) > p.options.idleDuration && len(p.counter) > p.options.min } +func (p *observer) updateMetrics() { + p.options.idleMetric.Set(float64(len(p.idle))) + p.options.allMetric.Set(float64(len(p.counter))) +} + func (p *observer) release(ch *Channel) { if ch != nil { p.idle <- idleChan{since: time.Now(), ch: ch} diff --git a/conn/connection.go b/conn/connection.go index 5e1fd93..035b411 100644 --- a/conn/connection.go +++ b/conn/connection.go @@ -98,8 +98,7 @@ ConnectionLoop: c.connect(dialer) case <-c.ctx.Done(): _ = c.logger.Log(CanceledError) - e := connection.Close() - if e != nil { + if e := connection.Close(); e != nil { _ = c.logger.Log(errors.Wrap(e, "connection closed")) } c.notifier.Notify() @@ -109,12 +108,12 @@ ConnectionLoop: break ConnectionLoop } } - defer c.state.connected() if attemptNum == c.maxAttempts { _ = c.logger.Log(MaxAttemptsError) return } - defer func() { _ = c.logger.Log(Connected) }() + _ = c.logger.Log(Connected) + defer c.state.connected() } // Common errors diff --git a/logger/logger.go b/logger/logger.go index 1c280d5..9023148 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -7,7 +7,7 @@ type Logger interface { Log(v ...interface{}) error } -var NoopLogger = noopLogger{} +var NoopLogger Logger = noopLogger{} type noopLogger struct{} diff --git a/metrics/generic.go b/metrics/generic.go new file mode 100644 index 0000000..054f51a --- /dev/null +++ b/metrics/generic.go @@ -0,0 +1,25 @@ +package metrics + +// Counter describes a metric that accumulates values monotonically. +// An example of a counter is the number of received HTTP requests. +// This interface is a subset of related go-kit Counter interface. +type Counter interface { + Add(delta float64) +} + +// Gauge describes a metric that takes specific values over time. +// An example of a gauge is the current depth of a job queue. +// This interface is a subset of related go-kit Gauge interface. +type Gauge interface { + Set(value float64) +} + +var ( + NoopGauge Gauge = noop{} + NoopCounter Counter = noop{} +) + +type noop struct{} + +func (c noop) Set(float64) {} +func (c noop) Add(float64) {} diff --git a/publisher.go b/publisher.go index a59a9e4..8e1e77a 100644 --- a/publisher.go +++ b/publisher.go @@ -74,6 +74,7 @@ func (p Publisher) publish(channel *Channel, ctx context.Context, exchangeName s if err != nil { return err } + go p.opts.counterMetric.Add(1) // non-blocking call for _, before := range p.opts.before { before(ctx, &msg) } diff --git a/publisher_options.go b/publisher_options.go index c026292..e134bbe 100644 --- a/publisher_options.go +++ b/publisher_options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/devimteam/amqp/logger" + "github.com/devimteam/amqp/metrics" ) type PublisherOption func(*Publisher) @@ -15,9 +16,10 @@ type publisherOptions struct { flag bool timeout time.Duration } - log logger.Logger - observerOpts []ObserverOption - workers int + log logger.Logger + observerOpts []ObserverOption + workers int + counterMetric metrics.Counter } func defaultPubOptions() publisherOptions { @@ -27,6 +29,7 @@ func defaultPubOptions() publisherOptions { opts.log = logger.NoopLogger opts.defaultContentType = defaultContentType opts.workers = defaultWorkers + opts.counterMetric = metrics.NoopCounter return opts } @@ -75,3 +78,9 @@ func PublisherContentType(t string) PublisherOption { publisher.opts.defaultContentType = t } } + +func PublisherProcessedMetric(counter metrics.Counter) PublisherOption { + return func(publisher *Publisher) { + publisher.opts.counterMetric = counter + } +} diff --git a/subscriber.go b/subscriber.go index 4cb7ec1..eb5a2e4 100644 --- a/subscriber.go +++ b/subscriber.go @@ -152,6 +152,7 @@ func (s Subscriber) processEvents( } func (s Subscriber) processEvent(d amqp.Delivery, dataType interface{}, eventChan chan<- Event) { + go s.opts.counterMetric.Add(1) // non-blocking call err := s.checkEvent(d) if err != nil { err = s.errorBefore(d, err) diff --git a/subscriber_options.go b/subscriber_options.go index 7f78831..179a706 100644 --- a/subscriber_options.go +++ b/subscriber_options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/devimteam/amqp/logger" + "github.com/devimteam/amqp/metrics" ) type subscriberOptions struct { @@ -12,12 +13,13 @@ type subscriberOptions struct { flag bool timeout time.Duration } - workers int - processAll bool - log logger.Logger - msgOpts subMessageOptions - errorBefore []ErrorBefore - observerOpts []ObserverOption + workers int + processAll bool + log logger.Logger + msgOpts subMessageOptions + errorBefore []ErrorBefore + observerOpts []ObserverOption + counterMetric metrics.Counter } func defaultSubOptions() subscriberOptions { @@ -31,6 +33,7 @@ func defaultSubOptions() subscriberOptions { opts.msgOpts.defaultContentType = defaultContentType opts.workers = defaultWorkers opts.log = logger.NoopLogger + opts.counterMetric = metrics.NoopCounter return opts } @@ -105,3 +108,9 @@ func SubWithObserverOptions(opts ...ObserverOption) SubscriberOption { subscriber.opts.observerOpts = append(subscriber.opts.observerOpts, opts...) } } + +func SubProcessedMetric(counter metrics.Counter) SubscriberOption { + return func(subscriber *Subscriber) { + subscriber.opts.counterMetric = counter + } +}