Skip to content

Native async support for PEP 249 interface (replace to_thread wrapper) #95

@ajshedivy

Description

@ajshedivy

Summary

The current mapepire_python.asyncio module provides an async PEP 249 interface (AsyncConnection, AsyncCursor), but it is implemented as a thread-based wrapper around the synchronous Connection/Cursor — not a native async implementation. Every async operation delegates to the sync code via asyncio.to_thread() / loop.run_in_executor(), which spawns a thread pool thread for each call.

Meanwhile, the pool/ module already has a production-quality native async architecture (PoolJob, PoolQuery, AsyncWebSocketConnection) using websockets.asyncio.client and pyee.asyncio.AsyncIOEventEmitter for non-blocking, multiplexed request handling.

This issue proposes unifying these two worlds: back the PEP 249 async interface with the native async WebSocket infrastructure instead of wrapping sync code in threads.

Current Architecture

mapepire_python/
├── asyncio/                    # PEP 249 async — wraps sync via to_thread()
│   ├── connection.py           #   AsyncConnection → Connection (sync)
│   ├── cursor.py               #   AsyncCursor → Cursor (sync) → SQLJob (sync)
│   └── utils.py                #   to_thread() shim
│
├── client/                     # Synchronous core
│   ├── sql_job.py              #   SQLJob — blocking send/recv
│   ├── query.py                #   Query — sync state machine
│   └── websocket_client.py     #   websockets.sync.client
│
├── pool/                       # Native async (already working)
│   ├── pool_job.py             #   PoolJob — async with event-driven message handler
│   ├── pool_query.py           #   PoolQuery — async query
│   ├── pool_client.py          #   Pool — async connection pool
│   └── async_websocket_client.py  # websockets.asyncio.client

The Problem

In asyncio/cursor.py, every method delegates to the sync cursor in a thread:

async def execute(self, operation, parameters=None):
    await to_thread(self._cursor.execute, operation, parameters)  # thread pool!
    return self

async def fetchone(self):
    return await to_thread(self._cursor.fetchone)  # thread pool!

This means:

  • Thread overhead: Each DB call spawns or uses a thread from the default executor
  • No true concurrency: The sync SQLJob uses blocking socket.recv() — only one query can execute at a time per connection, even when called from async code
  • GIL contention: Thread-based I/O still contends with the GIL
  • No multiplexing: Unlike PoolJob, which can handle multiple concurrent requests on a single WebSocket via event-driven message routing, the wrapped sync connection is single-request-at-a-time

Proposed Changes

1. Create AsyncSQLJob — a native async single-connection job

A new class modeled after PoolJob but designed for single-connection use (no pool required):

# mapepire_python/client/async_sql_job.py
class AsyncSQLJob(BaseJob):
    """Native async SQL job using async WebSocket — no thread delegation."""
    
    async def connect(self, db2_server: DaemonServer):
        self.socket = await AsyncWebSocketConnection(...).connect()
        asyncio.create_task(self._message_handler())
        # ...

    async def send(self, content: str) -> dict:
        await self.socket.send(content)
        return await self._wait_for_response(req_id)

    async def query(self, sql: str, opts=None) -> AsyncQuery:
        return AsyncQuery(self, sql, opts)

    async def query_and_run(self, sql: str, **kwargs) -> dict:
        async with self.query(sql, **kwargs) as q:
            return await q.run()

This reuses the proven PoolJob patterns (event emitter, message handler, futures-based response waiting) without requiring a pool.

2. Rewrite AsyncConnection to use AsyncSQLJob

# mapepire_python/asyncio/connection.py
class AsyncConnection(aiopep249.AsyncConnection):
    def __init__(self, database, opts={}, **kwargs):
        super().__init__()
        self._job = AsyncSQLJob()  # Native async, not wrapping sync
        self._database = database
    
    async def _ensure_connected(self):
        if self._job.status == JobStatus.NotStarted:
            await self._job.connect(self._database)

    async def close(self):
        await self._job.close()  # Direct async, no to_thread()

    async def execute(self, operation, parameters=None):
        cursor = await self.cursor()
        return await cursor.execute(operation, parameters)

3. Rewrite AsyncCursor to use native async queries

# mapepire_python/asyncio/cursor.py
class AsyncCursor(aiopep249.TransactionalAsyncCursor):
    async def execute(self, operation, parameters=None):
        opts = QueryOptions(parameters=parameters) if parameters else None
        self._query = await self._job.query(operation, opts=opts)
        self._result = await self._query.run()
        return self

    async def fetchone(self):
        # Direct result access, no thread delegation
        ...

    async def fetchall(self):
        # Accumulate via async fetch_more loop
        ...

4. Remove to_thread() dependency

Once AsyncConnection/AsyncCursor are backed by native async I/O, the asyncio/utils.py to_thread() shim is no longer needed for core operations.

5. Async iterator support for streaming results

class AsyncCursor:
    async def __aiter__(self):
        while not self._is_done:
            row = await self.fetchone()
            if row is not None:
                yield row

Scope & Checklist

  • Create AsyncSQLJob (native async single-connection job, reusing PoolJob patterns)
  • Create AsyncQuery for use with AsyncSQLJob (or refactor PoolQuery to be reusable)
  • Rewrite AsyncConnection to use AsyncSQLJob instead of wrapping sync Connection
  • Rewrite AsyncCursor to use native async queries instead of to_thread(self._cursor.*)
  • Add __aiter__ / __anext__ to AsyncCursor for async for row in cursor streaming
  • Update from mapepire_python.asyncio import connect to use the new implementation
  • Ensure PEP 249 async compliance (aiopep249 ABC contracts still satisfied)
  • Add/update async tests in tests/pep249_async_test.py to cover native async paths
  • Maintain backward compatibility — from mapepire_python.asyncio import connect API unchanged
  • Consider extracting shared async logic from PoolJob so both AsyncSQLJob and PoolJob can reuse it (DRY)
  • Update documentation and examples

Benefits

Aspect Current (to_thread) Proposed (native async)
I/O model Thread pool executor Non-blocking event loop
Concurrency 1 query/connection Multiplexed via event emitter
Overhead Thread per call Zero threads for I/O
GIL Contended Not applicable (pure async)
Streaming Not supported async for row in cursor
Architecture Wrapper around sync Shared infrastructure with Pool

Non-Goals

  • Removing the sync API: SQLJob, Connection, Cursor remain unchanged
  • Changing the Pool API: Pool, PoolJob, PoolQuery remain unchanged
  • Breaking the public API: from mapepire_python.asyncio import connect keeps the same signature

Related

  • The pool/ module already proves the async WebSocket + event emitter pattern works well at scale
  • The websockets>=14.0 dependency already provides both sync and async clients
  • pyee is already a dependency for event-driven response routing

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions