diff --git a/README.md b/README.md index 3985d38..2fd1acc 100644 --- a/README.md +++ b/README.md @@ -1 +1,137 @@ -"# TM-software-H11" +# TM-software-H11 + +Go backend that exposes telemetry via WebSockets and REST, receives commands from the frontend, and logs activity. It can simulate sensors or receive telemetry over TCP from an external client. + +### Features +- WebSocket (gorilla/websocket) for real-time streaming (`/api/stream`). +- REST for recent history (`/api/messages`) and receiving commands (`/api/commands`). +- Sensor simulation (temperature, pressure, 6 battery cells) or TCP server mode to receive telemetry. +- Batch processing (mean/min/max) and broadcast summaries to WS clients. +- Hub with configurable history and fan-out to multiple clients. +- Simple log rotation in `logs/`. + + + +## Configuration +Edit `config.toml`: +- `[network]` sets `address` (TCP) and `http_address` (HTTP/WS). +- `[processor]` sets `batch_size` for statistics. +- `[logger]` sets directory, prefix, and rotation by line count. +- `[websocket]` sets `history_size` (recent messages kept by the hub). +- `[sensor.*]` defines simulated sensors in `client` mode. + +Defaults of interest: +- History (`websocket.history_size`): 100 +- Batch (`processor.batch_size`): 5 + +## Run + +From the repo root: + +### Client mode (simulates sensors and optionally serves HTTP/WS) +```powershell +# Also serve HTTP/WS (default true) - for single-node demo +go run ./cmd/main.go --mode client --serve-http=true + +# Only simulate and send over TCP (no local HTTP/WS) - for multi-node setup +go run ./cmd/main.go --mode client --serve-http=false +``` + +### Server mode (receives telemetry over TCP and serves HTTP/WS) +```powershell +go run ./cmd/main.go --mode server +``` + +**When to use each mode:** +- **Single-node demo**: Use client mode with `--serve-http=true`. This processes telemetry locally, logs locally, and serves the dashboard locally. No server needed. +- **Multi-node setup**: Use server mode + client mode with `--serve-http=false`. The server centralizes telemetry processing, logging, and dashboard. Clients send data to the server. + +**Important for multi-node setup:** +- Only the server should use the HTTP port (default 8080 in the toml). +- If you get "bind: Only one usage of each socket address" error, it means both processes are trying to use the same port. +- Solution: Use different `http_address` in `config.toml` for clients, or only run clients with `--serve-http=false`. + +Addresses and ports are taken from `config.toml`: +- Telemetry TCP: `network.address` (example., `localhost:4040`) +- HTTP/WS: `network.http_address` (ex., `localhost:8080`) + +## Endpoints +- `GET /api/messages` — returns a JSON array with the hub's latest envelopes. +- `GET /api/stream` — WebSocket endpoint for real-time envelopes. +- `POST /api/commands` — accepts `{ "action": "pause|resume|launch|set_batch_size|snapshot", ... }` Emits `command_ack` over WS. +- `GET /` — serves `web/index.html` (sample dashboard, super simple). + +## Message format (Envelope) +All WS messages follow: + +```json +{ + "type": "telemetry|command_ack|snapshot", + "ts": "RFC3339", + "seq": 1, + "payload": {} +} +``` + +Aggregated telemetry example: + +```json +{ + "type": "telemetry", + "ts": "2025-10-17T18:30:00Z", + "seq": 123, + "payload": { + "sensor": "Temperature", + "unit": "°C", + "mean": 23.5, + "min": 22.8, + "max": 24.1 + } +} +``` + +Command ACK example: + +```json +{ + "type": "command_ack", + "ts": "2025-10-17T18:31:01Z", + "payload": { + "id": "", + "command": { "action": "pause" }, + "status": "ok" + } +} +``` + +## Supported commands +- `launch` — sample action (logs an execution message). +- `pause` / `resume` — pause/resume batch processing. +- `set_batch_size` — requires `{ "value": "10" }` (positive integer in string form). +- `snapshot` — logs the command and broadcasts current state `{ paused, batchSize }` over WS. + + + +## Sample dashboard +- **Single-node**: Open `http://localhost:8080/` (served by client with `--serve-http=true`). +- **Multi-node**: Open `http://localhost:8080/` (served by server, clients send data to server). + +**Multi-node setup steps:** +1. Terminal A: `go run ./cmd/main.go --mode server` +2. Terminal B: `go run ./cmd/main.go --mode client --serve-http=false` +3. Open `http://localhost:8080/` (server's dashboard) + +Use the buttons to: +- Connect/disconnect WS. +- Send commands using the input JSON. +- See charts (Chart.js) for temperature, pressure, and cells. + +## Logs +Logs rotate after `logger.max_lines`. Files are stored in `logs/` using the configured prefix. + +## Dev and tests +- Console WS client: `go run ./tools/wsclient` (with server running at `localhost:8080`. Nvm, now it's extracted from the toml). Basically shows you raw messages. +- Run all tests: `go test ./...` +- Test files: `internal/websocket/hub_test.go`, `internal/integration/ws_contract_test.go`, `internal/netsender/integration_test.go`. + + diff --git a/cmd/main.go b/cmd/main.go index f173b75..559e46e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,21 +1,30 @@ package main import ( + "context" "flag" "fmt" "os" "os/signal" + "sync" "syscall" + "time" + + "github.com/google/uuid" "backend/config" + "backend/internal/api" "backend/internal/logger" "backend/internal/netreceiver" + "backend/internal/netsender" "backend/internal/sensor" + "backend/internal/websocket" ) func main() { // Parse mode from command line mode := flag.String("mode", "", "Mode to run: server or client") + serveHTTP := flag.Bool("serve-http", true, "In client mode: also serve HTTP/WS (true|false)") flag.Parse() if *mode != "server" && *mode != "client" { fmt.Println("Usage: go run ./cmd/main.go --mode [server|client]") @@ -38,46 +47,177 @@ func main() { defer log.Close() dataCh := make(chan sensor.Data) - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // websocket hub and command channel (history size from config) + hist := cfg.Websocket.HistorySize + if hist <= 0 { + hist = 100 + } + hub := websocket.NewHub(hist) + commandCh := make(chan map[string]string, 32) switch *mode { case "server": // Only server opens TCP listener - if err := netreceiver.StartTCP(cfg.Network.Address, dataCh, stopCh); err != nil { + if err := netreceiver.StartTCP(ctx, &wg, cfg.Network.Address, dataCh); err != nil { fmt.Println("Error starting TCP listener:", err) + cancel() + wg.Wait() return } + + fmt.Println("Telemetry address:", cfg.Network.Address) + fmt.Println("HTTP address:", cfg.Network.HTTPAddr) + // start HTTP server for API/WS in background + if err := api.StartHTTP(ctx, &wg, cfg.Network.HTTPAddr, hub, log.Printf, commandCh); err != nil { + fmt.Println("HTTP server error:", err) + } fmt.Println("Server mode: listening for incoming telemetry...") case "client": // Only client simulates sensors - startSimulatedClient(cfg, dataCh, stopCh) - fmt.Println("Client mode: generating and sending telemetry...") + // create an intermediate channel so we can fan-out sensor data + sensorOut := make(chan sensor.Data) + netsenderCh := make(chan sensor.Data, 256) + startSimulatedClient(cfg, sensorOut, ctx) + + go func() { + defer close(netsenderCh) + for { + select { + case <-ctx.Done(): + return + case d := <-sensorOut: + // non-blocking send to local processor + select { + case dataCh <- d: + default: + } + // non-blocking send to netsender buffer + select { + case netsenderCh <- d: + default: + } + } + } + }() + + nsCfg := netsender.NetsenderCfg{ + DialTimeout: time.Duration(cfg.Netsender.DialTimeoutMS) * time.Millisecond, + WriteTimeout: time.Duration(cfg.Netsender.WriteTimeoutMS) * time.Millisecond, + InitialBackoff: time.Duration(cfg.Netsender.InitialBackoffMS) * time.Millisecond, + MaxBackoff: time.Duration(cfg.Netsender.MaxBackoffMS) * time.Millisecond, + } + + logf := func(format string, v ...interface{}) { + log.Printf(format, v...) + } + if err := netsender.StartSender(ctx, &wg, cfg.Network.Address, nsCfg, netsenderCh, logf); err != nil { + fmt.Println("netsender error:", err) + } + + if *serveHTTP { + if err := api.StartHTTP(ctx, &wg, cfg.Network.HTTPAddr, hub, log.Printf, commandCh); err != nil { + fmt.Println("HTTP server error:", err) + } + fmt.Println("Client mode: generating telemetry and serving HTTP/WS...") + } else { + fmt.Println("Client mode: generating telemetry (HTTP server disabled)") + } } - batch := make([]float64, 0, cfg.Processor.BatchSize) + // per-sensor batches: collect values per sensor name and compute stats independently + batches := make(map[string][]float64) + paused := false + batchSize := cfg.Processor.BatchSize sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) for { select { case d := <-dataCh: - batch = append(batch, d.Value) - if len(batch) >= cfg.Processor.BatchSize { - stats := sensor.Process(batch) + if paused { + // ignore data while paused + continue + } + arr := batches[d.Name] + arr = append(arr, d.Value) + if len(arr) >= batchSize { + stats := sensor.Process(arr) log.Printf("%s stats [%s] -> Mean: %.2f, Min: %.2f, Max: %.2f", d.Name, d.Unit, stats.Mean, stats.Min, stats.Max) - batch = batch[:0] + // broadcast telemetry summary to websocket clients + payload := map[string]interface{}{ + "timestamp": d.Timestamp.Format(time.RFC3339), + "sensor": d.Name, + "unit": d.Unit, + "mean": stats.Mean, + "min": stats.Min, + "max": stats.Max, + } + hub.BroadcastEnvelope("telemetry", payload) + // reset the batch slice for this sensor + arr = arr[:0] + } + batches[d.Name] = arr + case cmd := <-commandCh: + + log.Printf("command received: %v", cmd) + // simple command execution + action := cmd["action"] + status := "ok" + switch action { + case "launch": + + log.Printf("executing launch action") + case "pause": + paused = true + log.Printf("processing paused") + case "resume": + paused = false + log.Printf("processing resumed") + case "set_batch_size": + if v, ok := cmd["value"]; ok { + // try parse int + var n int + if _, err := fmt.Sscanf(v, "%d", &n); err == nil && n > 0 { + batchSize = n + log.Printf("batch size set to %d", n) + } else { + status = "invalid_value" + } + } else { + status = "missing_value" + } + case "snapshot": + //current state + snap := map[string]interface{}{ + "paused": paused, + "batchSize": batchSize, + } + hub.BroadcastEnvelope("snapshot", snap) + default: + status = "unknown_command" + } + // broadcast ack/status to websocket clients (with timestamp and id) + ack := map[string]interface{}{ + "id": uuid.New().String(), + "command": cmd, + "status": status, } + hub.BroadcastEnvelope("command_ack", ack) case <-sigCh: - close(stopCh) fmt.Println("Shutting down ...") + cancel() + wg.Wait() return } } } -func startSimulatedClient(cfg *config.Config, out chan<- sensor.Data, stop <-chan struct{}) { +func startSimulatedClient(cfg *config.Config, out chan<- sensor.Data, ctx context.Context) { tempGen := sensor.Generator{ Name: cfg.Sensor.Temperature.Name, Unit: cfg.Sensor.Temperature.Unit, @@ -92,6 +232,54 @@ func startSimulatedClient(cfg *config.Config, out chan<- sensor.Data, stop <-cha Max: cfg.Sensor.Pressure.Max, Period: cfg.Sensor.Pressure.Period(), } - tempGen.Start(out, stop) - pressGen.Start(out, stop) + cell1 := sensor.Generator{ + Name: cfg.Sensor.Cell1.Name, + Unit: cfg.Sensor.Cell1.Unit, + Min: cfg.Sensor.Cell1.Min, + Max: cfg.Sensor.Cell1.Max, + Period: cfg.Sensor.Cell1.Period(), + } + cell2 := sensor.Generator{ + Name: cfg.Sensor.Cell2.Name, + Unit: cfg.Sensor.Cell2.Unit, + Min: cfg.Sensor.Cell2.Min, + Max: cfg.Sensor.Cell2.Max, + Period: cfg.Sensor.Cell2.Period(), + } + cell3 := sensor.Generator{ + Name: cfg.Sensor.Cell3.Name, + Unit: cfg.Sensor.Cell3.Unit, + Min: cfg.Sensor.Cell3.Min, + Max: cfg.Sensor.Cell3.Max, + Period: cfg.Sensor.Cell3.Period(), + } + cell4 := sensor.Generator{ + Name: cfg.Sensor.Cell4.Name, + Unit: cfg.Sensor.Cell4.Unit, + Min: cfg.Sensor.Cell4.Min, + Max: cfg.Sensor.Cell4.Max, + Period: cfg.Sensor.Cell4.Period(), + } + cell5 := sensor.Generator{ + Name: cfg.Sensor.Cell5.Name, + Unit: cfg.Sensor.Cell5.Unit, + Min: cfg.Sensor.Cell5.Min, + Max: cfg.Sensor.Cell5.Max, + Period: cfg.Sensor.Cell5.Period(), + } + cell6 := sensor.Generator{ + Name: cfg.Sensor.Cell6.Name, + Unit: cfg.Sensor.Cell6.Unit, + Min: cfg.Sensor.Cell6.Min, + Max: cfg.Sensor.Cell6.Max, + Period: cfg.Sensor.Cell6.Period(), + } + tempGen.Start(ctx, out) + pressGen.Start(ctx, out) + cell1.Start(ctx, out) + cell2.Start(ctx, out) + cell3.Start(ctx, out) + cell4.Start(ctx, out) + cell5.Start(ctx, out) + cell6.Start(ctx, out) } diff --git a/config.toml b/config.toml index 8999646..3d8120d 100644 --- a/config.toml +++ b/config.toml @@ -12,6 +12,48 @@ min = 0.0 max = 10.0 period_ms = 200 +[sensor.cell_1] +name = "Cell 1" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + +[sensor.cell_2] +name = "Cell 2" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + +[sensor.cell_3] +name = "Cell 3" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + +[sensor.cell_4] +name = "Cell 4" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + +[sensor.cell_5] +name = "Cell 5" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + +[sensor.cell_6] +name = "Cell 6" +unit = "V" +min = 3.0 +max = 4.2 +period_ms = 500 + [processor] batch_size = 5 @@ -24,3 +66,13 @@ max_lines = 100 mode = "server" # or "client" protocol = "tcp" # currently only tpc, not udp address = "localhost:4040" +http_address = "localhost:8080" + +[netsender] +dial_timeout_ms = 3000 +write_timeout_ms = 5000 +initial_backoff_ms = 1000 +max_backoff_ms = 30000 + +[websocket] +history_size = 100 diff --git a/config/config.go b/config/config.go index cbf773d..a1c3f4c 100644 --- a/config/config.go +++ b/config/config.go @@ -31,10 +31,22 @@ type Config struct { Sensor struct { Temperature SensorCfg `toml:"temperature"` Pressure SensorCfg `toml:"pressure"` + Cell1 SensorCfg `toml:"cell_1"` + Cell2 SensorCfg `toml:"cell_2"` + Cell3 SensorCfg `toml:"cell_3"` + Cell4 SensorCfg `toml:"cell_4"` + Cell5 SensorCfg `toml:"cell_5"` + Cell6 SensorCfg `toml:"cell_6"` } `toml:"sensor"` Processor ProcessorCfg `toml:"processor"` Logger LoggerCfg `toml:"logger"` Network NetworkCfg `toml:"network"` + + Netsender NetsenderCfg `toml:"netsender"` + + Websocket struct { + HistorySize int `toml:"history_size"` + } `toml:"websocket"` } // Load reads configuration from a TOML file. @@ -51,7 +63,16 @@ func (s SensorCfg) Period() time.Duration { } type NetworkCfg struct { - Mode string `toml:"mode"` // "server" or "client" - Protocol string `toml:"protocol"` // "tcp" or "udp" - Address string `toml:"address"` // ... + Mode string `toml:"mode"` // "server" or "client" + Protocol string `toml:"protocol"` // "tcp" or "udp" ... + Address string `toml:"address"` // ... + HTTPAddr string `toml:"http_address"` // address for HTTP/WS server +} + +// NetsenderCfg controls dialing and backoff behavior for the netsender. +type NetsenderCfg struct { + DialTimeoutMS int `toml:"dial_timeout_ms"` + WriteTimeoutMS int `toml:"write_timeout_ms"` + InitialBackoffMS int `toml:"initial_backoff_ms"` + MaxBackoffMS int `toml:"max_backoff_ms"` } diff --git a/go.mod b/go.mod index 0fedf66..b6a83c1 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module backend go 1.25.1 -require github.com/BurntSushi/toml v1.5.0 +require ( + github.com/BurntSushi/toml v1.5.0 + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 +) diff --git a/go.sum b/go.sum index ff7fd09..625f7ef 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/internal/api/http.go b/internal/api/http.go new file mode 100644 index 0000000..31aae7a --- /dev/null +++ b/internal/api/http.go @@ -0,0 +1,54 @@ +package api + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + "backend/internal/websocket" +) + +func StartHTTP(ctx context.Context, wg *sync.WaitGroup, addr string, hub *websocket.Hub, logf func(string, ...interface{}), commandCh chan<- map[string]string) error { + mux := http.NewServeMux() + // serve static files from the web/ directory at root + fs := http.FileServer(http.Dir("./web")) + mux.Handle("/", fs) + mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) { + websocket.HandleWS(hub, w, r) + }) + mux.HandleFunc("/api/commands", websocket.HandleCommands(logf, commandCh)) + mux.HandleFunc("/api/messages", websocket.HandleMessages(hub)) + + logged := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // minimal log to stdout; caller provided logf can be nil + println(time.Now().Format(time.RFC3339), "http: recv", r.RemoteAddr, r.Method, r.URL.Path) + mux.ServeHTTP(w, r) + }) + + srv := &http.Server{ + Addr: addr, + Handler: logged, + } + + fmt.Printf("http: starting server on %s\n", addr) + wg.Add(1) + go func() { + defer wg.Done() + // run server + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + + fmt.Printf("http server error: %v\n", err) + } + }() + + go func() { + <-ctx.Done() + ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctxShutdown) + }() + + return nil +} diff --git a/internal/integration/ws_contract_test.go b/internal/integration/ws_contract_test.go new file mode 100644 index 0000000..98fac31 --- /dev/null +++ b/internal/integration/ws_contract_test.go @@ -0,0 +1,84 @@ +package integration + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + gw "github.com/gorilla/websocket" + + "backend/internal/websocket" +) + +func TestWebSocketCommandAck(t *testing.T) { + + hub := websocket.NewHub(10) + commandCh := make(chan map[string]string, 4) + + // Instead of trying to reuse StartHTTP (which starts its own listener), create the mux + mux := http.NewServeMux() + fs := http.FileServer(http.Dir("./web")) + mux.Handle("/", fs) + mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) { websocket.HandleWS(hub, w, r) }) + mux.HandleFunc("/api/commands", websocket.HandleCommands(nil, commandCh)) + mux.HandleFunc("/api/messages", websocket.HandleMessages(hub)) + ts := httptest.NewServer(mux) + defer ts.Close() + + go func() { + for cmd := range commandCh { + + hub.BroadcastEnvelope("command_ack", map[string]string{"status": "ok", "action": cmd["action"]}) + + hub.BroadcastEnvelope("snapshot", map[string]string{"note": "snapshot after command"}) + } + }() + + // connect a websocket client + + u := ts.URL + "/api/stream" + if strings.HasPrefix(ts.URL, "https://") { + u = "wss" + strings.TrimPrefix(u, "https") + } else { + u = "ws" + strings.TrimPrefix(u, "http") + } + dialer := gw.DefaultDialer + c, resp, err := dialer.Dial(u, nil) + if err != nil { + if resp != nil { + t.Fatalf("dial error: %v status=%s", err, resp.Status) + } else { + t.Fatalf("dial error: %v", err) + } + } + defer c.Close() + + cmd := map[string]string{"action": "snapshot"} + b, _ := json.Marshal(cmd) + res, err := http.Post(ts.URL+"/api/commands", "application/json", bytes.NewReader(b)) + if err != nil { + t.Fatalf("post error: %v", err) + } + if res.StatusCode != http.StatusAccepted { + t.Fatalf("expected 202 Accepted, got %d", res.StatusCode) + } + + // wait and check that websocket receives an ack or snapshot + c.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, msg, err := c.ReadMessage() + if err != nil { + t.Fatalf("read message error: %v", err) + } + var env map[string]interface{} + if err := json.Unmarshal(msg, &env); err != nil { + t.Fatalf("invalid json: %v", err) + } + if env["type"] != "command_ack" && env["type"] != "snapshot" { + t.Fatalf("expected command_ack or snapshot, got %v", env["type"]) + } + +} diff --git a/internal/netreceiver/receiver.go b/internal/netreceiver/receiver.go index e312b14..cfb635b 100644 --- a/internal/netreceiver/receiver.go +++ b/internal/netreceiver/receiver.go @@ -2,10 +2,12 @@ package netreceiver import ( "backend/internal/sensor" + "context" "encoding/json" "fmt" "io" "net" + "sync" "time" ) @@ -16,18 +18,28 @@ type Telemetry struct { Timestamp string `json:"timestamp"` } -// starts a tpc listener that decodes incoming JSON messages into sensor Data -func StartTCP(address string, out chan<- sensor.Data, stop <-chan struct{}) error { +// StartTCP starts a tcp listener that decodes incoming JSON messages into sensor Data. +// It returns immediately and runs the listener in background. Cancel the provided ctx to stop. +func StartTCP(ctx context.Context, wg *sync.WaitGroup, address string, out chan<- sensor.Data) error { ln, err := net.Listen("tcp", address) if err != nil { return fmt.Errorf("listen error: %w", err) } fmt.Println("Listening on", address) + wg.Add(1) go func() { + defer wg.Done() defer ln.Close() for { + conn, err := ln.Accept() if err != nil { + select { + case <-ctx.Done(): + return + default: + } + // temporary error continue } go handleConn(conn, out) diff --git a/internal/netsender/integration_test.go b/internal/netsender/integration_test.go new file mode 100644 index 0000000..29ddefa --- /dev/null +++ b/internal/netsender/integration_test.go @@ -0,0 +1,79 @@ +package netsender + +import ( + "context" + "encoding/json" + "net" + "sync" + "testing" + "time" + + "backend/internal/sensor" +) + +// TestSenderReceiverIntegration creates a local listener in the test, accepts the +// connection from StartSender and decodes the JSON telemetry messages. This +// avoids racing. +func TestSenderReceiverIntegration(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") //... + if err != nil { + t.Fatalf("listen failed: %v", err) + } + defer ln.Close() + addr := ln.Addr().String() + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // channel to receive Telemetry decoded from the connection + telemCh := make(chan Telemetry, 4) + + // accept connection and decode JSON messages + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + dec := json.NewDecoder(conn) + for { + var tmsg Telemetry + if err := dec.Decode(&tmsg); err != nil { + return + } + telemCh <- tmsg + } + }() + + // start sender (it will dial the addr above) + nsCfg := NetsenderCfg{ + DialTimeout: 500 * time.Millisecond, + WriteTimeout: 500 * time.Millisecond, + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } + in := make(chan sensor.Data, 4) + if err := StartSender(ctx, &wg, addr, nsCfg, in, t.Logf); err != nil { + t.Fatalf("StartSender failed: %v", err) + } + + // send some samples + in <- sensor.Data{Name: "T", Unit: "C", Value: 1.0, Timestamp: time.Now()} + in <- sensor.Data{Name: "T", Unit: "C", Value: 2.0, Timestamp: time.Now()} + + select { + case <-time.After(3 * time.Second): + cancel() + wg.Wait() + t.Fatalf("timed out waiting for telemetry") + case tm := <-telemCh: + if tm.Name == "" { + t.Fatalf("received empty telemetry") + } + // pass + _ = tm + } + + cancel() + wg.Wait() +} diff --git a/internal/netsender/netsender.go b/internal/netsender/netsender.go new file mode 100644 index 0000000..7c0dae2 --- /dev/null +++ b/internal/netsender/netsender.go @@ -0,0 +1,137 @@ +package netsender + +import ( + "backend/internal/sensor" + "context" + "encoding/json" + "fmt" + "math/rand" + "net" + "sync" + "time" +) + +type Telemetry struct { + Name string `json:"name"` + Value float64 `json:"value"` + Unit string `json:"unit"` + Timestamp string `json:"timestamp"` +} + +type NetsenderCfg struct { + DialTimeout time.Duration + WriteTimeout time.Duration + InitialBackoff time.Duration + MaxBackoff time.Duration +} + +// StartSender connects to address and sends telemetry JSON messages from in channel. +// It reconnects if the connection is lost. Stops when stop is closed.. +func StartSender(ctx context.Context, wg *sync.WaitGroup, address string, cfg NetsenderCfg, in <-chan sensor.Data, logf func(string, ...interface{})) error { + wg.Add(1) + go func() { + defer wg.Done() + var conn net.Conn + var err error + backoff := cfg.InitialBackoff + if backoff <= 0 { + backoff = 1 * time.Second + } + maxBackoff := cfg.MaxBackoff + if maxBackoff <= 0 { + maxBackoff = 30 * time.Second + } + // use math/rand for jitter + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for { + select { + case <-ctx.Done(): + if conn != nil { + conn.Close() + } + return + default: + } + + if conn == nil { + dialer := net.Dialer{Timeout: cfg.DialTimeout} + if dialer.Timeout == 0 { + dialer.Timeout = 3 * time.Second + } + conn, err = dialer.Dial("tcp", address) + if err != nil { + if logf != nil { + logf("%s netsender: dial error: %v", time.Now().Format(time.RFC3339), err) + } else { + fmt.Printf("%s netsender: dial error: %v\n", time.Now().Format(time.RFC3339), err) + } + // backoff with jitter + jitter := time.Duration(r.Int63n(500)) * time.Millisecond + if r.Intn(2) == 0 { + jitter = -jitter + } + wait := backoff + jitter + if wait < 0 { + wait = backoff + } + select { + case <-ctx.Done(): + return + case <-time.After(wait): + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + continue + } + // reset backoff if connected=true + backoff = cfg.InitialBackoff + if backoff <= 0 { + backoff = 1 * time.Second + } + if logf != nil { + logf("%s netsender: connected to %s", time.Now().Format(time.RFC3339), address) + } else { + fmt.Printf("%s netsender: connected to %s\n", time.Now().Format(time.RFC3339), address) + } + } + + select { + case <-ctx.Done(): + if conn != nil { + conn.Close() + } + return + case d := <-in: + t := Telemetry{ + Name: d.Name, + Value: d.Value, + Unit: d.Unit, + Timestamp: d.Timestamp.Format(time.RFC3339), + } + if b, err := json.Marshal(t); err == nil { + b = append(b, '\n') + // set write deadline + wt := cfg.WriteTimeout + if wt <= 0 { + wt = 5 * time.Second + } + _ = conn.SetWriteDeadline(time.Now().Add(wt)) + if _, err := conn.Write(b); err != nil { + // close conn and attempt reconnect + if logf != nil { + logf("%s netsender: write error: %v", time.Now().Format(time.RFC3339), err) + } else { + fmt.Printf("%s netsender: write error: %v\n", time.Now().Format(time.RFC3339), err) + } + conn.Close() + conn = nil + } + } + } + } + }() + return nil +} diff --git a/internal/sensor/generator.go b/internal/sensor/generator.go index 5d2c0f4..2bd88b0 100644 --- a/internal/sensor/generator.go +++ b/internal/sensor/generator.go @@ -1,6 +1,7 @@ package sensor import ( + "context" "math/rand" "time" ) @@ -23,8 +24,8 @@ type Generator struct { } // Start launches a goroutine that produces readings into the out channel. -// It stops when the context is canceled. -func (g Generator) Start(out chan<- Data, stop <-chan struct{}) { +// It stops when ctx is canceled. +func (g Generator) Start(ctx context.Context, out chan<- Data) { go func() { ticker := time.NewTicker(g.Period) defer ticker.Stop() @@ -39,7 +40,7 @@ func (g Generator) Start(out chan<- Data, stop <-chan struct{}) { Name: g.Name, Unit: g.Unit, } - case <-stop: + case <-ctx.Done(): return } } diff --git a/internal/websocket/handler.go b/internal/websocket/handler.go new file mode 100644 index 0000000..9f9d34d --- /dev/null +++ b/internal/websocket/handler.go @@ -0,0 +1,153 @@ +package websocket + +import ( + "encoding/json" + "fmt" + "net/http" + "sync/atomic" + "time" + + gw "github.com/gorilla/websocket" +) + +var upgrader = gw.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +type Client struct { + conn *gw.Conn + send chan []byte + hub *Hub + id uint64 +} + +var clientID uint64 + +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(512) + c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.conn.SetPongHandler(func(string) error { + c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + for { + _, _, err := c.conn.ReadMessage() + if err != nil { + break + } + // ignore client messages for now + } +} + +func (c *Client) writePump() { + ticker := time.NewTicker(54 * time.Second) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case msg, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + + c.conn.WriteMessage(gw.CloseMessage, []byte{}) + return + } + if err := c.conn.WriteMessage(gw.TextMessage, msg); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.conn.WriteMessage(gw.PingMessage, nil); err != nil { + return + } + } + } +} + +// HandleWS upgrades HTTP connection to websocket and registers client +func HandleWS(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + + fmt.Printf("%s handler: websocket upgrade failed from %s: %v\n", time.Now().Format(time.RFC3339), r.RemoteAddr, err) + for k, v := range r.Header { + fmt.Printf("%s handler: header %s=%v\n", time.Now().Format(time.RFC3339), k, v) + } + return + } + c := &Client{ + conn: conn, + send: make(chan []byte, 256), + hub: hub, + id: atomic.AddUint64(&clientID, 1), + } + + fmt.Printf("%s handler: new WS client connected (id=%p)\n", time.Now().Format(time.RFC3339), c) + hub.register <- c + + for _, m := range hub.LastMessages() { + select { + case c.send <- m: + default: + // drop + } + } + + go c.writePump() + c.readPump() +} + +// processes POST /api/commands +func HandleCommands(logf func(string, ...interface{}), commandCh chan<- map[string]string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var cmd map[string]string + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&cmd); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + // log command + if logf != nil { + logf("command received: %v", cmd) + } + + select { + case commandCh <- cmd: + default: + + http.Error(w, "command queue full", http.StatusServiceUnavailable) + return + } + // ack + w.WriteHeader(http.StatusAccepted) + _ = json.NewEncoder(w).Encode(map[string]string{"status": "queued"}) + } +} + +// GET /api/messages returning the hub history +func HandleMessages(hub *Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + msgs := hub.LastMessages() + + out := make([]json.RawMessage, len(msgs)) + for i, m := range msgs { + out[i] = json.RawMessage(m) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(out) + } +} diff --git a/internal/websocket/hub.go b/internal/websocket/hub.go new file mode 100644 index 0000000..f04f283 --- /dev/null +++ b/internal/websocket/hub.go @@ -0,0 +1,147 @@ +package websocket + +import ( + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" +) + +// Hub manages websocket clients, broadcasting messages and keeping a history +type Hub struct { + // registered clients + register chan *Client + unregister chan *Client + broadcast chan []byte + + clients map[*Client]bool + + // history of last messages + history [][]byte + capHist int + + // internal control + mu sync.RWMutex + seq uint64 +} + +// Envelope is the standard message wrapper sent to clients. +type Envelope struct { + Type string `json:"type"` + Timestamp string `json:"ts"` + Seq uint64 `json:"seq,omitempty"` + Payload json.RawMessage `json:"payload"` +} + +// BroadcastEnvelope marshals the payload and sends a standardized envelope. +func (h *Hub) BroadcastEnvelope(typ string, payload interface{}) { + p, err := json.Marshal(payload) + if err != nil { + return + } + env := Envelope{ + Type: typ, + Timestamp: time.Now().Format(time.RFC3339), + Seq: atomic.AddUint64(&h.seq, 1), + Payload: json.RawMessage(p), + } + b, err := json.Marshal(env) + if err != nil { + return + } + + fmt.Printf("%s hub: BroadcastEnvelope type=%s seq=%d payload_len=%d\n", time.Now().Format(time.RFC3339), env.Type, env.Seq, len(env.Payload)) + h.Broadcast(b) +} + +// NewHub creates a Hub with history capacity n +func NewHub(historyCap int) *Hub { + h := &Hub{ + register: make(chan *Client), + unregister: make(chan *Client), + broadcast: make(chan []byte, 256), + clients: make(map[*Client]bool), + history: make([][]byte, 0, historyCap), + capHist: historyCap, + } + go h.run() + return h +} + +func (h *Hub) Broadcast(msg []byte) { + + h.appendHistory(msg) + + select { + case h.broadcast <- msg: + default: + // drop if full to avoid blocking producer + } +} + +// returns a copy of the stored history (most recent first) +func (h *Hub) LastMessages() [][]byte { + h.mu.RLock() + defer h.mu.RUnlock() + fmt.Printf("%s hub: LastMessages called, history_len=%d\n", time.Now().Format(time.RFC3339), len(h.history)) + out := make([][]byte, len(h.history)) + for i := range h.history { + out[i] = make([]byte, len(h.history[i])) + copy(out[i], h.history[i]) + } + return out +} + +func (h *Hub) run() { + for { + select { + case msg := <-h.broadcast: + // forward to clients (non-blocking per client) + + sent := 0 + for c := range h.clients { + select { + case c.send <- msg: + sent++ + default: + // if client's send buffer full, drop message for that client + } + } + if sent == 0 { + fmt.Printf("%s hub: broadcasted message but no clients received it (clients=%d)\n", time.Now().Format(time.RFC3339), len(h.clients)) + } else { + fmt.Printf("%s hub: broadcasted message to %d clients\n", time.Now().Format(time.RFC3339), sent) + } + case c := <-h.register: + h.clients[c] = true + fmt.Printf("%s hub: client registered (id=%p) clients=%d\n", time.Now().Format(time.RFC3339), c, len(h.clients)) + case c := <-h.unregister: + if _, ok := h.clients[c]; ok { + delete(h.clients, c) + close(c.send) + fmt.Printf("%s hub: client unregistered (id=%p) clients=%d\n", time.Now().Format(time.RFC3339), c, len(h.clients)) + } + } + } +} + +func (h *Hub) appendHistory(msg []byte) { + h.mu.Lock() + defer h.mu.Unlock() + + m := make([]byte, len(msg)) + copy(m, msg) + if h.capHist <= 0 { + return + } + if len(h.history) < h.capHist { + + h.history = append([][]byte{m}, h.history...) + fmt.Printf("%s hub: appendHistory now len=%d\n", time.Now().Format(time.RFC3339), len(h.history)) + return + } + + h.history = append([][]byte{m}, h.history[:h.capHist-1]...) + fmt.Printf("%s hub: appendHistory rotated len=%d\n", time.Now().Format(time.RFC3339), len(h.history)) +} diff --git a/internal/websocket/hub_test.go b/internal/websocket/hub_test.go new file mode 100644 index 0000000..88e1f54 --- /dev/null +++ b/internal/websocket/hub_test.go @@ -0,0 +1,53 @@ +package websocket + +import ( + "testing" + "time" +) + +// simple test +func TestHubHistoryAppend(t *testing.T) { + h := NewHub(3) + + h.Broadcast([]byte("one")) + h.Broadcast([]byte("two")) + h.Broadcast([]byte("three")) + + // give hub goroutine a moment to process + time.Sleep(10 * time.Millisecond) + + msgs := h.LastMessages() + if len(msgs) != 3 { + t.Fatalf("expected 3 messages, got %d", len(msgs)) + } + if string(msgs[0]) != "three" { + t.Errorf("expected most recent 'three', got '%s'", string(msgs[0])) + } + if string(msgs[2]) != "one" { + t.Errorf("expected oldest 'one', got '%s'", string(msgs[2])) + } +} + +func TestHubHistoryRotation(t *testing.T) { + h := NewHub(2) + h.Broadcast([]byte("a")) + h.Broadcast([]byte("b")) + h.Broadcast([]byte("c")) + time.Sleep(10 * time.Millisecond) + msgs := h.LastMessages() + if len(msgs) != 2 { + t.Fatalf("expected 2 messages, got %d", len(msgs)) + } + if string(msgs[0]) != "c" || string(msgs[1]) != "b" { + t.Fatalf("unexpected rotation: %+v", msgs) + } +} + +func TestBroadcastNonBlocking(t *testing.T) { + h := NewHub(1) + // fill the broadcast channel by sending many messages quickly + for i := 0; i < 1000; i++ { + h.Broadcast([]byte("x")) + } + // if Broadcast blocked, test would hang; reach here means non-blocking +} diff --git a/tools/wsclient/main.go b/tools/wsclient/main.go new file mode 100644 index 0000000..1d7ba9c --- /dev/null +++ b/tools/wsclient/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "log" + "net/url" + "os" + "time" + + "backend/config" + gw "github.com/gorilla/websocket" +) + +func main() { + // Load configuration + cfg, err := config.Load("config.toml") + if err != nil { + log.Fatal("Error loading config:", err) + } + + u := url.URL{Scheme: "ws", Host: cfg.Network.HTTPAddr, Path: "/api/stream"} + log.Printf("connecting to %s", u.String()) + dialer := gw.DefaultDialer + c, resp, err := dialer.Dial(u.String(), nil) + if err != nil { + if resp != nil { + fmt.Fprintf(os.Stderr, "dial error: %v, status: %s\n", err, resp.Status) + } else { + fmt.Fprintf(os.Stderr, "dial error: %v\n", err) + } + os.Exit(1) + } + defer c.Close() + + done := make(chan struct{}) + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Printf("read error: %v", err) + return + } + log.Printf("recv: %s", string(message)) + } + }() + + // keep alive for a while to observe traffic + t := time.NewTimer(30 * time.Second) + select { + case <-t.C: + log.Println("timeout, closing") + case <-done: + log.Println("connection closed by server") + } +} diff --git a/web/index.html b/web/index.html new file mode 100644 index 0000000..96eb722 --- /dev/null +++ b/web/index.html @@ -0,0 +1,196 @@ + + + + + Telemetry Dashboard + + + +

Telemetry Dashboard

+
+ + + + + +
+
+ +
+

Overview

+
+

Temperature

+

Pressure

+
+
+ +

Battery Cells

+
+ +
+ + + + + + + + + \ No newline at end of file