Skip to content

Commit

Permalink
feat: Add instrumentation to kafka-python.
Browse files Browse the repository at this point in the history
Signed-off-by: Paulo Vital <[email protected]>
  • Loading branch information
pvital committed Feb 20, 2025
1 parent d1e45ab commit 65ecb98
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 11 deletions.
63 changes: 63 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -164,6 +173,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -192,6 +210,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -221,6 +248,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -250,6 +286,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -293,6 +338,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down Expand Up @@ -322,6 +376,15 @@ jobs:
environment:
PUBSUB_EMULATOR_HOST: 0.0.0.0:8681
PUBSUB_PROJECT1: test-project,test-topic
- image: public.ecr.aws/bitnami/kafka:3.9.0
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094
working_directory: ~/repo
steps:
- checkout
Expand Down
4 changes: 4 additions & 0 deletions .tekton/python-tracer-prepuller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ spec:
# public.ecr.aws/docker/library/postgres:16.2-bookworm
image: public.ecr.aws/docker/library/postgres@sha256:07572430dbcd821f9f978899c3ab3a727f5029be9298a41662e1b5404d5b73e0
command: ["sh", "-c", "'true'"]
- name: prepuller-kafka
# public.ecr.aws/bitnami/kafka:3.9.0
image: public.ecr.aws/docker/library/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be
command: ["sh", "-c", "'true'"]
- name: prepuller-38
# public.ecr.aws/docker/library/python:3.8.20-bookworm
image: public.ecr.aws/docker/library/python@
Expand Down
19 changes: 19 additions & 0 deletions .tekton/task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ spec:
- name: rabbitmq
# public.ecr.aws/docker/library/rabbitmq:3.13.0
image: public.ecr.aws/docker/library/rabbitmq@sha256:39de1a4fc6c72d12bd5dfa23e8576536fd1c0cc8418344cd5a51addfc9a1145d
- name: kafka
# public.ecr.aws/bitnami/kafka:3.9.0
image: public.ecr.aws/bitnami/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be
env:
- name: KAFKA_CFG_NODE_ID
value: 0
- name: KAFKA_CFG_PROCESS_ROLES
value: controller,broker
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: 0@kafka:9093
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094

params:
- name: imageDigest
type: string
Expand Down
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,17 @@ services:
ports:
- "8681:8681"
- "8682:8682"

kafka:
image: public.ecr.aws/bitnami/kafka:latest
ports:
- '9092:9092'
- '9094:9094'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
23 changes: 15 additions & 8 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,50 @@ def boot_agent() -> None:
asyncio, # noqa: F401
boto3_inst, # noqa: F401
cassandra_inst, # noqa: F401
celery, # noqa: F401
couchbase_inst, # noqa: F401
fastapi_inst, # noqa: F401
flask, # noqa: F401
# gevent_inst, # noqa: F401
grpcio, # noqa: F401
logging, # noqa: F401
mysqlclient, # noqa: F401
pika, # noqa: F401
pep0249, # noqa: F401
pika, # noqa: F401
psycopg2, # noqa: F401
pymongo, # noqa: F401
pymysql, # noqa: F401
pyramid, # noqa: F401
redis, # noqa: F401
sanic_inst, # noqa: F401
sqlalchemy, # noqa: F401
starlette_inst, # noqa: F401
sanic_inst, # noqa: F401
urllib3, # noqa: F401
)
from instana.instrumentation.aiohttp import (
client, # noqa: F401
server, # noqa: F401
client as aiohttp_client, # noqa: F401
)
from instana.instrumentation.aiohttp import (
server as aiohttp_server, # noqa: F401
)
from instana.instrumentation.aws import lambda_inst # noqa: F401
from instana.instrumentation import celery # noqa: F401
from instana.instrumentation.django import middleware # noqa: F401
from instana.instrumentation.google.cloud import (
pubsub, # noqa: F401
storage, # noqa: F401
)
from instana.instrumentation.kafka import (
kafka_python, # noqa: F401
)
from instana.instrumentation.tornado import (
client as tornado_client, # noqa: F401
)
from instana.instrumentation.tornado import (
client, # noqa: F401
server, # noqa: F401
server as tornado_server, # noqa: F401
)

# Hooks
from instana.hooks import hook_uwsgi, hook_gunicorn # noqa: F401
from instana.hooks import hook_gunicorn, hook_uwsgi # noqa: F401


if "INSTANA_DISABLE" not in os.environ:
Expand Down
1 change: 1 addition & 0 deletions src/instana/instrumentation/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# (c) Copyright IBM Corp. 2025
90 changes: 90 additions & 0 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# (c) Copyright IBM Corp. 2025

try:
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple

import kafka # noqa: F401
import wrapt
from opentelemetry.trace import SpanKind

from instana.log import logger
from instana.propagators.format import Format
from instana.util.traceutils import (
get_tracer_tuple,
tracing_is_off,
)

if TYPE_CHECKING:
from kafka.producer.future import FutureRecordMetadata

@wrapt.patch_function_wrapper("kafka", "KafkaProducer.send")
def trace_kafka_send(
wrapped: Callable[..., "kafka.KafkaProducer.send"],
instance: "kafka.KafkaProducer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

with tracer.start_as_current_span(
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
) as span:
span.set_attribute("kafka.service", args[0])
span.set_attribute("kafka.access", "send")

# context propagation
tracer.inject(
span.context,
Format.KAFKA_HEADERS,
kwargs.get("headers", {}),
disable_w3c_trace_context=True,
)

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
def trace_kafka_consume(
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
topic = list(instance.subscription())[0]
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "consume")

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

logger.debug("Instrumenting Kafka (kafka-python)")
except ImportError:
pass
Loading

0 comments on commit 65ecb98

Please sign in to comment.