Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.35
0.7.55
24 changes: 23 additions & 1 deletion cmd/bridge/run_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ Ex: /dev/<YOUR_SERIAL_PORT>:/dev/ttyUSB0
errChan <- err
}()

// Main event loop - just handle shutdown and errors
// Health check ticker to monitor MQTT connection
healthTicker := time.NewTicker(30 * time.Second)
defer healthTicker.Stop()

// Main event loop - handle shutdown, errors, and health checks
for {
select {
case <-sigChan:
Expand All @@ -134,6 +138,24 @@ Ex: /dev/<YOUR_SERIAL_PORT>:/dev/ttyUSB0
time.Sleep(500 * time.Millisecond)
log.Println("Bridge stopped")
return
case <-healthTicker.C:
// Check if MQTT is stuck in a failed reconnection state
if b.IsMQTTReconnecting() {
duration, attempts := b.GetMQTTReconnectionInfo()
// If MQTT has been trying to reconnect for more than 5 minutes, force full restart
if duration > 5*time.Minute {
log.Printf("MQTT reconnection stuck after %v (%d attempts). Forcing full reconnection...", duration, attempts)
errChan <- nil // Trigger reconnection
goto reconnectLoop
} else if attempts > 0 {
log.Printf("MQTT reconnecting... (disconnected for %v, %d attempts)", duration, attempts)
}
} else if !b.IsHealthy() {
// Bridge is unhealthy and not reconnecting - force restart
log.Println("Bridge unhealthy (MQTT not connected and not reconnecting). Forcing reconnection...")
errChan <- nil
goto reconnectLoop
}
case err := <-errChan:
connectionDuration := time.Since(lastSuccessTime)

Expand Down
32 changes: 23 additions & 9 deletions pkg/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func NewMQTTClient(broker, clientID, lwtTopic string, lwtPayload []byte) *MQTTCl
AddBroker(broker).
SetClientID(clientID).
SetAutoReconnect(true).
SetMaxReconnectInterval(30 * time.Second). // Increased from 10s
SetConnectRetryInterval(5 * time.Second). // Add initial retry interval
SetKeepAlive(60 * time.Second). // Increase keep alive
SetPingTimeout(10 * time.Second). // Increase ping timeout
SetConnectTimeout(30 * time.Second). // Add connect timeout
SetWriteTimeout(10 * time.Second). // Add write timeout
SetMessageChannelDepth(1000) // Increase message buffer
SetMaxReconnectInterval(1 * time.Minute). // Longer max interval
SetConnectRetryInterval(5 * time.Second). // Initial retry interval
SetKeepAlive(30 * time.Second). // More frequent keepalive to detect issues faster
SetPingTimeout(10 * time.Second). // Ping timeout
SetConnectTimeout(30 * time.Second). // Connect timeout
SetWriteTimeout(10 * time.Second). // Write timeout
SetMessageChannelDepth(1000). // Message buffer
SetCleanSession(false). // Persist session to survive reconnects
SetResumeSubs(true) // Auto-resubscribe on reconnect

if lwtTopic != "" && lwtPayload != nil {
opts.SetWill(lwtTopic, string(lwtPayload), 0, true)
Expand All @@ -190,20 +192,32 @@ func NewMQTTClient(broker, clientID, lwtTopic string, lwtPayload []byte) *MQTTCl
}
})

// Track reconnection attempts
// Track reconnection attempts with logging
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
m.mu.Lock()
m.reconnectionAttempts++
attempts := m.reconnectionAttempts
m.isReconnecting = true
m.mu.Unlock()

// Log every 10 attempts to avoid spam
if attempts%10 == 1 {
fmt.Printf("MQTT reconnection attempt %d...\n", attempts)
}
})

// Track successful reconnection
// Track successful reconnection with logging
opts.SetOnConnectHandler(func(c mqtt.Client) {
m.mu.Lock()
wasReconnecting := m.isReconnecting
attempts := m.reconnectionAttempts
m.isReconnecting = false
m.reconnectionAttempts = 0
m.mu.Unlock()

if wasReconnecting && attempts > 0 {
fmt.Printf("MQTT reconnected successfully after %d attempts\n", attempts)
}
})

m.client = mqtt.NewClient(opts)
Expand Down
Loading