From ffb7b86a19bdef8707fc1ed6be6ede6fc8308ae7 Mon Sep 17 00:00:00 2001 From: HenriqueOtsuka Date: Thu, 24 Jul 2025 18:14:48 -0300 Subject: [PATCH] retry without the need to force new deployment --- amqp/consumer.go | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/amqp/consumer.go b/amqp/consumer.go index 64c5954..c2da020 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -548,29 +548,30 @@ func (c *consumer) Consume() { } rs := c.conn.NotifyReestablish() + for { + for !c.isClosed() { + if !c.conn.IsConnected() { + logger.Info("Connection not established. Waiting connection to be reestablished.") + <-rs + continue + } - for !c.isClosed() { - if !c.conn.IsConnected() { - logger.Info("Connection not established. Waiting connection to be reestablished.") - - <-rs - - continue - } - - err := c.doConsume() - - if err == nil { - logger.WithFields(logrus.Fields{ - "queue": c.queueName, - "closed": c.closed, - }).Info("Consumption finished.") - } else { - logger.WithFields(logrus.Fields{ - "queue": c.queueName, - "error": err, - }).Error("Error consuming events.") + err := c.doConsume() + + if err == nil { + logger.WithFields(logrus.Fields{ + "queue": c.queueName, + "closed": c.closed, + }).Info("Consumption finished.") + } else { + logger.WithFields(logrus.Fields{ + "queue": c.queueName, + "error": err, + }).Error("Error consuming events.") + } } + logger.Info("AMQP closed. Retrying in 30 seconds...") + time.Sleep(30 * time.Second) } }