Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,38 @@ make fmt
```
hyperfleet-adapter/
├── cmd/
│ └── adapter/ # Main application entry point
│ └── adapter/ # Main application entry point
├── pkg/
│ ├── errors/ # Error handling utilities
│ └── logger/ # Structured logging with context support
│ ├── constants/ # Shared constants (annotations, labels)
│ ├── errors/ # Error handling utilities
│ ├── health/ # Health and metrics servers
│ ├── logger/ # Structured logging with context support
│ ├── otel/ # OpenTelemetry tracing utilities
│ ├── utils/ # General utility functions
│ └── version/ # Version information
├── internal/
│ ├── broker_consumer/ # Message broker consumer implementations
│ ├── config_loader/ # Configuration loading logic
│ ├── criteria/ # Precondition and CEL evaluation
│ ├── executor/ # Event execution engine
│ ├── hyperfleet_api/ # HyperFleet API client
│ └── k8s_client/ # Kubernetes client wrapper
├── test/ # Integration tests
├── charts/ # Helm chart for Kubernetes deployment
├── Dockerfile # Multi-stage Docker build
├── Makefile # Build and test automation
├── go.mod # Go module dependencies
└── README.md # This file
│ ├── config_loader/ # Configuration loading and validation
│ ├── criteria/ # Precondition and CEL evaluation
│ ├── executor/ # Event execution engine (phases pipeline)
│ ├── hyperfleet_api/ # HyperFleet API client
│ ├── k8s_client/ # Kubernetes client wrapper
│ ├── maestro_client/ # Maestro/OCM ManifestWork client
│ ├── manifest/ # Manifest utilities (generation, rendering)
│ └── resource_applier/ # ResourceApplier interface (unified apply)
├── test/
│ └── integration/ # Integration tests
│ ├── config-loader/ # Config loader integration tests
│ ├── executor/ # Executor integration tests
│ ├── k8s_client/ # K8s client integration tests
│ ├── maestro_client/ # Maestro client integration tests
│ └── testutil/ # Test utilities
├── charts/ # Helm chart for Kubernetes deployment
├── configs/ # Configuration templates and examples
├── scripts/ # Build and test scripts
├── Dockerfile # Multi-stage Docker build
├── Makefile # Build and test automation
├── go.mod # Go module dependencies
└── README.md # This file
```

### Available Make Targets
Expand Down
29 changes: 3 additions & 26 deletions internal/config_loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package config_loader
import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/utils"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -250,29 +249,7 @@ func loadYAMLFile(baseDir, refPath string) (map[string]interface{}, error) {

// resolvePath resolves a relative path against the base directory and validates
// that the resolved path does not escape the base directory.
// This delegates to utils.ResolveSecurePath.
func resolvePath(baseDir, refPath string) (string, error) {
baseAbs, err := filepath.Abs(baseDir)
if err != nil {
return "", fmt.Errorf("failed to resolve base directory: %w", err)
}
baseClean := filepath.Clean(baseAbs)

var targetPath string
if filepath.IsAbs(refPath) {
targetPath = filepath.Clean(refPath)
} else {
targetPath = filepath.Clean(filepath.Join(baseClean, refPath))
}

// Check if target path is within base directory
rel, err := filepath.Rel(baseClean, targetPath)
if err != nil {
return "", fmt.Errorf("path %q escapes base directory", refPath)
}

if strings.HasPrefix(rel, "..") {
return "", fmt.Errorf("path %q escapes base directory", refPath)
}

return targetPath, nil
return utils.ResolveSecurePath(baseDir, refPath)
}
185 changes: 70 additions & 115 deletions internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,39 @@ func NewExecutor(config *ExecutorConfig) (*Executor, error) {
return nil, err
}

log := config.Logger

// Create phases (each phase contains its own business logic)
paramExtractionPhase := NewParamExtractionPhase(config.Config, config.K8sClient, log)
preconditionsPhase := NewPreconditionsPhase(config.APIClient, config.Config, log)
resourcesPhase := NewResourcesPhase(config.K8sClient, config.Config, log)
postActionsPhase := NewPostActionsPhase(config.APIClient, config.Config, log)

// Create pipeline with all phases
pipeline := NewPipeline(log,
paramExtractionPhase,
preconditionsPhase,
resourcesPhase,
postActionsPhase,
)

return &Executor{
config: config,
precondExecutor: newPreconditionExecutor(config),
resourceExecutor: newResourceExecutor(config),
postActionExecutor: newPostActionExecutor(config),
log: config.Logger,
config: config,
log: log,
pipeline: pipeline,
paramExtractionPhase: paramExtractionPhase,
preconditionsPhase: preconditionsPhase,
resourcesPhase: resourcesPhase,
postActionsPhase: postActionsPhase,
Comment on lines 26 to 49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's look at the phase Execute implementations
echo "=== Phase Execute implementations ==="
rg -n --type=go -C5 'func \(.*\*.*Phase\) Execute' internal/executor/

echo ""
echo "=== State assignments in executor package ==="
rg -n --type=go '\.(results|outcome|state)\s*=' internal/executor/

echo ""
echo "=== Phase struct definitions ==="
rg -n --type=go '^type.*Phase struct' internal/executor/

Repository: openshift-hyperfleet/hyperfleet-adapter

Length of output: 4105


🏁 Script executed:

#!/bin/bash
# Get phase struct definitions with their fields
echo "=== Phase struct definitions with fields ==="
rg -n --type=go -A10 '^type.*Phase struct' internal/executor/

echo ""
echo "=== Executor struct definition ==="
rg -n --type=go -A15 '^type Executor struct' internal/executor/

echo ""
echo "=== Executor.Execute method ==="
rg -n --type=go -A20 'func \(\s*e\s*\*Executor\) Execute\(' internal/executor/

Repository: openshift-hyperfleet/hyperfleet-adapter

Length of output: 6361


🏁 Script executed:

#!/bin/bash
# Get full Executor.Execute method
echo "=== Full Executor.Execute method ==="
rg -n --type=go -A100 'func \(\s*e\s*\*Executor\) Execute\(' internal/executor/executor.go | head -120

echo ""
echo "=== How results are retrieved (GetResults methods) ==="
rg -n --type=go 'func.*GetResults\|GetOutcome\|GetPreconditions' internal/executor/

echo ""
echo "=== Pipeline Execute method ==="
rg -n --type=go -A30 'func \(\s*p\s*\*Pipeline\) Execute\(' internal/executor/

Repository: openshift-hyperfleet/hyperfleet-adapter

Length of output: 6039


Guard shared phase state across concurrent Executor execution.

Lines 29-38 store phase instances on the Executor. These phases maintain mutable state fields (results, outcome) that are written during each phase Execute and read in buildExecutionResult (lines 157, 159, 166, 170). If the same Executor instance is used concurrently, results can race: one execution could be writing to a phase's state while another reads it. Consider instantiating phases per Execute call, adding synchronization (mutex), or documenting and enforcing single-threaded use.

🤖 Prompt for AI Agents
In `@internal/executor/executor.go` around lines 26 - 49, The stored phase
instances on Executor (paramExtractionPhase, preconditionsPhase, resourcesPhase,
postActionsPhase) are shareable mutable state and can race across concurrent
Execute runs; change the code to create fresh phase instances inside the
Executor.Execute method (call NewParamExtractionPhase, NewPreconditionsPhase,
NewResourcesPhase, NewPostActionsPhase at the start of Execute) and use those
local variables for pipeline execution and for buildExecutionResult reads
instead of the Executor fields; alternatively, if you must keep them on the
Executor, protect all phase Execute and buildExecutionResult accesses with a
mutex on the Executor to serialize runs—update references in
buildExecutionResult and Execute to use the new local instances or to acquire
the mutex accordingly.

}, nil
}

