Skip to content

bug: QuerySessionPool returns sessions with leaked transactions #807

@vladkolotvin

Description

@vladkolotvin

QuerySessionPool returns sessions with leaked transactions when the automatic rollback in QueryTxContext.__aexit__ is interrupted by CancelledError

Environment

  • ydb (Python SDK): 3.21.2
  • Python: 3.12.6
  • YDB server: local docker image (same behavior reproduced against ydbplatform/local-ydb with default settings)
  • Platform: macOS 14 / Linux

Summary

QueryTxContext.__aexit__ runs an automatic rollback whenever the async with session.transaction(...) block exits while the transaction is in BEGINED state. That rollback is not protected from cancellation and the surrounding try/except only catches issues.Error. As a result, a single CancelledError that arrives during await self.rollback() leaves the transaction open on the server, while QuerySessionPool.release() still returns the session to the pool unconditionally.

This is not limited to double‑cancel / cancel storms. The automatic rollback is triggered by any exit path with state == BEGINED:

  • user handler raised a regular (non‑issues.Error) exception;
  • user handler raised asyncio.CancelledError;
  • await tx.commit() failed;
  • handler returned without calling commit() explicitly.

For any of these, it is enough for one outer asyncio.wait_for / asyncio.timeout / framework cancel to fire while rollback is awaiting its RPC response to interrupt it and silently leak the transaction.

Affected code paths

ydb/aio/query/transaction.py:

async def __aexit__(self, *args, **kwargs):
    await self._ensure_prev_stream_finished()
    if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None:
        logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
        try:
            await self.rollback()         # <-- not shielded; can be interrupted by cancel
        except issues.Error:              # <-- does NOT catch CancelledError / BaseException
            logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)

ydb/aio/query/pool.py:

async def release(self, session: QuerySession) -> None:
    self._queue.put_nowait(session)       # <-- no session state check
    logger.debug("Session returned to queue: %s", session.session_id)

Observed consequences:

  • CancelledError propagates out of __aexit__;
  • transaction remains open on the server until the server‑side tx timeout expires (for read‑write transactions this can contribute to lock/snapshot contention for concurrent workload on the same rows);
  • session._on_execute_stream_error is not invoked, session.is_active stays True;
  • pool.release() puts the session back into the queue and the next caller reuses it;
  • the only observable signal is a WARNING: Potentially leaked tx: <id> log, which the SDK already emits on the happy path too.

A related manifestation is the "heisen‑commit": a cancel hitting await tx.commit() inside QuerySessionPool.retry_tx_async. The commit may already have been applied on the server; the subsequent automatic rollback then fails with issues.Error, is just logged, and the client believes the transaction has not been applied.

Steps to reproduce

This is the most realistic reproduction: a regular handler exception + a single outer asyncio.wait_for timeout whose cancel happens to land during rollback. No cancel storm, no artificial re‑cancelling.

To keep the race deterministic we slow down _rollback_call a bit — in production the same effect is naturally achieved by real network latency.

# pip install ydb==3.21.2
import asyncio
import logging
import time

import ydb
from ydb.aio import Driver, QuerySessionPool
from ydb.aio.query import transaction as tx_module

logging.basicConfig(level=logging.INFO)

# ---- instrument rollback to verify it ran to completion ---------------------
_started, _finished = {}, {}
_orig = tx_module.QueryTxContext.rollback


async def traced_rollback(self, settings=None):
    tid = self._tx_state.tx_id or "?"
    _started[tid] = time.monotonic()
    try:
        r = await _orig(self, settings)
        _finished[tid] = "ok"
        return r
    except BaseException as e:
        _finished[tid] = f"exc: {type(e).__name__}"
        raise


tx_module.QueryTxContext.rollback = traced_rollback

# ---- slow down rollback RPC to stand in for real network latency ------------
_orig_rb_call = tx_module.QueryTxContext._rollback_call


async def slow_rb_call(self, settings=None):
    for _ in range(50):
        await asyncio.sleep(0.01)
    return await _orig_rb_call(self, settings)


tx_module.QueryTxContext._rollback_call = slow_rb_call


async def main():
    cfg = ydb.DriverConfig(
        endpoint="grpc://localhost:2136",
        database="/local",
        credentials=ydb.credentials.AnonymousCredentials(),
        disable_discovery=True,
    )
    async with Driver(driver_config=cfg) as driver:
        await driver.wait(5, fail_fast=True)
        async with QuerySessionPool(driver, 5) as pool:

            async def handler(tx):
                stream = await tx.execute("SELECT 1;")
                async for _ in stream:
                    pass
                # regular, non-cancel exception from user code
                raise RuntimeError("boom from handler")

            try:
                # a SINGLE outer timeout, not a cancel storm
                await asyncio.wait_for(pool.retry_tx_async(handler), timeout=0.1)
            except (asyncio.TimeoutError, RuntimeError):
                pass

            await asyncio.sleep(0.1)
            print("rollbacks started :", _started)
            print("rollbacks finished:", _finished)
            print("pool current_size :", pool._current_size)
            print("pool queue_size   :", pool._queue.qsize())


asyncio.run(main())

Observed output

rollbacks started : {'01kpr6r1fbcywer68gdxcxfgey': 713533.17...}
rollbacks finished: {'01kpr6r1fbcywer68gdxcxfgey': 'exc: CancelledError'}
pool current_size : 5
pool queue_size   : 5
  • rollback was entered but never completed on the server;
  • the session was nevertheless released back into the pool;
  • QuerySession.is_active stays True, so the next retry_tx_async call reuses it.

A broader harness with 10 scenarios (single cancel during execute / iteration / wait_for, saturated pool cancel, cancel‑storm, cancel at commit, cancel while waiting for a session, tight timeouts, handler exception + single timeout, …) confirms that every failing case reduces to the same root cause: CancelledError reaching await self.rollback() in __aexit__. Happy to share it if useful.

Expected behaviour

__aexit__ should guarantee that either:

  • the server‑side transaction is actually terminated (committed or rolled back) before the session is handed back to the pool, or
  • the session is considered unusable and is not reused by QuerySessionPool for subsequent callers.

Whichever guarantee is chosen, the user should not be left with an open transaction on a pooled session without any explicit error.

Minor related findings

  • QuerySessionPool._waiters is initialized to 0 and never mutated anywhere in the codebase — the field is effectively dead and any metric derived from it will always be zero.
  • QuerySessionPool.acquire() contains asyncio.ensure_future(asyncio.ensure_future(self._should_stop.wait())) — the double wrapping is a no‑op and looks accidental.

Impact

  • Silent server‑side transaction leaks whenever a handler raises any exception (not just CancelledError) and a single outer timeout/cancel lands while the SDK is rolling back.
  • No observable error surface — the caller only sees the original exception or TimeoutError; the SDK emits a single WARNING log that is identical to the happy‑path "potentially leaked tx" warning.
  • Leaked read‑write transactions continue to hold server resources and may affect optimistic‑lock conflicts for concurrent traffic until the server‑side tx timeout expires.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions