diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index e5d991d2..f2f327f1 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -14,11 +14,8 @@ from instana.log import logger from instana.propagators.format import Format from instana.singletons import get_tracer - from instana.util.traceutils import ( - get_tracer_tuple, - tracing_is_off, - ) from instana.span.span import InstanaSpan + from instana.util.traceutils import get_tracer_tuple, tracing_is_off consumer_token = None consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span") @@ -69,16 +66,20 @@ def trace_kafka_produce( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + + # Get the topic from either args or kwargs + topic = args[0] if args else kwargs.get("topic", "") + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( "kafka", "produce", - args[0], + topic, ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: - span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.service", topic) span.set_attribute("kafka.access", "produce") # context propagation @@ -89,6 +90,10 @@ def trace_kafka_produce( # dictionary. To maintain compatibility with the headers for the # Kafka Python library, we will use a list of tuples. headers = args[6] if len(args) > 6 else kwargs.get("headers", []) + + # Initialize headers if it's None + if headers is None: + headers = [] suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"} headers.append(suppression_header) diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index 3b1423d3..307b7d52 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -14,11 +14,8 @@ from instana.log import logger from instana.propagators.format import Format from instana.singletons import get_tracer - from instana.util.traceutils import ( - get_tracer_tuple, - tracing_is_off, - ) from instana.span.span import InstanaSpan + from instana.util.traceutils import get_tracer_tuple, tracing_is_off if TYPE_CHECKING: from kafka.producer.future import FutureRecordMetadata @@ -38,15 +35,19 @@ def trace_kafka_send( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + + # Get the topic from either args or kwargs + topic = args[0] if args else kwargs.get("topic", "") + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( "kafka", "send", - args[0], + topic, ) with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: - span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.service", topic) span.set_attribute("kafka.access", "send") # context propagation diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 61f31bce..a5c9b334 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -5,30 +5,26 @@ from typing import Generator import pytest -from confluent_kafka import ( - Consumer, - KafkaException, - Producer, -) +from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient, NewTopic -from mock import patch, Mock +from mock import Mock, patch from opentelemetry.trace import SpanKind from opentelemetry.trace.span import format_span_id from instana.configurator import config -from instana.options import StandardOptions -from instana.singletons import agent, tracer -from instana.util.config import parse_ignored_endpoints_from_yaml -from tests.helpers import get_first_span_by_filter, testenv from instana.instrumentation.kafka import confluent_kafka_python from instana.instrumentation.kafka.confluent_kafka_python import ( clear_context, - save_consumer_span_into_context, close_consumer_span, - trace_kafka_close, consumer_span, + save_consumer_span_into_context, + trace_kafka_close, ) +from instana.options import StandardOptions +from instana.singletons import agent, tracer from instana.span.span import InstanaSpan +from instana.util.config import parse_ignored_endpoints_from_yaml +from tests.helpers import get_first_span_by_filter, testenv class TestConfluentKafka: @@ -120,6 +116,66 @@ def test_trace_confluent_kafka_produce(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "produce" + def test_trace_confluent_kafka_produce_with_keyword_topic(self) -> None: + """Test that tracing works when topic is passed as a keyword argument.""" + with tracer.start_as_current_span("test"): + # Pass topic as a keyword argument + self.producer.produce(topic=testenv["kafka_topic"], value=b"raw_bytes") + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "produce" + + def test_trace_confluent_kafka_produce_with_keyword_args(self) -> None: + """Test that tracing works when both topic and headers are passed as keyword arguments.""" + with tracer.start_as_current_span("test"): + # Pass both topic and headers as keyword arguments + self.producer.produce( + topic=testenv["kafka_topic"], + value=b"raw_bytes", + headers=[("custom-header", b"header-value")], + ) + self.producer.flush(timeout=10) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "produce" + def test_trace_confluent_kafka_consume(self) -> None: agent.options.set_trace_configurations() # Produce some events diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index eb3723e3..a1d0ccbb 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -12,19 +12,18 @@ from opentelemetry.trace.span import format_span_id from instana.configurator import config -from instana.options import StandardOptions -from instana.singletons import agent, tracer -from instana.util.config import parse_ignored_endpoints_from_yaml -from tests.helpers import get_first_span_by_filter, testenv - from instana.instrumentation.kafka import kafka_python from instana.instrumentation.kafka.kafka_python import ( clear_context, - save_consumer_span_into_context, close_consumer_span, consumer_span, + save_consumer_span_into_context, ) +from instana.options import StandardOptions +from instana.singletons import agent, tracer from instana.span.span import InstanaSpan +from instana.util.config import parse_ignored_endpoints_from_yaml +from tests.helpers import get_first_span_by_filter, testenv class TestKafkaPython: @@ -122,6 +121,70 @@ def test_trace_kafka_python_send(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "send" + def test_trace_kafka_python_send_with_keyword_topic(self) -> None: + """Test that tracing works when topic is passed as a keyword argument.""" + with tracer.start_as_current_span("test"): + # Pass topic as a keyword argument + future = self.producer.send( + topic=testenv["kafka_topic"], value=b"raw_bytes" + ) + + _ = future.get(timeout=10) # noqa: F841 + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "send" + + def test_trace_kafka_python_send_with_keyword_args(self) -> None: + """Test that tracing works when both topic and headers are passed as keyword arguments.""" + with tracer.start_as_current_span("test"): + # Pass both topic and headers as keyword arguments + future = self.producer.send( + topic=testenv["kafka_topic"], + value=b"raw_bytes", + headers=[("custom-header", b"header-value")], + ) + + _ = future.get(timeout=10) # noqa: F841 + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "send" + def test_trace_kafka_python_consume(self) -> None: # Produce some events self.producer.send(testenv["kafka_topic"], b"raw_bytes1")