Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:
env:
TEST_KUBECTL: yes
TEST_MINIKUBE_ENABLED: yes
TEST_MQTT_LOCAL_ENABLED: yes
steps:
- uses: actions/checkout@v3

Expand Down Expand Up @@ -49,6 +50,13 @@ jobs:
sudo mv minikube /usr/local/bin/
minikube start
kubectl config view

- name: Install Mqtt
run: |
sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients
sudo service mosquitto start
sudo service mosquitto status

- name: COA Test
run: cd coa && go test -v ./... -run '^[^C]*$|^[^c][^o]*$|^[^c][^o]*o[^n][^f][^o][^r][^m][^a][^n][^c][^e][^C]*$'
Expand Down
20 changes: 19 additions & 1 deletion api/pkg/apis/v1alpha1/managers/devices/devices-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import (
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"

observability "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability"
observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils"
)

var log = logger.NewLogger("coa.runtime")

type DevicesManager struct {
managers.Manager
StateProvider states.IStateProvider
Expand All @@ -29,10 +32,12 @@ type DevicesManager struct {
func (s *DevicesManager) Init(context *contexts.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error {
err := s.Manager.Init(context, config, providers)
if err != nil {
log.Errorf("M (Devices): failed to initialize manager %+v", err)
return err
}
stateprovider, err := managers.GetStateProvider(config, providers)
if err == nil {
log.Errorf("M (Devices): failed to get state provider %+v", err)
s.StateProvider = stateprovider
} else {
return err
Expand All @@ -46,6 +51,7 @@ func (t *DevicesManager) DeleteSpec(ctx context.Context, name string) error {
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof("M (Devices): DeleteSpec name %s, traceId: %s", name, span.SpanContext().TraceID().String())

err = t.StateProvider.Delete(ctx, states.DeleteRequest{
ID: name,
Expand All @@ -56,7 +62,11 @@ func (t *DevicesManager) DeleteSpec(ctx context.Context, name string) error {
"resource": "devices",
},
})
return err
if err != nil {
log.Errorf("M (Devices):failed to delete state %s, error: %v, traceId: %s", name, err, span.SpanContext().TraceID().String())
return err
}
return nil
}

func (t *DevicesManager) UpsertSpec(ctx context.Context, name string, spec model.DeviceSpec) error {
Expand All @@ -65,6 +75,7 @@ func (t *DevicesManager) UpsertSpec(ctx context.Context, name string, spec model
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof("M (Devices): UpsertSpec name %s, traceId: %s", name, span.SpanContext().TraceID().String())

upsertRequest := states.UpsertRequest{
Value: states.StateEntry{
Expand All @@ -88,6 +99,7 @@ func (t *DevicesManager) UpsertSpec(ctx context.Context, name string, spec model
}
_, err = t.StateProvider.Upsert(ctx, upsertRequest)
if err != nil {
log.Errorf("M (Devices): failed to update state %s, error: %v, traceId: %s", name, err, span.SpanContext().TraceID().String())
return err
}
return nil
Expand All @@ -99,6 +111,7 @@ func (t *DevicesManager) ListSpec(ctx context.Context) ([]model.DeviceState, err
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof("M (Devices): ListSpec, traceId: %s", span.SpanContext().TraceID().String())

listRequest := states.ListRequest{
Metadata: map[string]string{
Expand All @@ -109,13 +122,15 @@ func (t *DevicesManager) ListSpec(ctx context.Context) ([]model.DeviceState, err
}
solutions, _, err := t.StateProvider.List(ctx, listRequest)
if err != nil {
log.Errorf("M (Devices): failed to list state, error: %v, traceId: %s", err, span.SpanContext().TraceID().String())
return nil, err
}
ret := make([]model.DeviceState, 0)
for _, t := range solutions {
var rt model.DeviceState
rt, err = getDeviceState(t.ID, t.Body)
if err != nil {
log.Errorf("M (Devices): ListSpec failed to get device state %s, error: %v, traceId: %s", t.ID, err, span.SpanContext().TraceID().String())
return nil, err
}
ret = append(ret, rt)
Expand Down Expand Up @@ -146,6 +161,7 @@ func (t *DevicesManager) GetSpec(ctx context.Context, id string) (model.DeviceSt
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof("M (Devices): GetSpec id %s, traceId: %s", id, span.SpanContext().TraceID().String())

getRequest := states.GetRequest{
ID: id,
Expand All @@ -157,11 +173,13 @@ func (t *DevicesManager) GetSpec(ctx context.Context, id string) (model.DeviceSt
}
target, err := t.StateProvider.Get(ctx, getRequest)
if err != nil {
log.Errorf("M (Devices): failed to get state %s, error: %v, traceId: %s", id, err, span.SpanContext().TraceID().String())
return model.DeviceState{}, err
}

ret, err := getDeviceState(id, target.Body)
if err != nil {
log.Errorf("M (Devices): GetSpec failed to get device state, error: %v, traceId: %s", err, span.SpanContext().TraceID().String())
return model.DeviceState{}, err
}
return ret, nil
Expand Down
27 changes: 25 additions & 2 deletions api/pkg/apis/v1alpha1/managers/reference/reference-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/reference"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/reporter"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"
"github.com/oliveagle/jsonpath"
)

var log = logger.NewLogger("coa.runtime")

type ReferenceManager struct {
managers.Manager
ReferenceProviders map[string]reference.IReferenceProvider
Expand All @@ -39,29 +42,32 @@ type CachedItem struct {
}

func (s *ReferenceManager) Init(context *contexts.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error {

err := s.Manager.Init(context, config, providers)
if err != nil {
log.Errorf("M (Reference): failed to initialize manager %+v", err)
return err
}

stateProvider, err := managers.GetStateProvider(config, providers)
if err == nil {
s.StateProvider = stateProvider
} else {
log.Errorf("M (Reference): failed to get state provider %+v", err)
return err
}

reportProvider, err := managers.GetReporter(config, providers)
if err == nil {
s.Reporter = reportProvider
} else {
log.Errorf("M (Reference): failed to get reporter %+v", err)
return err
}

ctx := contexts.ManagerContext{}
err = ctx.Init(context, nil)
if err != nil {
log.Errorf("M (Reference): failed to initialize manager context %+v", err)
return err
}

Expand All @@ -88,51 +94,63 @@ func (s *ReferenceManager) Init(context *contexts.VendorContext, config managers
}

func (s *ReferenceManager) GetExt(refType string, namespace string, id1 string, group1 string, kind1 string, version1 string, id2 string, group2 string, kind2 string, version2 string, iteration string, alias string) ([]byte, error) {
log.Infof("M (Reference): GetExt id1 - %s, id2 - %s, group2 - %s", id1, id2, group2)

if group2 != "download" {
data1, err := s.Get(refType, id1, namespace, group1, kind1, version1, "", "")
if err != nil {
log.Errorf("M (Reference): failed to get %s: %+v", id1, err)
return nil, err
}
data2, err := s.Get(refType, id2, namespace, group2, kind2, version2, "", "")
if err != nil {
log.Errorf("M (Reference): failed to get %s: %+v", id2, err)
return nil, err
}
return fillParameters(data1, data2, id1, alias)
} else {
data1, err := s.Get(refType, id1, namespace, group1, kind1, version1, "", "")
if err != nil {
log.Errorf("M (Reference): failed to get %s: %+v", id1, err)
return nil, err
}
obj := make(map[string]interface{}, 0)
err = json.Unmarshal(data1, &obj)
if err != nil {
log.Errorf("M (Reference): failed to unmarshall %s object: %+v", id1, err)
return nil, err
}
var specData []byte
if v, ok := obj["spec"]; ok {
specData, err = json.Marshal(v)
if err != nil {
log.Errorf("M (Reference): failed to unmarshall %s spec: %+v", id1, err)
return nil, err
}
} else {
log.Errorf("M (Reference): %s spec property not found", id1)
return nil, v1alpha2.NewCOAError(nil, "resolved object doesn't contain a 'spec' property", v1alpha2.InternalError)
}

model := model.ModelSpec{}
err = json.Unmarshal(specData, &model)
if err != nil {
log.Errorf("M (Reference): failed to unmarshall %s object spec: %+v", id1, err)
return nil, err
}
modelType := safeRead("model.type", model.Properties)
if modelType != "customvision" {
log.Errorf("M (Reference): failed to unmarshall %s object spec:", id1)
return nil, v1alpha2.NewCOAError(nil, "only 'customvision' model type is supported", v1alpha2.InternalError)
}
modelProject := safeRead("model.project", model.Properties)
if modelProject == "" {
log.Errorf("M (Reference): failed to read %s model.project property", id1)
return nil, v1alpha2.NewCOAError(nil, "property 'model.project' is not found", v1alpha2.InternalError)
}
modelEndpoint := safeRead("model.endpoint", model.Properties)
if modelEndpoint == "" {
log.Errorf("M (Reference): failed to read %s model.endpoint property", id1)
return nil, v1alpha2.NewCOAError(nil, "property 'model.endpoint' is not found", v1alpha2.InternalError)
}
modelVersions := make(map[string]string)
Expand All @@ -142,6 +160,7 @@ func (s *ReferenceManager) GetExt(refType string, namespace string, id1 string,
}
}
if len(modelVersions) == 0 {
log.Errorf("M (Reference): failed to read %s model.version property", id1)
return nil, v1alpha2.NewCOAError(nil, "no model version are found", v1alpha2.InternalError)
}
selection := ""
Expand All @@ -153,11 +172,13 @@ func (s *ReferenceManager) GetExt(refType string, namespace string, id1 string,
}
}
if selection == "" {
log.Errorf("M (Reference): failed to read %s model.version property", id1)
return nil, v1alpha2.NewCOAError(nil, fmt.Sprintf("requested version 'model.version.%s' is not found", iteration), v1alpha2.InternalError)
}

downloadData, err := s.Get("v1alpha2.CustomVision", modelProject, modelEndpoint, kind2, version2, selection, "", "")
if err != nil {
log.Errorf("M (Reference): failed to get %s: %+v", modelProject, err)
return nil, err
}
return downloadData, nil
Expand Down Expand Up @@ -191,6 +212,9 @@ func (s *ReferenceManager) Get(refType string, id string, namespace string, grou
} else {
entityId = fmt.Sprintf("%s-%s-%s-%s-%s-%s", refType, id, namespace, group, kind, version)
}

log.Infof("M (Reference): Get entityId- %s", entityId)

entity, err := s.StateProvider.Get(context.TODO(), states.GetRequest{
ID: entityId,
})
Expand Down Expand Up @@ -239,7 +263,6 @@ func (s *ReferenceManager) Get(refType string, id string, namespace string, grou
})
refData, _ := json.Marshal(ref)
return refData, nil

}

func (s *ReferenceManager) Report(id string, namespace string, group string, kind string, version string, properties map[string]string, overwrite bool) error {
Expand Down
9 changes: 8 additions & 1 deletion api/pkg/apis/v1alpha1/managers/target/target-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,35 @@ type DeviceSpec struct {
}

func (s *TargetManager) Init(context *contexts.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error {

probeProvider, err := managers.GetProbeProvider(config, providers)
if err == nil {
s.ProbeProvider = probeProvider
} else {
log.Errorf("M (Target): failed to get probe provider %+v", err)
return err
}

referenceProvider, err := managers.GetReferenceProvider(config, providers)
if err == nil {
s.ReferenceProvider = referenceProvider
} else {
log.Errorf("M (Target): failed to get reference provider %+v", err)
return err
}

uploaderProvider, err := managers.GetUploaderProvider(config, providers)
if err == nil {
s.UploaderProvider = uploaderProvider
} else {
log.Errorf("M (Target): failed to get upload provider %+v", err)
return err
}

reporterProvider, err := managers.GetReporter(config, providers)
if err == nil {
s.Reporter = reporterProvider
} else {
log.Errorf("M (Target): failed to get reporter %+v", err)
return err
}

Expand All @@ -97,6 +100,7 @@ func (s *TargetManager) Enabled() bool {
}
func (s *TargetManager) Poll() []error {
target := s.ReferenceProvider.TargetID()
log.Infof("M (Target): Poll target- %s", target)

ret, err := s.ReferenceProvider.List(target+"=true", "", "default", model.FabricGroup, "devices", "v1", "v1alpha2.ReferenceK8sCRD")
if err != nil {
Expand Down Expand Up @@ -170,6 +174,8 @@ func (s *TargetManager) Poll() []error {
return errors
}
func (s *TargetManager) reportStatus(deviceName string, targetName string, snapshot string, targetStatus string, deviceStatus string, overwrite bool, errStr string) []error {
log.Infof("M (Target): reportStatus deviceName- %s, targetName - %s, snapshot -%s targetStatus -%s, deviceStatus -%s, overwrite -%s", deviceName, targetName, snapshot, targetStatus, deviceStatus, overwrite)

ret := make([]error, 0)
report := make(map[string]string)
report[targetName+".status"] = targetStatus
Expand All @@ -194,6 +200,7 @@ func (s *TargetManager) reportStatus(deviceName string, targetName string, snaps
log.Debugf("failed to report target status: %s", err.Error())
ret = append(ret, err)
}

return ret
}
func (s *TargetManager) Reconcil() []error {
Expand Down
Loading