diff --git a/README.md b/README.md index 0b03624..f3a16e0 100644 --- a/README.md +++ b/README.md @@ -487,6 +487,9 @@ OTLP distribution. `upstream_url`) - `workspace_id` - Workspace identifier for policy sync - `log_level` - Logging level (trace, debug, info, warn, err) +- `worker_count` - Number of I/O worker threads (default: 1). Higher values + improve throughput on multi-core systems. Can be overridden by + `TERO_MAX_WORKERS` environment variable. - `policy_providers` - List of policy sources (file/http) - `max_body_size` - Request/response body limits @@ -518,6 +521,9 @@ OTLP distribution. ### Environment Variables: - `TERO_LOG_LEVEL` - Override log level (trace, debug, info, warn, err) +- `TERO_MAX_WORKERS` - Override number of I/O worker threads (default: 1, or + value from config file). Higher values improve throughput on multi-core + systems. ## Design Principles diff --git a/bench/scaling/generate-policies.sh b/bench/scaling/generate-policies.sh index f6e5738..bfbf2ec 100755 --- a/bench/scaling/generate-policies.sh +++ b/bench/scaling/generate-policies.sh @@ -222,14 +222,12 @@ receivers: datadog: endpoint: 127.0.0.1:4319 read_timeout: 60s - -processors: - batch: {} YAML_HEAD - local log_processors="batch" - local metric_processors="batch" - local trace_processors="batch" + local log_processors="" + local metric_processors="" + local trace_processors="" + local has_processors=false if [[ $count -gt 0 ]]; then # Split: ~34% logs, ~33% metrics, ~33% traces @@ -242,14 +240,22 @@ YAML_HEAD local log_transform_count=$((log_count - log_drop_count)) # Build processor lists based on what we actually have - local log_proc_list="batch" - local metric_proc_list="batch" - local trace_proc_list="batch" + local log_proc_list="" + local metric_proc_list="" + local trace_proc_list="" # Filter processor for logs - drop rules (only if we have drops) if [[ $log_drop_count -gt 0 ]]; then - log_proc_list="$log_proc_list, filter/logs" - echo "" + if [[ -z "$log_proc_list" ]]; then + log_proc_list="filter/logs" + else + log_proc_list="$log_proc_list, filter/logs" + fi + if [[ "$has_processors" == "false" ]]; then + echo "" + echo "processors:" + has_processors=true + fi echo " filter/logs:" echo " error_mode: ignore" echo " logs:" @@ -268,8 +274,16 @@ YAML_HEAD # Transform processor for logs - add attributes (only if we have transforms) if [[ $log_transform_count -gt 0 ]]; then - log_proc_list="$log_proc_list, transform/logs" - echo "" + if [[ -z "$log_proc_list" ]]; then + log_proc_list="transform/logs" + else + log_proc_list="$log_proc_list, transform/logs" + fi + if [[ "$has_processors" == "false" ]]; then + echo "" + echo "processors:" + has_processors=true + fi echo " transform/logs:" echo " error_mode: ignore" echo " log_statements:" @@ -286,8 +300,16 @@ YAML_HEAD # Filter processor for metrics - drop rules (only if we have metrics) if [[ $metric_count -gt 0 ]]; then - metric_proc_list="$metric_proc_list, filter/metrics" - echo "" + if [[ -z "$metric_proc_list" ]]; then + metric_proc_list="filter/metrics" + else + metric_proc_list="$metric_proc_list, filter/metrics" + fi + if [[ "$has_processors" == "false" ]]; then + echo "" + echo "processors:" + has_processors=true + fi echo " filter/metrics:" echo " error_mode: ignore" echo " metrics:" @@ -308,9 +330,17 @@ YAML_HEAD # otelcol's probabilistic_sampler applies a single sampling rate to all traces # We use the average of our sampling percentages for comparison if [[ $trace_count -gt 0 ]]; then - trace_proc_list="$trace_proc_list, probabilistic_sampler" + if [[ -z "$trace_proc_list" ]]; then + trace_proc_list="probabilistic_sampler" + else + trace_proc_list="$trace_proc_list, probabilistic_sampler" + fi + if [[ "$has_processors" == "false" ]]; then + echo "" + echo "processors:" + has_processors=true + fi # Use 50% as average sampling rate (matches our 10/25/50/75/100 distribution) - echo "" echo " probabilistic_sampler:" echo " sampling_percentage: 50" fi @@ -320,6 +350,12 @@ YAML_HEAD trace_processors="$trace_proc_list" fi + # If no processors were added, output empty processors block + if [[ "$has_processors" == "false" ]]; then + echo "" + echo "processors: {}" + fi + cat < "$config_file" <64KB are dynamically allocated. + // - thread_pool.count: Number of threads for request processing. These threads + // call the handler and make upstream requests. Keep moderate to avoid + // contention on the shared upstream HTTP client connection pool. + // - thread_pool.backlog: Queue size before rejecting requests. + // + // The upstream HTTP client has a mutex-protected connection pool. Too many + // handler threads cause contention when acquiring/releasing connections. + // Match thread_pool.count roughly to expected concurrent upstream connections. const server = try allocator.create(httpz.Server(*ServerContext)); errdefer allocator.destroy(server); server.* = try httpz.Server(*ServerContext).init(allocator, .{ @@ -315,6 +330,14 @@ pub const ProxyServer = struct { .request = .{ .max_body_size = max_body_size, }, + .workers = .{ + .count = @intCast(@max(1, worker_count)), + .large_buffer_size = 64 * 1024, // 64KB - avoid pre-allocating based on max_body_size + }, + .thread_pool = .{ + .count = 32, // Moderate count to reduce upstream connection pool contention + .backlog = 512, // Queue for burst handling + }, }, ctx); return .{ @@ -413,18 +436,34 @@ fn compressIfNeeded( }; } +/// Headers to skip when forwarding requests (case-insensitive hash lookup) +const skip_request_headers = std.StaticStringMapWithEql( + void, + std.static_string_map.eqlAsciiIgnoreCase, +).initComptime(.{ + .{ "host", {} }, + .{ "connection", {} }, + .{ "content-length", {} }, + .{ "transfer-encoding", {} }, +}); + +/// Headers to skip when forwarding responses (case-insensitive hash lookup) +const skip_response_headers = std.StaticStringMapWithEql( + void, + std.static_string_map.eqlAsciiIgnoreCase, +).initComptime(.{ + .{ "content-length", {} }, + .{ "transfer-encoding", {} }, +}); + /// Check if header should be skipped when forwarding request fn shouldSkipRequestHeader(name: []const u8) bool { - return std.ascii.eqlIgnoreCase(name, "host") or - std.ascii.eqlIgnoreCase(name, "connection") or - std.ascii.eqlIgnoreCase(name, "content-length") or - std.ascii.eqlIgnoreCase(name, "transfer-encoding"); + return skip_request_headers.has(name); } /// Check if header should be skipped when forwarding response fn shouldSkipResponseHeader(name: []const u8) bool { - return std.ascii.eqlIgnoreCase(name, "content-length") or - std.ascii.eqlIgnoreCase(name, "transfer-encoding"); + return skip_response_headers.has(name); } /// Build headers array in single pass @@ -601,19 +640,17 @@ fn proxyToUpstreamOnce( module: ProxyModule, body_to_send: []const u8, ) !usize { - // Build upstream URI using pre-allocated buffer - const upstream_uri_str = try ctx.upstreams.buildUpstreamUri( + // Build std.Uri directly from pre-parsed components (avoids Uri.parse overhead) + const uri = try ctx.upstreams.buildUri( module_id, req.url.path, req.url.query, ); - - const uri = try std.Uri.parse(upstream_uri_str); const method = toStdHttpMethod(toHttpMethod(req.method)); ctx.bus.debug(UpstreamRequest{ .method = @tagName(method), - .url = upstream_uri_str, + .uri = uri, }); // Get shared HTTP client from upstream manager (thread-safe connection pooling) diff --git a/src/proxy/upstream_client.zig b/src/proxy/upstream_client.zig index 2b9d9fc..3478d69 100644 --- a/src/proxy/upstream_client.zig +++ b/src/proxy/upstream_client.zig @@ -203,6 +203,58 @@ pub const UpstreamClientManager = struct { return fbs.getWritten(); } + + /// Build a std.Uri directly from pre-parsed components (zero parsing overhead). + /// The path is written into the pre-allocated uri_buffer. + /// This avoids the cost of std.Uri.parse() on every request. + pub fn buildUri( + self: *UpstreamClientManager, + module_id: ModuleId, + request_path: []const u8, + query_string: []const u8, + ) !std.Uri { + const idx = @intFromEnum(module_id); + const slice = self.upstreams.slice(); + + const scheme = slice.items(.scheme)[idx]; + const host = slice.items(.host)[idx]; + const port = slice.items(.port)[idx]; + const base_path = slice.items(.base_path)[idx]; + const uri_buffer = slice.items(.uri_buffer)[idx]; + + // Build combined path into buffer + var fbs = std.io.fixedBufferStream(uri_buffer); + const writer = fbs.writer(); + + // Add base path if present and not just "/" + if (base_path.len > 0 and !std.mem.eql(u8, base_path, "/")) { + try writer.writeAll(base_path); + } + + // Add separator if needed between base_path and request_path + if (request_path.len > 0) { + const needs_separator = (base_path.len == 0 or base_path[base_path.len - 1] != '/') and + request_path[0] != '/'; + if (needs_separator) { + try writer.writeAll("/"); + } + try writer.writeAll(request_path); + } + + const combined_path = fbs.getWritten(); + + // Build Uri struct directly without parsing + return .{ + .scheme = scheme, + .host = .{ .raw = host }, + .port = port, + .path = .{ .raw = if (combined_path.len > 0) combined_path else "/" }, + .query = if (query_string.len > 0) .{ .raw = query_string } else null, + .fragment = null, + .user = null, + .password = null, + }; + } }; // ============================================================================= @@ -307,3 +359,54 @@ test "UpstreamClientManager multiple upstreams" { try std.testing.expectEqualStrings("api1.example.com", config0.host); try std.testing.expectEqualStrings("api2.example.com", config1.host); } + +test "UpstreamClientManager buildUri constructs Uri without parsing" { + const allocator = std.testing.allocator; + + var manager = UpstreamClientManager.init(allocator); + defer manager.deinit(); + + const module_id = try manager.createUpstream( + "https://api.example.com/v2", + 2048, + 1024, + 1024, + ); + + // Test basic path + const uri1 = try manager.buildUri(module_id, "/logs", ""); + try std.testing.expectEqualStrings("https", uri1.scheme); + try std.testing.expectEqualStrings("api.example.com", uri1.host.?.raw); + try std.testing.expectEqual(@as(u16, 443), uri1.port.?); + try std.testing.expectEqualStrings("/v2/logs", uri1.path.raw); + try std.testing.expectEqual(@as(?std.Uri.Component, null), uri1.query); + + // Test with query string + const uri2 = try manager.buildUri(module_id, "/logs", "api_key=xxx"); + try std.testing.expectEqualStrings("/v2/logs", uri2.path.raw); + try std.testing.expectEqualStrings("api_key=xxx", uri2.query.?.raw); + + // Test empty path uses root + const uri3 = try manager.buildUri(module_id, "", ""); + try std.testing.expectEqualStrings("/v2", uri3.path.raw); +} + +test "UpstreamClientManager buildUri with non-standard port" { + const allocator = std.testing.allocator; + + var manager = UpstreamClientManager.init(allocator); + defer manager.deinit(); + + const module_id = try manager.createUpstream( + "http://localhost:9999", + 2048, + 1024, + 1024, + ); + + const uri = try manager.buildUri(module_id, "/test", ""); + try std.testing.expectEqualStrings("http", uri.scheme); + try std.testing.expectEqualStrings("localhost", uri.host.?.raw); + try std.testing.expectEqual(@as(u16, 9999), uri.port.?); + try std.testing.expectEqualStrings("/test", uri.path.raw); +}