diff --git a/elastic_transport/__init__.py b/elastic_transport/__init__.py index ca3b4e8..fc7114c 100644 --- a/elastic_transport/__init__.py +++ b/elastic_transport/__init__.py @@ -41,6 +41,7 @@ Urllib3HttpNode, ) from ._node_pool import NodePool, NodeSelector, RandomSelector, RoundRobinSelector +from ._otel import OpenTelemetrySpan from ._response import ApiResponse as ApiResponse from ._response import BinaryApiResponse as BinaryApiResponse from ._response import HeadApiResponse as HeadApiResponse @@ -79,6 +80,7 @@ "NodePool", "NodeSelector", "ObjectApiResponse", + "OpenTelemetrySpan", "RandomSelector", "RequestsHttpNode", "RoundRobinSelector", diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 6e10a36..1e66f74 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -174,7 +174,7 @@ def __init__( # sniffing. Uses '_sniffing_task' instead. self._sniffing_lock = None # type: ignore[assignment] - async def perform_request( # type: ignore[override] + async def perform_request( # type: ignore[override, return] self, method: str, target: str, @@ -186,8 +186,7 @@ async def perform_request( # type: ignore[override] retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, - endpoint_id: Optional[str] = None, - path_parts: Optional[Mapping[str, str]] = None, + otel_span: Union[OpenTelemetrySpan, DefaultType] = DEFAULT, ) -> TransportApiResponse: """ Perform the actual request. Retrieve a node from the node @@ -211,47 +210,9 @@ async def perform_request( # type: ignore[override] :arg retry_on_timeout: Set to true to retry after timeout errors. :arg request_timeout: Amount of time to wait for a response to fail with a timeout error. :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. - :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. - Used for OpenTelemetry instrumentation. - :arg path_paths: Dictionary with all dynamic value in the url path. - Used for OpenTelemetry instrumentation. + :arg otel_span: OpenTelemetry span used to add metadata to the span. :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """ - path_parts = path_parts if path_parts is not None else {} - with self.otel.span( - method, - endpoint_id=endpoint_id, - path_parts=path_parts, - ) as otel_span: - response = await self._perform_request( - method, - target, - body=body, - headers=headers, - max_retries=max_retries, - retry_on_status=retry_on_status, - retry_on_timeout=retry_on_timeout, - request_timeout=request_timeout, - client_meta=client_meta, - otel_span=otel_span, - ) - otel_span.set_elastic_cloud_metadata(response.meta.headers) - return response - - async def _perform_request( # type: ignore[override,return] - self, - method: str, - target: str, - *, - body: Optional[Any] = None, - headers: Union[Mapping[str, Any], DefaultType] = DEFAULT, - max_retries: Union[int, DefaultType] = DEFAULT, - retry_on_status: Union[Collection[int], DefaultType] = DEFAULT, - retry_on_timeout: Union[bool, DefaultType] = DEFAULT, - request_timeout: Union[Optional[float], DefaultType] = DEFAULT, - client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, - otel_span: OpenTelemetrySpan, - ) -> TransportApiResponse: await self._async_call() if headers is DEFAULT: @@ -261,6 +222,7 @@ async def _perform_request( # type: ignore[override,return] max_retries = resolve_default(max_retries, self.max_retries) retry_on_timeout = resolve_default(retry_on_timeout, self.retry_on_timeout) retry_on_status = resolve_default(retry_on_status, self.retry_on_status) + otel_span = resolve_default(otel_span, OpenTelemetrySpan(None)) if self.meta_header: request_headers["x-elastic-client-meta"] = ",".join( diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index 74543c3..92972a0 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -17,27 +17,13 @@ from __future__ import annotations -import contextlib -import os -from typing import Generator, Mapping, Optional +from typing import TYPE_CHECKING, Mapping -try: - from opentelemetry import trace - from opentelemetry.trace import Span - - _tracer: trace.Tracer | None = trace.get_tracer("elastic-transport") -except ModuleNotFoundError: - _tracer = None +if TYPE_CHECKING: + from typing import Literal + from opentelemetry.trace import Span -# Valid values for the enabled config are 'true' and 'false'. Default is 'true'. -ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED" -# Describes how to handle search queries in the request body when assigned to -# a span attribute. -# Valid values are 'omit' and 'raw'. -# Default is 'omit' as 'raw' has security implications. -BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY" -DEFAULT_BODY_STRATEGY = "omit" # A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in # the request body may be captured for these endpoints, depending on the body capture strategy. @@ -57,9 +43,10 @@ class OpenTelemetrySpan: def __init__( self, - otel_span: Optional[Span], - endpoint_id: Optional[str] = None, - body_strategy: Optional[str] = None, + otel_span: Span | None, + endpoint_id: str | None = None, + # TODO import Literal at the top-level when dropping Python 3.7 + body_strategy: 'Literal["omit", "raw"]' = "omit", ): self.otel_span = otel_span self.body_strategy = body_strategy @@ -97,50 +84,3 @@ def set_db_statement(self, serialized_body: bytes) -> None: self.otel_span.set_attribute( "db.statement", serialized_body.decode("utf-8") ) - - -class OpenTelemetry: - def __init__( - self, - enabled: bool | None = None, - tracer: trace.Tracer | None = None, - body_strategy: str | None = None, - ): - if enabled is None: - enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false" - self.tracer = tracer or _tracer - self.enabled = enabled and self.tracer is not None - - if body_strategy is not None: - self.body_strategy = body_strategy - else: - self.body_strategy = os.environ.get( - BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY - ) - - @contextlib.contextmanager - def span( - self, - method: str, - *, - endpoint_id: Optional[str], - path_parts: Mapping[str, str], - ) -> Generator[OpenTelemetrySpan, None, None]: - if not self.enabled or self.tracer is None: - yield OpenTelemetrySpan(None) - return - - span_name = endpoint_id or method - with self.tracer.start_as_current_span(span_name) as otel_span: - otel_span.set_attribute("http.request.method", method) - otel_span.set_attribute("db.system", "elasticsearch") - if endpoint_id is not None: - otel_span.set_attribute("db.operation", endpoint_id) - for key, value in path_parts.items(): - otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) - - yield OpenTelemetrySpan( - otel_span, - endpoint_id=endpoint_id, - body_strategy=self.body_strategy, - ) diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 274b403..bf1de58 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -60,7 +60,7 @@ Urllib3HttpNode, ) from ._node_pool import NodePool, NodeSelector -from ._otel import OpenTelemetry, OpenTelemetrySpan +from ._otel import OpenTelemetrySpan from ._serializer import DEFAULT_SERIALIZERS, Serializer, SerializerCollection from ._version import __version__ from .client_utils import client_meta_version, resolve_default @@ -226,9 +226,6 @@ def __init__( self.retry_on_status = retry_on_status self.retry_on_timeout = retry_on_timeout - # Instrumentation - self.otel = OpenTelemetry() - # Build the NodePool from all the options node_pool_kwargs: Dict[str, Any] = {} if node_selector_class is not None: @@ -256,7 +253,7 @@ def __init__( if sniff_on_start: self.sniff(True) - def perform_request( + def perform_request( # type: ignore[return] self, method: str, target: str, @@ -268,8 +265,7 @@ def perform_request( retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, - endpoint_id: Optional[str] = None, - path_parts: Optional[Mapping[str, str]] = None, + otel_span: Union[OpenTelemetrySpan, DefaultType] = DEFAULT, ) -> TransportApiResponse: """ Perform the actual request. Retrieve a node from the node @@ -293,47 +289,10 @@ def perform_request( :arg retry_on_timeout: Set to true to retry after timeout errors. :arg request_timeout: Amount of time to wait for a response to fail with a timeout error. :arg client_meta: Extra client metadata key-value pairs to send in the client meta header. - :arg endpoint_id: The endpoint id of the request, such as `ml.close_job`. - Used for OpenTelemetry instrumentation. - :arg path_paths: Dictionary with all dynamic value in the url path. - Used for OpenTelemetry instrumentation. + :arg otel_span: OpenTelemetry span used to add metadata to the span. + :returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response. """ - path_parts = path_parts if path_parts is not None else {} - with self.otel.span( - method, - endpoint_id=endpoint_id, - path_parts=path_parts, - ) as otel_span: - response = self._perform_request( - method, - target, - body=body, - headers=headers, - max_retries=max_retries, - retry_on_status=retry_on_status, - retry_on_timeout=retry_on_timeout, - request_timeout=request_timeout, - client_meta=client_meta, - otel_span=otel_span, - ) - otel_span.set_elastic_cloud_metadata(response.meta.headers) - return response - - def _perform_request( # type: ignore[return] - self, - method: str, - target: str, - *, - body: Optional[Any] = None, - headers: Union[Mapping[str, Any], DefaultType] = DEFAULT, - max_retries: Union[int, DefaultType] = DEFAULT, - retry_on_status: Union[Collection[int], DefaultType] = DEFAULT, - retry_on_timeout: Union[bool, DefaultType] = DEFAULT, - request_timeout: Union[Optional[float], DefaultType] = DEFAULT, - client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, - otel_span: OpenTelemetrySpan, - ) -> TransportApiResponse: if headers is DEFAULT: request_headers = HttpHeaders() else: @@ -341,6 +300,7 @@ def _perform_request( # type: ignore[return] max_retries = resolve_default(max_retries, self.max_retries) retry_on_timeout = resolve_default(retry_on_timeout, self.retry_on_timeout) retry_on_status = resolve_default(retry_on_status, self.retry_on_status) + otel_span = resolve_default(otel_span, OpenTelemetrySpan(None)) if self.meta_header: request_headers["x-elastic-client-meta"] = ",".join( diff --git a/tests/test_otel.py b/tests/test_otel.py index f5a7e1d..e1666ed 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -15,13 +15,11 @@ # specific language governing permissions and limitations # under the License. -import os - from opentelemetry.sdk.trace import TracerProvider, export from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from elastic_transport import JsonSerializer -from elastic_transport._otel import ENABLED_ENV_VAR, OpenTelemetry +from elastic_transport._otel import OpenTelemetrySpan def setup_tracing(): @@ -36,56 +34,31 @@ def setup_tracing(): def test_no_span(): # With telemetry disabled, those calls should not raise - otel = OpenTelemetry(enabled=False) - with otel.span( - "GET", - endpoint_id="ml.open_job", - path_parts={"job_id": "my-job"}, - ) as span: - span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"})) - span.set_node_metadata( - "localhost", - 9200, - "http://localhost:9200/", - "_ml/anomaly_detectors/my-job/_open", - ) - span.set_elastic_cloud_metadata( - { - "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", - "X-Found-Handling-Instance": "instance-0000000001", - } - ) - - -def test_enabled(): - otel = OpenTelemetry() - assert otel.enabled == (os.environ.get(ENABLED_ENV_VAR, "false") != "false") - - -def test_minimal_span(): - tracer, memory_exporter = setup_tracing() - - otel = OpenTelemetry(enabled=True, tracer=tracer) - with otel.span("GET", endpoint_id=None, path_parts={}): - pass - - spans = memory_exporter.get_finished_spans() - assert len(spans) == 1 - assert spans[0].name == "GET" - assert spans[0].attributes == { - "http.request.method": "GET", - "db.system": "elasticsearch", - } + span = OpenTelemetrySpan(None) + span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"})) + span.set_node_metadata( + "localhost", + 9200, + "http://localhost:9200/", + "_ml/anomaly_detectors/my-job/_open", + ) + span.set_elastic_cloud_metadata( + { + "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", + "X-Found-Handling-Instance": "instance-0000000001", + } + ) def test_detailed_span(): tracer, memory_exporter = setup_tracing() - otel = OpenTelemetry(enabled=True, tracer=tracer) - with otel.span( - "GET", - endpoint_id="ml.open_job", - path_parts={"job_id": "my-job"}, - ) as span: + with tracer.start_as_current_span("ml.open_job") as otel_span: + span = OpenTelemetrySpan( + otel_span, + endpoint_id="my-job/_open", + body_strategy="omit", + ) + span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"})) span.set_node_metadata( "localhost", @@ -104,13 +77,9 @@ def test_detailed_span(): assert len(spans) == 1 assert spans[0].name == "ml.open_job" assert spans[0].attributes == { - "http.request.method": "GET", "url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open", "server.address": "localhost", "server.port": 9200, - "db.system": "elasticsearch", - "db.operation": "ml.open_job", - "db.elasticsearch.path_parts.job_id": "my-job", "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", "db.elasticsearch.node.name": "instance-0000000001", } @@ -118,16 +87,13 @@ def test_detailed_span(): def test_db_statement(): tracer, memory_exporter = setup_tracing() - otel = OpenTelemetry(enabled=True, tracer=tracer, body_strategy="raw") - with otel.span("GET", endpoint_id="search", path_parts={}) as span: + with tracer.start_as_current_span("search") as otel_span: + span = OpenTelemetrySpan(otel_span, endpoint_id="search", body_strategy="raw") span.set_db_statement(JsonSerializer().dumps({"query": {"match_all": {}}})) spans = memory_exporter.get_finished_spans() assert len(spans) == 1 assert spans[0].name == "search" assert spans[0].attributes == { - "http.request.method": "GET", - "db.system": "elasticsearch", - "db.operation": "search", "db.statement": '{"query":{"match_all":{}}}', }