Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ OTLP distribution.
`upstream_url`)
- `workspace_id` - Workspace identifier for policy sync
- `log_level` - Logging level (trace, debug, info, warn, err)
- `worker_count` - Number of I/O worker threads (default: 1). Higher values
improve throughput on multi-core systems. Can be overridden by
`TERO_MAX_WORKERS` environment variable.
- `policy_providers` - List of policy sources (file/http)
- `max_body_size` - Request/response body limits

Expand Down Expand Up @@ -518,6 +521,9 @@ OTLP distribution.
### Environment Variables:

- `TERO_LOG_LEVEL` - Override log level (trace, debug, info, warn, err)
- `TERO_MAX_WORKERS` - Override number of I/O worker threads (default: 1, or
value from config file). Higher values improve throughput on multi-core
systems.

## Design Principles

Expand Down
112 changes: 65 additions & 47 deletions bench/scaling/generate-policies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,12 @@ receivers:
datadog:
endpoint: 127.0.0.1:4319
read_timeout: 60s

processors:
batch: {}
YAML_HEAD

local log_processors="batch"
local metric_processors="batch"
local trace_processors="batch"
local log_processors=""
local metric_processors=""
local trace_processors=""
local has_processors=false

if [[ $count -gt 0 ]]; then
# Split: ~34% logs, ~33% metrics, ~33% traces
Expand All @@ -242,14 +240,22 @@ YAML_HEAD
local log_transform_count=$((log_count - log_drop_count))

# Build processor lists based on what we actually have
local log_proc_list="batch"
local metric_proc_list="batch"
local trace_proc_list="batch"
local log_proc_list=""
local metric_proc_list=""
local trace_proc_list=""

# Filter processor for logs - drop rules (only if we have drops)
if [[ $log_drop_count -gt 0 ]]; then
log_proc_list="$log_proc_list, filter/logs"
echo ""
if [[ -z "$log_proc_list" ]]; then
log_proc_list="filter/logs"
else
log_proc_list="$log_proc_list, filter/logs"
fi
if [[ "$has_processors" == "false" ]]; then
echo ""
echo "processors:"
has_processors=true
fi
echo " filter/logs:"
echo " error_mode: ignore"
echo " logs:"
Expand All @@ -268,8 +274,16 @@ YAML_HEAD

# Transform processor for logs - add attributes (only if we have transforms)
if [[ $log_transform_count -gt 0 ]]; then
log_proc_list="$log_proc_list, transform/logs"
echo ""
if [[ -z "$log_proc_list" ]]; then
log_proc_list="transform/logs"
else
log_proc_list="$log_proc_list, transform/logs"
fi
if [[ "$has_processors" == "false" ]]; then
echo ""
echo "processors:"
has_processors=true
fi
echo " transform/logs:"
echo " error_mode: ignore"
echo " log_statements:"
Expand All @@ -286,8 +300,16 @@ YAML_HEAD

# Filter processor for metrics - drop rules (only if we have metrics)
if [[ $metric_count -gt 0 ]]; then
metric_proc_list="$metric_proc_list, filter/metrics"
echo ""
if [[ -z "$metric_proc_list" ]]; then
metric_proc_list="filter/metrics"
else
metric_proc_list="$metric_proc_list, filter/metrics"
fi
if [[ "$has_processors" == "false" ]]; then
echo ""
echo "processors:"
has_processors=true
fi
echo " filter/metrics:"
echo " error_mode: ignore"
echo " metrics:"
Expand All @@ -308,9 +330,17 @@ YAML_HEAD
# otelcol's probabilistic_sampler applies a single sampling rate to all traces
# We use the average of our sampling percentages for comparison
if [[ $trace_count -gt 0 ]]; then
trace_proc_list="$trace_proc_list, probabilistic_sampler"
if [[ -z "$trace_proc_list" ]]; then
trace_proc_list="probabilistic_sampler"
else
trace_proc_list="$trace_proc_list, probabilistic_sampler"
fi
if [[ "$has_processors" == "false" ]]; then
echo ""
echo "processors:"
has_processors=true
fi
# Use 50% as average sampling rate (matches our 10/25/50/75/100 distribution)
echo ""
echo " probabilistic_sampler:"
echo " sampling_percentage: 50"
fi
Expand All @@ -320,6 +350,12 @@ YAML_HEAD
trace_processors="$trace_proc_list"
fi