func validateExecutorConfig(config *ExecutorConfig) error {
if config == nil {
return fmt.Errorf("config is required")
}

if config.Config == nil {
return fmt.Errorf("config is required")
}

requiredFields := []string{
"Config",
"APIClient",
"Logger",
"K8sClient"}
Expand Down Expand Up @@ -89,107 +103,15 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu

execCtx := NewExecutionContext(ctx, rawData, e.config.Config)

// Initialize execution result
result := &ExecutionResult{
Status: StatusSuccess,
Params: make(map[string]interface{}),
Errors: make(map[ExecutionPhase]error),
CurrentPhase: PhaseParamExtraction,
}

e.log.Info(ctx, "Processing event")

// Phase 1: Parameter Extraction
e.log.Infof(ctx, "Phase %s: RUNNING", result.CurrentPhase)
if err := e.executeParamExtraction(execCtx); err != nil {
result.Status = StatusFailed
result.Errors[PhaseParamExtraction] = err
execCtx.SetError("ParameterExtractionFailed", err.Error())
resErr := fmt.Errorf("resource execution failed: %w", err)
errCtx := logger.WithErrorField(ctx, resErr)
e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase)
return result
}
result.Params = execCtx.Params
e.log.Debugf(ctx, "Parameter extraction completed: extracted %d params", len(execCtx.Params))

// Phase 2: Preconditions
result.CurrentPhase = PhasePreconditions
preconditions := e.config.Config.Spec.Preconditions
e.log.Infof(ctx, "Phase %s: RUNNING - %d configured", result.CurrentPhase, len(preconditions))
precondOutcome := e.precondExecutor.ExecuteAll(ctx, preconditions, execCtx)
result.PreconditionResults = precondOutcome.Results

if precondOutcome.Error != nil {
// Process execution error: precondition evaluation failed
result.Status = StatusFailed
precondErr := fmt.Errorf("precondition evaluation failed: error=%w", precondOutcome.Error)
result.Errors[result.CurrentPhase] = precondErr
execCtx.SetError("PreconditionFailed", precondOutcome.Error.Error())
errCtx := logger.WithErrorField(ctx, precondOutcome.Error)
e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase)
result.ResourcesSkipped = true
result.SkipReason = "PreconditionFailed"
execCtx.SetSkipped("PreconditionFailed", precondOutcome.Error.Error())
// Continue to post actions for error reporting
} else if !precondOutcome.AllMatched {
// Business outcome: precondition not satisfied
result.ResourcesSkipped = true
result.SkipReason = precondOutcome.NotMetReason
execCtx.SetSkipped("PreconditionNotMet", precondOutcome.NotMetReason)
e.log.Infof(ctx, "Phase %s: SUCCESS - NOT_MET - %s", result.CurrentPhase, precondOutcome.NotMetReason)
} else {
// All preconditions matched
e.log.Infof(ctx, "Phase %s: SUCCESS - MET - %d passed", result.CurrentPhase, len(precondOutcome.Results))
}
// Execute all phases through the pipeline
phaseResults := e.pipeline.Execute(ctx, execCtx)

// Phase 3: Resources (skip if preconditions not met or previous error)
result.CurrentPhase = PhaseResources
resources := e.config.Config.Spec.Resources
e.log.Infof(ctx, "Phase %s: RUNNING - %d configured", result.CurrentPhase, len(resources))
if !result.ResourcesSkipped {
resourceResults, err := e.resourceExecutor.ExecuteAll(ctx, resources, execCtx)
result.ResourceResults = resourceResults

if err != nil {
result.Status = StatusFailed
resErr := fmt.Errorf("resource execution failed: %w", err)
result.Errors[result.CurrentPhase] = resErr
execCtx.SetError("ResourceFailed", err.Error())
errCtx := logger.WithErrorField(ctx, err)
e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase)
// Continue to post actions for error reporting
} else {
e.log.Infof(ctx, "Phase %s: SUCCESS - %d processed", result.CurrentPhase, len(resourceResults))
}
} else {
e.log.Infof(ctx, "Phase %s: SKIPPED - %s", result.CurrentPhase, result.SkipReason)
}

