Skip to content

Commit ce82b57

Browse files
authored
Accept endpoint_id and path_params from client (#151)
1 parent bf0884d commit ce82b57

File tree

4 files changed

+87
-7
lines changed

4 files changed

+87
-7
lines changed

elastic_transport/_async_transport.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def __init__(
173173
# sniffing. Uses '_sniffing_task' instead.
174174
self._sniffing_lock = None # type: ignore[assignment]
175175

176-
async def perform_request( # type: ignore[override,return]
176+
async def perform_request( # type: ignore[override]
177177
self,
178178
method: str,
179179
target: str,
@@ -185,6 +185,8 @@ async def perform_request( # type: ignore[override,return]
185185
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
186186
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
187187
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
188+
endpoint_id: Union[str, DefaultType] = DEFAULT,
189+
path_parts: Union[Mapping[str, str], DefaultType] = DEFAULT,
188190
) -> TransportApiResponse:
189191
"""
190192
Perform the actual request. Retrieve a node from the node
@@ -208,8 +210,42 @@ async def perform_request( # type: ignore[override,return]
208210
:arg retry_on_timeout: Set to true to retry after timeout errors.
209211
:arg request_timeout: Amount of time to wait for a response to fail with a timeout error.
210212
:arg client_meta: Extra client metadata key-value pairs to send in the client meta header.
213+
:arg endpoint_id: The endpoint id of the request, such as `ml.close_job`.
214+
Used for OpenTelemetry instrumentation.
215+
:arg path_paths: Dictionary with all dynamic value in the url path.
216+
Used for OpenTelemetry instrumentation.
211217
:returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response.
212218
"""
219+
with self.otel.span(
220+
method,
221+
endpoint_id=resolve_default(endpoint_id, None),
222+
path_parts=resolve_default(path_parts, {}),
223+
):
224+
return await self._perform_request(
225+
method,
226+
target,
227+
body=body,
228+
headers=headers,
229+
max_retries=max_retries,
230+
retry_on_status=retry_on_status,
231+
retry_on_timeout=retry_on_timeout,
232+
request_timeout=request_timeout,
233+
client_meta=client_meta,
234+
)
235+
236+
async def _perform_request( # type: ignore[override,return]
237+
self,
238+
method: str,
239+
target: str,
240+
*,
241+
body: Optional[Any] = None,
242+
headers: Union[Mapping[str, Any], DefaultType] = DEFAULT,
243+
max_retries: Union[int, DefaultType] = DEFAULT,
244+
retry_on_status: Union[Collection[int], DefaultType] = DEFAULT,
245+
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
246+
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
247+
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
248+
) -> TransportApiResponse:
213249
await self._async_call()
214250

215251
if headers is DEFAULT:

elastic_transport/_otel.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import contextlib
2121
import os
22-
import typing
22+
from typing import Generator, Mapping, Optional
2323

2424
try:
2525
from opentelemetry import trace
@@ -40,13 +40,21 @@ def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = No
4040
self.enabled = enabled and self.tracer is not None
4141

4242
@contextlib.contextmanager
43-
def span(self, method: str) -> typing.Generator[None, None, None]:
43+
def span(
44+
self,
45+
method: str,
46+
*,
47+
endpoint_id: Optional[str],
48+
path_parts: Mapping[str, str],
49+
) -> Generator[None, None, None]:
4450
if not self.enabled or self.tracer is None:
4551
yield
4652
return
4753

48-
span_name = method
54+
span_name = endpoint_id or method
4955
with self.tracer.start_as_current_span(span_name) as span:
5056
span.set_attribute("http.request.method", method)
5157
span.set_attribute("db.system", "elasticsearch")
58+
for key, value in path_parts.items():
59+
span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
5260
yield

elastic_transport/_transport.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ def perform_request(
268268
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
269269
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
270270
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
271+
endpoint_id: Union[str, DefaultType] = DEFAULT,
272+
path_parts: Union[Mapping[str, str], DefaultType] = DEFAULT,
271273
) -> TransportApiResponse:
272274
"""
273275
Perform the actual request. Retrieve a node from the node
@@ -291,9 +293,17 @@ def perform_request(
291293
:arg retry_on_timeout: Set to true to retry after timeout errors.
292294
:arg request_timeout: Amount of time to wait for a response to fail with a timeout error.
293295
:arg client_meta: Extra client metadata key-value pairs to send in the client meta header.
296+
:arg endpoint_id: The endpoint id of the request, such as `ml.close_job`.
297+
Used for OpenTelemetry instrumentation.
298+
:arg path_paths: Dictionary with all dynamic value in the url path.
299+
Used for OpenTelemetry instrumentation.
294300
:returns: Tuple of the :class:`elastic_transport.ApiResponseMeta` with the deserialized response.
295301
"""
296-
with self.otel.span(method):
302+
with self.otel.span(
303+
method,
304+
endpoint_id=resolve_default(endpoint_id, None),
305+
path_parts=resolve_default(path_parts, {}),
306+
):
297307
return self._perform_request(
298308
method,
299309
target,

tests/test_otel.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,46 @@
2222
from elastic_transport._otel import OpenTelemetry
2323

2424

25-
def test_span():
25+
def setup_tracing():
2626
tracer_provider = TracerProvider()
2727
memory_exporter = InMemorySpanExporter()
2828
span_processor = export.SimpleSpanProcessor(memory_exporter)
2929
tracer_provider.add_span_processor(span_processor)
3030
tracer = tracer_provider.get_tracer(__name__)
3131

32+
return tracer, memory_exporter
33+
34+
35+
def test_minimal_span():
36+
tracer, memory_exporter = setup_tracing()
37+
38+
otel = OpenTelemetry(enabled=True, tracer=tracer)
39+
with otel.span("GET", endpoint_id=None, path_parts={}):
40+
pass
41+
42+
spans = memory_exporter.get_finished_spans()
43+
assert len(spans) == 1
44+
assert spans[0].name == "GET"
45+
assert spans[0].attributes == {
46+
"http.request.method": "GET",
47+
"db.system": "elasticsearch",
48+
}
49+
50+
51+
def test_detailed_span():
52+
tracer, memory_exporter = setup_tracing()
3253
otel = OpenTelemetry(enabled=True, tracer=tracer)
33-
with otel.span("GET"):
54+
with otel.span(
55+
"GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"}
56+
):
3457
pass
3558

3659
spans = memory_exporter.get_finished_spans()
3760
assert len(spans) == 1
61+
assert spans[0].name == "ml.close_job"
3862
assert spans[0].attributes == {
3963
"http.request.method": "GET",
4064
"db.system": "elasticsearch",
65+
"db.elasticsearch.path_parts.job_id": "my-job",
66+
"db.elasticsearch.path_parts.foo": "bar",
4167
}

0 commit comments

Comments
 (0)