From 764707aaf7e216489e17b283612f481dd687082f Mon Sep 17 00:00:00 2001 From: Filipe Prezado <9027586+fprezado@users.noreply.github.com> Date: Tue, 1 Jul 2025 09:42:30 +0100 Subject: [PATCH 1/2] add client certificate auth for secure MQTT --- .../v1alpha1/providers/target/mqtt/mqtt.go | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go index 85778dc85..86d230316 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go @@ -4,12 +4,52 @@ * SPDX-License-Identifier: MIT */ +/* +Client certificate authentication for secure MQTT connections. + + +Basic MQTT settings: +- brokerAddress: MQTT broker URL (required) +- clientID: MQTT client identifier (required) +- requestTopic: Topic for sending requests (required) +- responseTopic: Topic for receiving responses (required) +- timeoutSeconds: Request timeout in seconds (default: 8) +- keepAliveSeconds: Keep-alive interval in seconds (default: 2) +- pingTimeoutSeconds: Ping timeout in seconds (default: 1) + +Authentication settings: +- username: MQTT username for basic authentication +- password: MQTT password for basic authentication + +TLS/Certificate settings: +- useTLS: Enable TLS connection (default: false) +- caCertPath: Path to CA certificate file for server verification +- clientCertPath: Path to client certificate file for mutual TLS authentication +- clientKeyPath: Path to client private key file for mutual TLS authentication +- insecureSkipVerify: Skip TLS certificate verification (default: false, use with caution) + +Example configuration with client certificate authentication: +{ + "brokerAddress": "ssl://mqtt.example.com:8883", + "clientID": "symphony-client", + "requestTopic": "symphony_request", + "responseTopic": "symphony_response", + "useTLS": "true", + "caCertPath": "/path/to/ca.crt", + "clientCertPath": "/path/to/client.crt", + "clientKeyPath": "/path/to/client.key" +} +*/ + package mqtt import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" + "io/ioutil" "strconv" "sync" "time" @@ -49,6 +89,14 @@ type MQTTTargetProviderConfig struct { TimeoutSeconds int `json:"timeoutSeconds,omitempty"` KeepAliveSeconds int `json:"keepAliveSeconds,omitempty"` PingTimeoutSeconds int `json:"pingTimeoutSeconds,omitempty"` + // TLS/Certificate configuration fields + UseTLS bool `json:"useTLS,omitempty"` + CACertPath string `json:"caCertPath,omitempty"` + ClientCertPath string `json:"clientCertPath,omitempty"` + ClientKeyPath string `json:"clientKeyPath,omitempty"` + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } var lock sync.Mutex @@ -121,6 +169,30 @@ func MQTTTargetProviderConfigFromMap(properties map[string]string) (MQTTTargetPr if ret.TimeoutSeconds <= 0 { ret.TimeoutSeconds = 8 } + + // Handle TLS/Certificate configuration + if v, ok := properties["useTLS"]; ok { + ret.UseTLS = v == "true" + } + if v, ok := properties["caCertPath"]; ok { + ret.CACertPath = v + } + if v, ok := properties["clientCertPath"]; ok { + ret.ClientCertPath = v + } + if v, ok := properties["clientKeyPath"]; ok { + ret.ClientKeyPath = v + } + if v, ok := properties["insecureSkipVerify"]; ok { + ret.InsecureSkipVerify = v == "true" + } + if v, ok := properties["username"]; ok { + ret.Username = v + } + if v, ok := properties["password"]; ok { + ret.Password = v + } + return ret, nil } @@ -164,6 +236,25 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { opts.SetKeepAlive(time.Duration(i.Config.KeepAliveSeconds) * time.Second) opts.SetPingTimeout(time.Duration(i.Config.PingTimeoutSeconds) * time.Second) opts.CleanSession = true + + // Configure authentication + if i.Config.Username != "" { + opts.SetUsername(i.Config.Username) + } + if i.Config.Password != "" { + opts.SetPassword(i.Config.Password) + } + + // Configure TLS if enabled + if i.Config.UseTLS { + tlsConfig, err := i.createTLSConfig(ctx) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to create TLS config - %+v", err) + return v1alpha2.NewCOAError(err, "failed to create TLS config", v1alpha2.InternalError) + } + opts.SetTLSConfig(tlsConfig) + } + i.MQTTClient = gmqtt.NewClient(opts) if token := i.MQTTClient.Connect(); token.Wait() && token.Error() != nil { sLog.ErrorfCtx(ctx, " P (MQTT Target): faild to connect to MQTT broker - %+v", err) @@ -517,6 +608,47 @@ func (i *MQTTTargetProvider) Apply(ctx context.Context, deployment model.Deploym return ret, nil } +// createTLSConfig creates a TLS configuration for MQTT client authentication +func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: i.Config.InsecureSkipVerify, + } + + // Load CA certificate if provided + if i.Config.CACertPath != "" { + caCert, err := ioutil.ReadFile(i.Config.CACertPath) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to read CA certificate - %+v", err) + return nil, fmt.Errorf("failed to read CA certificate: %w", err) + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to parse CA certificate") + return nil, fmt.Errorf("failed to parse CA certificate") + } + tlsConfig.RootCAs = caCertPool + sLog.InfofCtx(ctx, " P (MQTT Target): loaded CA certificate from %s", i.Config.CACertPath) + } + + // Load client certificate and key if provided + if i.Config.ClientCertPath != "" && i.Config.ClientKeyPath != "" { + clientCert, err := tls.LoadX509KeyPair(i.Config.ClientCertPath, i.Config.ClientKeyPath) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to load client certificate and key - %+v", err) + return nil, fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{clientCert} + sLog.InfofCtx(ctx, " P (MQTT Target): loaded client certificate from %s and key from %s", + i.Config.ClientCertPath, i.Config.ClientKeyPath) + } else if i.Config.ClientCertPath != "" || i.Config.ClientKeyPath != "" { + // Both cert and key must be provided together + return nil, fmt.Errorf("both clientCertPath and clientKeyPath must be provided for client certificate authentication") + } + + return tlsConfig, nil +} + func (*MQTTTargetProvider) GetValidationRule(ctx context.Context) model.ValidationRule { return model.ValidationRule{ AllowSidecar: false, From 61ee60e014728ea0f28b67e67b7c6f348aade375 Mon Sep 17 00:00:00 2001 From: fprezado <9027586+fprezado@users.noreply.github.com> Date: Tue, 1 Jul 2025 16:13:57 +0000 Subject: [PATCH 2/2] client certificate auth for secure MQTT updates --- .../v1alpha1/providers/target/mqtt/mqtt.go | 73 +- api/symphony-api-no-k8s-secure-mqtt.json | 703 ++++++++++++++++++ coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go | 150 +++- 3 files changed, 902 insertions(+), 24 deletions(-) create mode 100644 api/symphony-api-no-k8s-secure-mqtt.json diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go index 86d230316..7c739f041 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go @@ -7,7 +7,6 @@ /* Client certificate authentication for secure MQTT connections. - Basic MQTT settings: - brokerAddress: MQTT broker URL (required) - clientID: MQTT client identifier (required) @@ -28,17 +27,6 @@ TLS/Certificate settings: - clientKeyPath: Path to client private key file for mutual TLS authentication - insecureSkipVerify: Skip TLS certificate verification (default: false, use with caution) -Example configuration with client certificate authentication: -{ - "brokerAddress": "ssl://mqtt.example.com:8883", - "clientID": "symphony-client", - "requestTopic": "symphony_request", - "responseTopic": "symphony_response", - "useTLS": "true", - "caCertPath": "/path/to/ca.crt", - "clientCertPath": "/path/to/client.crt", - "clientKeyPath": "/path/to/client.key" -} */ package mqtt @@ -48,9 +36,11 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "encoding/pem" "fmt" "io/ioutil" "strconv" + "strings" "sync" "time" @@ -257,8 +247,23 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { i.MQTTClient = gmqtt.NewClient(opts) if token := i.MQTTClient.Connect(); token.Wait() && token.Error() != nil { - sLog.ErrorfCtx(ctx, " P (MQTT Target): faild to connect to MQTT broker - %+v", err) - return v1alpha2.NewCOAError(token.Error(), "failed to connect to MQTT broker", v1alpha2.InternalError) + connErr := token.Error() + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to connect to MQTT broker - %+v", connErr) + + // Provide specific guidance for common TLS errors + if strings.Contains(connErr.Error(), "certificate signed by unknown authority") { + sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS certificate verification failed. Common solutions:") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 1. Set 'caCertPath' to the path of your broker's CA certificate") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 2. Set 'insecureSkipVerify' to 'true' for testing (not recommended for production)") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 3. Ensure your broker certificate is issued by a trusted CA") + } else if strings.Contains(connErr.Error(), "tls:") { + sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS connection error. Check your TLS configuration:") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Broker address should use 'ssl://' or 'tls://' prefix for TLS connections") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Verify CA certificate path and format") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Check client certificate and key paths if using mutual TLS") + } + + return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError) } if token := i.MQTTClient.Subscribe(i.Config.ResponseTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { @@ -616,19 +621,39 @@ func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, // Load CA certificate if provided if i.Config.CACertPath != "" { + sLog.InfofCtx(ctx, " P (MQTT Target): attempting to load CA certificate from %s", i.Config.CACertPath) + caCert, err := ioutil.ReadFile(i.Config.CACertPath) if err != nil { sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to read CA certificate - %+v", err) return nil, fmt.Errorf("failed to read CA certificate: %w", err) } + // Verify the CA cert content + sLog.InfofCtx(ctx, " P (MQTT Target): CA certificate file size: %d bytes", len(caCert)) + if len(caCert) == 0 { + return nil, fmt.Errorf("CA certificate file is empty") + } + + // Validate that the file contains valid PEM data + if !isCertificatePEM(caCert) { + sLog.ErrorfCtx(ctx, " P (MQTT Target): CA certificate file does not contain valid PEM data") + return nil, fmt.Errorf("CA certificate file does not contain valid PEM data") + } + caCertPool := x509.NewCertPool() if !caCertPool.AppendCertsFromPEM(caCert) { - sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to parse CA certificate") - return nil, fmt.Errorf("failed to parse CA certificate") + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to parse CA certificate - invalid PEM format or corrupted certificate") + return nil, fmt.Errorf("failed to parse CA certificate - invalid PEM format or corrupted certificate") } tlsConfig.RootCAs = caCertPool - sLog.InfofCtx(ctx, " P (MQTT Target): loaded CA certificate from %s", i.Config.CACertPath) + sLog.InfofCtx(ctx, " P (MQTT Target): successfully loaded CA certificate from %s", i.Config.CACertPath) + } else { + if !i.Config.InsecureSkipVerify { + sLog.WarnCtx(ctx, " P (MQTT Target): no CA certificate path provided - using system CA pool. If connection fails with 'certificate signed by unknown authority', either provide a CA certificate or set insecureSkipVerify to true") + } else { + sLog.InfofCtx(ctx, " P (MQTT Target): TLS certificate verification disabled (insecureSkipVerify=true)") + } } // Load client certificate and key if provided @@ -649,6 +674,20 @@ func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, return tlsConfig, nil } +// isCertificatePEM checks if the given data contains valid PEM formatted certificate data +func isCertificatePEM(data []byte) bool { + // Check if the data contains PEM headers + dataStr := string(data) + if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || + !strings.Contains(dataStr, "-----END CERTIFICATE-----") { + return false + } + + // Try to decode the PEM block + block, _ := pem.Decode(data) + return block != nil && block.Type == "CERTIFICATE" +} + func (*MQTTTargetProvider) GetValidationRule(ctx context.Context) model.ValidationRule { return model.ValidationRule{ AllowSidecar: false, diff --git a/api/symphony-api-no-k8s-secure-mqtt.json b/api/symphony-api-no-k8s-secure-mqtt.json new file mode 100644 index 000000000..86889f53f --- /dev/null +++ b/api/symphony-api-no-k8s-secure-mqtt.json @@ -0,0 +1,703 @@ +{ + "siteInfo": { + "siteId": "laptop", + "properties": { + "name": "My Laptop", + "address": "1 Main Street", + "city": "Carnation", + "state": "WA", + "zip": "98014", + "country": "USA", + "phone": "425-555-1212", + "version": "0.45.1" + }, + "currentSite": { + "baseUrl": "http://localhost:8082/v1alpha2/", + "username": "admin", + "password": "" + } + }, + "api": { + "pubsub": { + "shared": true, + "provider": { + "type": "providers.pubsub.memory", + "config": {} + } + }, + "keylock": { + "shared": true, + "provider": { + "type": "providers.keylock.memory", + "config": { + "mode": "Global", + "cleanInterval" : 30, + "purgeDuration" : 43200 + } + } + }, + "vendors": [ + { + "type": "vendors.settings", + "managers": [ + { + "name": "config-manager", + "type": "managers.symphony.configs", + "properties": { + "singleton": "true" + }, + "providers": { + "catalog": { + "type": "providers.config.catalog", + "config": { + "user": "admin", + "password": "" + } + } + } + } + ] + }, + { + "type": "vendors.stage", + "route": "stage", + "managers": [ + { + "name": "stage-manager", + "type": "managers.symphony.stage", + "properties": { + "user": "admin", + "password": "", + "providers.volatilestate": "memory" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "campaigns-manager", + "type": "managers.symphony.campaigns", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "activations-manager", + "type": "managers.symphony.activations", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "wait.user": "admin", + "wait.password": "", + "wait.wait.interval": "15", + "wait.wait.count": "10" + } + }, + { + "type": "vendors.activations", + "route": "activations", + "managers": [ + { + "name": "activations-manager", + "type": "managers.symphony.activations", + "properties": { + "providers.persistentstate": "k8s-state", + "useJobManager": "true", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.backgroundjob", + "route": "backgroundjob", + "loopInterval": 3600, + "managers": [ + { + "name": "activations-cleanup-manager", + "type": "managers.symphony.activationscleanup", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true", + "RetentionDuration": "4320h" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.campaigns", + "route": "campaigns", + "managers": [ + { + "name": "campaigns-manager", + "type": "managers.symphony.campaigns", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.campaigncontainers", + "route": "campaigncontainers", + "managers": [ + { + "name": "campaign-container-manager", + "type": "managers.symphony.campaigncontainers", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.echo", + "route": "greetings", + "managers": [] + }, + { + "type": "vendors.jobs", + "route": "jobs", + "loopInterval": 15, + "managers": [ + { + "name": "jobs-manager", + "type": "managers.symphony.jobs", + "properties": { + "providers.volatilestate": "mem-state", + "providers.persistentstate": "mem-state", + "user": "admin", + "password": "", + "interval": "#15", + "poll.enabled": "true", + "schedule.enabled": "true" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.targets", + "loopInterval": 15, + "route": "targets", + "managers": [ + { + "name": "targets-manager", + "type": "managers.symphony.targets", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "useJobManager": "true" + } + }, + { + "type": "vendors.solutions", + "loopInterval": 15, + "route": "solutions", + "managers": [ + { + "name": "solutions-manager", + "type": "managers.symphony.solutions", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.instances", + "loopInterval": 15, + "route": "instances", + "managers": [ + { + "name": "instances-manager", + "type": "managers.symphony.instances", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "useJobManager": "true" + } + }, + { + "type": "vendors.solutioncontainers", + "route": "solutioncontainers", + "managers": [ + { + "name": "solution-container-manager", + "type": "managers.symphony.solutioncontainers", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.devices", + "loopInterval": 15, + "route": "devices", + "managers": [ + { + "name": "devices-manager", + "type": "managers.symphony.devices", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.models", + "loopInterval": 15, + "route": "models", + "managers": [ + { + "name": "models-manager", + "type": "managers.symphony.models", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.skills", + "loopInterval": 15, + "route": "skills", + "managers": [ + { + "name": "skills-manager", + "type": "managers.symphony.skills", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.users", + "loopInterval": 15, + "route": "users", + "properties": { + "test-users": "true" + }, + "managers": [ + { + "name": "users-manager", + "type": "managers.symphony.users", + "properties": { + "providers.volatilestate": "mem-state" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.solution", + "loopInterval": 15, + "route": "solution", + "managers": [ + { + "name": "solution-manager", + "type": "managers.symphony.solution", + "properties": { + "providers.persistentstate": "mem-state", + "providers.config": "mock-config", + "providers.secret": "mock-secret", + "providers.keylock": "mem-keylock" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + }, + "mem-keylock": { + "type": "providers.keylock.memory", + "config": { + "mode" : "Shared" + } + }, + "mock-config": { + "type": "providers.config.mock", + "config": {} + }, + "mock-secret": { + "type": "providers.secret.mock", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.agent", + "loopInterval": 15, + "route": "agent", + "managers": [ + { + "name": "reference-manager", + "type": "managers.symphony.reference", + "properties": { + "providers.reference": "http-reference", + "providers.volatilestate": "memory", + "providers.reporter": "http-reporter" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "http-reference": { + "type": "providers.reference.http", + "config": {} + }, + "http-reporter": { + "type": "providers.reporter.http", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.federation", + "route": "federation", + "loopInterval": 15, + "managers": [ + { + "name": "trails-manager", + "type": "managers.symphony.trails", + "providers": { + "mock": { + "type": "providers.ledger.mock", + "config": {} + } + } + }, + { + "name": "sites-manager", + "type": "managers.symphony.sites", + "properties": { + "providers.persistentstate": "memory" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "staging-manager", + "type": "managers.symphony.staging", + "properties": { + "poll.enabled": "true", + "interval": "#15", + "providers.queue": "memory-queue", + "providers.volatilestate": "memory-state" + }, + "providers": { + "memory-queue": { + "type": "providers.queue.memory", + "config": {} + }, + "memory-state": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "sync-manager", + "type": "managers.symphony.sync", + "properties": { + "baseUrl": "http://localhost:8080/v1alpha2/", + "user": "admin", + "password": "", + "interval": "#15", + "sync.enabled": "true" + } + } + ] + }, + { + "type": "vendors.catalogs", + "route": "catalogs", + "managers": [ + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "graph": { + "type": "providers.graph.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.catalogcontainers", + "route": "catalogcontainers", + "managers": [ + { + "name": "catalog-container-manager", + "type": "managers.symphony.catalogcontainers", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.visualization", + "route": "visualization", + "managers": [ + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "graph": { + "type": "providers.graph.memory", + "config": {} + } + } + } + ] + } + ] + }, + "bindings": [ + { + "type": "bindings.mqtt", + "config": { + "brokerAddress": "ssl://MQTT_BROKER_ADDRESS:8883", + "clientID": "symphony-client", + "requestTopic": "symphony-request", + "responseTopic": "symphony-response", + "useTLS": "true", + "caCertPath": "/path/to/ca/cert/ca.crt", + "clientCertPath": "/path/to/client/cert/client.crt", + "clientKeyPath": "/path/to/client/key/client.key" + } + }, + { + "type": "bindings.http", + "config": { + "port": 8082, + "pipeline": [ + { + "type": "middleware.http.cors", + "properties": { + "Access-Control-Allow-Headers": "authorization,Content-Type", + "Access-Control-Allow-Credentials": "true", + "Access-Control-Allow-Methods": "HEAD,GET,POST,PUT,DELETE,OPTIONS", + "Access-Control-Allow-Origin": "*" + } + }, + { + "type": "middleware.http.jwt", + "properties": { + "ignorePaths": ["/v1alpha2/users/auth", "/v1alpha2/solution/instances", "/v1alpha2/agent/references", "/v1alpha2/greetings"], + "verifyKey": "SymphonyKey", + "enableRBAC": true, + "roles": [ + { + "role": "administrator", + "claim": "user", + "value": "admin" + }, + { + "role": "reader", + "claim": "user", + "value": "*" + }, + { + "role": "solution-creator", + "claim": "user", + "value": "developer" + }, + { + "role": "target-manager", + "claim": "user", + "value": "device-manager" + }, + { + "role": "operator", + "claim": "user", + "value": "solution-operator" + } + ], + "policy": { + "administrator": { + "items": { + "*": "*" + } + }, + "reader": { + "items": { + "*": "GET" + } + }, + "solution-creator": { + "items": { + "/v1alpha2/solutions": "*" + } + }, + "target-manager": { + "items": { + "/v1alpha2/targets": "*" + } + }, + "solution-operator": { + "items": { + "/v1alpha2/instances": "*" + } + } + } + } + } + ] + } + } + ] +} diff --git a/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go b/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go index 7006eface..be7fb4934 100644 --- a/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go +++ b/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go @@ -8,7 +8,12 @@ package mqtt import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" + "encoding/pem" + "fmt" + "io/ioutil" "strings" "time" @@ -21,10 +26,21 @@ import ( var log = logger.NewLogger("coa.runtime") type MQTTBindingConfig struct { - BrokerAddress string `json:"brokerAddress"` - ClientID string `json:"clientID"` - RequestTopic string `json:"requestTopic"` - ResponseTopic string `json:"responseTopic"` + BrokerAddress string `json:"brokerAddress"` + ClientID string `json:"clientID"` + RequestTopic string `json:"requestTopic"` + ResponseTopic string `json:"responseTopic"` + TimeoutSeconds int `json:"timeoutSeconds,omitempty"` + KeepAliveSeconds int `json:"keepAliveSeconds,omitempty"` + PingTimeoutSeconds int `json:"pingTimeoutSeconds,omitempty"` + // TLS/Certificate configuration fields + UseTLS string `json:"useTLS,omitempty"` + CACertPath string `json:"caCertPath,omitempty"` + ClientCertPath string `json:"clientCertPath,omitempty"` + ClientKeyPath string `json:"clientKeyPath,omitempty"` + InsecureSkipVerify string `json:"insecureSkipVerify,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } type MQTTBinding struct { @@ -44,13 +60,56 @@ func (m *MQTTBinding) Launch(config MQTTBindingConfig, endpoints []v1alpha2.Endp routeTable[route] = endpoint } + // Set default values + if config.KeepAliveSeconds <= 0 { + config.KeepAliveSeconds = 2 + } + if config.PingTimeoutSeconds <= 0 { + config.PingTimeoutSeconds = 1 + } + opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID(config.ClientID) - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(time.Duration(config.KeepAliveSeconds) * time.Second) + opts.SetPingTimeout(time.Duration(config.PingTimeoutSeconds) * time.Second) opts.CleanSession = false + + // Configure authentication + if config.Username != "" { + opts.SetUsername(config.Username) + } + if config.Password != "" { + opts.SetPassword(config.Password) + } + + // Configure TLS if enabled + if config.UseTLS == "true" { + tlsConfig, err := m.createTLSConfig(config) + if err != nil { + log.Errorf("MQTT Binding: failed to create TLS config - %+v", err) + return v1alpha2.NewCOAError(err, "failed to create TLS config", v1alpha2.InternalError) + } + opts.SetTLSConfig(tlsConfig) + } + m.MQTTClient = gmqtt.NewClient(opts) if token := m.MQTTClient.Connect(); token.Wait() && token.Error() != nil { - return v1alpha2.NewCOAError(token.Error(), "failed to connect to MQTT broker", v1alpha2.InternalError) + connErr := token.Error() + log.Errorf("MQTT Binding: failed to connect to MQTT broker - %+v", connErr) + + // Provide specific guidance for common TLS errors + if strings.Contains(connErr.Error(), "certificate signed by unknown authority") { + log.Errorf("MQTT Binding: TLS certificate verification failed. Common solutions:") + log.Errorf("MQTT Binding: 1. Set 'caCertPath' to the path of your broker's CA certificate") + log.Errorf("MQTT Binding: 2. Set 'insecureSkipVerify' to 'true' for testing (not recommended for production)") + log.Errorf("MQTT Binding: 3. Ensure your broker certificate is issued by a trusted CA") + } else if strings.Contains(connErr.Error(), "tls:") { + log.Errorf("MQTT Binding: TLS connection error. Check your TLS configuration:") + log.Errorf("MQTT Binding: - Broker address should use 'ssl://' or 'tls://' prefix for TLS connections") + log.Errorf("MQTT Binding: - Verify CA certificate path and format") + log.Errorf("MQTT Binding: - Check client certificate and key paths if using mutual TLS") + } + + return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError) } if token := m.MQTTClient.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { @@ -99,6 +158,83 @@ func (m *MQTTBinding) Launch(config MQTTBindingConfig, endpoints []v1alpha2.Endp return nil } +// createTLSConfig creates a TLS configuration for MQTT client authentication +func (m *MQTTBinding) createTLSConfig(config MQTTBindingConfig) (*tls.Config, error) { + insecureSkipVerify := config.InsecureSkipVerify == "true" + + tlsConfig := &tls.Config{ + InsecureSkipVerify: insecureSkipVerify, + } + + // Load CA certificate if provided + if config.CACertPath != "" { + log.Infof("MQTT Binding: attempting to load CA certificate from %s", config.CACertPath) + + caCert, err := ioutil.ReadFile(config.CACertPath) + if err != nil { + log.Errorf("MQTT Binding: failed to read CA certificate - %+v", err) + return nil, fmt.Errorf("failed to read CA certificate: %w", err) + } + + // Verify the CA cert content + log.Infof("MQTT Binding: CA certificate file size: %d bytes", len(caCert)) + if len(caCert) == 0 { + return nil, fmt.Errorf("CA certificate file is empty") + } + + // Validate that the file contains valid PEM data + if !isCertificatePEM(caCert) { + log.Errorf("MQTT Binding: CA certificate file does not contain valid PEM data") + return nil, fmt.Errorf("CA certificate file does not contain valid PEM data") + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + log.Errorf("MQTT Binding: failed to parse CA certificate - invalid PEM format or corrupted certificate") + return nil, fmt.Errorf("failed to parse CA certificate - invalid PEM format or corrupted certificate") + } + tlsConfig.RootCAs = caCertPool + log.Infof("MQTT Binding: successfully loaded CA certificate from %s", config.CACertPath) + } else { + if !insecureSkipVerify { + log.Warn("MQTT Binding: no CA certificate path provided - using system CA pool. If connection fails with 'certificate signed by unknown authority', either provide a CA certificate or set insecureSkipVerify to true") + } else { + log.Infof("MQTT Binding: TLS certificate verification disabled (insecureSkipVerify=true)") + } + } + + // Load client certificate and key if provided + if config.ClientCertPath != "" && config.ClientKeyPath != "" { + clientCert, err := tls.LoadX509KeyPair(config.ClientCertPath, config.ClientKeyPath) + if err != nil { + log.Errorf("MQTT Binding: failed to load client certificate and key - %+v", err) + return nil, fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{clientCert} + log.Infof("MQTT Binding: loaded client certificate from %s and key from %s", + config.ClientCertPath, config.ClientKeyPath) + } else if config.ClientCertPath != "" || config.ClientKeyPath != "" { + // Both cert and key must be provided together + return nil, fmt.Errorf("both clientCertPath and clientKeyPath must be provided for client certificate authentication") + } + + return tlsConfig, nil +} + +// isCertificatePEM checks if the given data contains valid PEM formatted certificate data +func isCertificatePEM(data []byte) bool { + // Check if the data contains PEM headers + dataStr := string(data) + if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || + !strings.Contains(dataStr, "-----END CERTIFICATE-----") { + return false + } + + // Try to decode the PEM block + block, _ := pem.Decode(data) + return block != nil && block.Type == "CERTIFICATE" +} + // Shutdown stops the MQTT binding func (m *MQTTBinding) Shutdown(ctx context.Context) error { m.MQTTClient.Disconnect(1000)