Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func main() {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
}
defer svc.Close()

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down Expand Up @@ -402,7 +403,7 @@ func StopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger lo
shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("Failed to shutdown %s server: %v", svcName, err)
return fmt.Errorf("failed to shutdown %s server: %v", svcName, err)
}
return fmt.Errorf("%s service shutdown by signal: %s", svcName, sig)
case <-ctx.Done():
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/api/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func TestPublish(t *testing.T) {
{"publish data", data, http.StatusOK},
{"publish data with invalid data", "}", http.StatusInternalServerError},
}
t.Cleanup(func() {
assert.Nil(t, svc.Close())
})

for _, tc := range cases {
req := testRequest{
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,16 @@ func (lm loggingMiddleware) Terminal(uuid, cmdStr string) (err error) {

return lm.svc.Terminal(uuid, cmdStr)
}

func (lm loggingMiddleware) Close() (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method close took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Close()
}
13 changes: 11 additions & 2 deletions pkg/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,18 @@ func (ms *metricsMiddleware) Publish(topic, payload string) error {

func (ms *metricsMiddleware) Terminal(topic, payload string) error {
defer func(begin time.Time) {
ms.counter.With("method", "publish").Add(1)
ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "terminal").Add(1)
ms.latency.With("method", "terminal").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Terminal(topic, payload)
}

func (ms *metricsMiddleware) Close() error {
defer func(begin time.Time) {
ms.counter.With("method", "close").Add(1)
ms.latency.With("method", "close").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Close()
}
18 changes: 12 additions & 6 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ package agent
import (
"crypto/tls"
"encoding/json"
"fmt"
"os"
"time"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/pelletier/go-toml"
)

var (
ErrWritingToml = errors.New("error writing to toml file")
errReadingFile = errors.New("error reading config file")
errUnmarshalToml = errors.New("error unmarshaling toml")
errMarshalToml = errors.New("error marshaling toml")
)

type ServerConfig struct {
Port string `toml:"port" json:"port"`
BrokerURL string `toml:"broker_url" json:"broker_url"`
Expand Down Expand Up @@ -86,24 +92,24 @@ func NewConfig(sc ServerConfig, cc ChanConfig, ec EdgexConfig, lc LogConfig, mc
func SaveConfig(c Config) error {
b, err := toml.Marshal(c)
if err != nil {
return errors.New(fmt.Sprintf("Error reading config file: %s", err))
return errors.Wrap(errMarshalToml, err)
}
if err := os.WriteFile(c.File, b, 0644); err != nil {
return errors.New(fmt.Sprintf("Error writing toml: %s", err))
return errors.Wrap(ErrWritingToml, err)
}
return nil
}

// Read - retrieve config from a file.
// ReadConfig - retrieve config from a file.
func ReadConfig(file string) (Config, error) {
data, err := os.ReadFile(file)
c := Config{}
if err != nil {
return c, errors.New(fmt.Sprintf("Error reading config file: %s", err))
return Config{}, errors.Wrap(errReadingFile, err)
}

if err := toml.Unmarshal(data, &c); err != nil {
return Config{}, errors.New(fmt.Sprintf("Error unmarshaling toml: %s", err))
return Config{}, errors.Wrap(errUnmarshalToml, err)
}
return c, nil
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/agent/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package agent

import (
"fmt"
"os"
"strings"
"testing"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestReadConfig(t *testing.T) {
// Create a temporary config file for testing.
tempFile, err := os.CreateTemp("", "config.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile.Name())
tempFile2, err := os.CreateTemp("", "invalid.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile2.Name())

sampleConfig := `
File = "config.toml"

[channels]
control = ""
data = ""

[edgex]
url = "http://localhost:48090/api/v1/"

[heartbeat]
interval = "10s"

[log]
level = "info"

[mqtt]
ca_cert = ""
ca_path = "ca.crt"
cert_path = "thing.cert"
client_cert = ""
client_key = ""
mtls = false
password = ""
priv_key_path = "thing.key"
qos = 0
retain = false
skip_tls_ver = true
url = "localhost:1883"
username = ""

[server]
nats_url = "nats://127.0.0.1:4222"
port = "9999"

[terminal]
session_timeout = "1m0s"
`

if _, writeErr := tempFile.WriteString(sampleConfig); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile.Close()

if _, writeErr := tempFile2.WriteString(strings.ReplaceAll(sampleConfig, "[", "")); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile2.Close()

tests := []struct {
name string
fileName string
expectedErr error
}{
{
name: "failed to read file",
fileName: "invalidFile.toml",
expectedErr: errReadingFile,
},
{
name: "invalid toml",
fileName: tempFile2.Name(),
expectedErr: errUnmarshalToml,
},
{
name: "successful read",
fileName: tempFile.Name(),
expectedErr: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := ReadConfig(test.fileName)
assert.True(t, errors.Contains(err, test.expectedErr), fmt.Sprintf("expected %v got %v", test.expectedErr, err))
})
}
}
26 changes: 19 additions & 7 deletions pkg/agent/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -33,11 +34,12 @@ type Info struct {
type Heartbeat interface {
Update()
Info() Info
Close()
}

// interval - duration of interval
// if service doesnt send heartbeat during interval it is marked offline.
func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
func NewHeartbeat(ctx context.Context, name, svcType string, interval time.Duration) Heartbeat {
ticker := time.NewTicker(interval)
s := svc{
info: Info{
Expand All @@ -49,22 +51,25 @@ func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
ticker: ticker,
interval: interval,
}
s.listen()
go s.listen(ctx)
return &s
}

func (s *svc) listen() {
go func() {
for range s.ticker.C {
func (s *svc) listen(ctx context.Context) {
for {
select {
case <-s.ticker.C:
// TODO - we can disable ticker when the status gets OFFLINE
// and on the next heartbeat enable it again.
s.mu.Lock()
if time.Now().After(s.info.LastSeen.Add(s.interval)) {
s.info.Status = offline
}
s.mu.Unlock()
case <-ctx.Done():
return
}
}()
}
}

func (s *svc) Update() {
Expand All @@ -75,5 +80,12 @@ func (s *svc) Update() {
}

func (s *svc) Info() Info {
return s.info
s.mu.Lock()
defer s.mu.Unlock()
info := s.info
return info
}

func (s *svc) Close() {
s.ticker.Stop()
}
72 changes: 72 additions & 0 deletions pkg/agent/heartbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package agent

import (
"context"
"testing"
"time"
)

const (
name = "TestService"
serviceType = "TestType"
interval = 2 * time.Second
)

func TestNewHeartbeat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Check initial status and info
info := heartbeat.Info()
if info.Name != name {
t.Errorf("Expected name to be %s, but got %s", name, info.Name)
}
if info.Type != serviceType {
t.Errorf("Expected type to be %s, but got %s", serviceType, info.Type)
}
if info.Status != online {
t.Errorf("Expected initial status to be %s, but got %s", online, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}

func TestHeartbeat_Update(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Sleep for more than the interval to simulate an update
time.Sleep(3 * time.Second)

heartbeat.Update()

// Check if the status has been updated to online
info := heartbeat.Info()
if info.Status != online {
t.Errorf("Expected status to be %s, but got %s", online, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}

func TestHeartbeat_StatusOffline(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Sleep for more than two intervals to simulate offline status
time.Sleep(5 * time.Second)

// Check if the status has been updated to offline
info := heartbeat.Info()
if info.Status != offline {
t.Errorf("Expected status to be %s, but got %s", offline, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}
Loading