diff --git a/.gitignore b/.gitignore index cef2285c5..8d309fee9 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ docs-site/node_modules docs-site/.hugo_build.lock api/pkg/apis/v1alpha1/providers/target/rust/target -Cargo.lock \ No newline at end of file +Cargo.lock +**/mtls-certs/ \ No newline at end of file diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md b/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md new file mode 100644 index 000000000..5c3e5580f --- /dev/null +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md @@ -0,0 +1,221 @@ +### Running MQTT provider tests locally with Mosquitto (Docker) + +This guide shows how to run the unit/integration tests for the MQTT target provider against a local Mosquitto MQTT broker running in Docker. It covers both plain TCP (port 1883) and mutual TLS (mTLS, port 8883). + +Prerequisites +- Docker (or Docker Desktop) +- OpenSSL (for generating test certificates if you want to run the mTLS test) +- Go 1.21+ (matching this repo’s go.mod) + +Repository layout of interest +- `api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go` +- `api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go` + +By default the tests are gated behind environment variables and will be skipped unless explicitly enabled. + +Plain TCP broker (1883) +1) Create a Mosquitto config that enables a listener and allows anonymous access (required for Mosquitto 2.x): + +Create `mosquitto.conf` with: + +```conf +listener 1883 +protocol mqtt +allow_anonymous true +``` + +2) Start Mosquitto with the config mounted: + +```bash +docker run --rm -it --name mosquitto -p 1883:1883 -v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mtls-certs:/certs eclipse-mosquitto:2 +``` + +3) In a separate shell, enable the tests that expect a locally running broker: + +- Linux/macOS (bash/zsh): + +```bash +export TEST_MQTT_LOCAL_ENABLED=1 +export TEST_MQTT=1 +``` + +- Windows PowerShell: + +```powershell +$env:TEST_MQTT_LOCAL_ENABLED = "1" +$env:TEST_MQTT = "1" +``` + +4) Run the tests for the MQTT provider package: + +```bash +cd api +go test ./pkg/apis/v1alpha1/providers/target/mqtt -v +``` + +Notes +- The tests publish and subscribe on topics `coa-request` and `coa-response` by default. +- The tests create a responder client within the test itself; Mosquitto simply routes messages. +- If Mosquitto logs show "Starting in local only mode... Create a configuration file which defines a listener", you did not mount a config; use the steps above. + +mTLS broker (8883) + +The file `mqtt_test.go` contains an optional mTLS integration-style test guarded by environment variables. To run it, stand up Mosquitto with TLS and client-certificate auth and point the test to your CA, client cert, and client key. + +1) Generate a simple CA, server, and client certificates (for local testing only): + +```bash +# Clean slate (optional) +rm -f ca.* server.* client.* *.srl + +# 1) CA +openssl genrsa -out ca.key 2048 +openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \ + -subj "/CN=test-ca" -out ca.crt + +# 2) Keys & CSR config with extensions +openssl genrsa -out server.key 2048 + +cat > server.cnf <<'EOF' +[ req ] +distinguished_name = dn +prompt = no +req_extensions = v3_req + +[ dn ] +CN = localhost + +[ v3_req ] +subjectAltName = @alt_names +extendedKeyUsage = serverAuth + +[ alt_names ] +DNS.1 = localhost +IP.1 = 127.0.0.1 +EOF + +# CSR with req extensions present +openssl req -new -key server.key -out server.csr -config server.cnf + +# Sign CSR and COPY THE SAME EXTENSIONS into the cert +openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ + -out server.crt -days 365 -sha256 \ + -extensions v3_req -extfile server.cnf + +openssl genrsa -out client.key 2048 + +cat > client.cnf <<'EOF' +[ req ] +distinguished_name = dn +prompt = no +req_extensions = v3_req + +[ dn ] +CN = mtls-client + +[ v3_req ] +extendedKeyUsage = clientAuth +EOF + +openssl req -new -key client.key -out client.csr -config client.cnf + +openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ + -out client.crt -days 365 -sha256 \ + -extensions v3_req -extfile client.cnf + +chmod 600 server.key client.key +chmod 644 ca.crt server.crt client.crt + +echo "---- SERVER ----" +openssl x509 -in server.crt -noout -text | sed -n '/Subject:/p;/Subject Alternative Name/,+1p;/Extended Key Usage/p' +echo "---- CLIENT ----" +openssl x509 -in client.crt -noout -text | sed -n '/Subject:/p;/Extended Key Usage/p' + +``` + +2) Create a Mosquitto config `mosquitto.conf` next to the certs with both 1883 and 8883 enabled and require client certs on 8883: + +```conf +# Plain TCP for anonymous tests +listener 1883 +protocol mqtt +allow_anonymous true + +# TLS (server-auth only) for TestGet_TLS +listener 8883 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +allow_anonymous true + +# mTLS (client certs required) for TestGet_mTLS +listener 8884 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +require_certificate true +use_identity_as_username true +allow_anonymous true +``` + +3) Start Mosquitto with the config and certs mounted: + +```bash +# From the folder containing mosquitto.conf and the *.crt/*.key files +docker run --rm -it \ + --name mosquitto \ + -p 1883:1883 \ + -p 8883:8883 \ + -p 8884:8884 \ + -v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf \ + -v $(pwd)/mtls-certs:/certs \ + eclipse-mosquitto:2 +``` + +4) In a separate shell, export the mTLS test environment variables: + +```bash +export TEST_MQTT_TLS=1 +export TEST_MQTT_TLS_BROKER="ssl://127.0.0.1:8883" +export TEST_MQTT_TLS_CA="$(pwd)/mtls-certs/ca.crt" +export TEST_MQTT_TLS_REQUEST_TOPIC="coa-request" +export TEST_MQTT_TLS_RESPONSE_TOPIC="coa-response" + +export TEST_MQTT_MTLS=1 +export TEST_MQTT_MTLS_BROKER="ssl://127.0.0.1:8884" +export TEST_MQTT_MTLS_CA="$(pwd)/mtls-certs/ca.crt" +export TEST_MQTT_MTLS_CERT="$(pwd)/mtls-certs/client.crt" +export TEST_MQTT_MTLS_KEY="$(pwd)/mtls-certs/client.key" +export TEST_MQTT_MTLS_REQUEST_TOPIC="coa-request" +export TEST_MQTT_MTLS_RESPONSE_TOPIC="coa-response" +``` + +5) Run tests: + +```bash +cd api +go test ./pkg/apis/v1alpha1/providers/target/mqtt -v +``` + +Tips +- If you only want to run the mTLS test, use `-run` to filter: + +```bash +go test ./pkg/apis/v1alpha1/providers/target/mqtt -run TestGet_mTLS -v +``` + +- If you see certificate errors, double-check that: + - `TEST_MQTT_MTLS_CA` points to the CA that signed both the server and client certs. + - `server.crt`/`server.key` match, and CN/SAN includes `localhost` or you connect by the same name you issued. + - Mosquitto is actually listening on 8883 (check container logs). + +Environment variables used by tests +- Plain TCP tests (skip unless set): `TEST_MQTT_LOCAL_ENABLED=1`, `TEST_MQTT=1` +- mTLS test (skip unless set): `TEST_MQTT_MTLS=1`, `TEST_MQTT_MTLS_BROKER`, `TEST_MQTT_MTLS_CA`, `TEST_MQTT_MTLS_CERT`, `TEST_MQTT_MTLS_KEY`, `TEST_MQTT_MTLS_REQUEST_TOPIC`, `TEST_MQTT_MTLS_RESPONSE_TOPIC` + +Troubleshooting +- On Windows with WSL, run Docker Desktop and expose the ports to the host. The tests connect to `127.0.0.1`. +- If ports are in use, stop other MQTT brokers or change the exposed ports in `docker run` and env vars accordingly. + diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf b/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf new file mode 100644 index 000000000..e95134ff3 --- /dev/null +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf @@ -0,0 +1,22 @@ +# Plain TCP for anonymous tests +listener 1883 +protocol mqtt +allow_anonymous true + +# TLS (server-auth only) for TestGet_TLS +listener 8883 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +allow_anonymous true + +# mTLS (client certs required) for TestGet_mTLS +listener 8884 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +require_certificate true +use_identity_as_username true +allow_anonymous true \ No newline at end of file diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go index 7c739f041..0c52bf2ed 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go @@ -249,7 +249,7 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { if token := i.MQTTClient.Connect(); token.Wait() && token.Error() != nil { connErr := token.Error() sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to connect to MQTT broker - %+v", connErr) - + // Provide specific guidance for common TLS errors if strings.Contains(connErr.Error(), "certificate signed by unknown authority") { sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS certificate verification failed. Common solutions:") @@ -262,7 +262,7 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { sLog.ErrorfCtx(ctx, " P (MQTT Target): - Verify CA certificate path and format") sLog.ErrorfCtx(ctx, " P (MQTT Target): - Check client certificate and key paths if using mutual TLS") } - + return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError) } @@ -272,7 +272,7 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { proxyResponse := ProxyResponse{ IsOK: response.State == v1alpha2.OK || response.State == v1alpha2.Accepted, State: response.State, - Payload: response.String(), + Payload: response.Body, } if !proxyResponse.IsOK { @@ -355,7 +355,7 @@ func (i *MQTTTargetProvider) Get(ctx context.Context, deployment model.Deploymen select { case resp := <-responseChan: if resp.IsOK { - data := []byte(resp.Payload.(string)) + data := resp.Payload.([]byte) var ret []model.ComponentSpec err = json.Unmarshal(data, &ret) if err != nil { @@ -507,7 +507,7 @@ func (i *MQTTTargetProvider) Apply(ctx context.Context, deployment model.Deploym select { case resp := <-responseChan: if resp.IsOK { - data := []byte(resp.Payload.(string)) + data := resp.Payload.([]byte) var summary model.SummarySpec err = json.Unmarshal(data, &summary) if err == nil { @@ -678,11 +678,11 @@ func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, func isCertificatePEM(data []byte) bool { // Check if the data contains PEM headers dataStr := string(data) - if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || - !strings.Contains(dataStr, "-----END CERTIFICATE-----") { + if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || + !strings.Contains(dataStr, "-----END CERTIFICATE-----") { return false } - + // Try to decode the PEM block block, _ := pem.Decode(data) return block != nil && block.Type == "CERTIFICATE" diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go index 2910784b2..c928deed1 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go @@ -8,8 +8,16 @@ package mqtt import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" "encoding/json" + "encoding/pem" + "math/big" "os" + "path/filepath" "testing" "time" @@ -160,32 +168,59 @@ func TestGet(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - ret := make([]model.ComponentSpec, 0) - data, _ := json.Marshal(ret) - response.State = v1alpha2.OK - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - response.Body = data - data, _ = json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + response.Body = data + data, _ = json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } arr, err := provider.Get(context.Background(), model.DeploymentSpec{ @@ -214,30 +249,57 @@ func TestGetBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - response.State = v1alpha2.InternalError - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - response.Body = []byte("BAD!!") - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + response.State = v1alpha2.InternalError + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + response.Body = []byte("didn't get response to Get() call over MQTT") + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } _, err = provider.Get(context.Background(), model.DeploymentSpec{ @@ -247,7 +309,7 @@ func TestGetBad(t *testing.T) { }, nil) assert.NotNil(t, err) - assert.Equal(t, "Internal Error: BAD!!", err.Error()) + assert.Equal(t, "Internal Error: didn't get response to Get() call over MQTT", err.Error()) } func TestApply(t *testing.T) { testMQTT := os.Getenv("TEST_MQTT") @@ -262,7 +324,7 @@ func TestApply(t *testing.T) { MQTTRequestTopic string = "coa-request" MQTTResponseTopic string = "coa-response" - TestTargetSuccessMessage string = "Success" + TestTargetSuccessMessage string = "" ) config := MQTTTargetProviderConfig{ @@ -277,14 +339,17 @@ func TestApply(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) + // Connect with simple retry to avoid transient broker readiness issues if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + t.Fatalf("failed to connect mqtt responder: %v", token.Error()) } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + // Subscribe with simple retry, tolerating existing subscription + token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { var request v1alpha2.COARequest err := json.Unmarshal(msg.Payload(), &request) assert.Nil(t, err) @@ -315,10 +380,9 @@ func TestApply(t *testing.T) { token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work token.Wait() - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) - } + }) + if token.Wait() && token.Error() != nil { + t.Fatalf("failed to subscribe mqtt responder: %v", token.Error()) } deploymentSpec := model.DeploymentSpec{ @@ -408,8 +472,9 @@ func TestApplyBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -468,29 +533,56 @@ func TestARemove(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - response.State = v1alpha2.OK - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + response.State = v1alpha2.OK + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } _, err = provider.Apply(context.Background(), model.DeploymentSpec{ @@ -526,8 +618,9 @@ func TestARemoveBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -585,36 +678,63 @@ func TestGetApply(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - json.Unmarshal(msg.Payload(), &request) - var response v1alpha2.COAResponse - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - if request.Method == "GET" { - ret := make([]model.ComponentSpec, 0) - data, _ := json.Marshal(ret) - response.State = v1alpha2.OK - response.Body = data - } else { - response.State = v1alpha2.OK + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } - - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + break + } + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + if request.Method == "GET" { + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Body = data + } else { + response.State = v1alpha2.OK + } + + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } arr, err := provider.Get(context.Background(), model.DeploymentSpec{ @@ -664,8 +784,9 @@ func TestLocalApplyGet(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -744,3 +865,260 @@ func TestConformanceSuite(t *testing.T) { // assert.Nil(t, err) okay if provider is not fully initialized conformance.ConformanceSuite(t, provider) } + +// --- TLS/mTLS unit tests --- + +// generateSelfSignedCert creates a temporary self-signed certificate and key. +// Returns paths to cert and key files and the certificate bytes. +func generateSelfSignedCert(t *testing.T) (string, string, []byte) { + t.Helper() + privKey, err := rsa.GenerateKey(rand.Reader, 2048) + assert.Nil(t, err) + + tmpl := x509.Certificate{ + SerialNumber: bigIntOne(t), + Subject: pkix.Name{CommonName: "localhost"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + } + + certDER, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &privKey.PublicKey, privKey) + assert.Nil(t, err) + + // Write cert + certFile, err := os.CreateTemp("", "mtls-cert-*.pem") + assert.Nil(t, err) + defer certFile.Close() + assert.Nil(t, pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: certDER})) + + // Write key + keyFile, err := os.CreateTemp("", "mtls-key-*.pem") + assert.Nil(t, err) + defer keyFile.Close() + assert.Nil(t, pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privKey)})) + + return certFile.Name(), keyFile.Name(), pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) +} + +func bigIntOne(t *testing.T) *big.Int { + t.Helper() + return big.NewInt(1) +} + +func TestCreateTLSConfig_InvalidCAPath(t *testing.T) { + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: filepath.Join(os.TempDir(), "non-existent-ca.pem"), + }} + _, err := provider.createTLSConfig(context.Background()) + assert.NotNil(t, err) +} + +func TestCreateTLSConfig_InvalidCAPEM(t *testing.T) { + caFile, err := os.CreateTemp("", "invalid-ca-*.pem") + assert.Nil(t, err) + defer os.Remove(caFile.Name()) + defer caFile.Close() + _, _ = caFile.Write([]byte("not a pem")) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: caFile.Name(), + }} + _, cfgErr := provider.createTLSConfig(context.Background()) + assert.NotNil(t, cfgErr) +} + +func TestCreateTLSConfig_ClientCertWithoutKey(t *testing.T) { + certPath, _, _ := generateSelfSignedCert(t) + defer os.Remove(certPath) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + ClientCertPath: certPath, + // missing key path + }} + _, err := provider.createTLSConfig(context.Background()) + assert.NotNil(t, err) +} + +func TestCreateTLSConfig_ClientCertAndKey_Success(t *testing.T) { + certPath, keyPath, caBytes := generateSelfSignedCert(t) + defer os.Remove(certPath) + defer os.Remove(keyPath) + + // Use the same self-signed cert as CA to exercise RootCAs path + caFile, err := os.CreateTemp("", "ca-*.pem") + assert.Nil(t, err) + defer os.Remove(caFile.Name()) + defer caFile.Close() + _, _ = caFile.Write(caBytes) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: caFile.Name(), + ClientCertPath: certPath, + ClientKeyPath: keyPath, + }} + cfg, err := provider.createTLSConfig(context.Background()) + assert.Nil(t, err) + assert.NotNil(t, cfg) + assert.True(t, len(cfg.Certificates) == 1) +} + +// Optional integration-style test to actually run MQTT with mTLS against a live broker. +// Requires environment variables: +// - TEST_MQTT_MTLS=1 (enables the test) +// - TEST_MQTT_MTLS_BROKER (e.g., ssl://127.0.0.1:8883) +// - TEST_MQTT_MTLS_CA, TEST_MQTT_MTLS_CERT, TEST_MQTT_MTLS_KEY (paths to PEM files) +// - TEST_MQTT_MTLS_REQUEST_TOPIC, TEST_MQTT_MTLS_RESPONSE_TOPIC +func TestGet_mTLS(t *testing.T) { + if os.Getenv("TEST_MQTT_MTLS") == "" { + t.Skip("Skipping mTLS test; set TEST_MQTT_MTLS and related env vars to enable") + } + broker := os.Getenv("TEST_MQTT_MTLS_BROKER") + ca := os.Getenv("TEST_MQTT_MTLS_CA") + cert := os.Getenv("TEST_MQTT_MTLS_CERT") + key := os.Getenv("TEST_MQTT_MTLS_KEY") + reqTopic := os.Getenv("TEST_MQTT_MTLS_REQUEST_TOPIC") + respTopic := os.Getenv("TEST_MQTT_MTLS_RESPONSE_TOPIC") + if broker == "" || ca == "" || cert == "" || key == "" || reqTopic == "" || respTopic == "" { + t.Skip("Skipping mTLS test; missing required TEST_MQTT_MTLS_* env vars") + } + + provider := &MQTTTargetProvider{} + err := provider.Init(MQTTTargetProviderConfig{ + Name: "mtls-test", + BrokerAddress: broker, + ClientID: "mtls-provider", + RequestTopic: reqTopic, + ResponseTopic: respTopic, + UseTLS: true, + CACertPath: ca, + ClientCertPath: cert, + ClientKeyPath: key, + }) + assert.Nil(t, err) + + // Separate client to respond to requests, also using mTLS + respTLS := newTLSConfigFromFiles(t, ca, cert, key) + + opts := gmqtt.NewClientOptions().AddBroker(broker).SetClientID("mtls-responder") + opts.SetTLSConfig(respTLS) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) + + c := gmqtt.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + t.Fatalf("failed to connect mtls responder: %v", token.Error()) + } + if token := c.Subscribe(reqTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + _ = json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = map[string]string{"request-id": request.Metadata["request-id"]} + response.Body = data + data, _ = json.Marshal(response) + tok := c.Publish(respTopic, 0, false, data) + tok.Wait() + }); token.Wait() && token.Error() != nil { + if token.Error().Error() != "subscription exists" { + t.Fatalf("subscribe failed: %v", token.Error()) + } + } + + arr, err := provider.Get(context.Background(), model.DeploymentSpec{Instance: model.InstanceState{Spec: &model.InstanceSpec{}}}, nil) + assert.Nil(t, err) + assert.Equal(t, 0, len(arr)) +} + +// TLS server-auth only (no client cert). Requires a TLS listener without mTLS on the broker. +// Env vars: +// - TEST_MQTT_TLS=1 (enables the test) +// - TEST_MQTT_TLS_BROKER (e.g., ssl://127.0.0.1:8883) +// - TEST_MQTT_TLS_CA (path to broker CA cert) +// - TEST_MQTT_TLS_REQUEST_TOPIC, TEST_MQTT_TLS_RESPONSE_TOPIC +func TestGet_TLS(t *testing.T) { + if os.Getenv("TEST_MQTT_TLS") == "" { + t.Skip("Skipping TLS test; set TEST_MQTT_TLS and related env vars to enable") + } + broker := os.Getenv("TEST_MQTT_TLS_BROKER") + ca := os.Getenv("TEST_MQTT_TLS_CA") + reqTopic := os.Getenv("TEST_MQTT_TLS_REQUEST_TOPIC") + respTopic := os.Getenv("TEST_MQTT_TLS_RESPONSE_TOPIC") + if broker == "" || ca == "" || reqTopic == "" || respTopic == "" { + t.Skip("Skipping TLS test; missing required TEST_MQTT_TLS_* env vars") + } + + provider := &MQTTTargetProvider{} + err := provider.Init(MQTTTargetProviderConfig{ + Name: "tls-test", + BrokerAddress: broker, + ClientID: "tls-provider", + RequestTopic: reqTopic, + ResponseTopic: respTopic, + UseTLS: true, + CACertPath: ca, + }) + assert.Nil(t, err) + + // TLS responder without client certificate + caBytes, err := os.ReadFile(ca) + assert.Nil(t, err) + pool := x509.NewCertPool() + assert.True(t, pool.AppendCertsFromPEM(caBytes)) + tlsCfg := &tls.Config{RootCAs: pool} + + opts := gmqtt.NewClientOptions().AddBroker(broker).SetClientID("tls-responder") + opts.SetTLSConfig(tlsCfg) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) + + c := gmqtt.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + t.Fatalf("failed to connect tls responder: %v", token.Error()) + } + if token := c.Subscribe(reqTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + _ = json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = map[string]string{"request-id": request.Metadata["request-id"]} + response.Body = data + data, _ = json.Marshal(response) + tok := c.Publish(respTopic, 0, false, data) + tok.Wait() + }); token.Wait() && token.Error() != nil { + if token.Error().Error() != "subscription exists" { + t.Fatalf("subscribe failed: %v", token.Error()) + } + } + + arr, err := provider.Get(context.Background(), model.DeploymentSpec{Instance: model.InstanceState{Spec: &model.InstanceSpec{}}}, nil) + assert.Nil(t, err) + assert.Equal(t, 0, len(arr)) +} + +func newTLSConfigFromFiles(t *testing.T, caPath, certPath, keyPath string) *tls.Config { + t.Helper() + caBytes, err := os.ReadFile(caPath) + assert.Nil(t, err) + pool := x509.NewCertPool() + assert.True(t, pool.AppendCertsFromPEM(caBytes)) + + crt, err := tls.LoadX509KeyPair(certPath, keyPath) + assert.Nil(t, err) + + return &tls.Config{RootCAs: pool, Certificates: []tls.Certificate{crt}} +}