Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
d79fee0
working on the pump
vanDeventer May 19, 2025
af08bb6
forgetten save
vanDeventer May 19, 2025
41dca5f
redo configuration to have configurable services and working on pump …
vanDeventer May 27, 2025
56a852a
upgrading kGrapher
vanDeventer May 27, 2025
5a292c4
adding local ontologies
vanDeventer Jun 7, 2025
494f96c
update a few system with new configuration process
vanDeventer Jun 8, 2025
562d2c9
adding configuration update to Orchestrator
vanDeventer Jun 8, 2025
1b9b6e4
adding the filmer while addressing new configuration method
vanDeventer Jun 23, 2025
287b898
Fixes warning about time parsing problem for unregistered service
lmas Jul 9, 2025
7abd0cd
Adds go.mod files to systems
lmas Jul 8, 2025
97f67a0
Adds initial messenger system
lmas Jul 8, 2025
a770f7d
Adds basic "beacon" registration of messenger
lmas Jul 9, 2025
1dae075
Use the new log functions
lmas Jul 11, 2025
6010d7e
Set shorter beacon time to minimise the amount of missed msgs
lmas Jul 14, 2025
8d1d95f
Allows the messenger to recieve log msgs from other systems
lmas Jul 14, 2025
2d9e309
Added html files to .gitignore file
gabaxh Jul 14, 2025
11d6297
TODO: temporary testing logging for ds18 system, COMMIT TO BE REMOVED
lmas Jul 14, 2025
a438d1a
Start tracking log messages and adds simplistic dashboard
lmas Jul 15, 2025
c3789a7
Started testing the orchestrator system, starting with orchestrate fu…
gabaxh Jul 10, 2025
09707ed
Continued with tests of orchestrator system
gabaxh Jul 11, 2025
f0edfc6
Cleaned up some code
gabaxh Jul 14, 2025
f96b562
Added the possibility to get several services from the orchestrator i…
gabaxh Jul 14, 2025
cb01aa6
Continued working on tests of orchestrator system
gabaxh Jul 15, 2025
67128da
Continued with tests on orchestrator system
gabaxh Jul 16, 2025
c230b3f
Done with unit tests of orchestrator system
gabaxh Jul 18, 2025
93ea24b
Fixed some missed spellchecks
gabaxh Jul 18, 2025
cc923e0
Adds go.mod files as they are required for Go projects
lmas Jul 18, 2025
5d16ec2
Fixed some PR comments
gabaxh Jul 18, 2025
203438e
Removes old and misleading comments
lmas Jul 18, 2025
f2de22d
Removes spammy prints and the rest were made uniform, fixes tests/lin…
lmas Jul 18, 2025
b0d7d03
Removes commented out integration test, it will be handled separately
lmas Jul 18, 2025
d1562f7
Fixed PR comments
gabaxh Jul 21, 2025
61f0df2
Removed the use of testing variable in functions creating forms
gabaxh Jul 21, 2025
b77f405
Clean ups
lmas Jul 22, 2025
3ad416d
Adds tests for messenger.go
lmas Jul 25, 2025
301d267
Replaces single letter variables with better names
lmas Jul 25, 2025
69421e8
Fixed typos
Pake-TU Jul 13, 2025
e5a929a
Added test for initTemplate()
Pake-TU Jul 13, 2025
d4e3d5f
Removed unnecessary logic
Pake-TU Jul 14, 2025
66319c0
Added test for getUniqueSystems()
Pake-TU Jul 14, 2025
b8fbb5a
Refactored and simplified FilterByServiceDefinitionAndDetails()
Pake-TU Jul 14, 2025
7a4b27c
Added help functions etc for testing FilterByServiceDefinitionAndDeta…
Pake-TU Jul 14, 2025
6d93b1b
Finished test for FilterByServiceDefinitionAndDetails()
Pake-TU Jul 15, 2025
15c9cd8
Added test for newResource
Pake-TU Jul 15, 2025
c202f19
Added CoreS to createTestSystem()
Pake-TU Jul 15, 2025
0acfb7c
Added tests for roleStatus() and peersList()
Pake-TU Jul 15, 2025
05f8581
Added tests for systemList()
Pake-TU Jul 15, 2025
96d8109
Changed helpfunctions returned unitasset to pointer
Pake-TU Jul 16, 2025
4741f9f
Fixed linter errors: added error handlers and var declaration
Pake-TU Jul 16, 2025
c981f21
added missing mutex locks & unlocks
Pake-TU Jul 16, 2025
4d930ba
added first test for serviceRegistryHandler()
Pake-TU Jul 16, 2025
b84651a
Added tests for serviceRegistryHandler()
Pake-TU Jul 17, 2025
06e5ddc
Adds go.mod files as they are required for Go projects
lmas Jul 18, 2025
cabbf97
Added tests for updateDB()
Pake-TU Jul 20, 2025
22e07ac
Added tests for queryDB and cleanDB()
Pake-TU Jul 21, 2025
ebd706b
Fixed linter errors
Pake-TU Jul 21, 2025
2ec23d9
Fixed spellchecker errors
Pake-TU Jul 21, 2025
9101c8e
Simplified scheduler
Pake-TU Jul 21, 2025
3a4ad21
Added tests for scheduler
Pake-TU Jul 22, 2025
c461acd
Fixed some errorhandlers, spellings and locks in newResource
Pake-TU Jul 22, 2025
ec116f8
fixed test after changes in esr.go
Pake-TU Jul 22, 2025
fb87c6a
Fixed comments and removed unused struct
Pake-TU Jul 22, 2025
42a07f3
Added delete test for serviceRegistryHandler()
Pake-TU Jul 22, 2025
1791ee6
Added missing http.Error() in error handlers for updateDB()
Pake-TU Jul 22, 2025
351285e
fixed errorhandlers in esr.go, and added testcase in esr_test.go
Pake-TU Jul 22, 2025
61a2d97
Added functioncall in checkExpiration() to remove task from scheduler
Pake-TU Jul 23, 2025
8ef2d6b
Fixed PR comments
Pake-TU Jul 23, 2025
0348965
changed errorhandlers in testcases
Pake-TU Jul 23, 2025
84527a6
Fixed PR comments
Pake-TU Jul 23, 2025
ed709cb
Added returns for RemoveTask() and Stop()
Pake-TU Jul 25, 2025
59892d6
Finished tests, and comments from PR
Pake-TU Jul 25, 2025
0a6f146
Small fixes from the review
lmas Jul 25, 2025
609c109
Moved and slightly changed the ID check to ensure an old service will…
Pake-TU Jul 25, 2025
a471f02
Removed unnecessary testcase
Pake-TU Jul 28, 2025
9cd0864
Fixed linter error
Pake-TU Jul 28, 2025
9801d9d
Adds initial tests for thing.go
lmas Jul 25, 2025
7152e0c
Avoid periodically spamming warnings for a minor error
lmas Aug 8, 2025
c5ddf51
Finishes the tests for messenger
lmas Aug 8, 2025
14cc890
Removes unrelated go.mod files
lmas Aug 8, 2025
282a851
Revert "TODO: temporary testing logging for ds18 system, COMMIT TO BE…
lmas Aug 8, 2025
c963039
Removes more go.mod files
lmas Aug 8, 2025
ae1843c
Merge pull request #4 from lmas/messenger
lmas Aug 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*_amac
*_ilin
*.sh
*.html
Makefile

