Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/testserver/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/util/flowcontrol"

"github.com/castai/cluster-controller/internal/helm"
"github.com/castai/cluster-controller/internal/metrics"
"github.com/castai/cluster-controller/loadtest"
"github.com/castai/cluster-controller/loadtest/scenarios"
)
Expand All @@ -28,6 +29,8 @@ func run(ctx context.Context) error {
cfg := loadtest.GetConfig()
logger.Info("creating test server")

loadtest.RegisterTestMetrics(metrics.GetRegistry())

testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{
MaxActionsPerCall: 1000,
TimeoutWaitingForActions: 60 * time.Second,
Expand All @@ -51,7 +54,7 @@ func run(ctx context.Context) error {
// Choose scenarios below by adding/removing/etc. instances of scenarios.XXX()
// All scenarios in the list run in parallel (but not necessarily at the same time if preparation takes different time).
testScenarios := []scenarios.TestScenario{
scenarios.CheckNodeStatus(5000, logger),
scenarios.DrainNode(1, 10, logger),
}

logger.Info("Starting continuous test scenario execution")
Expand Down Expand Up @@ -88,8 +91,10 @@ func run(ctx context.Context) error {

if len(receivedErrors) > 0 {
logger.Error(fmt.Sprintf("Iteration %d completed with (%d) errors: %v", iteration, len(receivedErrors), errors.Join(receivedErrors...)))
loadtest.IncrementTestRunFailure(len(receivedErrors))
} else {
logger.Info(fmt.Sprintf("Iteration %d completed successfully", iteration))
loadtest.IncrementTestRunSuccess()
}

logger.Info("Waiting 1 minute before next iteration")
Expand Down
5 changes: 5 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func NewMetricsMux() *http.ServeMux {
func Gather() ([]*dto.MetricFamily, error) {
return registry.Gather()
}

// GetRegistry returns the prometheus registry for registering additional metrics.
func GetRegistry() prometheus.Registerer {
return registry
}
17 changes: 13 additions & 4 deletions loadtest/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ import (
"net/http"

"github.com/castai/cluster-controller/internal/castai"
"github.com/castai/cluster-controller/internal/metrics"
)

func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer) error {
http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions", func(w http.ResponseWriter, r *http.Request) {
mux := http.NewServeMux()

// Register metrics endpoint - metrics.NewMetricsMux() has "/metrics" internally, so we strip it
metricsMux := metrics.NewMetricsMux()
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metricsMux.ServeHTTP(w, r)
})

mux.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions", func(w http.ResponseWriter, r *http.Request) {
result, err := testServer.GetActions(r.Context(), "")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -29,7 +38,7 @@ func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer
}
})

http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/{action_id}/ack", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/{action_id}/ack", func(w http.ResponseWriter, r *http.Request) {
actionID := r.PathValue("action_id")
var req castai.AckClusterActionRequest
err := json.NewDecoder(r.Body).Decode(&req)
Expand All @@ -45,7 +54,7 @@ func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer
}
})

http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/logs", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/logs", func(w http.ResponseWriter, r *http.Request) {
var req castai.LogEntry
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
Expand All @@ -61,5 +70,5 @@ func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer
})

//nolint:gosec // Missing timeouts are not a real issue here.
return http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil)
return http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), mux)
}
38 changes: 38 additions & 0 deletions loadtest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package loadtest

import (
"github.com/prometheus/client_golang/prometheus"
)

// testRunCounter tracks test run iterations by status.
var testRunCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "test_run_total",
Help: "Count of test run iterations by status (success/failed).",
},
[]string{"status"},
)

// testRunErrorsCounter tracks total number of errors in failed test runs.
var testRunErrorsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "test_run_errors_total",
Help: "Total count of errors across all failed test runs.",
},
)

func IncrementTestRunSuccess() {
testRunCounter.With(prometheus.Labels{"status": "success"}).Inc()
}

func IncrementTestRunFailure(errorCount int) {
testRunCounter.With(prometheus.Labels{"status": "failed"}).Inc()
testRunErrorsCounter.Add(float64(errorCount))
}

func RegisterTestMetrics(registry prometheus.Registerer) {
registry.MustRegister(
testRunCounter,
testRunErrorsCounter,
)
}
8 changes: 5 additions & 3 deletions loadtest/scenarios/drain_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (s *drainNodeScenario) Preparation(ctx context.Context, namespace string, c
deployment.Namespace = namespace
//nolint:gosec // Not afraid of overflow here.
deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas))
deployment.Spec.Template.Spec.NodeName = nodeName

_, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{})
if err != nil {
Expand All @@ -92,7 +91,9 @@ func (s *drainNodeScenario) Preparation(ctx context.Context, namespace string, c
})
}

return errGroup.Wait()
err := errGroup.Wait()

return err
}

func (s *drainNodeScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error {
Expand Down Expand Up @@ -160,7 +161,8 @@ func (s *drainNodeScenario) Run(ctx context.Context, _ string, _ kubernetes.Inte
CreatedAt: time.Now().UTC(),
ActionDrainNode: &castai.ActionDrainNode{
NodeName: node.Name,
NodeID: "",
ProviderId: node.Name,
NodeID: node.Name,
DrainTimeoutSeconds: 60,
Force: true,
},
Expand Down
Loading