# If no processors were added, output empty processors block
if [[ "$has_processors" == "false" ]]; then
echo ""
echo "processors: {}"
fi

cat <<YAML_TAIL

exporters:
Expand All @@ -328,23 +364,35 @@ exporters:
endpoint: http://127.0.0.1:9999
compression: none
encoding: proto
sending_queue:
queue_size: 100000
batch: {}

otlphttp/metrics:
endpoint: http://127.0.0.1:9999
compression: none
encoding: proto
sending_queue:
queue_size: 100000
batch: {}

otlphttp/traces:
endpoint: http://127.0.0.1:9999
compression: none
encoding: proto
sending_queue:
queue_size: 100000
batch: {}

# For Datadog input, convert to OTLP protobuf for consistent output format
# Note: otelcol's datadog exporter requires an API key, so we use otlphttp
otlphttp/datadog:
endpoint: http://127.0.0.1:9999
compression: none
encoding: proto
sending_queue:
queue_size: 100000
batch: {}

service:
telemetry:
Expand Down Expand Up @@ -561,9 +609,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/logs"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

otlp_metrics_out:
type: http
Expand All @@ -572,9 +617,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/metrics"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

otlp_traces_out:
type: http
Expand All @@ -583,9 +625,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/traces"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

datadog_logs_out:
type: http
Expand All @@ -594,9 +633,6 @@ sinks:
uri: "http://127.0.0.1:9999/api/v2/logs"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

datadog_metrics_out:
type: http
Expand All @@ -605,9 +641,6 @@ sinks:
uri: "http://127.0.0.1:9999/api/v2/series"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1
YAML_SINKS
else
# No transforms - passthrough mode
Expand All @@ -622,9 +655,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/logs"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

otlp_metrics_out:
type: http
Expand All @@ -633,9 +663,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/metrics"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

otlp_traces_out:
type: http
Expand All @@ -644,9 +671,6 @@ sinks:
uri: "http://127.0.0.1:9999/v1/traces"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

datadog_logs_out:
type: http
Expand All @@ -655,9 +679,6 @@ sinks:
uri: "http://127.0.0.1:9999/api/v2/logs"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1