# Test binary, built with `go test -c`
Expand All @@ -24,7 +25,7 @@ vendor/

# mbaigo
*.json
go.mod
go.sum
serviceRegistry.db
*.pem
**/files/

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ Many of the testing is done with the Raspberry Pi (3, 4, &5) with [GPIO](https:/
- 20103 Orchestrator
- 20104 Authorizer
- 20105 Modeler (local cloud semantics with GraphDB)
- 20106 Messenger
- 20150 ds18b20 (1-wire sensor)
- 20151 Parallax (PWM)
- 20152 Thermostat
- 20153 Revolutionary (Rev Pi PLC)
- 20154 Levler (Level control)
- 20160 Picam
- 20161 USB microphone
- 20170 UA client (OPC UA)
- 20171 Modboss (Modbus TCP)
- 20172 Telegrapher (MQTT)
- 20180 Influxer (Influx DB)
- 20180 Influxer (Influx DB)
23 changes: 11 additions & 12 deletions Influxer/README.md → collector/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# mbaigo System: influxer
# mbaigo System: Collector

The Influxer is a system that as for asset the time series database [InfluxDB](https://en.wikipedia.org/wiki/InfluxDB).
The Collector is a system that as for asset the time series database [InfluxDB](https://en.wikipedia.org/wiki/InfluxDB).

It offers one services, *squery*. squery provides a list of signals present in its bucket’s measurements.

Expand All @@ -12,33 +12,32 @@ As with the other systems, this is a prototype that shows that the mbaigo librar
## Compiling
To compile the code, one needs to get the AiGo module
```go get github.com/vanDeventer/mbaigo```
and initialize the *go.mod* file with ``` go mod init github.com/vanDeventer/arrowsys/inflxer``` before running *go mod tidy*.
and initialize the *go.mod* file with ``` go mod init github.com/vanDeventer/arrowsys/collector``` before running *go mod tidy*.

The reason the *go.mod* file is not included in the repository is that when developing the mbaigo module, a replace statement needs to be included to point to the development code.

To run the code, one just needs to type in ```go run influxer.go thing.go``` within a terminal or at a command prompt.
To run the code, one just needs to type in ```go run Collector.go thing.go``` within a terminal or at a command prompt.

It is **important** to start the program from within its own directory (and each system should have their own directory) because it looks for its configuration file there. If it does not find it there, it will generate one and shutdown to allow the configuration file to be updated.

The configuration and operation of the system can be verified using the system's web server using a standard web browser, whose address is provided by the system at startup.

To build the software for one's own machine,
```go build -o influxer```.
```go build -o Collector```.


## Cross compiling/building
The following commands enable one to build for different platforms:
- Intel Mac: ```GOOS=darwin GOARCH=amd64 go build -o influxer_imac influxer.go thing.go```
- ARM Mac: ```GOOS=darwin GOARCH=arm64 go build -o influxer_amac influxer.go thing.go```
- Windows 64: ```GOOS=windows GOARCH=amd64 go build -o influxer.exe influxer.go thing.go```
- Raspberry Pi 64: ```GOOS=linux GOARCH=arm64 go build -o influxer_rpi64 influxer.go thing.go```
- (new) Raspberry Pi 32: ```GOOS=linux GOARCH=arm GOARM=7 go build -o influxer_rpi32 influxer.go thing.go```
- Linux: ```GOOS=linux GOARCH=amd64 go build -o influxer_linux influxer.go thing.go```
- Intel Mac: ```GOOS=darwin GOARCH=amd64 go build -o Collector_imac```
- ARM Mac: ```GOOS=darwin GOARCH=arm64 go build -o Collector_amac ```
- Windows 64: ```GOOS=windows GOARCH=amd64 go build -o Collector.exe```
- Raspberry Pi 64: ```GOOS=linux GOARCH=arm64 go build -o Collector_rpi64```
- Linux: ```GOOS=linux GOARCH=amd64 go build -o Collector_linux```

One can find a complete list of platform by typing *‌go tool dist list* at the command prompt

If one wants to secure copy it to a Raspberry pi,
`scp influxer_rpi64 jan@192.168.1.6:Desktop/influxer/` where user is the *username* @ the *IP address* of the Raspberry Pi with a relative (to the user's home directory) target *Desktop/influxer/* directory.influxer
`scp Collector_rpi64 jan@192.168.1.10:rpiExec/Collector/` where user is the *username* @ the *IP address* of the Raspberry Pi with a relative (to the user's home directory) target *rpiExec/Collector/* directory.Collector


## Deployment of the asset
Expand Down
19 changes: 14 additions & 5 deletions Influxer/influxer.go → collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"log"
Expand All @@ -34,14 +35,22 @@ func main() {
defer cancel() // make sure all paths cancel the context to avoid context leak

// instantiate the System
sys := components.NewSystem("influxer", ctx)
sys := components.NewSystem("Collector", ctx)

// Instatiate the Capusle
// Instantiate the husk
sys.Husk = &components.Husk{
Description: " is a system that ingests time signals into an Influx database",
Details: map[string][]string{"Developer": {"Synecdoque"}},
ProtoPort: map[string]int{"https": 0, "http": 20180, "coap": 0},
InfoLink: "https://github.com/sdoque/systems/tree/main/influxer",
DName: pkix.Name{
CommonName: sys.Name,
Organization: []string{"Synecdoque"},
OrganizationalUnit: []string{"Systems"},
Locality: []string{"Luleå"},
Province: []string{"Norrbotten"},
Country: []string{"SE"},
},
}

// instantiate a template unit asset
Expand All @@ -50,17 +59,17 @@ func main() {
sys.UAssets[assetName] = &assetTemplate

// Configure the system
rawResources, servsTemp, err := usecases.Configure(&sys)
rawResources, err := usecases.Configure(&sys)
if err != nil {
log.Fatalf("Configuration error: %v\n", err)
}
sys.UAssets = make(map[string]*components.UnitAsset) // clear the unit asset map (from the template)
for _, raw := range rawResources {
var uac UnitAsset
var uac usecases.ConfigurableAsset
if err := json.Unmarshal(raw, &uac); err != nil {
log.Fatalf("Resource configuration error: %+v\n", err)
}
ua, cleanup := newResource(uac, &sys, servsTemp)
ua, cleanup := newResource(uac, &sys)
defer cleanup()
sys.UAssets[ua.GetName()] = &ua
}
Expand Down
81 changes: 57 additions & 24 deletions Influxer/thing.go → collector/thing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
Expand All @@ -43,6 +44,15 @@ type MeasurementT struct {

//-------------------------------------Define the unit asset

// Traits are Asset-specific configurable parameters
type Traits struct {
FluxURL string `json:"db_url"`
Token string `json:"token"`
Org string `json:"organization"`
Bucket string `json:"bucket"`
Measurements []MeasurementT `json:"measurements"`
}

// UnitAsset type models the unit asset (interface) of the system
type UnitAsset struct {
Name string `json:"bucket_name"`
Expand All @@ -51,12 +61,8 @@ type UnitAsset struct {
ServicesMap components.Services `json:"-"`
CervicesMap components.Cervices `json:"-"`
//
FluxURL string `json:"db_url"`
Token string `json:"token"`
Org string `json:"organization"`
Bucket string `json:"bucket"`
Measurements []MeasurementT `json:"measurements"`
client influxdb2.Client // InfluxDB client
Traits
client influxdb2.Client // InfluxDB client
}

// GetName returns the name of the Resource.
Expand All @@ -79,6 +85,11 @@ func (ua *UnitAsset) GetDetails() map[string][]string {
return ua.Details
}

// GetTraits returns the traits of the Resource.
func (ua *UnitAsset) GetTraits() any {
return ua.Traits
}

// ensure UnitAsset implements components.UnitAsset (this check is done at during the compilation)
var _ components.UnitAsset = (*UnitAsset)(nil)

Expand All @@ -98,15 +109,17 @@ func initTemplate() components.UnitAsset {
uat := &UnitAsset{
Name: "demo",
Details: map[string][]string{"Database": {"InfluxDB"}},
FluxURL: "http://10.0.0.33:8086",
Token: "K1NTWNlToyUNXdii7IwNJ1W-kMsagUr8w1r4cRVYqK-N-R9vVT1MCJwHFBxOgiW85iKiMSsUpbrxQsQZJA8IzA==",
Org: "mbaigo",
Bucket: "demo",
Measurements: []MeasurementT{
{
Name: "temperature",
Details: map[string][]string{"Location": {"Kitchen"}},
Period: 3,
Traits: Traits{
FluxURL: "http://10.0.0.33:8086",
Token: "K1NTWNlToyUNXdii7IwNJ1W-kMsagUr8w1r4cRVYqK-N-R9vVT1MCJwHFBxOgiW85iKiMSsUpbrxQsQZJA8IzA==",
Org: "mbaigo",
Bucket: "demo",
Measurements: []MeasurementT{
{
Name: "temperature",
Details: map[string][]string{"Location": {"Kitchen"}},
Period: 3,
},
},
},
ServicesMap: components.Services{
Expand All @@ -119,19 +132,26 @@ func initTemplate() components.UnitAsset {
//-------------------------------------Instantiate the unit assets based on configuration

// newResource creates a new UnitAsset resource based on the configuration
func newResource(uac UnitAsset, sys *components.System, servs []components.Service) (components.UnitAsset, func()) {
func newResource(configuredAsset usecases.ConfigurableAsset, sys *components.System) (components.UnitAsset, func()) {
ua := &UnitAsset{
Name: uac.Name,
Name: configuredAsset.Name,
Owner: sys,
Details: uac.Details,
ServicesMap: components.CloneServices(servs),
FluxURL: uac.FluxURL,
Token: uac.Token,
Org: uac.Org,
Bucket: uac.Bucket,
Details: configuredAsset.Details,
ServicesMap: usecases.MakeServiceMap(configuredAsset.Services),
// FluxURL: uac.FluxURL,
// Token: uac.Token,
// Org: uac.Org,
// Bucket: uac.Bucket,
CervicesMap: make(map[string]*components.Cervice), // Initialize map
}

traits, err := UnmarshalTraits(configuredAsset.Traits)
if err != nil {
log.Println("Warning: could not unmarshal traits:", err)
} else if len(traits) > 0 {
ua.Traits = traits[0] // or handle multiple traits if needed
}

if ua.FluxURL == "" || ua.Token == "" || ua.Org == "" || ua.Bucket == "" {
log.Fatal("Invalid InfluxDB configuration: missing required parameters")
}
Expand All @@ -145,7 +165,7 @@ func newResource(uac UnitAsset, sys *components.System, servs []components.Servi
// Collect and ingest measurements
var wg sync.WaitGroup
sProtocols := components.SProtocols(sys.Husk.ProtoPort)
for _, measurement := range uac.Measurements {
for _, measurement := range ua.Traits.Measurements {
// determine the protocols that the system supports
cMeasurement := components.Cervice{
Definition: measurement.Name,
Expand Down Expand Up @@ -173,6 +193,19 @@ func newResource(uac UnitAsset, sys *components.System, servs []components.Servi
}
}

// UnmarshalTraits unmarshals a slice of json.RawMessage into a slice of Traits.
func UnmarshalTraits(rawTraits []json.RawMessage) ([]Traits, error) {
var traitsList []Traits
for _, raw := range rawTraits {
var t Traits
if err := json.Unmarshal(raw, &t); err != nil {
return nil, fmt.Errorf("failed to unmarshal trait: %w", err)
}
traitsList = append(traitsList, t)
}
return traitsList, nil
}

//-------------------------------------Unit asset's functionalities

// collectIngest
Expand Down
19 changes: 15 additions & 4 deletions ds18b20/ds18b20.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"log"
Expand All @@ -43,6 +44,14 @@ func main() {
Details: map[string][]string{"Developer": {"Synecdoque"}},
ProtoPort: map[string]int{"https": 0, "http": 20150, "coap": 0},
InfoLink: "https://github.com/sdoque/systems/tree/main/ds18b20",
DName: pkix.Name{
CommonName: sys.Name,
Organization: []string{"Synecdoque"},
OrganizationalUnit: []string{"Systems"},
Locality: []string{"Luleå"},
Province: []string{"Norrbotten"},
Country: []string{"SE"},
},
}

// instantiate a template unit asset
Expand All @@ -51,18 +60,20 @@ func main() {
sys.UAssets[assetName] = &assetTemplate

// Configure the system
rawResources, servsTemp, err := usecases.Configure(&sys)
rawResources, err := usecases.Configure(&sys)
if err != nil {
log.Fatalf("configuration error: %v\n", err)
}
sys.UAssets = make(map[string]*components.UnitAsset) // clear the unit asset map (from the template)
var cleanups []func()
for _, raw := range rawResources {
var uac UnitAsset
var uac usecases.ConfigurableAsset
if err := json.Unmarshal(raw, &uac); err != nil {
log.Fatalf("resource configuration error: %+v\n", err)
}
ua, cleanup := newResource(uac, &sys, servsTemp)
defer cleanup()
ua, cleanup := newResource(uac, &sys)
cleanups = append(cleanups, cleanup)
defer cleanup() // ensure cleanup is called when the program exits
sys.UAssets[ua.GetName()] = &ua
}

Expand Down
Loading