// Phase 4: Post Actions (always execute for error reporting)
result.CurrentPhase = PhasePostActions
postConfig := e.config.Config.Spec.Post
postActionCount := 0
if postConfig != nil {
postActionCount = len(postConfig.PostActions)
}
e.log.Infof(ctx, "Phase %s: RUNNING - %d configured", result.CurrentPhase, postActionCount)
postResults, err := e.postActionExecutor.ExecuteAll(ctx, postConfig, execCtx)
result.PostActionResults = postResults

if err != nil {
result.Status = StatusFailed
postErr := fmt.Errorf("post action execution failed: %w", err)
result.Errors[result.CurrentPhase] = postErr
errCtx := logger.WithErrorField(ctx, err)
e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase)
} else {
e.log.Infof(ctx, "Phase %s: SUCCESS - %d executed", result.CurrentPhase, len(postResults))
}

// Finalize
result.ExecutionContext = execCtx
// Build execution result from phase results
result := e.buildExecutionResult(phaseResults, execCtx)

// Log final status
if result.Status == StatusSuccess {
e.log.Infof(ctx, "Event execution finished: event_execution_status=success resources_skipped=%t reason=%s", result.ResourcesSkipped, result.SkipReason)
} else {
Expand All @@ -202,20 +124,53 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu
errCtx := logger.WithErrorField(ctx, combinedErr)
e.log.Errorf(errCtx, "Event execution finished: event_execution_status=failed")
}

return result
}

// executeParamExtraction extracts parameters from the event and environment
func (e *Executor) executeParamExtraction(execCtx *ExecutionContext) error {
// Extract configured parameters
if err := extractConfigParams(e.config.Config, execCtx, e.config.K8sClient); err != nil {
return err
// buildExecutionResult converts phase results into the final ExecutionResult
func (e *Executor) buildExecutionResult(phaseResults []PhaseResult, execCtx *ExecutionContext) *ExecutionResult {
result := &ExecutionResult{
Status: StatusSuccess,
Params: execCtx.Params,
Errors: make(map[ExecutionPhase]error),
ExecutionContext: execCtx,
}

// Add metadata params
addMetadataParams(e.config.Config, execCtx)
// Track the last phase that ran
for _, pr := range phaseResults {
result.CurrentPhase = pr.Phase

return nil
if pr.Error != nil {
result.Status = StatusFailed
result.Errors[pr.Phase] = pr.Error
}

if pr.Skipped && pr.Phase == PhaseResources {
result.ResourcesSkipped = true
result.SkipReason = pr.SkipReason
}
}

// Collect results from individual phases
if e.preconditionsPhase != nil {
result.PreconditionResults = e.preconditionsPhase.Results()
// Update skip reason from preconditions outcome if available
if outcome := e.preconditionsPhase.Outcome(); outcome != nil && !outcome.AllMatched {
result.ResourcesSkipped = true
result.SkipReason = outcome.NotMetReason
}
}

if e.resourcesPhase != nil {
result.ResourceResults = e.resourcesPhase.Results()
}

if e.postActionsPhase != nil {
result.PostActionResults = e.postActionsPhase.Results()
}

return result
}

// startTracedExecution creates an OTel span and adds trace context to logs.
Expand Down
7 changes: 4 additions & 3 deletions internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestExecute_ParamExtraction(t *testing.T) {
}
}

func TestParamExtractor(t *testing.T) {
func TestParamExtractionPhase(t *testing.T) {
t.Setenv("TEST_ENV", "env-value")

evt := event.New()
Expand Down Expand Up @@ -361,8 +361,9 @@ func TestParamExtractor(t *testing.T) {
},
}

// Extract params using pure function
err := extractConfigParams(config, execCtx, nil)
// Create ParamExtractionPhase and extract params
phase := NewParamExtractionPhase(config, nil, logger.NewTestLogger())
err := phase.extractConfigParams(execCtx)

if tt.expectError {
assert.Error(t, err)
Expand Down
Loading