datadog_metrics_out:
type: http
Expand All @@ -666,9 +687,6 @@ sinks:
uri: "http://127.0.0.1:9999/api/v2/series"
encoding:
codec: json
batch:
max_bytes: 1048576
timeout_secs: 1
YAML_SINKS
fi
}
Expand Down
1 change: 1 addition & 0 deletions bench/scaling/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ create_edge_config() {
{
"listen_address": "127.0.0.1",
"listen_port": $port,
"worker_count": 4,
"upstream_url": "http://127.0.0.1:$ECHO_SERVER_PORT",
"workspace_id": "benchmark",
"log_level": "err",
Expand Down
1 change: 1 addition & 0 deletions bench/scaling/test-equivalence.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ create_edge_config() {
cat > "$config_file" <<EOF
{
"listen_address": "127.0.0.1",
"worker_count": 4,
"listen_port": $EDGE_OTLP_PORT,
"upstream_url": "http://127.0.0.1:$ECHO_PORT",
"workspace_id": "equivalence-test",
Expand Down
1 change: 1 addition & 0 deletions config-otlp.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"upstream_url": "https://otlp.${TERO_DD_REGION}.datadoghq.com",
"log_level": "info",
"max_body_size": 1048576,
"worker_count": 4,
"policy_providers": [
{
"id": "file",
Expand Down
4 changes: 3 additions & 1 deletion config-prometheus.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"upstream_url": "http://localhost:8888",
"log_level": "info",
"max_body_size": 52428800,
"worker_count": 4,
"prometheus": {
"max_bytes_per_scrape": 10000
"max_input_bytes_per_scrape": 52428800,
"max_output_bytes_per_scrape": 52428800
},
"policy_providers": [
{
Expand Down
1 change: 1 addition & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"metrics_url": "https://api.${TERO_DD_REGION}.datadoghq.com",
"log_level": "info",
"max_body_size": 1048576,
"worker_count": 4,
"policy_providers": [
{
"id": "file",
Expand Down
1 change: 1 addition & 0 deletions otelcol-test.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"path":"/v1/logs","content_type":"application/x-protobuf","data_base64":"Cr0EEtMDEpwBUggBAgMEBQYHC0oQAQIDBAUGBwgJCgsMDQ4PEDIgEhUKE2RiLXByaW1hcnkuaW50ZXJuYWwKB2RiX2hvc3QyGRILCglFVElNRURPVVQKCmVycm9yX2NvZGUqLwotRGF0YWJhc2UgY29ubmVjdGlvbiBmYWlsZWQ6IHRpbWVvdXQgYWZ0ZXIgMzBzGgVFUlJPUhARCQBe+uj+nJcXEnxSCAECAwQFBgcPShABAgMEBQYHCAkKCwwNDg8QMhYSAhgMChB2YWxpZGF0aW9uX3J1bGVzKjQKMlJlcXVlc3QgdmFsaWRhdGlvbiBwYXNzZWQgZm9yIGVuZHBvaW50IC9hcGkvb3JkZXJzGgVERUJVRxAFCQCGZdf/nJcXEpwBUggBAgMEBQYHEUoQAQIDBAUGBwgJCgsMDQ4PEDIlEhkKF2RhaWx5X3JlcG9ydF9nZW5lcmF0aW9uCghqb2JfbmFtZTISEgMYuCoKC2R1cmF0aW9uX21zKjIKMFNjaGVkdWxlZCBqb2IgY29tcGxldGVkOiBkYWlseV9yZXBvcnRfZ2VuZXJhdGlvbhoESU5GTxAJCQAam04AnZcXChUSBTEuMC4wCgxiZW5jaC1sb2dnZXIKZQofEg8KDWJlbmNoLXNlcnZpY2UKDHNlcnZpY2UubmFtZQoaEgcKBTEuMC4wCg9zZXJ2aWNlLnZlcnNpb24KJhIMCgpwcm9kdWN0aW9uChZkZXBsb3ltZW50LmVudmlyb25tZW50"}
8 changes: 8 additions & 0 deletions src/config/parser.zig
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const ConfigJson = struct {
log_level: []const u8,
max_body_size: u32,
max_upstream_retries: ?u8 = null,
worker_count: ?u32 = null,
policy_providers: ?[]ProviderJson = null,
service: ?ServiceJson = null,
prometheus: ?PrometheusJson = null,
Expand Down Expand Up @@ -123,6 +124,13 @@ pub fn parseConfigBytes(allocator: std.mem.Allocator, json_bytes: []const u8) !*
config.max_body_size = json_config.max_body_size;
config.max_upstream_retries = json_config.max_upstream_retries orelse 10;

// Parse worker_count: TERO_MAX_WORKERS env var takes precedence over config file
if (std.posix.getenv("TERO_MAX_WORKERS")) |env_val| {
config.worker_count = std.fmt.parseInt(u32, env_val, 10) catch 1;
} else if (json_config.worker_count) |count| {
config.worker_count = count;
}

// Parse policy providers if present
if (json_config.policy_providers) |json_providers| {
const providers = try parseProviders(allocator, json_providers);
Expand Down
6 changes: 6 additions & 0 deletions src/config/types.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub const ProxyConfig = struct {
// Retry config
max_upstream_retries: u8,

// Worker config - number of I/O threads for connection handling
// Higher values improve throughput on multi-core systems
// Can be overridden by TERO_MAX_WORKERS environment variable
worker_count: u32 = 1,

// Policy providers
policy_providers: []ProviderConfig,

Expand All @@ -68,6 +73,7 @@ pub const ProxyConfig = struct {
.log_level = .info,
.max_body_size = 1024 * 1024, // 1MB
.max_upstream_retries = 10,
.worker_count = 1,
.policy_providers = &.{},
};
}
Expand Down
1 change: 1 addition & 0 deletions src/datadog_main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub fn main() !void {
config.listen_port,
config.max_upstream_retries,
config.max_body_size,
config.worker_count,
&module_registrations,
);
defer proxy.deinit();
Expand Down
1 change: 1 addition & 0 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ pub fn main() !void {
config.listen_port,
config.max_upstream_retries,
config.max_body_size,
config.worker_count,
&module_registrations,
);
defer proxy.deinit();
Expand Down
1 change: 1 addition & 0 deletions src/otlp_main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ pub fn main() !void {
config.listen_port,
config.max_upstream_retries,
config.max_body_size,
config.worker_count,
&module_registrations,
);
defer proxy.deinit();
Expand Down
1 change: 1 addition & 0 deletions src/prometheus_main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ pub fn main() !void {
config.listen_port,
config.max_upstream_retries,
config.max_body_size,
config.worker_count,
&module_registrations,
);
defer proxy.deinit();
Expand Down
Loading