diff --git a/VERSION b/VERSION index 5592e6e..181ca8c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.7.35 \ No newline at end of file +0.7.55 \ No newline at end of file diff --git a/cmd/bridge/run_cli.go b/cmd/bridge/run_cli.go index 4f6cce9..c94e9e0 100644 --- a/cmd/bridge/run_cli.go +++ b/cmd/bridge/run_cli.go @@ -125,7 +125,11 @@ Ex: /dev/:/dev/ttyUSB0 errChan <- err }() - // Main event loop - just handle shutdown and errors + // Health check ticker to monitor MQTT connection + healthTicker := time.NewTicker(30 * time.Second) + defer healthTicker.Stop() + + // Main event loop - handle shutdown, errors, and health checks for { select { case <-sigChan: @@ -134,6 +138,24 @@ Ex: /dev/:/dev/ttyUSB0 time.Sleep(500 * time.Millisecond) log.Println("Bridge stopped") return + case <-healthTicker.C: + // Check if MQTT is stuck in a failed reconnection state + if b.IsMQTTReconnecting() { + duration, attempts := b.GetMQTTReconnectionInfo() + // If MQTT has been trying to reconnect for more than 5 minutes, force full restart + if duration > 5*time.Minute { + log.Printf("MQTT reconnection stuck after %v (%d attempts). Forcing full reconnection...", duration, attempts) + errChan <- nil // Trigger reconnection + goto reconnectLoop + } else if attempts > 0 { + log.Printf("MQTT reconnecting... (disconnected for %v, %d attempts)", duration, attempts) + } + } else if !b.IsHealthy() { + // Bridge is unhealthy and not reconnecting - force restart + log.Println("Bridge unhealthy (MQTT not connected and not reconnecting). Forcing reconnection...") + errChan <- nil + goto reconnectLoop + } case err := <-errChan: connectionDuration := time.Since(lastSuccessTime) diff --git a/pkg/bridge/bridge.go b/pkg/bridge/bridge.go index 003a089..09e9678 100644 --- a/pkg/bridge/bridge.go +++ b/pkg/bridge/bridge.go @@ -165,13 +165,15 @@ func NewMQTTClient(broker, clientID, lwtTopic string, lwtPayload []byte) *MQTTCl AddBroker(broker). SetClientID(clientID). SetAutoReconnect(true). - SetMaxReconnectInterval(30 * time.Second). // Increased from 10s - SetConnectRetryInterval(5 * time.Second). // Add initial retry interval - SetKeepAlive(60 * time.Second). // Increase keep alive - SetPingTimeout(10 * time.Second). // Increase ping timeout - SetConnectTimeout(30 * time.Second). // Add connect timeout - SetWriteTimeout(10 * time.Second). // Add write timeout - SetMessageChannelDepth(1000) // Increase message buffer + SetMaxReconnectInterval(1 * time.Minute). // Longer max interval + SetConnectRetryInterval(5 * time.Second). // Initial retry interval + SetKeepAlive(30 * time.Second). // More frequent keepalive to detect issues faster + SetPingTimeout(10 * time.Second). // Ping timeout + SetConnectTimeout(30 * time.Second). // Connect timeout + SetWriteTimeout(10 * time.Second). // Write timeout + SetMessageChannelDepth(1000). // Message buffer + SetCleanSession(false). // Persist session to survive reconnects + SetResumeSubs(true) // Auto-resubscribe on reconnect if lwtTopic != "" && lwtPayload != nil { opts.SetWill(lwtTopic, string(lwtPayload), 0, true) @@ -190,20 +192,32 @@ func NewMQTTClient(broker, clientID, lwtTopic string, lwtPayload []byte) *MQTTCl } }) - // Track reconnection attempts + // Track reconnection attempts with logging opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) { m.mu.Lock() m.reconnectionAttempts++ + attempts := m.reconnectionAttempts m.isReconnecting = true m.mu.Unlock() + + // Log every 10 attempts to avoid spam + if attempts%10 == 1 { + fmt.Printf("MQTT reconnection attempt %d...\n", attempts) + } }) - // Track successful reconnection + // Track successful reconnection with logging opts.SetOnConnectHandler(func(c mqtt.Client) { m.mu.Lock() + wasReconnecting := m.isReconnecting + attempts := m.reconnectionAttempts m.isReconnecting = false m.reconnectionAttempts = 0 m.mu.Unlock() + + if wasReconnecting && attempts > 0 { + fmt.Printf("MQTT reconnected successfully after %d attempts\n", attempts) + } }) m.client = mqtt.NewClient(opts)