Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 173 additions & 2 deletions api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,43 @@
* SPDX-License-Identifier: MIT
*/

/*
Client certificate authentication for secure MQTT connections.

Basic MQTT settings:
- brokerAddress: MQTT broker URL (required)
- clientID: MQTT client identifier (required)
- requestTopic: Topic for sending requests (required)
- responseTopic: Topic for receiving responses (required)
- timeoutSeconds: Request timeout in seconds (default: 8)
- keepAliveSeconds: Keep-alive interval in seconds (default: 2)
- pingTimeoutSeconds: Ping timeout in seconds (default: 1)

Authentication settings:
- username: MQTT username for basic authentication
- password: MQTT password for basic authentication

TLS/Certificate settings:
- useTLS: Enable TLS connection (default: false)
- caCertPath: Path to CA certificate file for server verification
- clientCertPath: Path to client certificate file for mutual TLS authentication
- clientKeyPath: Path to client private key file for mutual TLS authentication
- insecureSkipVerify: Skip TLS certificate verification (default: false, use with caution)

*/

package mqtt

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -49,6 +79,14 @@ type MQTTTargetProviderConfig struct {
TimeoutSeconds int `json:"timeoutSeconds,omitempty"`
KeepAliveSeconds int `json:"keepAliveSeconds,omitempty"`
PingTimeoutSeconds int `json:"pingTimeoutSeconds,omitempty"`
// TLS/Certificate configuration fields
UseTLS bool `json:"useTLS,omitempty"`
CACertPath string `json:"caCertPath,omitempty"`
ClientCertPath string `json:"clientCertPath,omitempty"`
ClientKeyPath string `json:"clientKeyPath,omitempty"`
InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}

var lock sync.Mutex
Expand Down Expand Up @@ -121,6 +159,30 @@ func MQTTTargetProviderConfigFromMap(properties map[string]string) (MQTTTargetPr
if ret.TimeoutSeconds <= 0 {
ret.TimeoutSeconds = 8
}

// Handle TLS/Certificate configuration
if v, ok := properties["useTLS"]; ok {
ret.UseTLS = v == "true"
}
if v, ok := properties["caCertPath"]; ok {
ret.CACertPath = v
}
if v, ok := properties["clientCertPath"]; ok {
ret.ClientCertPath = v
}
if v, ok := properties["clientKeyPath"]; ok {
ret.ClientKeyPath = v
}
if v, ok := properties["insecureSkipVerify"]; ok {
ret.InsecureSkipVerify = v == "true"
}
if v, ok := properties["username"]; ok {
ret.Username = v
}
if v, ok := properties["password"]; ok {
ret.Password = v
}

return ret, nil
}

Expand Down Expand Up @@ -164,10 +226,44 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error {
opts.SetKeepAlive(time.Duration(i.Config.KeepAliveSeconds) * time.Second)
opts.SetPingTimeout(time.Duration(i.Config.PingTimeoutSeconds) * time.Second)
opts.CleanSession = true

// Configure authentication
if i.Config.Username != "" {
opts.SetUsername(i.Config.Username)
}
if i.Config.Password != "" {
opts.SetPassword(i.Config.Password)
}

// Configure TLS if enabled
if i.Config.UseTLS {
tlsConfig, err := i.createTLSConfig(ctx)
if err != nil {
sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to create TLS config - %+v", err)
return v1alpha2.NewCOAError(err, "failed to create TLS config", v1alpha2.InternalError)
}
opts.SetTLSConfig(tlsConfig)
}

i.MQTTClient = gmqtt.NewClient(opts)
if token := i.MQTTClient.Connect(); token.Wait() && token.Error() != nil {
sLog.ErrorfCtx(ctx, " P (MQTT Target): faild to connect to MQTT broker - %+v", err)
return v1alpha2.NewCOAError(token.Error(), "failed to connect to MQTT broker", v1alpha2.InternalError)
connErr := token.Error()
sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to connect to MQTT broker - %+v", connErr)

// Provide specific guidance for common TLS errors
if strings.Contains(connErr.Error(), "certificate signed by unknown authority") {
sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS certificate verification failed. Common solutions:")
sLog.ErrorfCtx(ctx, " P (MQTT Target): 1. Set 'caCertPath' to the path of your broker's CA certificate")
sLog.ErrorfCtx(ctx, " P (MQTT Target): 2. Set 'insecureSkipVerify' to 'true' for testing (not recommended for production)")
sLog.ErrorfCtx(ctx, " P (MQTT Target): 3. Ensure your broker certificate is issued by a trusted CA")
} else if strings.Contains(connErr.Error(), "tls:") {
sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS connection error. Check your TLS configuration:")
sLog.ErrorfCtx(ctx, " P (MQTT Target): - Broker address should use 'ssl://' or 'tls://' prefix for TLS connections")
sLog.ErrorfCtx(ctx, " P (MQTT Target): - Verify CA certificate path and format")
sLog.ErrorfCtx(ctx, " P (MQTT Target): - Check client certificate and key paths if using mutual TLS")
}

return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError)
}

