From 991da3a1e5dbb83278ab4e73df7efa7b502b0023 Mon Sep 17 00:00:00 2001 From: tithakka Date: Fri, 6 Feb 2026 22:57:04 +0530 Subject: [PATCH] Hyperfleet-361 : Use updated broker with logger context inheritance --- cmd/sentinel/main.go | 2 +- docs/testcontainers.md | 2 +- go.mod | 2 +- go.sum | 4 +- internal/config/config.go | 2 +- internal/metrics/metrics.go | 14 +-- internal/sentinel/sentinel.go | 2 +- internal/sentinel/sentinel_test.go | 75 ++++++++++++- pkg/logger/logger.go | 83 ++++++++++++-- pkg/logger/logger_test.go | 12 +- test/integration/integration_test.go | 160 +++++++++++++++++++++++++++ test/integration/testcontainer.go | 2 +- 12 files changed, 324 insertions(+), 36 deletions(-) diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index ec1e437..42ccc0f 100755 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -168,7 +168,7 @@ func runServe(cfg *config.SentinelConfig, logCfg *logger.LogConfig) error { // Initialize publisher using hyperfleet-broker library // Configuration is loaded from broker.yaml or BROKER_CONFIG_FILE env var - pub, err := broker.NewPublisher() + pub, err := broker.NewPublisher(log) if err != nil { log.Errorf(ctx, "Failed to initialize broker publisher: %v", err) return fmt.Errorf("failed to initialize broker publisher: %w", err) diff --git a/docs/testcontainers.md b/docs/testcontainers.md index e9e743a..cb12cc0 100644 --- a/docs/testcontainers.md +++ b/docs/testcontainers.md @@ -72,7 +72,7 @@ sudo chmod a+xrw /var/run/podman sudo chmod a+xrw /var/run/podman/podman.sock ``` -**Warning**: This solution grants elevated permissions and should only be used in development environments. +**Warn**: This solution grants elevated permissions and should only be used in development environments. ## Supported Brokers diff --git a/go.mod b/go.mod index 5f79ad4..543bb87 100755 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.16.2 github.com/google/uuid v1.6.0 github.com/oapi-codegen/runtime v1.1.2 - github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0 + github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1 github.com/prometheus/client_golang v1.23.2 github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.0 diff --git a/go.sum b/go.sum index 47ba112..ac894f1 100755 --- a/go.sum +++ b/go.sum @@ -187,8 +187,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= -github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0 h1:JfIF9ZVFIaXmo0hKAN42SCwItpqEHXtc54+WHRKuYg4= -github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU= +github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1 h1:Mvx0ojBvttYlwu3VfOHwvH+eEM1xA40GzNOZqv1cGyQ= +github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/internal/config/config.go b/internal/config/config.go index 9a9c14f..c166c27 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -123,7 +123,7 @@ func (c *SentinelConfig) ValidateTemplates() error { ctx := context.Background() if len(c.MessageData) == 0 { - log.Warning(ctx, "message_data is empty, CloudEvents will have minimal data payload") + log.Warn(ctx, "message_data is empty, CloudEvents will have minimal data payload") return nil } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 688fb98..60ebfb5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -220,7 +220,7 @@ func ResetSentinelMetrics() { func UpdatePendingResourcesMetric(resourceType, resourceSelector string, count int) { // Validate inputs if resourceType == "" || resourceSelector == "" { - getLogger().Warningf(context.Background(), "Attempted to update pending_resources metric with empty parameters: resourceType=%q resourceSelector=%q", resourceType, resourceSelector) + getLogger().Warnf(context.Background(), "Attempted to update pending_resources metric with empty parameters: resourceType=%q resourceSelector=%q", resourceType, resourceSelector) return } if count < 0 { @@ -251,7 +251,7 @@ func UpdatePendingResourcesMetric(resourceType, resourceSelector string, count i func UpdateEventsPublishedMetric(resourceType, resourceSelector, reason string) { // Validate inputs if resourceType == "" || resourceSelector == "" || reason == "" { - getLogger().Warningf(context.Background(), "Attempted to update events_published metric with empty parameters: resourceType=%q resourceSelector=%q reason=%q", resourceType, resourceSelector, reason) + getLogger().Warnf(context.Background(), "Attempted to update events_published metric with empty parameters: resourceType=%q resourceSelector=%q reason=%q", resourceType, resourceSelector, reason) return } @@ -281,7 +281,7 @@ func UpdateEventsPublishedMetric(resourceType, resourceSelector, reason string) func UpdateResourcesSkippedMetric(resourceType, resourceSelector, reason string) { // Validate inputs if resourceType == "" || resourceSelector == "" || reason == "" { - getLogger().Warningf(context.Background(), "Attempted to update resources_skipped metric with empty parameters: resourceType=%q resourceSelector=%q reason=%q", resourceType, resourceSelector, reason) + getLogger().Warnf(context.Background(), "Attempted to update resources_skipped metric with empty parameters: resourceType=%q resourceSelector=%q reason=%q", resourceType, resourceSelector, reason) return } @@ -311,11 +311,11 @@ func UpdateResourcesSkippedMetric(resourceType, resourceSelector, reason string) func UpdatePollDurationMetric(resourceType, resourceSelector string, durationSeconds float64) { // Validate inputs if resourceType == "" || resourceSelector == "" { - getLogger().Warningf(context.Background(), "Attempted to update poll_duration metric with empty parameters: resourceType=%q resourceSelector=%q", resourceType, resourceSelector) + getLogger().Warnf(context.Background(), "Attempted to update poll_duration metric with empty parameters: resourceType=%q resourceSelector=%q", resourceType, resourceSelector) return } if durationSeconds < 0 { - getLogger().Warningf(context.Background(), "Attempted to update poll_duration metric with negative duration: %f", durationSeconds) + getLogger().Warnf(context.Background(), "Attempted to update poll_duration metric with negative duration: %f", durationSeconds) return } @@ -343,7 +343,7 @@ func UpdatePollDurationMetric(resourceType, resourceSelector string, durationSec func UpdateAPIErrorsMetric(resourceType, resourceSelector, errorType string) { // Validate inputs if resourceType == "" || resourceSelector == "" || errorType == "" { - getLogger().Warningf(context.Background(), "Attempted to update api_errors metric with empty parameters: resourceType=%q resourceSelector=%q errorType=%q", resourceType, resourceSelector, errorType) + getLogger().Warnf(context.Background(), "Attempted to update api_errors metric with empty parameters: resourceType=%q resourceSelector=%q errorType=%q", resourceType, resourceSelector, errorType) return } @@ -372,7 +372,7 @@ func UpdateAPIErrorsMetric(resourceType, resourceSelector, errorType string) { func UpdateBrokerErrorsMetric(resourceType, resourceSelector, errorType string) { // Validate inputs if resourceType == "" || resourceSelector == "" || errorType == "" { - getLogger().Warningf(context.Background(), "Attempted to update broker_errors metric with empty parameters: resourceType=%q resourceSelector=%q errorType=%q", resourceType, resourceSelector, errorType) + getLogger().Warnf(context.Background(), "Attempted to update broker_errors metric with empty parameters: resourceType=%q resourceSelector=%q errorType=%q", resourceType, resourceSelector, errorType) return } diff --git a/internal/sentinel/sentinel.go b/internal/sentinel/sentinel.go index e8e0a7f..1a6ed3a 100644 --- a/internal/sentinel/sentinel.go +++ b/internal/sentinel/sentinel.go @@ -130,7 +130,7 @@ func (s *Sentinel) trigger(ctx context.Context) error { } // Publish to broker using configured topic - if err := s.publisher.Publish(topic, &event); err != nil { + if err := s.publisher.Publish(eventCtx, topic, &event); err != nil { // Record broker error metrics.UpdateBrokerErrorsMetric(resourceType, resourceSelector, "publish_error") s.logger.Errorf(eventCtx, "Failed to publish event resource_id=%s error=%v", resource.ID, err) diff --git a/internal/sentinel/sentinel_test.go b/internal/sentinel/sentinel_test.go index fb0938d..f420155 100644 --- a/internal/sentinel/sentinel_test.go +++ b/internal/sentinel/sentinel_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "net/http/httptest" "testing" @@ -81,7 +82,7 @@ type MockPublisher struct { publishError error } -func (m *MockPublisher) Publish(topic string, event *cloudevents.Event) error { +func (m *MockPublisher) Publish(ctx context.Context, topic string, event *cloudevents.Event) error { if m.publishError != nil { return m.publishError } @@ -94,6 +95,18 @@ func (m *MockPublisher) Close() error { return nil } +type MockPublisherWithLogger struct { + mockLogger *logger.MockLoggerWithContext +} + +func (m *MockPublisherWithLogger) Publish(ctx context.Context, topic string, event *cloudevents.Event) error { + // Simulate what broker does - log with the provided context + m.mockLogger.Info(ctx, fmt.Sprintf("broker publishing event to topic %s", topic)) + return nil +} + +func (m *MockPublisherWithLogger) Close() error { return nil } + // TestTrigger_Success tests successful event publishing func TestTrigger_Success(t *testing.T) { ctx := context.Background() @@ -305,10 +318,10 @@ func TestTrigger_MixedResources(t *testing.T) { // Create mock server server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { clusters := []map[string]interface{}{ - createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Should publish (max age exceeded) - createMockCluster("cluster-2", 1, 1, true, now.Add(-5*time.Minute)), // Should skip (within max age) - createMockCluster("cluster-3", 3, 3, false, now.Add(-1*time.Minute)), // Should publish (not ready max age exceeded: 1min > 10s) - createMockCluster("cluster-4", 5, 4, true, now.Add(-5*time.Minute)), // Should publish (generation changed) + createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Should publish (max age exceeded) + createMockCluster("cluster-2", 1, 1, true, now.Add(-5*time.Minute)), // Should skip (within max age) + createMockCluster("cluster-3", 3, 3, false, now.Add(-1*time.Minute)), // Should publish (not ready max age exceeded: 1min > 10s) + createMockCluster("cluster-4", 5, 4, true, now.Add(-5*time.Minute)), // Should publish (generation changed) } response := createMockClusterList(clusters) w.Header().Set("Content-Type", "application/json") @@ -359,3 +372,55 @@ func TestTrigger_MixedResources(t *testing.T) { } } } + +func TestTrigger_ContextPropagationToBroker(t *testing.T) { + var capturedLogs []string + var capturedContexts []context.Context + + mockLogger := &logger.MockLoggerWithContext{ + CapturedLogs: &capturedLogs, + CapturedContexts: &capturedContexts, + } + + // Create mock publisher that uses our mock logger + mockPublisherWithLogger := &MockPublisherWithLogger{ + mockLogger: mockLogger, + } + + ctx := context.Background() + ctx = logger.WithDecisionReason(ctx, "max_age_exceeded") + ctx = logger.WithTopic(ctx, "test-topic") + ctx = logger.WithSubset(ctx, "clusters") + ctx = logger.WithTraceID(ctx, "trace-123") + ctx = logger.WithSpanID(ctx, "span-456") + + event := cloudevents.NewEvent() + event.SetSpecVersion(cloudevents.VersionV1) + event.SetType("com.redhat.hyperfleet.cluster.reconcile") + event.SetSource("hyperfleet-sentinel") + event.SetID("test-id") + + err := mockPublisherWithLogger.Publish(ctx, "test-topic", &event) + if err != nil { + t.Fatalf("publish failed: %v", err) + } + + if len(capturedContexts) == 0 { + t.Fatal("no context captured by broker logger") + } + + brokerCtx := capturedContexts[0] + + // Test context values propagated to broker + if reason, ok := brokerCtx.Value(logger.DecisionReasonCtxKey).(string); !ok || reason != "max_age_exceeded" { + t.Errorf("decision_reason not propagated: got %v", reason) + } + + if topic, ok := brokerCtx.Value(logger.TopicCtxKey).(string); !ok || topic != "test-topic" { + t.Errorf("topic not propagated: got %v", topic) + } + + if traceID, ok := brokerCtx.Value(logger.TraceIDCtxKey).(string); !ok || traceID != "trace-123" { + t.Errorf("trace_id not propagated: got %v", traceID) + } +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index b883b73..d7c7067 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -59,8 +59,8 @@ type HyperFleetLogger interface { Debugf(ctx context.Context, format string, args ...interface{}) Info(ctx context.Context, message string) Infof(ctx context.Context, format string, args ...interface{}) - Warning(ctx context.Context, message string) - Warningf(ctx context.Context, format string, args ...interface{}) + Warn(ctx context.Context, message string) + Warnf(ctx context.Context, format string, args ...interface{}) Error(ctx context.Context, message string) Errorf(ctx context.Context, format string, args ...interface{}) Fatal(ctx context.Context, message string) @@ -457,11 +457,11 @@ func (l *logger) Infof(ctx context.Context, format string, args ...interface{}) l.log(ctx, LevelInfo, fmt.Sprintf(format, args...)) } -func (l *logger) Warning(ctx context.Context, message string) { +func (l *logger) Warn(ctx context.Context, message string) { l.log(ctx, LevelWarn, message) } -func (l *logger) Warningf(ctx context.Context, format string, args ...interface{}) { +func (l *logger) Warnf(ctx context.Context, format string, args ...interface{}) { l.log(ctx, LevelWarn, fmt.Sprintf(format, args...)) } @@ -522,14 +522,14 @@ func (l *logger) Extra(key string, value interface{}) HyperFleetLogger { // noopLogger is a logger that does nothing (used for verbosity filtering) type noopLogger struct{} -func (n *noopLogger) Debug(ctx context.Context, message string) {} -func (n *noopLogger) Debugf(ctx context.Context, format string, args ...interface{}) {} -func (n *noopLogger) Info(ctx context.Context, message string) {} -func (n *noopLogger) Infof(ctx context.Context, format string, args ...interface{}) {} -func (n *noopLogger) Warning(ctx context.Context, message string) {} -func (n *noopLogger) Warningf(ctx context.Context, format string, args ...interface{}) {} -func (n *noopLogger) Error(ctx context.Context, message string) {} -func (n *noopLogger) Errorf(ctx context.Context, format string, args ...interface{}) {} +func (n *noopLogger) Debug(ctx context.Context, message string) {} +func (n *noopLogger) Debugf(ctx context.Context, format string, args ...interface{}) {} +func (n *noopLogger) Info(ctx context.Context, message string) {} +func (n *noopLogger) Infof(ctx context.Context, format string, args ...interface{}) {} +func (n *noopLogger) Warn(ctx context.Context, message string) {} +func (n *noopLogger) Warnf(ctx context.Context, format string, args ...interface{}) {} +func (n *noopLogger) Error(ctx context.Context, message string) {} +func (n *noopLogger) Errorf(ctx context.Context, format string, args ...interface{}) {} func (n *noopLogger) Fatal(ctx context.Context, message string) { fmt.Fprintf(os.Stderr, "FATAL: %s\n", message) os.Exit(1) @@ -540,3 +540,62 @@ func (n *noopLogger) Fatalf(ctx context.Context, format string, args ...interfac } func (n *noopLogger) V(level int32) HyperFleetLogger { return n } func (n *noopLogger) Extra(key string, value interface{}) HyperFleetLogger { return n } + +type MockLoggerWithContext struct { + CapturedLogs *[]string + CapturedContexts *[]context.Context +} + +func (m *MockLoggerWithContext) Debug(ctx context.Context, message string) { + *m.CapturedLogs = append(*m.CapturedLogs, message) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Debugf(ctx context.Context, format string, args ...interface{}) { + *m.CapturedLogs = append(*m.CapturedLogs, fmt.Sprintf(format, args...)) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Info(ctx context.Context, message string) { + *m.CapturedLogs = append(*m.CapturedLogs, message) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Infof(ctx context.Context, format string, args ...interface{}) { + *m.CapturedLogs = append(*m.CapturedLogs, fmt.Sprintf(format, args...)) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Warn(ctx context.Context, message string) { + *m.CapturedLogs = append(*m.CapturedLogs, message) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Warnf(ctx context.Context, format string, args ...interface{}) { + *m.CapturedLogs = append(*m.CapturedLogs, fmt.Sprintf(format, args...)) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Error(ctx context.Context, message string) { + *m.CapturedLogs = append(*m.CapturedLogs, message) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Errorf(ctx context.Context, format string, args ...interface{}) { + *m.CapturedLogs = append(*m.CapturedLogs, fmt.Sprintf(format, args...)) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) +} + +func (m *MockLoggerWithContext) Fatal(ctx context.Context, message string) { + *m.CapturedLogs = append(*m.CapturedLogs, message) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) + fmt.Fprintf(os.Stderr, "FATAL: %s\n", message) + os.Exit(1) +} + +func (m *MockLoggerWithContext) Fatalf(ctx context.Context, format string, args ...interface{}) { + *m.CapturedLogs = append(*m.CapturedLogs, fmt.Sprintf(format, args...)) + *m.CapturedContexts = append(*m.CapturedContexts, ctx) + fmt.Fprintf(os.Stderr, "FATAL: %s\n", fmt.Sprintf(format, args...)) + os.Exit(1) +} diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index 2e07d24..fe5a23f 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -222,7 +222,7 @@ func TestLoggerLevelFiltering(t *testing.T) { case "info": log.Info(ctx, testMessage) case "warn": - log.Warning(ctx, testMessage) + log.Warn(ctx, testMessage) case "error": log.Error(ctx, testMessage) } @@ -505,6 +505,10 @@ func TestLoggerSentinelFields(t *testing.T) { if entry.Subset != "clusters" { t.Errorf("expected subset 'clusters', got %q", entry.Subset) } + + if entry.Component != "sentinel" { + t.Errorf("expected component 'sentinel', got %q", entry.Component) + } } func TestLoggerCorrelationFields(t *testing.T) { @@ -641,8 +645,8 @@ func TestNoopLogger(t *testing.T) { noop.Debugf(ctx, "test %s", "arg") noop.Info(ctx, "test") noop.Infof(ctx, "test %s", "arg") - noop.Warning(ctx, "test") - noop.Warningf(ctx, "test %s", "arg") + noop.Warn(ctx, "test") + noop.Warnf(ctx, "test %s", "arg") noop.Error(ctx, "test") noop.Errorf(ctx, "test %s", "arg") @@ -677,7 +681,7 @@ func TestLoggerFormattedMethods(t *testing.T) { }{ {"Debugf", func() { log.Debugf(ctx, "debug %s %d", "test", 123) }, "debug test 123"}, {"Infof", func() { log.Infof(ctx, "info %s %d", "test", 456) }, "info test 456"}, - {"Warningf", func() { log.Warningf(ctx, "warn %s %d", "test", 789) }, "warn test 789"}, + {"Warningf", func() { log.Warnf(ctx, "warn %s %d", "test", 789) }, "warn test 789"}, {"Errorf", func() { log.Errorf(ctx, "error %s %d", "test", 101) }, "error test 101"}, } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 774aed1..7656989 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -3,12 +3,15 @@ package integration import ( + "bytes" "context" "encoding/json" + "io" "net/http" "net/http/httptest" "os" "runtime" + "strings" "testing" "time" @@ -398,3 +401,160 @@ func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { t.Logf("TSL syntax validation completed - received correct format: %s", receivedSearchParam) } + +func TextIntegration_BrokerLoggerContext(t *testing.T) { + // Buffer to observe logs + var logBuffer bytes.Buffer + now := time.Now() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Get globalConfig and assign multiWriter to observe output logs + globalConfig := logger.GetGlobalConfig() + multiWriter := io.MultiWriter(globalConfig.Output, &logBuffer) + + helper := NewHelper() + cfg := &logger.LogConfig{ + Level: logger.LevelInfo, + Format: logger.FormatJSON, // JSON for easy parsing + Output: multiWriter, + Component: "sentinel", + Version: "test", + Hostname: "testhost", + } + log := logger.NewHyperFleetLoggerWithConfig(cfg) + + // Mock server returns clusters that will trigger event publishing + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + clusters := []map[string]interface{}{ + // This cluster will trigger max_age_ready exceeded event + createMockCluster("cluster-old", 2, 2, true, now.Add(-35*time.Minute)), // Exceeds 30min + // This cluster will trigger max_age_not_ready exceeded event + createMockCluster("cluster-not-ready", 1, 1, false, now.Add(-15*time.Second)), // Exceeds 10sec + } + response := createMockClusterList(clusters) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + hyperfleetClient, _ := client.NewHyperFleetClient(server.URL, 10*time.Second) + decisionEngine := engine.NewDecisionEngine(10*time.Second, 30*time.Minute) + + sentinelConfig := &config.SentinelConfig{ + ResourceType: "clusters", + PollInterval: 100 * time.Millisecond, + MaxAgeNotReady: 10 * time.Second, + MaxAgeReady: 30 * time.Minute, + ResourceSelector: []config.LabelSelector{ + {Label: "region", Value: "us-east"}, + {Label: "env", Value: "production"}, + }, + } + + // Create Sentinel with our logger and real RabbitMQ broker + s := sentinel.NewSentinel(sentinelConfig, hyperfleetClient, decisionEngine, helper.RabbitMQ.Publisher(), log) + + // Run Sentinel + errChan := make(chan error, 1) + go func() { + errChan <- s.Start(ctx) + }() + + time.Sleep(500 * time.Millisecond) + cancel() + + // Wait for Sentinel to stop + select { + case err := <-errChan: + if err != nil && err != context.Canceled { + t.Fatalf("Sentinel failed: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Sentinel did not stop within timeout") + } + + // Analyze logs + logOutput := logBuffer.String() + t.Logf("Captured log output:\n%s", logOutput) + + logLines := strings.Split(strings.TrimSpace(logOutput), "\n") + + var foundSentinelEventLog bool + var foundBrokerOperationLog bool + + for _, line := range logLines { + if line == "" { + continue + } + + var entry map[string]interface{} + if err := json.Unmarshal([]byte(line), &entry); err != nil { + t.Logf("Skipping non-JSON line: %s", line) + continue + } + + msg, hasMsg := entry["message"].(string) + component, hasComponent := entry["component"].(string) + + // Look for Sentinel's own event publishing logs + if hasMsg && hasComponent && component == "sentinel" && + strings.Contains(msg, "Published event") { + foundSentinelEventLog = true + + // Verify Sentinel context fields are present + if entry["decision_reason"] == nil { + t.Errorf("Sentinel event log missing decision_reason: %v", entry) + } + if entry["topic"] == nil { + t.Errorf("Sentinel event log missing topic: %v", entry) + } + if entry["subset"] == nil { + t.Errorf("Sentinel event log missing subset: %v", entry) + } + if entry["trace_id"] == nil { + t.Errorf("Sentinel event log missing trace_id: %v", entry) + } + if entry["span_id"] == nil { + t.Errorf("Sentinel event log missing span_id: %v", entry) + } + + t.Logf("Found Sentinel event log with context: decision_reason=%v topic=%v subset=%v tr", + entry["decision_reason"], entry["topic"], entry["subset"]) + } + + // Look for broker operation logs (these should inherit Sentinel context) + if hasMsg && hasComponent && component == "sentinel" && + (strings.Contains(msg, "broker") || strings.Contains(msg, "publish") || + strings.Contains(msg, "Creating publisher") || strings.Contains(msg, "publisher initialized")) { + foundBrokerOperationLog = true + + // Broker operations should have Sentinel context + if entry["component"] != "sentinel" { + t.Errorf("Broker operation log missing component=sentinel: %v", entry) + } + + // Check for context inheritance (these fields should be present if context flowed through) + if entry["decision_reason"] != nil || entry["topic"] != nil || entry["subset"] != nil { + t.Logf("Broker operation inherits Sentinel context: decision_reason=%v topic=%v subset=%v", + entry["decision_reason"], entry["topic"], entry["subset"]) + } + + t.Logf("Found broker operation log with component=sentinel: %s", msg) + } + } + + if !foundSentinelEventLog { + t.Error("No Sentinel event publishing logs found - events may not have been published") + } + + if !foundBrokerOperationLog { + t.Error("No broker operation logs found - broker may not be logging") + } + + // Success criteria: Both Sentinel and broker logs should use component=sentinel + if foundSentinelEventLog && foundBrokerOperationLog { + t.Log("SUCCESS: Logger context inheritance verified - both Sentinel and broker operations log with component=sentinel") + } +} diff --git a/test/integration/testcontainer.go b/test/integration/testcontainer.go index 40f34af..991ee0c 100644 --- a/test/integration/testcontainer.go +++ b/test/integration/testcontainer.go @@ -58,7 +58,7 @@ func NewRabbitMQTestContainer(ctx context.Context) (*RabbitMQTestContainer, erro "broker.rabbitmq.url": amqpURL, } - publisher, err := broker.NewPublisher(configMap) + publisher, err := broker.NewPublisher(log, configMap) if err != nil { container.Terminate(ctx) return nil, fmt.Errorf("failed to create broker publisher: %w", err)