A pure Kotlin Multiplatform implementation of the MQTT v5.0 protocol, using Ktor for networking. No third-party MQTT libraries are used.
- All 15 packet types: CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT, AUTH
- All QoS levels: QoS 0 (At most once), QoS 1 (At least once), QoS 2 (Exactly once)
- Full v5.0 Properties: Session Expiry, Receive Maximum, Maximum Packet Size, Topic Alias, User Properties, Content Type, Response Topic, Correlation Data, Subscription Identifier, and more
- Will Messages: with Will Delay Interval and all Will Properties
- Keep Alive: automatic PINGREQ/PINGRESP handling
- Session Management: Clean Start, Session Expiry Interval, Session Present
- Topic Aliases: bidirectional topic alias support for reduced bandwidth
- Flow Control: Receive Maximum-based send quota management
- Enhanced Authentication: challenge/response authentication framework (AUTH packets)
- Re-Authentication: initiate re-authentication on an active connection
- Reason Codes: comprehensive reason code support on all acknowledgment packets
- Server Capability Discovery: Maximum QoS, Retain Available, Wildcard/Shared/Subscription-ID availability
- Plain TCP & TLS/SSL: supports both unencrypted (port 1883) and secure (port 8883) connections with custom TLS configuration (custom CA, mTLS)
- Auto-Reconnect: pluggable reconnection strategy (exponential backoff, constant delay, linear backoff, or fully custom) upon unexpected disconnection; re-subscribes to previously active topics on successful reconnect
- QoS 1/2 Retry on Reconnect: per MQTT v5 Section 4.4, unacknowledged QoS 1/2 messages are automatically resent with the DUP flag when the session is resumed after reconnect
- Offline Message Queue: publish while disconnected -- messages are buffered and sent automatically when the connection is restored (configurable capacity, drops oldest when full)
- Connection State Flow: reactive
StateFlow<ConnectionState>(DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING, RECONNECTING) for driving UI in Compose / SwiftUI - Connect Timeout: configurable timeout for the initial TCP/TLS connection handshake
- Logging: optional callback-based logger with configurable log levels (DEBUG, INFO, WARN, ERROR) and lazy message evaluation for zero-cost when disabled
| Platform | Targets |
|---|---|
| JVM | JVM 17+, Android |
| iOS | iosArm64, iosX64, iosSimulatorArm64 |
| macOS | macosArm64 (Apple Silicon), macosX64 (Intel) |
| Linux | linuxX64 |
- Ktor Networking: uses
ktor-networkfor raw TCP sockets andktor-network-tlsfor TLS/SSL with custom configuration support - Coroutine-based: fully suspending API built on Kotlin coroutines
- Reactive State:
StateFlow<ConnectionState>for UI binding +SharedFlowfor messages - Spec-compliant Session Resumption: QoS 1/2 message retry with DUP flag per MQTT v5 Section 4.4
- Dual Message Delivery:
SharedFlow-based reactive API and callback-based listener - Zero third-party MQTT dependencies: the entire protocol is implemented from scratch
Add the following to your build.gradle.kts:
repositories {
mavenCentral()
}
dependencies {
implementation("io.github.baole:kmqtt5:1.0.0")
}For Kotlin Multiplatform projects, add the dependency in the commonMain source set:
kotlin {
sourceSets {
commonMain.dependencies {
implementation("io.github.baole:kmqtt5:1.0.0")
}
}
}[versions]
kmqtt5 = "1.0.0"
[libraries]
kmqtt5 = { module = "io.github.baole:kmqtt5", version.ref = "kmqtt5" }Then in your build.gradle.kts:
dependencies {
implementation(libs.kmqtt5)
}import io.github.mqtt5.*
import kotlinx.coroutines.runBlocking
import kotlin.time.Duration.Companion.seconds
fun main() = runBlocking {
// Create client
val client = MqttClient {
host = "broker.example.com"
port = 1883
clientId = "my-app"
cleanStart = true
keepAlive = 60.seconds
}
// Connect
client.connect()
// Subscribe
client.subscribe("sensor/#", QoS.AT_LEAST_ONCE)
// Receive messages via callback
client.onMessage = { message ->
println("${message.topic}: ${message.payloadAsString}")
}
// Or via Kotlin Flow
// client.messages.collect { message -> ... }
// Publish
client.publish("sensor/temp", "22.5", QoS.AT_LEAST_ONCE)
// Disconnect
client.disconnect()
}// StateFlow — collect in Compose, SwiftUI, or any coroutine scope
client.connectionState.collect { state ->
when (state) {
ConnectionState.CONNECTED -> showOnlineIndicator()
ConnectionState.CONNECTING -> showSpinner()
ConnectionState.RECONNECTING -> showReconnectingBanner()
ConnectionState.DISCONNECTING -> showDisconnecting()
ConnectionState.DISCONNECTED -> showOfflineIndicator()
}
}
// Or read the current value synchronously
if (client.connectionState.value == ConnectionState.CONNECTED) { /* ... */ }When the connection drops unexpectedly, the client can automatically reconnect using a configurable reconnect strategy. On success it re-subscribes to all previously active topics and flushes any messages that were queued while offline.
val client = MqttClient {
host = "broker.example.com"
clientId = "reliable-client"
autoReconnect = true
// Shorthand — builds an ExponentialBackoff under the hood
reconnectDelay = 1.seconds // initial delay (doubles each attempt)
maxReconnectDelay = 60.seconds // cap for exponential backoff
maxReconnectAttempts = 0 // 0 = unlimited
}Set reconnectStrategy for full control. When set, it takes precedence over the
shorthand properties above.
| Strategy | Behaviour |
|---|---|
ExponentialBackoff |
Delay doubles each attempt with optional random jitter (default) |
ConstantDelay |
Fixed delay between every attempt |
LinearBackoff |
Delay grows by a fixed step each attempt |
ReconnectStrategy.None |
Never reconnect (useful with offline queue only) |
// Exponential backoff with jitter — avoids thundering herd in IoT fleets
val client = MqttClient {
host = "broker.example.com"
autoReconnect = true
reconnectStrategy = ExponentialBackoff(
initialDelay = 1.seconds,
maxDelay = 30.seconds,
maxAttempts = 20,
jitterFactor = 0.25, // adds up to 25 % random jitter
)
}
// Constant 5-second delay, max 10 attempts
val client = MqttClient {
host = "broker.example.com"
autoReconnect = true
reconnectStrategy = ConstantDelay(delay = 5.seconds, maxAttempts = 10)
}
// Linear backoff: 1 s, 3 s, 5 s, 7 s, … capped at 30 s
val client = MqttClient {
host = "broker.example.com"
autoReconnect = true
reconnectStrategy = LinearBackoff(
initialDelay = 1.seconds,
step = 2.seconds,
maxDelay = 30.seconds,
)
}Implement ReconnectStrategy with a single lambda. Return the delay before the
next attempt, or null to stop reconnecting.
val client = MqttClient {
host = "broker.example.com"
autoReconnect = true
reconnectStrategy = ReconnectStrategy { attempt, cause ->
// Server explicitly refused — don't bother retrying
if (cause is MqttConnectException) null
// Transient error — linear 2 s, 4 s, 6 s … capped at 30 s
else (attempt * 2).seconds.coerceAtMost(30.seconds)
}
}Messages published while disconnected are buffered and sent automatically once the connection is restored.
val client = MqttClient {
host = "broker.example.com"
autoReconnect = true
offlineQueueCapacity = 100 // 0 = unlimited; drops oldest when full
}
// Works even while disconnected
client.publish("sensor/temp", "22.5", QoS.AT_LEAST_ONCE)
println("Queued: ${client.offlineQueueSize}")client.onReconnecting = { attempt ->
println("Reconnecting… attempt $attempt")
}
client.onReconnected = {
println("Reconnected! Subscriptions and queued messages restored.")
}val client = MqttClient {
host = "broker.example.com"
connectTimeout = 10.seconds // fail fast if broker is unreachable
}val client = MqttClient {
host = "broker.example.com"
logger = MqttLogger(minLevel = MqttLogLevel.DEBUG) { level, tag, message ->
println("[$level] $tag: $message")
}
}val client = MqttClient {
host = "broker.example.com"
port = 8883
useTls = true
clientId = "secure-client"
credentials("username", "password")
}The tls {} helper enables TLS and gives access to Ktor's TLSConfigBuilder for
platform-specific settings such as custom trust managers and client certificates.
// Basic TLS (system trust store)
val client = MqttClient {
host = "broker.example.com"
port = 8883
tls() // enables TLS with default (system) trust store
}
// Custom CA certificate (JVM) — e.g., for AWS IoT Core, Azure IoT Hub
val client = MqttClient {
host = "iot.example.com"
port = 8883
tls {
trustManager = myCustomTrustManager // JVM: javax.net.ssl.TrustManager
}
}
// Mutual TLS (mTLS) with client certificates (JVM)
val client = MqttClient {
host = "iot.example.com"
port = 8883
tls {
trustManager = myCustomTrustManager
// Add client certificate for mutual authentication
addKeyStore(keyStore, keyPassword)
}
}val client = MqttClient {
host = "broker.example.com"
clientId = "monitored-client"
will("status/monitored-client", "offline".encodeToByteArray(), QoS.AT_LEAST_ONCE) {
retain = true
delayInterval = 30 // seconds
payloadFormatIndicator = 1 // UTF-8
contentType = "text/plain"
}
}import io.github.mqtt5.MqttProperties
// Requester
val requestProps = MqttProperties().apply {
responseTopic = "response/my-client"
correlationData = "req-001".encodeToByteArray()
}
client.publish("service/request", payload, QoS.AT_LEAST_ONCE, properties = requestProps)
// Responder (on receiving the request)
client.onMessage = { message ->
val responseTopic = message.properties.responseTopic ?: return@onMessage
val corrData = message.properties.correlationData
val responseProps = MqttProperties().apply {
correlationData = corrData
}
runBlocking {
client.publish(responseTopic, "response-payload", QoS.AT_LEAST_ONCE, properties = responseProps)
}
}val client = MqttClient {
host = "broker.example.com"
authenticationMethod = "SCRAM-SHA-256"
authenticationData = initialClientData
}
client.onAuth = { authPacket ->
// Process server challenge and return client response
val serverData = authPacket.properties.authenticationData
val responseData = processChallenge(serverData)
AuthPacket(
reasonCode = ReasonCode.CONTINUE_AUTHENTICATION,
properties = MqttProperties().apply {
authenticationMethod = "SCRAM-SHA-256"
authenticationData = responseData
}
)
}client.subscribe(listOf(
Subscription("sensor/#", SubscriptionOptions(
qos = QoS.EXACTLY_ONCE,
noLocal = false,
retainAsPublished = true,
retainHandling = 1, // Send retained only if subscription doesn't exist
)),
Subscription("cmd/+", SubscriptionOptions(
qos = QoS.AT_LEAST_ONCE,
noLocal = true, // Don't receive own messages
)),
))val props = MqttProperties().apply {
contentType = "application/json"
messageExpiryInterval = 3600 // 1 hour
payloadFormatIndicator = 1 // UTF-8
userProperties.add("trace-id" to "abc-123")
userProperties.add("version" to "1.0")
}
client.publish(
topic = "events/order",
payload = """{"orderId": "12345", "status": "created"}""",
qos = QoS.EXACTLY_ONCE,
properties = props,
)# Build the library
./gradlew :library:build
# Run tests
./gradlew :library:allTests
# Run the sample
./gradlew :sample:run- Kotlin 2.3.10+
- Ktor 3.4.0+
- JDK 17+ (for JVM target)
The library implements the full MQTT v5 wire protocol:
MqttCodec: Low-level primitives for Variable Byte Integers, UTF-8 Encoded Strings, Binary Data, and String PairsMqttProperties: Encodes/decodes all 28 MQTT v5 property typesMqttPacket: Sealed class hierarchy for all 15 packet typesPacketEncoder: Serializes packets to wire-format byte arraysPacketDecoder: Deserializes wire-format bytes into packet objects
MqttConnection: Manages TCP/TLS sockets via Ktor'saSocket()API- Provides
sendPacket()andreadPacket()as suspend functions - Thread-safe write operations via Mutex
MqttClient: The main public API- Manages packet ID allocation, topic aliases, session state, and keep-alive
- Handles the QoS 1 and QoS 2 acknowledgment flows automatically
- Delivers messages via
SharedFlowand/or callbacks
Contributions are welcome! Please open issues or submit pull requests.
This project is licensed under the Apache License 2.0. See the LICENSE file for details.