if token := i.MQTTClient.Subscribe(i.Config.ResponseTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) {
Expand Down Expand Up @@ -517,6 +613,81 @@ func (i *MQTTTargetProvider) Apply(ctx context.Context, deployment model.Deploym
return ret, nil
}

// createTLSConfig creates a TLS configuration for MQTT client authentication
func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: i.Config.InsecureSkipVerify,
}

// Load CA certificate if provided
if i.Config.CACertPath != "" {
sLog.InfofCtx(ctx, " P (MQTT Target): attempting to load CA certificate from %s", i.Config.CACertPath)

caCert, err := ioutil.ReadFile(i.Config.CACertPath)
if err != nil {
sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to read CA certificate - %+v", err)
return nil, fmt.Errorf("failed to read CA certificate: %w", err)
}

// Verify the CA cert content
sLog.InfofCtx(ctx, " P (MQTT Target): CA certificate file size: %d bytes", len(caCert))
if len(caCert) == 0 {
return nil, fmt.Errorf("CA certificate file is empty")
}

// Validate that the file contains valid PEM data
if !isCertificatePEM(caCert) {
sLog.ErrorfCtx(ctx, " P (MQTT Target): CA certificate file does not contain valid PEM data")
return nil, fmt.Errorf("CA certificate file does not contain valid PEM data")
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to parse CA certificate - invalid PEM format or corrupted certificate")
return nil, fmt.Errorf("failed to parse CA certificate - invalid PEM format or corrupted certificate")
}
tlsConfig.RootCAs = caCertPool
sLog.InfofCtx(ctx, " P (MQTT Target): successfully loaded CA certificate from %s", i.Config.CACertPath)
} else {
if !i.Config.InsecureSkipVerify {
sLog.WarnCtx(ctx, " P (MQTT Target): no CA certificate path provided - using system CA pool. If connection fails with 'certificate signed by unknown authority', either provide a CA certificate or set insecureSkipVerify to true")
} else {
sLog.InfofCtx(ctx, " P (MQTT Target): TLS certificate verification disabled (insecureSkipVerify=true)")
}
}

// Load client certificate and key if provided
if i.Config.ClientCertPath != "" && i.Config.ClientKeyPath != "" {
clientCert, err := tls.LoadX509KeyPair(i.Config.ClientCertPath, i.Config.ClientKeyPath)
if err != nil {
sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to load client certificate and key - %+v", err)
return nil, fmt.Errorf("failed to load client certificate and key: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{clientCert}
sLog.InfofCtx(ctx, " P (MQTT Target): loaded client certificate from %s and key from %s",
i.Config.ClientCertPath, i.Config.ClientKeyPath)
} else if i.Config.ClientCertPath != "" || i.Config.ClientKeyPath != "" {
// Both cert and key must be provided together
return nil, fmt.Errorf("both clientCertPath and clientKeyPath must be provided for client certificate authentication")
}

return tlsConfig, nil
}

// isCertificatePEM checks if the given data contains valid PEM formatted certificate data
func isCertificatePEM(data []byte) bool {
// Check if the data contains PEM headers
dataStr := string(data)
if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") ||
!strings.Contains(dataStr, "-----END CERTIFICATE-----") {
return false
}

// Try to decode the PEM block
block, _ := pem.Decode(data)
return block != nil && block.Type == "CERTIFICATE"
}

func (*MQTTTargetProvider) GetValidationRule(ctx context.Context) model.ValidationRule {
return model.ValidationRule{
AllowSidecar: false,
Expand Down
Loading
Loading