From 1f0cde045058c15c05b98b9daa4827d587ad68b8 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Tue, 28 Oct 2025 16:37:25 +0100 Subject: [PATCH 1/2] fix: IndexError in the `confluent_kafka_python.py` Fixed IndexError in `confluent_kafka_python.py` by handling both positional and keyword arguments for the topic parameter in the `trace_kafka_produce` function. The issue occurred when the topic was passed as a keyword argument, resulting in an empty args tuple and causing an IndexError when trying to access `args[0]`. The solution: 1. Modified the `trace_kafka_produce` function to get the topic from either `args` or `kwargs` 2. Added safety checks to handle edge cases 3. Added two new test methods to verify the fix works with different argument patterns: - `test_trace_confluent_kafka_produce_with_keyword_topic` - `test_trace_confluent_kafka_produce_with_keyword_args` This fix ensures that the Kafka instrumentation works correctly regardless of how the `produce` method is called, improving the robustness of the Python sensor. Signed-off-by: Paulo Vital --- .../kafka/confluent_kafka_python.py | 17 ++-- tests/clients/kafka/test_confluent_kafka.py | 80 ++++++++++++++++--- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index e5d991d2..f2f327f1 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -14,11 +14,8 @@ from instana.log import logger from instana.propagators.format import Format from instana.singletons import get_tracer - from instana.util.traceutils import ( - get_tracer_tuple, - tracing_is_off, - ) from instana.span.span import InstanaSpan + from instana.util.traceutils import get_tracer_tuple, tracing_is_off consumer_token = None consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span") @@ -69,16 +66,20 @@ def trace_kafka_produce( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + + # Get the topic from either args or kwargs + topic = args[0] if args else kwargs.get("topic", "") + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( "kafka", "produce", - args[0], + topic, ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: - span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.service", topic) span.set_attribute("kafka.access", "produce") # context propagation @@ -89,6 +90,10 @@ def trace_kafka_produce( # dictionary. To maintain compatibility with the headers for the # Kafka Python library, we will use a list of tuples. headers = args[6] if len(args) > 6 else kwargs.get("headers", []) + + # Initialize headers if it's None + if headers is None: + headers = [] suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"} headers.append(suppression_header) diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 61f31bce..a5c9b334 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -5,30 +5,26 @@ from typing import Generator import pytest -from confluent_kafka import ( - Consumer, - KafkaException, - Producer, -) +from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient, NewTopic -from mock import patch, Mock +from mock import Mock, patch from opentelemetry.trace import SpanKind from opentelemetry.trace.span import format_span_id from instana.configurator import config -from instana.options import StandardOptions -from instana.singletons import agent, tracer -from instana.util.config import parse_ignored_endpoints_from_yaml -from tests.helpers import get_first_span_by_filter, testenv from instana.instrumentation.kafka import confluent_kafka_python from instana.instrumentation.kafka.confluent_kafka_python import ( clear_context, - save_consumer_span_into_context, close_consumer_span, - trace_kafka_close, consumer_span, + save_consumer_span_into_context, + trace_kafka_close, ) +from instana.options import StandardOptions +from instana.singletons import agent, tracer from instana.span.span import InstanaSpan +from instana.util.config import parse_ignored_endpoints_from_yaml +from tests.helpers import get_first_span_by_filter, testenv class TestConfluentKafka: @@ -120,6 +116,66 @@ def test_trace_confluent_kafka_produce(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "produce" + def test_trace_confluent_kafka_produce_with_keyword_topic(self) -> None: + """Test that tracing works when topic is passed as a keyword argument.""" + with tracer.start_as_current_span("test"): + # Pass topic as a keyword argument + self.producer.produce(topic=testenv["kafka_topic"], value=b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "produce" + + def test_trace_confluent_kafka_produce_with_keyword_args(self) -> None: + """Test that tracing works when both topic and headers are passed as keyword arguments.""" + with tracer.start_as_current_span("test"): + # Pass both topic and headers as keyword arguments + self.producer.produce( + topic=testenv["kafka_topic"], + value=b"raw_bytes", + headers=[("custom-header", b"header-value")], + ) + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "produce" + def test_trace_confluent_kafka_consume(self) -> None: agent.options.set_trace_configurations() # Produce some events From ee1db84541e33227e043f8fc15988006b19889c2 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Tue, 28 Oct 2025 16:49:16 +0100 Subject: [PATCH 2/2] fix: IndexError in the `kafka_python.py` Fixed potential IndexError in `kafka_python.py` by handling both positional and keyword arguments for the topic parameter in the `trace_kafka_send` function. The issue is similar to the one fixed in `confluent_kafka_python.py`, where an IndexError could occur when the topic was passed as a keyword argument, resulting in an empty `args` tuple. The solution: 1. Modified the `trace_kafka_send` function to get the topic from either `args` or `kwargs` 2. Added safety checks to handle edge cases 3. Added two new test methods to verify the fix works with different argument patterns: - `test_trace_kafka_python_send_with_keyword_topic` - `test_trace_kafka_python_send_with_keyword_args` This fix ensures that the Kafka instrumentation works correctly regardless of how the `send` method is called, improving the robustness of the Python sensor. Signed-off-by: Paulo Vital --- .../instrumentation/kafka/kafka_python.py | 13 ++-- tests/clients/kafka/test_kafka_python.py | 75 +++++++++++++++++-- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index 3b1423d3..307b7d52 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -14,11 +14,8 @@ from instana.log import logger from instana.propagators.format import Format from instana.singletons import get_tracer - from instana.util.traceutils import ( - get_tracer_tuple, - tracing_is_off, - ) from instana.span.span import InstanaSpan + from instana.util.traceutils import get_tracer_tuple, tracing_is_off if TYPE_CHECKING: from kafka.producer.future import FutureRecordMetadata @@ -38,15 +35,19 @@ def trace_kafka_send( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + + # Get the topic from either args or kwargs + topic = args[0] if args else kwargs.get("topic", "") + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( "kafka", "send", - args[0], + topic, ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: - span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.service", topic) span.set_attribute("kafka.access", "send") # context propagation diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index eb3723e3..a1d0ccbb 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -12,19 +12,18 @@ from opentelemetry.trace.span import format_span_id from instana.configurator import config -from instana.options import StandardOptions -from instana.singletons import agent, tracer -from instana.util.config import parse_ignored_endpoints_from_yaml -from tests.helpers import get_first_span_by_filter, testenv - from instana.instrumentation.kafka import kafka_python from instana.instrumentation.kafka.kafka_python import ( clear_context, - save_consumer_span_into_context, close_consumer_span, consumer_span, + save_consumer_span_into_context, ) +from instana.options import StandardOptions +from instana.singletons import agent, tracer from instana.span.span import InstanaSpan +from instana.util.config import parse_ignored_endpoints_from_yaml +from tests.helpers import get_first_span_by_filter, testenv class TestKafkaPython: @@ -122,6 +121,70 @@ def test_trace_kafka_python_send(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "send" + def test_trace_kafka_python_send_with_keyword_topic(self) -> None: + """Test that tracing works when topic is passed as a keyword argument.""" + with tracer.start_as_current_span("test"): + # Pass topic as a keyword argument + future = self.producer.send( + topic=testenv["kafka_topic"], value=b"raw_bytes" + ) + + _ = future.get(timeout=10) # noqa: F841 + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "send" + + def test_trace_kafka_python_send_with_keyword_args(self) -> None: + """Test that tracing works when both topic and headers are passed as keyword arguments.""" + with tracer.start_as_current_span("test"): + # Pass both topic and headers as keyword arguments + future = self.producer.send( + topic=testenv["kafka_topic"], + value=b"raw_bytes", + headers=[("custom-header", b"header-value")], + ) + + _ = future.get(timeout=10) # noqa: F841 + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "send" + def test_trace_kafka_python_consume(self) -> None: # Produce some events self.producer.send(testenv["kafka_topic"], b"raw_bytes1")