From 209dfe4a698ce248002c55e6a7b8f55c37a22dbd Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 22 Feb 2024 15:53:51 +0400 Subject: [PATCH 1/4] Accept endpoint_id and path_params from client --- elastic_transport/_otel.py | 14 +++++++++++--- elastic_transport/_transport.py | 12 +++++++++++- tests/test_otel.py | 30 ++++++++++++++++++++++++++++-- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index 8f19ea3..1591c2c 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -19,7 +19,7 @@ import contextlib import os -import typing +from typing import Generator, Mapping, Optional try: from opentelemetry import trace @@ -40,13 +40,21 @@ def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = No self.enabled = enabled and self.tracer is not None @contextlib.contextmanager - def span(self, method: str) -> typing.Generator[None, None, None]: + def span( + self, + method: str, + *, + endpoint_id: Optional[str], + path_parts: Mapping[str, str], + ) -> Generator[None, None, None]: if not self.enabled or self.tracer is None: yield return - span_name = method + span_name = endpoint_id or method with self.tracer.start_as_current_span(span_name) as span: span.set_attribute("http.request.method", method) span.set_attribute("db.system", "elasticsearch") + for key, value in path_parts.items(): + span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) yield diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 584f809..37f6c9b 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -268,6 +268,8 @@ def perform_request( retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, + endpoint_id: Union[str, DefaultType] = DEFAULT, + path_parts: Union[Mapping[str, str], DefaultType] = DEFAULT, ) -> TransportApiResponse: """ Perform the actual request. Retrieve a node from the node @@ -291,9 +293,17 @@ def perform_request( :arg retry_on_timeout: Set to true to retry after timeout errors. :arg request_timeout: Amount of time to wait for a response to fail with a timeout error. :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. + :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. + Used for OpenTelemetry instrumentation. + :arg path_parths: Dictionary with all dynamic value in the url path. + Used for OpenTelemetry instrumentation. :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """ - with self.otel.span(method): + with self.otel.span( + method, + endpoint_id=resolve_default(endpoint_id, None), + path_parts=resolve_default(path_parts, {}), + ): return self._perform_request( method, target, diff --git a/tests/test_otel.py b/tests/test_otel.py index 44fecfb..f65ba50 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -22,20 +22,46 @@ from elastic_transport._otel import OpenTelemetry -def test_span(): +def setup_tracing(): tracer_provider = TracerProvider() memory_exporter = InMemorySpanExporter() span_processor = export.SimpleSpanProcessor(memory_exporter) tracer_provider.add_span_processor(span_processor) tracer = tracer_provider.get_tracer(__name__) + return tracer, memory_exporter + + +def test_minimal_span(): + tracer, memory_exporter = setup_tracing() + + otel = OpenTelemetry(enabled=True, tracer=tracer) + with otel.span("GET", path_parts={}): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "GET" + assert spans[0].attributes == { + "http.request.method": "GET", + "db.system": "elasticsearch", + } + + +def test_detailed_span(): + tracer, memory_exporter = setup_tracing() otel = OpenTelemetry(enabled=True, tracer=tracer) - with otel.span("GET"): + with otel.span( + "GET", "ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"} + ): pass spans = memory_exporter.get_finished_spans() assert len(spans) == 1 + assert spans[0].name == "ml.close_job" assert spans[0].attributes == { "http.request.method": "GET", "db.system": "elasticsearch", + "db.elasticsearch.path_parts.job_id": "my-job", + "db.elasticsearch.path_parts.foo": "bar", } From d64be3fd8b87419d4adcc66395bafba7a383880d Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 22 Feb 2024 16:16:49 +0400 Subject: [PATCH 2/4] Support OpenTelemetry in AsyncTransport --- elastic_transport/_async_transport.py | 38 ++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 67db393..8913fd2 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -173,7 +173,7 @@ def __init__( # sniffing. Uses '_sniffing_task' instead. self._sniffing_lock = None # type: ignore[assignment] - async def perform_request( # type: ignore[override,return] + async def perform_request( # type: ignore[override] self, method: str, target: str, @@ -185,6 +185,8 @@ async def perform_request( # type: ignore[override,return] retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, + endpoint_id: Union[str, DefaultType] = DEFAULT, + path_parts: Union[Mapping[str, str], DefaultType] = DEFAULT, ) -> TransportApiResponse: """ Perform the actual request. Retrieve a node from the node @@ -208,8 +210,42 @@ async def perform_request( # type: ignore[override,return] :arg retry_on_timeout: Set to true to retry after timeout errors. :arg request_timeout: Amount of time to wait for a response to fail with a timeout error. :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. + :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. + Used for OpenTelemetry instrumentation. + :arg path_parths: Dictionary with all dynamic value in the url path. + Used for OpenTelemetry instrumentation. :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """ + with self.otel.span( + method, + endpoint_id=resolve_default(endpoint_id, None), + path_parts=resolve_default(path_parts, {}), + ): + return await self._perform_request( + method, + target, + body=body, + headers=headers, + max_retries=max_retries, + retry_on_status=retry_on_status, + retry_on_timeout=retry_on_timeout, + request_timeout=request_timeout, + client_meta=client_meta, + ) + + async def _perform_request( # type: ignore[override,return] + self, + method: str, + target: str, + *, + body: Optional[Any] = None, + headers: Union[Mapping[str, Any], DefaultType] = DEFAULT, + max_retries: Union[int, DefaultType] = DEFAULT, + retry_on_status: Union[Collection[int], DefaultType] = DEFAULT, + retry_on_timeout: Union[bool, DefaultType] = DEFAULT, + request_timeout: Union[Optional[float], DefaultType] = DEFAULT, + client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, + ) -> TransportApiResponse: await self._async_call() if headers is DEFAULT: From fcf4127e6e08fbd29a8fda6758e167ea3fce8882 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 22 Feb 2024 16:28:30 +0400 Subject: [PATCH 3/4] Fix otel unit tests --- tests/test_otel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_otel.py b/tests/test_otel.py index f65ba50..ed2ba91 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -36,7 +36,7 @@ def test_minimal_span(): tracer, memory_exporter = setup_tracing() otel = OpenTelemetry(enabled=True, tracer=tracer) - with otel.span("GET", path_parts={}): + with otel.span("GET", endpoint_id=None, path_parts={}): pass spans = memory_exporter.get_finished_spans() @@ -52,7 +52,7 @@ def test_detailed_span(): tracer, memory_exporter = setup_tracing() otel = OpenTelemetry(enabled=True, tracer=tracer) with otel.span( - "GET", "ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"} + "GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"} ): pass From bd6ae62550806e96691022def9a07aa929bf3386 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Fri, 23 Feb 2024 13:10:55 +0400 Subject: [PATCH 4/4] Fix path_parts typo --- elastic_transport/_async_transport.py | 2 +- elastic_transport/_transport.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 8913fd2..7c20e7b 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -212,7 +212,7 @@ async def perform_request( # type: ignore[override] :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. Used for OpenTelemetry instrumentation. - :arg path_parths: Dictionary with all dynamic value in the url path. + :arg path_paths: Dictionary with all dynamic value in the url path. Used for OpenTelemetry instrumentation. :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """ diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 37f6c9b..23afadc 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -295,7 +295,7 @@ def perform_request( :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. Used for OpenTelemetry instrumentation. - :arg path_parths: Dictionary with all dynamic value in the url path. + :arg path_paths: Dictionary with all dynamic value in the url path. Used for OpenTelemetry instrumentation. :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """