diff --git a/docs/index.rst b/docs/index.rst
index 3e53104e..77efd8b3 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -26,6 +26,12 @@ Python client for `YDB `_ — a fault-tolerant distributed SQ
coordination
scheme
+.. toctree::
+ :hidden:
+ :caption: Observability
+
+ opentelemetry
+
.. toctree::
:hidden:
:caption: Reference
@@ -82,7 +88,7 @@ Distributed Coordination
------------------------
The :doc:`coordination` page covers distributed semaphores and leader election. If you
-need to limit concurrent access to a shared resource across multiple processes or hosts,
+need to limit concurrent access to aЗе shared resource across multiple processes or hosts,
this is the service to use.
Schema Management
@@ -103,6 +109,15 @@ use the ``@ydb_retry`` decorator. Skipping this section is a common source of pr
incidents.
+Observability
+-------------
+
+The :doc:`opentelemetry` page explains how to add distributed tracing to your
+application using OpenTelemetry. One call to ``enable_tracing()`` instruments
+query sessions, transactions, and connection pool operations — so you can
+visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend.
+
+
API Reference
-------------
diff --git a/docs/opentelemetry.rst b/docs/opentelemetry.rst
new file mode 100644
index 00000000..f924ec48
--- /dev/null
+++ b/docs/opentelemetry.rst
@@ -0,0 +1,233 @@
+OpenTelemetry Tracing
+=====================
+
+The SDK provides built-in distributed tracing via `OpenTelemetry `_.
+When enabled, key YDB operations — such as session creation, query execution, transaction
+commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace
+context is automatically propagated to the YDB server through gRPC metadata using the
+`W3C Trace Context `_ standard.
+
+Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is
+no overhead unless you explicitly opt in.
+
+
+Installation
+------------
+
+OpenTelemetry packages are not included by default. Install the SDK with the
+``opentelemetry`` extra:
+
+.. code-block:: sh
+
+ pip install ydb[opentelemetry]
+
+This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an
+exporter for your tracing backend, for example:
+
+.. code-block:: sh
+
+ # OTLP/gRPC exporter (works with Jaeger, Tempo, and others)
+ pip install opentelemetry-exporter-otlp-proto-grpc
+
+
+Enabling Tracing
+----------------
+
+Call ``enable_tracing()`` once, **after** configuring your OpenTelemetry tracer provider
+and **before** creating a ``Driver``:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import BatchSpanProcessor
+ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+ from opentelemetry.sdk.resources import Resource
+
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ # 1. Set up OpenTelemetry
+ resource = Resource(attributes={"service.name": "my-service"})
+ provider = TracerProvider(resource=resource)
+ provider.add_span_processor(
+ BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
+ )
+ trace.set_tracer_provider(provider)
+
+ # 2. Enable YDB tracing
+ enable_tracing()
+
+ # 3. Use the SDK as usual — spans are created automatically
+ with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
+ driver.wait(timeout=5)
+ with ydb.QuerySessionPool(driver) as pool:
+ pool.execute_with_retries("SELECT 1")
+
+ provider.shutdown()
+
+``enable_tracing()`` accepts an optional ``tracer`` argument. If omitted, the SDK
+obtains a tracer named ``"ydb.sdk"`` from the global tracer provider.
+
+
+What Is Instrumented
+--------------------
+
+The following operations produce spans:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 35 20 45
+
+ * - Span Name
+ - Kind
+ - Description
+ * - ``ydb.Driver.Initialize``
+ - INTERNAL
+ - Driver wait / endpoint discovery.
+ * - ``ydb.CreateSession``
+ - CLIENT
+ - Creating a new query session.
+ * - ``ydb.ExecuteQuery``
+ - CLIENT
+ - Executing a query (including ``execute_with_retries``).
+ * - ``ydb.Commit``
+ - CLIENT
+ - Committing an explicit transaction.
+ * - ``ydb.Rollback``
+ - CLIENT
+ - Rolling back a transaction.
+ * - ``ydb.RunWithRetry``
+ - INTERNAL
+ - Umbrella span wrapping the whole retryable block (``retry_operation_*`` / ``retry_tx_*`` / ``execute_with_retries``).
+ * - ``ydb.Try``
+ - INTERNAL
+ - A single retry attempt. Carries ``ydb.retry.backoff_ms`` — how long the retrier slept before starting this attempt (``0`` for the first one).
+
+All spans are nested under the currently active span, so wrapping your application
+logic in a parent span produces a complete trace tree:
+
+.. code-block:: python
+
+ tracer = trace.get_tracer(__name__)
+
+ with tracer.start_as_current_span("handle-request"):
+ pool.execute_with_retries("SELECT 1")
+ # ↳ ydb.CreateSession (if a new session is needed)
+ # ↳ ydb.ExecuteQuery
+
+
+Span Attributes
+---------------
+
+Every YDB RPC (CLIENT-kind) span carries these semantic attributes:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``db.system.name``
+ - Always ``"ydb"``.
+ * - ``db.namespace``
+ - Database path (e.g. ``"/local"``).
+ * - ``server.address``
+ - Host from the connection string.
+ * - ``server.port``
+ - Port from the connection string.
+ * - ``network.peer.address``
+ - Actual node host from the discovery endpoint map (set once the session is attached to a node).
+ * - ``network.peer.port``
+ - Actual node port from the discovery endpoint map.
+ * - ``ydb.node.dc``
+ - Data-center / location reported by discovery for the node (e.g. ``"vla"``, ``"sas"``).
+
+Additional attributes are set when available:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``ydb.node.id``
+ - YDB node that handled the request.
+
+On errors, the span also records:
+
+- ``error.type`` — ``"ydb_error"``, ``"transport_error"``, or the Python exception class name.
+- ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``).
+
+
+Trace Context Propagation
+-------------------------
+
+When tracing is enabled, the SDK automatically injects trace context headers into
+every gRPC call to YDB using the globally configured OpenTelemetry propagator
+(``opentelemetry.propagate.inject``). By default, OpenTelemetry uses the
+`W3C Trace Context `_ propagator, which adds
+``traceparent`` and ``tracestate`` headers.
+
+YDB server expects W3C Trace Context headers, so the default propagator configuration
+works out of the box. This allows the server to correlate client spans with
+server-side processing, enabling end-to-end trace visibility across the entire
+request path.
+
+
+Async Usage
+-----------
+
+Tracing works identically with the async driver. Call ``enable_tracing()`` once at
+startup:
+
+.. code-block:: python
+
+ import asyncio
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ enable_tracing()
+
+ async def main():
+ async with ydb.aio.Driver(
+ endpoint="grpc://localhost:2136",
+ database="/local",
+ ) as driver:
+ await driver.wait(timeout=5)
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ await pool.execute_with_retries("SELECT 1")
+
+ asyncio.run(main())
+
+
+
+Using a Custom Tracer
+---------------------
+
+To use a specific tracer instead of the global one:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+
+ my_tracer = trace.get_tracer("my.custom.tracer")
+ enable_tracing(tracer=my_tracer)
+
+
+Running the Examples
+--------------------
+
+The ``examples/opentelemetry/`` directory contains ready-to-run examples with a Docker
+Compose setup that starts YDB, an OTLP collector, Tempo, Prometheus, and Grafana:
+
+.. code-block:: sh
+
+ cd examples/opentelemetry
+ docker compose -f compose-e2e.yaml up -d
+
+ # Run the example
+ python example.py
+
+Open `http://localhost:3000 `_ (Grafana) to explore the
+collected traces via the Tempo data source.
diff --git a/examples/opentelemetry/compose-e2e.yaml b/examples/opentelemetry/compose-e2e.yaml
new file mode 100644
index 00000000..933d9a38
--- /dev/null
+++ b/examples/opentelemetry/compose-e2e.yaml
@@ -0,0 +1,61 @@
+version: "3.3"
+services:
+ ydb:
+ image: ydbplatform/local-ydb:trunk
+ restart: always
+ hostname: localhost
+ platform: linux/amd64
+ environment:
+ YDB_DEFAULT_LOG_LEVEL: NOTICE
+ GRPC_TLS_PORT: "2135"
+ GRPC_PORT: "2136"
+ MON_PORT: "8765"
+ YDB_USE_IN_MEMORY_PDISKS: "true"
+ command: [ "--config-path", "/ydb_config/ydb-config-with-tracing.yaml" ]
+ ports:
+ - "2135:2135"
+ - "2136:2136"
+ - "8765:8765"
+ volumes:
+ - ./ydb_config:/ydb_config:ro
+
+ otel-collector:
+ image: otel/opentelemetry-collector-contrib:latest
+ command: [ "--config=/etc/otelcol/config.yaml" ]
+ volumes:
+ - ./otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
+ ports:
+ - "4317:4317"
+ - "4318:4318"
+ - "9464:9464"
+ - "13133:13133"
+ - "13317:55679"
+
+ prometheus:
+ image: prom/prometheus:latest
+ volumes:
+ - ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
+ ports:
+ - "9090:9090"
+ depends_on: [ otel-collector ]
+
+ tempo:
+ image: grafana/tempo:2.4.1
+ command: [ "-config.file=/etc/tempo.yaml" ]
+ volumes:
+ - ./tempo.yaml:/etc/tempo.yaml:ro
+ ports:
+ - "3200:3200"
+ depends_on: [ otel-collector ]
+
+ grafana:
+ image: grafana/grafana:10.4.2
+ environment:
+ GF_AUTH_ANONYMOUS_ENABLED: "true"
+ GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
+ volumes:
+ - ./grafana/provisioning:/etc/grafana/provisioning:ro
+ - ./grafana/dashboards:/var/lib/grafana/dashboards:ro
+ ports:
+ - "3000:3000"
+ depends_on: [ prometheus, tempo ]
diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example.py
new file mode 100644
index 00000000..d36397c1
--- /dev/null
+++ b/examples/opentelemetry/example.py
@@ -0,0 +1,65 @@
+"""Minimal example: OpenTelemetry tracing for YDB Python SDK."""
+
+import asyncio
+
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor
+from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+from opentelemetry.sdk.resources import Resource
+
+import ydb
+from ydb.opentelemetry import enable_tracing
+
+resource = Resource(attributes={"service.name": "ydb-example"})
+provider = TracerProvider(resource=resource)
+provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317")))
+trace.set_tracer_provider(provider)
+
+tracer = trace.get_tracer(__name__)
+enable_tracing(tracer)
+
+ENDPOINT = "grpc://localhost:2136"
+DATABASE = "/local"
+
+
+def sync_example():
+ """Sync: session execute and transaction execute + commit."""
+ with ydb.Driver(endpoint=ENDPOINT, database=DATABASE) as driver:
+ driver.wait(timeout=5)
+
+ with ydb.QuerySessionPool(driver) as pool:
+ with tracer.start_as_current_span("sync-example"):
+ pool.execute_with_retries("SELECT 1")
+
+ def tx_callee(session):
+ with session.transaction() as tx:
+ list(tx.execute("SELECT 1"))
+ tx.commit()
+
+ pool.retry_operation_sync(tx_callee)
+
+
+async def async_example():
+ """Async: session execute and transaction execute + commit."""
+ async with ydb.aio.Driver(endpoint=ENDPOINT, database=DATABASE) as driver:
+ await driver.wait(timeout=5)
+
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ with tracer.start_as_current_span("async-example"):
+ await pool.execute_with_retries("SELECT 1")
+
+ async def tx_callee(session):
+ async with session.transaction() as tx:
+ result = await tx.execute("SELECT 1")
+ async for _ in result:
+ pass
+ await tx.commit()
+
+ await pool.retry_operation_async(tx_callee)
+
+
+sync_example()
+asyncio.run(async_example())
+
+provider.shutdown()
diff --git a/examples/opentelemetry/grafana/dashboards/README.md b/examples/opentelemetry/grafana/dashboards/README.md
new file mode 100644
index 00000000..eb47493a
--- /dev/null
+++ b/examples/opentelemetry/grafana/dashboards/README.md
@@ -0,0 +1,5 @@
+This folder is intentionally left empty.
+
+Grafana is provisioned with Tempo + Prometheus datasources; use **Explore** to search traces.
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
new file mode 100644
index 00000000..5ccefdc1
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
@@ -0,0 +1,13 @@
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: true
+ editable: false
+ options:
+ path: /var/lib/grafana/dashboards
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
new file mode 100644
index 00000000..05ba5bd9
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
@@ -0,0 +1,22 @@
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ url: http://prometheus:9090
+ isDefault: true
+ editable: false
+
+ - name: Tempo
+ type: tempo
+ access: proxy
+ url: http://tempo:3200
+ editable: false
+ jsonData:
+ tracesToMetrics:
+ datasourceUid: Prometheus
+ serviceMap:
+ datasourceUid: Prometheus
+
+
diff --git a/examples/opentelemetry/otel-collector-config.yaml b/examples/opentelemetry/otel-collector-config.yaml
new file mode 100644
index 00000000..7f784445
--- /dev/null
+++ b/examples/opentelemetry/otel-collector-config.yaml
@@ -0,0 +1,44 @@
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+
+processors:
+ batch: { }
+
+exporters:
+ prometheus:
+ endpoint: 0.0.0.0:9464
+ resource_to_telemetry_conversion:
+ enabled: true
+
+ otlp/tempo:
+ endpoint: tempo:4317
+ tls:
+ insecure: true
+
+ debug:
+ verbosity: detailed
+
+extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+ zpages:
+ endpoint: 0.0.0.0:55679
+
+service:
+ extensions: [ health_check, zpages ]
+ pipelines:
+ metrics:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ prometheus ]
+
+ traces:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ otlp/tempo, debug ]
diff --git a/examples/opentelemetry/prometheus.yaml b/examples/opentelemetry/prometheus.yaml
new file mode 100644
index 00000000..64b31821
--- /dev/null
+++ b/examples/opentelemetry/prometheus.yaml
@@ -0,0 +1,7 @@
+global:
+ scrape_interval: 5s
+
+scrape_configs:
+ - job_name: otel-collector
+ static_configs:
+ - targets: ["otel-collector:9464"]
diff --git a/examples/opentelemetry/tempo.yaml b/examples/opentelemetry/tempo.yaml
new file mode 100644
index 00000000..43dbb19c
--- /dev/null
+++ b/examples/opentelemetry/tempo.yaml
@@ -0,0 +1,15 @@
+server:
+ http_listen_port: 3200
+
+distributor:
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+
+storage:
+ trace:
+ backend: local
+ local:
+ path: /tmp/tempo
diff --git a/examples/opentelemetry/ydb_config/README.md b/examples/opentelemetry/ydb_config/README.md
new file mode 100644
index 00000000..cbffaaba
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/README.md
@@ -0,0 +1,28 @@
+# YDB server-side tracing (OpenTelemetry)
+
+This folder is used to keep a **custom YDB config** that enables server-side OpenTelemetry tracing.
+
+## 1) Export the default config from a running container
+
+If YDB is running as `ydb-local`:
+
+```bash
+docker cp ydb-local:/ydb_data/cluster/kikimr_configs/config.yaml ./ydb_config/ydb-config.yaml
+```
+
+## 2) Enable OpenTelemetry exporter in the config
+
+Edit `ydb-config.yaml` and add the contents of `otel-tracing-snippet.yaml` (usually as a top-level section).
+
+Default OTLP endpoint (inside docker-compose network): `grpc://otel-collector:4317`
+Default service name (so you can find it in Tempo/Grafana): `ydb`
+
+## 3) Run with the overridden config
+
+Restart YDB (the main `compose-e2e.yaml` will automatically use `--config-path` if `ydb-config.yaml` exists):
+
+```bash
+docker-compose -f compose-e2e.yaml up -d --force-recreate ydb
+```
+
+Now you should see additional server-side traces in Tempo/Grafana (service name defaults to `ydb-local` in the snippet).
diff --git a/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
new file mode 100644
index 00000000..bd5978d2
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
@@ -0,0 +1,26 @@
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 60
+ max_traces_burst: 3
+ # Highest tracing detail for *sampled* traces (YDB-generated trace-id).
+ # Note: requests with an external `traceparent` are traced at level 13 (Detailed) per YDB docs.
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1
+ level: 15
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
new file mode 100644
index 00000000..ef93d0e6
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
@@ -0,0 +1,349 @@
+actor_system_config:
+ batch_executor: 2
+ executor:
+ - name: System
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: User
+ spin_threshold: 0
+ threads: 3
+ type: BASIC
+ - name: Batch
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: IO
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: IO
+ - name: IC
+ spin_threshold: 10
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: BASIC
+ io_executor: 3
+ scheduler:
+ progress_threshold: 10000
+ resolution: 1024
+ spin_threshold: 0
+ service_executor:
+ - executor_id: 4
+ service_name: Interconnect
+ sys_executor: 0
+ user_executor: 1
+blob_storage_config:
+ service_set:
+ availability_domains: 1
+ groups:
+ - erasure_species: 0
+ group_generation: 1
+ group_id: 0
+ rings:
+ - fail_domains:
+ - vdisk_locations:
+ - node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+ pdisks:
+ - node_id: 1
+ path: SectorMap:1:64
+ pdisk_category: 0
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisks:
+ - vdisk_id:
+ domain: 0
+ group_generation: 1
+ group_id: 0
+ ring: 0
+ vdisk: 0
+ vdisk_location:
+ node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+channel_profile_config:
+ profile:
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 0
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 1
+domains_config:
+ domain:
+ - domain_id: 1
+ name: local
+ storage_pool_types:
+ - kind: hdd
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd1
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd2
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdde
+ pool_config:
+ box_id: 1
+ encryption_mode: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ security_config:
+ default_users:
+ - name: root
+ password: '1234'
+ state_storage:
+ - ring:
+ nto_select: 1
+ ring:
+ - node:
+ - 1
+ use_ring_specific_node_selection: true
+ ssid: 1
+feature_flags:
+ enable_drain_on_shutdown: false
+ enable_mvcc_snapshot_reads: true
+ enable_persistent_query_stats: true
+ enable_public_api_external_blobs: false
+ enable_scheme_transactions_at_scheme_shard: true
+federated_query_config:
+ audit:
+ enabled: false
+ uaconfig:
+ uri: ''
+ checkpoint_coordinator:
+ checkpointing_period_millis: 1000
+ enabled: true
+ max_inflight: 1
+ storage:
+ endpoint: ''
+ common:
+ ids_prefix: pt
+ use_bearer_for_ydb: true
+ control_plane_proxy:
+ enabled: true
+ request_timeout: 30s
+ control_plane_storage:
+ available_binding:
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ available_connection:
+ - YDB_DATABASE
+ - CLICKHOUSE_CLUSTER
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ - MONITORING
+ enabled: true
+ storage:
+ endpoint: ''
+ db_pool:
+ enabled: true
+ storage:
+ endpoint: ''
+ enabled: false
+ gateways:
+ dq:
+ default_settings: []
+ enabled: true
+ pq:
+ cluster_mapping: []
+ solomon:
+ cluster_mapping: []
+ nodes_manager:
+ enabled: true
+ pending_fetcher:
+ enabled: true
+ pinger:
+ ping_period: 30s
+ private_api:
+ enabled: true
+ private_proxy:
+ enabled: true
+ resource_manager:
+ enabled: true
+ token_accessor:
+ enabled: true
+grpc_config:
+ ca: /ydb_certs/ca.pem
+ cert: /ydb_certs/cert.pem
+ host: '[::]'
+ key: /ydb_certs/key.pem
+ services:
+ - nbs
+ - legacy
+ - tablet_service
+ - yql
+ - discovery
+ - cms
+ - locking
+ - kesus
+ - pq
+ - pqcd
+ - pqv1
+ - topic
+ - datastreams
+ - scripting
+ - clickhouse_internal
+ - rate_limiter
+ - analytics
+ - export
+ - import
+ - yq
+ - keyvalue
+ - monitoring
+ - auth
+ - query_service
+ - view
+interconnect_config:
+ start_tcp: true
+kafka_proxy_config:
+ enable_kafka_proxy: true
+ listening_port: 9092
+kqpconfig:
+ settings:
+ - name: _ResultRowsLimit
+ value: '1000'
+log_config:
+ default_level: 5
+ entry: []
+ sys_log: false
+nameservice_config:
+ node:
+ - address: ::1
+ host: localhost
+ node_id: 1
+ port: 19001
+ walle_location:
+ body: 1
+ data_center: '1'
+ rack: '1'
+net_classifier_config:
+ cms_config_timeout_seconds: 30
+ net_data_file_path: /ydb_data/netData.tsv
+ updater_config:
+ net_data_update_interval_seconds: 60
+ retry_interval_seconds: 30
+pqcluster_discovery_config:
+ enabled: false
+pqconfig:
+ check_acl: false
+ cluster_table_path: ''
+ clusters_update_timeout_sec: 1
+ enable_proto_source_id_info: true
+ enabled: true
+ max_storage_node_port: 65535
+ meta_cache_timeout_sec: 1
+ quoting_config:
+ enable_quoting: false
+ require_credentials_in_new_protocol: false
+ root: ''
+ topics_are_first_class_citizen: true
+ version_table_path: ''
+sqs_config:
+ enable_dead_letter_queues: true
+ enable_sqs: false
+ force_queue_creation_v2: true
+ force_queue_deletion_v2: true
+ scheme_cache_hard_refresh_time_seconds: 0
+ scheme_cache_soft_refresh_time_seconds: 0
+static_erasure: none
+system_tablets:
+ default_node:
+ - 1
+ flat_schemeshard:
+ - info:
+ tablet_id: 72057594046678944
+ flat_tx_coordinator:
+ - node:
+ - 1
+ tx_allocator:
+ - node:
+ - 1
+ tx_mediator:
+ - node:
+ - 1
+table_service_config:
+ resource_manager:
+ channel_buffer_size: 262144
+ mkql_heavy_program_memory_limit: 1048576
+ mkql_light_program_memory_limit: 65536
+ verbose_memory_limit_exception: true
+ sql_version: 1
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1.0
+ level: 15
+ max_traces_per_minute: 1000
+# max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/pyproject.toml b/pyproject.toml
index 41e7ef6f..0b08f0b2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -41,6 +41,7 @@ module = [
"requests.*",
"ydb.public.api.*",
"contrib.ydb.public.api.*",
+ "opentelemetry.*",
]
ignore_missing_imports = true
diff --git a/setup.py b/setup.py
index 55ce9029..0f850fbf 100644
--- a/setup.py
+++ b/setup.py
@@ -37,5 +37,6 @@
options={"bdist_wheel": {"universal": True}},
extras_require={
"yc": ["yandexcloud", ],
+ "opentelemetry": ["opentelemetry-api>=1.0.0"],
}
)
diff --git a/test-requirements.txt b/test-requirements.txt
index a5b65963..0976ce50 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -43,6 +43,8 @@ sqlalchemy==1.4.26
pylint-protobuf
cython
freezegun>=1.3.0
+opentelemetry-api>=1.0.0
+opentelemetry-sdk>=1.0.0
# pytest-cov
yandexcloud
-e .
diff --git a/tests/tracing/__init__.py b/tests/tracing/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/tracing/conftest.py b/tests/tracing/conftest.py
new file mode 100644
index 00000000..94f653b8
--- /dev/null
+++ b/tests/tracing/conftest.py
@@ -0,0 +1,54 @@
+"""Shared fixtures for OpenTelemetry tracing tests.
+
+Sets up an in-memory TracerProvider so that spans created by the SDK
+can be collected and inspected without any external backend.
+"""
+
+import pytest
+
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
+
+from ydb.opentelemetry.tracing import _registry
+
+_provider = TracerProvider()
+_exporter = InMemorySpanExporter()
+_provider.add_span_processor(SimpleSpanProcessor(_exporter))
+trace.set_tracer_provider(_provider)
+
+
+@pytest.fixture()
+def otel_setup():
+ """Enable SDK tracing, yield the exporter, then restore noop defaults.
+
+ Each test gets a clean exporter (cleared before and after).
+ """
+ import ydb.opentelemetry._plugin as _plugin
+
+ _exporter.clear()
+
+ _plugin._enabled = False
+ _plugin._tracer = None
+
+ from ydb.opentelemetry import enable_tracing
+
+ enable_tracing()
+
+ yield _exporter
+
+ # Restore noop state
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _plugin._enabled = False
+ _plugin._tracer = None
+ _exporter.clear()
+
+
+class FakeDriverConfig:
+ def __init__(self, endpoint="test_endpoint:1337", database="/test_database"):
+ self.endpoint = endpoint
+ self.database = database
+ self.query_client_settings = None
+ self.tracer = None
diff --git a/tests/tracing/test_tracing_async.py b/tests/tracing/test_tracing_async.py
new file mode 100644
index 00000000..2ff574a5
--- /dev/null
+++ b/tests/tracing/test_tracing_async.py
@@ -0,0 +1,318 @@
+"""Unit tests for OpenTelemetry tracing — asynchronous SDK operations.
+
+Mirrors the sync tests but exercises the async code paths in ydb.aio.query.
+"""
+
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import asyncio
+import pytest
+
+
+async def _empty_async_iter():
+ return
+ yield # noqa: makes this an async generator
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_async_session_mock(driver_config=None, peer=None):
+ """Create a mock that behaves like an async QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session._peer = peer
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_async_tx(session, driver):
+ """Create a real async QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.aio.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+class TestAsyncCreateSessionSpan:
+ @pytest.mark.asyncio
+ async def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", new_callable=AsyncMock):
+ with patch.object(QuerySession, "_attach", new_callable=AsyncMock):
+ await qs.create()
+
+ span = _get_single_span(exporter, "ydb.CreateSession")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+
+
+class TestAsyncExecuteQuerySpan:
+ @pytest.mark.asyncio
+ async def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("n1", 2136, "dc-a")
+ qs._closed = False
+
+ fake_stream = _empty_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+
+ @pytest.mark.asyncio
+ async def test_tx_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ fake_stream = _empty_async_iter()
+ with patch.object(type(tx), "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ tx._prev_stream = None
+ result = await tx.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncCommitSpan:
+ @pytest.mark.asyncio
+ async def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", new_callable=AsyncMock):
+ await tx.commit()
+
+ span = _get_single_span(exporter, "ydb.Commit")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncRollbackSpan:
+ @pytest.mark.asyncio
+ async def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", new_callable=AsyncMock):
+ await tx.rollback()
+
+ span = _get_single_span(exporter, "ydb.Rollback")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.tx.id" not in attrs
+ assert "ydb.session.id" not in attrs
+
+
+class TestAsyncErrorHandling:
+ @pytest.mark.asyncio
+ async def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ await qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestAsyncRetryPolicySpans:
+ @pytest.mark.asyncio
+ async def test_success_emits_single_try(self, otel_setup):
+ from ydb.retries import retry_operation_async
+
+ exporter = otel_setup
+
+ async def callee():
+ return 7
+
+ assert await retry_operation_async(callee) == 7
+
+ run = _get_single_span(exporter, "ydb.RunWithRetry")
+ assert run.kind == SpanKind.INTERNAL
+
+ tries = _get_spans(exporter, "ydb.Try")
+ assert len(tries) == 1
+ assert tries[0].parent.span_id == run.context.span_id
+ assert dict(tries[0].attributes)["ydb.retry.backoff_ms"] == 0
+
+ @pytest.mark.asyncio
+ async def test_context_cancel_during_backoff_records_exception(self, otel_setup):
+ """Backoff sleep is the timeline of the next Try; a cancel hitting it
+ must be recorded on that Try span and propagate out through RunWithRetry.
+ """
+ from ydb import issues
+ from ydb.retries import retry_operation_async
+ from ydb.retries import BackoffSettings, RetrySettings
+
+ exporter = otel_setup
+ calls = {"n": 0}
+
+ async def flaky():
+ calls["n"] += 1
+ raise issues.Unavailable("transient")
+
+ retry_settings = RetrySettings(
+ max_retries=10,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=10.0),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=10.0),
+ )
+
+ task = asyncio.ensure_future(retry_operation_async(flaky, retry_settings))
+ # Let the first attempt fail and the backoff sleep start.
+ for _ in range(10):
+ await asyncio.sleep(0.01)
+ if calls["n"] >= 1:
+ break
+ task.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await task
+
+ run = _get_single_span(exporter, "ydb.RunWithRetry")
+ assert run.status.status_code == StatusCode.ERROR
+
+ tries = _get_spans(exporter, "ydb.Try")
+ assert len(tries) >= 2
+ # Try span that carried the cancelled backoff must be errored.
+ backoff_try = tries[-1]
+ assert backoff_try.status.status_code == StatusCode.ERROR
+ assert dict(backoff_try.attributes)["ydb.retry.backoff_ms"] > 0
+ error_types = {dict(s.attributes).get("error.type") for s in tries}
+ assert "CancelledError" in error_types
+
+
+class TestAsyncConcurrentSpansIsolation:
+ @pytest.mark.asyncio
+ async def test_parallel_executes_do_not_become_parent_child(self, otel_setup):
+ """Two concurrent execute calls must produce sibling spans, not parent-child."""
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ async def _slow_async_iter():
+ await asyncio.sleep(0.5)
+ return
+ yield # noqa
+
+ def _make_session():
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 1
+ qs._closed = False
+ return qs
+
+ async def do_execute(qs):
+ fake_stream = _slow_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1")
+ async for _ in result:
+ pass
+
+ qs1 = _make_session()
+ qs2 = _make_session()
+ await asyncio.gather(do_execute(qs1), do_execute(qs2))
+
+ spans = _get_spans(exporter, "ydb.ExecuteQuery")
+ assert len(spans) == 2
+
+ ids = {s.context.span_id for s in spans}
+ for s in spans:
+ if s.parent is not None:
+ assert s.parent.span_id not in ids, "Concurrent spans must be siblings, not parent-child"
diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py
new file mode 100644
index 00000000..37753462
--- /dev/null
+++ b/tests/tracing/test_tracing_sync.py
@@ -0,0 +1,434 @@
+"""Unit tests for OpenTelemetry tracing — synchronous SDK operations.
+
+Uses an in-memory span exporter to verify that correct spans, attributes,
+parent-child relationships, and error handling are produced by the SDK.
+No real YDB connection is needed.
+"""
+
+from unittest.mock import MagicMock, patch
+from opentelemetry import trace
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.opentelemetry.tracing import _registry, create_ydb_span
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+
+import pytest
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_session_mock(driver_config=None, peer=None):
+ """Create a mock that behaves like a sync QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session._peer = peer
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_tx(session, driver):
+ """Create a real QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ # Simulate that the transaction has been started (so commit/rollback create spans)
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+class TestCreateSessionSpan:
+ def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", return_value=None):
+ with patch.object(QuerySession, "_attach", return_value=None):
+ qs.create()
+
+ span = _get_single_span(exporter, "ydb.CreateSession")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert span.status.status_code == StatusCode.UNSET
+
+
+class TestExecuteQuerySpan:
+ def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._peer = ("node-7.cluster", 2136, "dc-east")
+ qs._closed = False
+
+ fake_stream = iter([]) # empty stream that raises StopIteration immediately
+ with patch.object(QuerySession, "_execute_call", return_value=fake_stream):
+ result = qs.execute("SELECT 1;")
+ # Consume the iterator to finish the span
+ list(result)
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert attrs["network.peer.address"] == "node-7.cluster"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-east"
+ assert attrs["ydb.node.id"] == 12345
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+ def test_tx_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ fake_stream = iter([])
+ with patch.object(type(tx), "_execute_call", return_value=fake_stream):
+ tx._prev_stream = None
+ result = tx.execute("SELECT 1;")
+ list(result)
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["network.peer.port"] == 2136
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestCommitSpan:
+ def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", return_value=None):
+ tx.commit()
+
+ span = _get_single_span(exporter, "ydb.Commit")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestRollbackSpan:
+ def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock(peer=("n1", 2136, "dc-a"))
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", return_value=None):
+ tx.rollback()
+
+ span = _get_single_span(exporter, "ydb.Rollback")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.node.id"] == 12345
+ assert attrs["network.peer.address"] == "n1"
+ assert attrs["ydb.node.dc"] == "dc-a"
+ assert "ydb.session.id" not in attrs
+ assert "ydb.tx.id" not in attrs
+
+
+class TestErrorHandling:
+ def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestNoSpansWhenDisabled:
+ def test_no_spans_without_enable_tracing(self):
+ """Without enable_tracing(), the registry uses noop — no spans are created."""
+
+ from tests.tracing.conftest import _exporter
+
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _exporter.clear()
+
+ with create_ydb_span("ydb.CreateSession", FakeDriverConfig()):
+ pass
+
+ assert len(_exporter.get_finished_spans()) == 0
+
+
+class TestParentChildRelationship:
+ def test_sdk_span_is_child_of_user_span(self, otel_setup):
+ exporter = otel_setup
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("user.operation"):
+ with create_ydb_span("ydb.ExecuteQuery", FakeDriverConfig(), node_id=1):
+ pass
+
+ spans = exporter.get_finished_spans()
+ ydb_span = next(s for s in spans if s.name == "ydb.ExecuteQuery")
+ user_span = next(s for s in spans if s.name == "user.operation")
+
+ assert ydb_span.parent is not None
+ assert ydb_span.parent.span_id == user_span.context.span_id
+ assert ydb_span.context.trace_id == user_span.context.trace_id
+
+
+class TestTraceMetadataInjection:
+ def test_get_trace_metadata_returns_traceparent(self, otel_setup):
+ from ydb.opentelemetry.tracing import get_trace_metadata
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("test.span"):
+ metadata = get_trace_metadata()
+
+ keys = [k for k, v in metadata]
+ assert "traceparent" in keys
+
+
+class TestDriverInitializeSpan:
+ def test_driver_initialize_emits_internal_span(self, otel_setup):
+ exporter = otel_setup
+
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Driver.Initialize", cfg, kind="internal"):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Driver.Initialize")
+ assert span.kind == SpanKind.INTERNAL
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+
+
+class TestCommonAttributes:
+ @pytest.mark.parametrize(
+ "endpoint,expected_host,expected_port",
+ [
+ ("grpc://host.example.com:2136", "grpc://host.example.com", 2136),
+ ("localhost:2136", "localhost", 2136),
+ ],
+ )
+ def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port):
+ exporter = otel_setup
+ cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb")
+
+ with create_ydb_span("ydb.Test", cfg):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert attrs["server.address"] == expected_host
+ assert attrs["server.port"] == expected_port
+ assert attrs["db.namespace"] == "/mydb"
+
+ def test_peer_attributes_are_optional(self, otel_setup):
+ exporter = otel_setup
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Test", cfg):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert "network.peer.address" not in attrs
+ assert "network.peer.port" not in attrs
+
+ def test_peer_attributes_emitted_when_known(self, otel_setup):
+ exporter = otel_setup
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Test", cfg, peer=("peer.example.com", 2137, "dc-west")):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert attrs["network.peer.address"] == "peer.example.com"
+ assert attrs["network.peer.port"] == 2137
+ assert attrs["ydb.node.dc"] == "dc-west"
+
+
+class TestPeerFromEndpointMap:
+ def test_wrapper_create_session_pulls_peer_from_store(self, otel_setup):
+ """wrapper_create_session must resolve peer (host, port, dc) via the driver's
+ connections_by_node_id cache, not via the grpc target string of the rpc call.
+ """
+ from ydb.query.session import wrapper_create_session
+
+ connection = MagicMock()
+ connection.endpoint = "ipv4:10.0.0.1:2136"
+ connection.peer_address = "node-42.dc-west.example"
+ connection.peer_port = 2136
+ connection.peer_location = "dc-west"
+
+ driver = MagicMock()
+ driver._store.connections_by_node_id = {42: connection}
+
+ session = MagicMock()
+ session._driver = driver
+
+ rpc_state = MagicMock()
+ rpc_state.endpoint = "ipv4:10.0.0.1:2136" # grpc-target string — should be ignored
+
+ proto = MagicMock()
+ with patch("ydb.query.session._ydb_query.CreateSessionResponse.from_proto") as from_proto:
+ from_proto.return_value = MagicMock(session_id="s-1", node_id=42, status=MagicMock())
+ with patch("ydb.issues._process_response"):
+ wrapper_create_session(rpc_state, proto, session)
+
+ assert session._peer == ("node-42.dc-west.example", 2136, "dc-west")
+
+
+class TestRetryPolicySpans:
+ def test_success_on_first_try_emits_single_try(self, otel_setup):
+ from ydb.retries import retry_operation_sync
+
+ exporter = otel_setup
+
+ def callee():
+ return 42
+
+ assert retry_operation_sync(callee) == 42
+
+ run = _get_single_span(exporter, "ydb.RunWithRetry")
+ assert run.kind == SpanKind.INTERNAL
+ assert run.status.status_code == StatusCode.UNSET
+
+ tries = _get_spans(exporter, "ydb.Try")
+ assert len(tries) == 1
+ assert tries[0].kind == SpanKind.INTERNAL
+ assert dict(tries[0].attributes)["ydb.retry.backoff_ms"] == 0
+ assert tries[0].parent.span_id == run.context.span_id
+
+ def test_retry_backoff_ms_on_each_try(self, otel_setup):
+ from ydb import issues
+ from ydb.retries import retry_operation_sync
+ from ydb.retries import RetrySettings, BackoffSettings
+
+ exporter = otel_setup
+ counter = {"n": 0}
+
+ def flaky():
+ counter["n"] += 1
+ if counter["n"] < 3:
+ raise issues.Unavailable("transient")
+ return "ok"
+
+ retry_settings = RetrySettings(
+ max_retries=5,
+ fast_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ slow_backoff_settings=BackoffSettings(ceiling=0, slot_duration=0.05),
+ )
+
+ assert retry_operation_sync(flaky, retry_settings) == "ok"
+
+ tries = _get_spans(exporter, "ydb.Try")
+ assert len(tries) == 3
+ # first attempt has no preceding backoff, later ones have a positive one
+ backoff_values = [dict(s.attributes)["ydb.retry.backoff_ms"] for s in tries]
+ assert backoff_values[0] == 0
+ assert all(v >= 0 for v in backoff_values)
+ assert any(v > 0 for v in backoff_values[1:])
+ # failed Try spans record the exception
+ assert tries[0].status.status_code == StatusCode.ERROR
+ assert tries[1].status.status_code == StatusCode.ERROR
+ assert tries[2].status.status_code == StatusCode.UNSET
+
+ def test_non_retryable_error_propagates_to_run_span(self, otel_setup):
+ from ydb import issues
+ from ydb.retries import retry_operation_sync
+
+ exporter = otel_setup
+
+ def broken():
+ raise issues.SchemeError("boom")
+
+ with pytest.raises(issues.SchemeError):
+ retry_operation_sync(broken)
+
+ run = _get_single_span(exporter, "ydb.RunWithRetry")
+ assert run.status.status_code == StatusCode.ERROR
+
+ tries = _get_spans(exporter, "ydb.Try")
+ assert len(tries) == 1
+ assert tries[0].status.status_code == StatusCode.ERROR
+ attrs = dict(tries[0].attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
diff --git a/ydb/aio/connection.py b/ydb/aio/connection.py
index 9e03450d..a3cf2ffc 100644
--- a/ydb/aio/connection.py
+++ b/ydb/aio/connection.py
@@ -26,6 +26,7 @@
from ydb.driver import DriverConfig
from ydb.settings import BaseRequestSettings
from ydb import issues
+from ydb.opentelemetry.tracing import get_trace_metadata
# Workaround for good IDE and universal for runtime
if TYPE_CHECKING:
@@ -71,6 +72,9 @@ async def _construct_metadata(
metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -157,7 +161,6 @@ def __init__(
driver_config: Optional[DriverConfig] = None,
endpoint_options: Optional[EndpointOptions] = None,
) -> None:
- global _stubs_list
self.endpoint = endpoint
self.endpoint_key = EndpointKey(self.endpoint, getattr(endpoint_options, "node_id", None))
self.node_id = getattr(endpoint_options, "node_id", None)
diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py
index fe709133..4f1b0cdd 100644
--- a/ydb/aio/pool.py
+++ b/ydb/aio/pool.py
@@ -6,6 +6,7 @@
from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING
from ydb import issues
+from ydb.opentelemetry.tracing import create_ydb_span
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
from .connection import Connection, EndpointKey
@@ -285,7 +286,8 @@ async def __wrapper__() -> None:
return __wrapper__
async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method
- await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
+ with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"):
+ await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
def discovery_debug_details(self) -> str:
if self._discovery:
diff --git a/ydb/aio/query/base.py b/ydb/aio/query/base.py
index 66df3703..cbf22e98 100644
--- a/ydb/aio/query/base.py
+++ b/ydb/aio/query/base.py
@@ -2,9 +2,10 @@
class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ def __init__(self, it, wrapper, on_error=None, span=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._span = span
async def __aenter__(self) -> "AsyncResponseContextIterator":
return self
@@ -12,12 +13,27 @@ async def __aenter__(self) -> "AsyncResponseContextIterator":
async def _next(self):
try:
return await super()._next()
+ except StopAsyncIteration:
+ self._finish_span()
+ raise
except Exception as e:
if self._on_error:
self._on_error(e)
+ self._finish_span(e)
raise e
+ def _finish_span(self, exception=None):
+ if self._span is not None:
+ if exception is not None:
+ self._span.set_error(exception)
+ self._span.end()
+ self._span = None
+
+ def __del__(self):
+ self._finish_span()
+
async def __aexit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
async for _ in self:
pass
+ self._finish_span()
diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py
index 67e62ff6..d7e09419 100644
--- a/ydb/aio/query/session.py
+++ b/ydb/aio/query/session.py
@@ -19,6 +19,7 @@
from ...query import base
from ...query.session import BaseQuerySession
+from ...opentelemetry.tracing import create_ydb_span, set_peer_attributes
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
@@ -105,8 +106,10 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query
if self._closed:
raise RuntimeError("Session is already closed")
- await self._create_call(settings=settings)
- await self._attach()
+ with create_ydb_span("ydb.CreateSession", self._driver_config) as span:
+ await self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ await self._attach()
return self
@@ -159,30 +162,44 @@ async def execute(
"""
self._check_session_ready_to_use()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
- return AsyncResponseContextIterator(
- it=stream_it,
- wrapper=lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self,
- settings=self._settings,
- ),
- on_error=self._on_execute_stream_error,
- )
+ try:
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ return AsyncResponseContextIterator(
+ it=stream_it,
+ wrapper=lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self,
+ settings=self._settings,
+ ),
+ on_error=self._on_execute_stream_error,
+ span=span,
+ )
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
async def explain(
self,
diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py
index 69c77478..f935641d 100644
--- a/ydb/aio/query/transaction.py
+++ b/ydb/aio/query/transaction.py
@@ -12,6 +12,7 @@
BaseQueryTxContext,
QueryTxStateEnum,
)
+from ...opentelemetry.tracing import create_ydb_span
if TYPE_CHECKING:
from .session import QuerySession
@@ -106,13 +107,19 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
- await self._commit_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Commit",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ):
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
+ await self._commit_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -133,13 +140,19 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
- await self._rollback_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Rollback",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ):
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
+ await self._rollback_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
async def execute(
self,
@@ -187,30 +200,44 @@ async def execute(
"""
await self._ensure_prev_stream_finished()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
- self._prev_stream = AsyncResponseContextIterator(
- it=stream_it,
- wrapper=lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self.session,
- tx=self,
+ try:
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
commit_tx=commit_tx,
- settings=self.session._settings,
- ),
- on_error=self.session._on_execute_stream_error,
- )
- return self._prev_stream
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ self._prev_stream = AsyncResponseContextIterator(
+ it=stream_it,
+ wrapper=lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self.session,
+ tx=self,
+ commit_tx=commit_tx,
+ settings=self.session._settings,
+ ),
+ on_error=self.session._on_execute_stream_error,
+ span=span,
+ )
+ return self._prev_stream
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
diff --git a/ydb/connection.py b/ydb/connection.py
index 98fbd5aa..d64438ef 100644
--- a/ydb/connection.py
+++ b/ydb/connection.py
@@ -24,6 +24,7 @@
import grpc
from . import issues, _apis, _utilities
from . import default_pem
+from .opentelemetry.tracing import get_trace_metadata
_stubs_list = (
_apis.TableService.Stub,
@@ -179,6 +180,9 @@ def _construct_metadata(driver_config, settings):
metadata.extend(getattr(settings, "headers", []))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -194,11 +198,14 @@ def _get_request_timeout(settings):
class EndpointOptions(object):
- __slots__ = ("ssl_target_name_override", "node_id")
+ __slots__ = ("ssl_target_name_override", "node_id", "address", "port", "location")
- def __init__(self, ssl_target_name_override=None, node_id=None):
+ def __init__(self, ssl_target_name_override=None, node_id=None, address=None, port=None, location=None):
self.ssl_target_name_override = ssl_target_name_override
self.node_id = node_id
+ self.address = address
+ self.port = port
+ self.location = location
def _construct_channel_options(driver_config, endpoint_options=None):
@@ -405,6 +412,9 @@ class Connection(object):
"closing",
"endpoint_key",
"node_id",
+ "peer_address",
+ "peer_port",
+ "peer_location",
)
def __init__(
@@ -419,9 +429,11 @@ def __init__(
discovered by the YDB endpoint discovery mechanism
:param driver_config: A driver config instance to be used for RPC call interception
"""
- global _stubs_list
self.endpoint = endpoint
self.node_id = getattr(endpoint_options, "node_id", None)
+ self.peer_address = getattr(endpoint_options, "address", None)
+ self.peer_port = getattr(endpoint_options, "port", None)
+ self.peer_location = getattr(endpoint_options, "location", None)
self.endpoint_key = EndpointKey(endpoint, getattr(endpoint_options, "node_id", None))
self._channel = channel_factory(self.endpoint, driver_config, endpoint_options=endpoint_options)
self._driver_config = driver_config
diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py
new file mode 100644
index 00000000..1ea6d6c8
--- /dev/null
+++ b/ydb/opentelemetry/__init__.py
@@ -0,0 +1,18 @@
+def enable_tracing(tracer=None):
+ """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.
+
+ Args:
+ tracer: Optional OTel tracer to use. If not provided, the default tracer from the global tracer provider will be used.
+ """
+ try:
+ from ydb.opentelemetry._plugin import _enable_tracing
+ except ImportError:
+ raise ImportError(
+ "OpenTelemetry packages are required for tracing support. "
+ "Install them with: pip install ydb[opentelemetry]"
+ ) from None
+
+ _enable_tracing(tracer)
+
+
+__all__ = ["enable_tracing"]
diff --git a/ydb/opentelemetry/_plugin.py b/ydb/opentelemetry/_plugin.py
new file mode 100644
index 00000000..f555ea1a
--- /dev/null
+++ b/ydb/opentelemetry/_plugin.py
@@ -0,0 +1,100 @@
+from opentelemetry import context, trace
+from opentelemetry.propagate import inject
+from opentelemetry.trace import StatusCode
+
+from ydb import issues
+from ydb.issues import StatusCode as YdbStatusCode
+from ydb.opentelemetry.tracing import _registry
+
+_TRANSPORT_STATUSES = frozenset(
+ {
+ YdbStatusCode.CONNECTION_LOST,
+ YdbStatusCode.CONNECTION_FAILURE,
+ YdbStatusCode.DEADLINE_EXCEEDED,
+ YdbStatusCode.CLIENT_INTERNAL_ERROR,
+ YdbStatusCode.UNIMPLEMENTED,
+ }
+)
+
+_tracer = None
+_enabled = False
+
+_KIND_MAP = {
+ "client": trace.SpanKind.CLIENT,
+ "internal": trace.SpanKind.INTERNAL,
+}
+
+
+def _otel_metadata_hook():
+ """Injects W3C Trace Context (traceparent/tracestate) into gRPC metadata."""
+ headers = {}
+ inject(headers)
+ return list(headers.items())
+
+
+def _set_error_on_span(span, exception):
+ if isinstance(exception, issues.Error) and exception.status is not None:
+ span.set_attribute("db.response.status_code", exception.status.name)
+ error_type = "transport_error" if exception.status in _TRANSPORT_STATUSES else "ydb_error"
+ else:
+ error_type = type(exception).__qualname__
+
+ span.set_attribute("error.type", error_type)
+ span.set_status(StatusCode.ERROR, str(exception))
+ span.record_exception(exception)
+
+
+class TracingSpan:
+ """Wrapper around an OTel span that manages context lifecycle.
+
+ Can be used as a context manager or manually
+ """
+
+ def __init__(self, span, token):
+ self._span = span
+ self._token = token
+
+ def set_error(self, exception):
+ _set_error_on_span(self._span, exception)
+
+ def set_attribute(self, key, value):
+ self._span.set_attribute(key, value)
+
+ def end(self):
+ self._span.end()
+ if self._token is not None:
+ context.detach(self._token)
+ self._token = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_val is not None:
+ self.set_error(exc_val)
+ self.end()
+ return False
+
+
+def _create_span(name, attributes=None, kind=None):
+ # Can be used as a context manager or manually
+ span = _tracer.start_span(
+ name,
+ kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT),
+ attributes=attributes or {},
+ )
+ ctx = trace.set_span_in_context(span)
+ token = context.attach(ctx)
+ return TracingSpan(span, token)
+
+
+def _enable_tracing(tracer=None):
+ global _enabled, _tracer
+
+ if _enabled:
+ return
+
+ _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk")
+ _enabled = True
+ _registry.set_metadata_hook(_otel_metadata_hook)
+ _registry.set_create_span(_create_span)
diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py
new file mode 100644
index 00000000..a936b269
--- /dev/null
+++ b/ydb/opentelemetry/tracing.py
@@ -0,0 +1,108 @@
+class _NoopSpan:
+ """Returned by create_ydb_span when tracing is disabled."""
+
+ def set_error(self, exception):
+ pass
+
+ def set_attribute(self, key, value):
+ pass
+
+ def end(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+_NOOP_SPAN = _NoopSpan()
+
+
+class OtelTracingRegistry:
+ """Singleton registry for OpenTelemetry tracing.
+
+ By default everything is no-op until :func:`enable_tracing` is called.
+ """
+
+ def __init__(self):
+ self._metadata_hook = None
+ self._create_span_func = None
+
+ def create_span(self, name, attributes=None, kind=None):
+ """Create a span. Returns a TracingSpan or _NoopSpan."""
+ if self._create_span_func is None:
+ return _NOOP_SPAN
+ return self._create_span_func(name, attributes, kind=kind)
+
+ def get_trace_metadata(self):
+ """Return tracing metadata (e.g. W3C traceparent) for gRPC calls."""
+ if self._metadata_hook is not None:
+ return self._metadata_hook()
+ return []
+
+ def set_metadata_hook(self, hook):
+ self._metadata_hook = hook
+
+ def set_create_span(self, func):
+ self._create_span_func = func
+
+
+_registry = OtelTracingRegistry()
+
+
+def get_trace_metadata():
+ """Return tracing metadata for gRPC calls."""
+ return _registry.get_trace_metadata()
+
+
+def _split_endpoint(endpoint):
+ endpoint = endpoint or ""
+ host, _, port = endpoint.rpartition(":")
+ return host, int(port) if port.isdigit() else 0
+
+
+def _build_ydb_attrs(driver_config, node_id=None, peer=None):
+ host, port = _split_endpoint(getattr(driver_config, "endpoint", None))
+ attrs = {
+ "db.system.name": "ydb",
+ "db.namespace": getattr(driver_config, "database", None) or "",
+ "server.address": host,
+ "server.port": port,
+ }
+ if peer is not None:
+ address, port_, location = peer
+ if address is not None:
+ attrs["network.peer.address"] = address
+ if port_ is not None:
+ attrs["network.peer.port"] = int(port_)
+ if location:
+ attrs["ydb.node.dc"] = location
+ if node_id is not None:
+ attrs["ydb.node.id"] = node_id or 0
+ return attrs
+
+
+def create_ydb_span(name, driver_config, node_id=None, kind=None, peer=None):
+ """Create a span pre-filled with standard YDB attributes.
+
+ ``peer`` is a ``(address, port, location)`` tuple pulled from the endpoint
+ map for the specific node serving the call; missing fields are skipped.
+ Can be used as a context manager or manually.
+ """
+ attrs = _build_ydb_attrs(driver_config, node_id, peer)
+ return _registry.create_span(name, attributes=attrs, kind=kind)
+
+
+def set_peer_attributes(span, peer):
+ """Fill in network.peer.* and ydb.node.dc on an existing span once the peer is known."""
+ if peer is None:
+ return
+ address, port, location = peer
+ if address is not None:
+ span.set_attribute("network.peer.address", address)
+ if port is not None:
+ span.set_attribute("network.peer.port", int(port))
+ if location:
+ span.set_attribute("ydb.node.dc", location)
diff --git a/ydb/pool.py b/ydb/pool.py
index 2901c573..4fef6377 100644
--- a/ydb/pool.py
+++ b/ydb/pool.py
@@ -10,6 +10,7 @@
from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING
from . import connection as connection_impl, issues, resolver, _utilities, tracing
+from .opentelemetry.tracing import create_ydb_span
from abc import abstractmethod
from .connection import Connection, EndpointKey
@@ -453,10 +454,11 @@ def wait(self, timeout: Optional[float] = None, fail_fast: bool = False) -> None
:param timeout: A timeout to wait in seconds
:return: None
"""
- if fail_fast:
- self._store.add_fast_fail().result(timeout)
- else:
- self._store.subscribe().result(timeout)
+ with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"):
+ if fail_fast:
+ self._store.add_fast_fail().result(timeout)
+ else:
+ self._store.subscribe().result(timeout)
def _on_disconnected(self, connection: Connection) -> None:
"""
diff --git a/ydb/query/base.py b/ydb/query/base.py
index e7764e1c..1aeb4f6b 100644
--- a/ydb/query/base.py
+++ b/ydb/query/base.py
@@ -27,7 +27,6 @@
from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop
from ydb._grpc.grpcwrapper.common_utils import to_thread
-
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
from .session import BaseQuerySession
@@ -73,9 +72,10 @@ class QueryResultSetFormat(enum.IntEnum):
class SyncResponseContextIterator(_utilities.SyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ def __init__(self, it, wrapper, on_error=None, span=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._span = span
def __enter__(self) -> "SyncResponseContextIterator":
return self
@@ -83,15 +83,30 @@ def __enter__(self) -> "SyncResponseContextIterator":
def _next(self):
try:
return super()._next()
+ except StopIteration:
+ self._finish_span()
+ raise
except Exception as e:
if self._on_error:
self._on_error(e)
+ self._finish_span(e)
raise e
+ def _finish_span(self, exception=None):
+ if self._span is not None:
+ if exception is not None:
+ self._span.set_error(exception)
+ self._span.end()
+ self._span = None
+
+ def __del__(self):
+ self._finish_span()
+
def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
for _ in self:
pass
+ self._finish_span()
class QueryClientSettings:
diff --git a/ydb/query/session.py b/ydb/query/session.py
index b21c6ba4..a7a32c40 100644
--- a/ydb/query/session.py
+++ b/ydb/query/session.py
@@ -18,6 +18,7 @@
from .base import QueryExplainResultFormat
from .. import _apis, issues, _utilities
+from ..opentelemetry.tracing import create_ydb_span, set_peer_attributes
from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState, EndpointKey
from .._grpc.grpcwrapper import common_utils
@@ -30,7 +31,7 @@
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
if TYPE_CHECKING:
- from ..driver import Driver as SyncDriver
+ from ..driver import Driver as SyncDriver, DriverConfig
from ..aio.driver import Driver as AsyncDriver
@@ -46,9 +47,30 @@ def wrapper_create_session(
issues._process_response(message.status)
session._session_id = message.session_id
session._node_id = message.node_id
+ session._peer = _resolve_peer(session._driver, message.node_id)
return session
+def _resolve_peer(driver, node_id):
+ """Look up network.peer.* / ydb.node.dc for a node in the driver's endpoint map."""
+ if node_id is None:
+ return None
+ store = getattr(driver, "_store", None)
+ if store is None:
+ return None
+ by_node = getattr(store, "connections_by_node_id", None)
+ if not by_node:
+ return None
+ connection = by_node.get(node_id)
+ if connection is None:
+ return None
+ return (
+ getattr(connection, "peer_address", None),
+ getattr(connection, "peer_port", None),
+ getattr(connection, "peer_location", None),
+ )
+
+
def wrapper_delete_session(
rpc_state: RpcState,
response_pb: _apis.ydb_query.DeleteSessionResponse,
@@ -70,6 +92,7 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]):
# Session data
_session_id: Optional[str] = None
_node_id: Optional[int] = None
+ _peer: Optional[tuple] = None
_closed: bool = False
def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings] = None):
@@ -84,6 +107,10 @@ def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings]
self._last_query_stats = None
+ @property
+ def _driver_config(self) -> Optional["DriverConfig"]:
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
return self._session_id
@@ -368,8 +395,10 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio
if self._closed:
raise RuntimeError("Session is already closed.")
- self._create_call(settings=settings)
- self._attach()
+ with create_ydb_span("ydb.CreateSession", self._driver_config) as span:
+ self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ self._attach()
return self
@@ -435,30 +464,44 @@ def execute(
"""
self._check_session_ready_to_use()
- stream_it = self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
- return base.SyncResponseContextIterator(
- stream_it,
- lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self,
- settings=self._settings,
- ),
- on_error=self._on_execute_stream_error,
- )
+ try:
+ stream_it = self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ return base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self,
+ settings=self._settings,
+ ),
+ on_error=self._on_execute_stream_error,
+ span=span,
+ )
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
def explain(
self,
diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py
index 687a5eaf..b1134c0e 100644
--- a/ydb/query/transaction.py
+++ b/ydb/query/transaction.py
@@ -17,6 +17,7 @@
_apis,
issues,
)
+from ..opentelemetry.tracing import create_ydb_span
from .._grpc.grpcwrapper import ydb_topic as _ydb_topic
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from ..connection import _RpcState as RpcState
@@ -244,6 +245,10 @@ def __init__(self, driver: DriverT, session: "BaseQuerySession", tx_mode: base.B
self._external_error = None
self._last_query_stats = None
+ @property
+ def _driver_config(self):
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
"""
@@ -553,13 +558,19 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
- self._commit_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Commit",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ):
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
+ self._commit_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -579,13 +590,19 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
- self._rollback_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Rollback",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ):
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
+ self._rollback_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
def execute(
self,
@@ -634,30 +651,44 @@ def execute(
"""
self._ensure_prev_stream_finished()
- stream_it = self._execute_call(
- query=query,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- parameters=parameters,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
- self._prev_stream = base.SyncResponseContextIterator(
- stream_it,
- lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self.session,
- tx=self,
+ try:
+ stream_it = self._execute_call(
+ query=query,
commit_tx=commit_tx,
- settings=self.session._settings,
- ),
- on_error=self.session._on_execute_stream_error,
- )
- return self._prev_stream
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ self._prev_stream = base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self.session,
+ tx=self,
+ commit_tx=commit_tx,
+ settings=self.session._settings,
+ ),
+ on_error=self.session._on_execute_stream_error,
+ span=span,
+ )
+ return self._prev_stream
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
diff --git a/ydb/resolver.py b/ydb/resolver.py
index 5047f4e5..d55de389 100644
--- a/ydb/resolver.py
+++ b/ydb/resolver.py
@@ -54,7 +54,11 @@ def endpoints_with_options(self) -> typing.Generator[typing.Tuple[str, conn_impl
ssl_target_name_override = self.address
endpoint_options = conn_impl.EndpointOptions(
- ssl_target_name_override=ssl_target_name_override, node_id=self.node_id
+ ssl_target_name_override=ssl_target_name_override,
+ node_id=self.node_id,
+ address=self.address,
+ port=self.port,
+ location=self.location,
)
if self.ipv6_addrs or self.ipv4_addrs:
diff --git a/ydb/retries.py b/ydb/retries.py
index c151e3d2..bd4fc4ad 100644
--- a/ydb/retries.py
+++ b/ydb/retries.py
@@ -7,6 +7,16 @@
from . import issues
from ._errors import check_retriable_error
+from .opentelemetry.tracing import _registry as _tracing_registry
+
+
+_RUN_WITH_RETRY_SPAN = "ydb.RunWithRetry"
+_TRY_SPAN = "ydb.Try"
+_BACKOFF_ATTR = "ydb.retry.backoff_ms"
+
+
+def _start_try_span(backoff_ms: int):
+ return _tracing_registry.create_span(_TRY_SPAN, attributes={_BACKOFF_ATTR: backoff_ms}, kind="internal")
class BackoffSettings:
@@ -72,8 +82,9 @@ def with_slow_backoff(self, backoff_settings: BackoffSettings) -> "RetrySettings
class YdbRetryOperationSleepOpt:
- def __init__(self, timeout: float) -> None:
+ def __init__(self, timeout: float, exception: Optional[BaseException] = None) -> None:
self.timeout = timeout
+ self.exception = exception
def __eq__(self, other: object) -> bool:
return (
@@ -142,7 +153,7 @@ def retry_operation_impl(
yield_sleep = False
if yield_sleep:
- yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
+ yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds, exception=e)
except Exception as e:
# you should provide your own handler you want
@@ -159,12 +170,29 @@ def retry_operation_sync(
*args: Any,
**kwargs: Any,
) -> Any:
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- time.sleep(next_opt.timeout)
- else:
- return next_opt.result
+ with _tracing_registry.create_span(_RUN_WITH_RETRY_SPAN, kind="internal"):
+ opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
+ try_span = _start_try_span(0)
+ try:
+ for next_opt in opt_generator:
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ exc = next_opt.exception
+ if exc is not None:
+ try_span.set_error(exc)
+ try_span.end()
+ try_span = _start_try_span(int(next_opt.timeout * 1000))
+ time.sleep(next_opt.timeout)
+ else:
+ try_span.end()
+ try_span = None
+ return next_opt.result
+ except BaseException as e:
+ if try_span is not None:
+ try_span.set_error(e)
+ try_span.end()
+ raise
+ if try_span is not None:
+ try_span.end()
return None
@@ -186,15 +214,33 @@ async def retry_operation_async( # pylint: disable=W1113
Returns awaitable result of coroutine. If retries are not succussful exception is raised.
"""
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- await asyncio.sleep(next_opt.timeout)
- else:
- try:
- return await next_opt.result
- except BaseException as e: # pylint: disable=W0703
- next_opt.set_exception(e)
+ with _tracing_registry.create_span(_RUN_WITH_RETRY_SPAN, kind="internal"):
+ opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
+ try_span = _start_try_span(0)
+ try:
+ for next_opt in opt_generator:
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ exc = next_opt.exception
+ if exc is not None:
+ try_span.set_error(exc)
+ try_span.end()
+ try_span = _start_try_span(int(next_opt.timeout * 1000))
+ await asyncio.sleep(next_opt.timeout)
+ else:
+ try:
+ result = await next_opt.result
+ try_span.end()
+ try_span = None
+ return result
+ except BaseException as e: # pylint: disable=W0703
+ next_opt.set_exception(e)
+ except BaseException as e:
+ if try_span is not None:
+ try_span.set_error(e)
+ try_span.end()
+ raise
+ if try_span is not None:
+ try_span.end()
return None