Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions channel_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/devimteam/amqp/conn"
"github.com/devimteam/amqp/logger"
"github.com/devimteam/amqp/metrics"
)

const defaultChannelIdleDuration = time.Second * 15
Expand All @@ -20,7 +21,6 @@ type (
idle chan idleChan
lastRevision time.Time
options observerOpts
logger logger.Logger
ctx context.Context
}
observerOpts struct {
Expand All @@ -31,6 +31,9 @@ type (
count int
size int
}
idleMetric metrics.Gauge
allMetric metrics.Gauge
logger logger.Logger
}
ObserverOption func(opts *observerOpts)
)
Expand Down Expand Up @@ -75,11 +78,32 @@ func LimitSize(size int) ObserverOption {
}
}

func ObserverWithLogger(logger logger.Logger) ObserverOption {
return func(opts *observerOpts) {
opts.logger = logger
}
}

func AllMetric(counter metrics.Gauge) ObserverOption {
return func(opts *observerOpts) {
opts.allMetric = counter
}
}

func IdleMetric(counter metrics.Gauge) ObserverOption {
return func(opts *observerOpts) {
opts.idleMetric = counter
}
}

func newObserver(ctx context.Context, conn *conn.Connection, options ...ObserverOption) *observer {
opts := observerOpts{
idleDuration: defaultChannelIdleDuration,
min: 0,
max: math.MaxUint16, // From https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf, section 4.9 Limitations
max: math.MaxUint16,
idleMetric: metrics.NoopGauge,
allMetric: metrics.NoopGauge,
logger: logger.NoopLogger,
}
for _, o := range options {
o(&opts)
Expand All @@ -90,7 +114,6 @@ func newObserver(ctx context.Context, conn *conn.Connection, options ...Observer
counter: make(chan struct{}, opts.max),
lastRevision: time.Now(),
options: opts,
logger: logger.NoopLogger,
ctx: ctx,
}
go func() {
Expand Down Expand Up @@ -126,7 +149,7 @@ func (p *observer) channel() *Channel {
case p.counter <- struct{}{}:
ch := Channel{
conn: p.conn,
logger: p.logger,
logger: p.options.logger,
}
ch.callMx.Lock() // Lock to prevent calls on nil channel. Mutex should be unlocked in `keepalive` function.
go ch.keepalive(p.ctx, time.Minute)
Expand All @@ -141,19 +164,21 @@ func (p *observer) clear() {
revisionTime := time.Now()
Loop:
for {
// fetch all idle channels from queue
select {
case c := <-p.idle:
if c.ch.closed {
continue
}
if revisionTime.Sub(c.since) > p.options.idleDuration && len(p.counter) < p.options.min {
if p.shouldBeClosed(revisionTime, c) {
if e := c.ch.close(); e != nil {
_ = p.logger.Log(e)
_ = p.options.logger.Log(e)
}
continue
}
channels = append(channels, c)
default:
// there is no idle channels: break the loop
break Loop
}
}
Expand All @@ -162,12 +187,18 @@ Loop:
}
p.lastRevision = revisionTime
p.m.Unlock()
go p.updateMetrics() // non-blocking call
}

func (p *observer) shouldBeClosed(revisionTime time.Time, c *idleChan) bool {
func (p *observer) shouldBeClosed(revisionTime time.Time, c idleChan) bool {
return revisionTime.Sub(c.since) > p.options.idleDuration && len(p.counter) > p.options.min
}

func (p *observer) updateMetrics() {
p.options.idleMetric.Set(float64(len(p.idle)))
p.options.allMetric.Set(float64(len(p.counter)))
}

func (p *observer) release(ch *Channel) {
if ch != nil {
p.idle <- idleChan{since: time.Now(), ch: ch}
Expand Down
7 changes: 3 additions & 4 deletions conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ ConnectionLoop:
c.connect(dialer)
case <-c.ctx.Done():
_ = c.logger.Log(CanceledError)
e := connection.Close()
if e != nil {
if e := connection.Close(); e != nil {
_ = c.logger.Log(errors.Wrap(e, "connection closed"))
}
c.notifier.Notify()
Expand All @@ -109,12 +108,12 @@ ConnectionLoop:
break ConnectionLoop
}
}
defer c.state.connected()
if attemptNum == c.maxAttempts {
_ = c.logger.Log(MaxAttemptsError)
return
}
defer func() { _ = c.logger.Log(Connected) }()
_ = c.logger.Log(Connected)
defer c.state.connected()
}

// Common errors
Expand Down
2 changes: 1 addition & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type Logger interface {
Log(v ...interface{}) error
}

var NoopLogger = noopLogger{}
var NoopLogger Logger = noopLogger{}

type noopLogger struct{}

Expand Down
25 changes: 25 additions & 0 deletions metrics/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package metrics

// Counter describes a metric that accumulates values monotonically.
// An example of a counter is the number of received HTTP requests.
// This interface is a subset of related go-kit Counter interface.
type Counter interface {
Add(delta float64)
}

// Gauge describes a metric that takes specific values over time.
// An example of a gauge is the current depth of a job queue.
// This interface is a subset of related go-kit Gauge interface.
type Gauge interface {
Set(value float64)
}

var (
NoopGauge Gauge = noop{}
NoopCounter Counter = noop{}
)

type noop struct{}

func (c noop) Set(float64) {}
func (c noop) Add(float64) {}
1 change: 1 addition & 0 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p Publisher) publish(channel *Channel, ctx context.Context, exchangeName s
if err != nil {
return err
}
go p.opts.counterMetric.Add(1) // non-blocking call
for _, before := range p.opts.before {
before(ctx, &msg)
}
Expand Down
15 changes: 12 additions & 3 deletions publisher_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/devimteam/amqp/logger"
"github.com/devimteam/amqp/metrics"
)

type PublisherOption func(*Publisher)
Expand All @@ -15,9 +16,10 @@ type publisherOptions struct {
flag bool
timeout time.Duration
}
log logger.Logger
observerOpts []ObserverOption
workers int
log logger.Logger
observerOpts []ObserverOption
workers int
counterMetric metrics.Counter
}

func defaultPubOptions() publisherOptions {
Expand All @@ -27,6 +29,7 @@ func defaultPubOptions() publisherOptions {
opts.log = logger.NoopLogger
opts.defaultContentType = defaultContentType
opts.workers = defaultWorkers
opts.counterMetric = metrics.NoopCounter
return opts
}

Expand Down Expand Up @@ -75,3 +78,9 @@ func PublisherContentType(t string) PublisherOption {
publisher.opts.defaultContentType = t
}
}

func PublisherProcessedMetric(counter metrics.Counter) PublisherOption {
return func(publisher *Publisher) {
publisher.opts.counterMetric = counter
}
}
1 change: 1 addition & 0 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s Subscriber) processEvents(
}

func (s Subscriber) processEvent(d amqp.Delivery, dataType interface{}, eventChan chan<- Event) {
go s.opts.counterMetric.Add(1) // non-blocking call
err := s.checkEvent(d)
if err != nil {
err = s.errorBefore(d, err)
Expand Down
21 changes: 15 additions & 6 deletions subscriber_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/devimteam/amqp/logger"
"github.com/devimteam/amqp/metrics"
)

type subscriberOptions struct {
Expand All @@ -12,12 +13,13 @@ type subscriberOptions struct {
flag bool
timeout time.Duration
}
workers int
processAll bool
log logger.Logger
msgOpts subMessageOptions
errorBefore []ErrorBefore
observerOpts []ObserverOption
workers int
processAll bool
log logger.Logger
msgOpts subMessageOptions
errorBefore []ErrorBefore
observerOpts []ObserverOption
counterMetric metrics.Counter
}

func defaultSubOptions() subscriberOptions {
Expand All @@ -31,6 +33,7 @@ func defaultSubOptions() subscriberOptions {
opts.msgOpts.defaultContentType = defaultContentType
opts.workers = defaultWorkers
opts.log = logger.NoopLogger
opts.counterMetric = metrics.NoopCounter
return opts
}

Expand Down Expand Up @@ -105,3 +108,9 @@ func SubWithObserverOptions(opts ...ObserverOption) SubscriberOption {
subscriber.opts.observerOpts = append(subscriber.opts.observerOpts, opts...)
}
}

func SubProcessedMetric(counter metrics.Counter) SubscriberOption {
return func(subscriber *Subscriber) {
subscriber.opts.counterMetric = counter
}
}