-
Notifications
You must be signed in to change notification settings - Fork 5
Native async support for PEP 249 interface (replace to_thread wrapper) #95
Description
Summary
The current mapepire_python.asyncio module provides an async PEP 249 interface (AsyncConnection, AsyncCursor), but it is implemented as a thread-based wrapper around the synchronous Connection/Cursor — not a native async implementation. Every async operation delegates to the sync code via asyncio.to_thread() / loop.run_in_executor(), which spawns a thread pool thread for each call.
Meanwhile, the pool/ module already has a production-quality native async architecture (PoolJob, PoolQuery, AsyncWebSocketConnection) using websockets.asyncio.client and pyee.asyncio.AsyncIOEventEmitter for non-blocking, multiplexed request handling.
This issue proposes unifying these two worlds: back the PEP 249 async interface with the native async WebSocket infrastructure instead of wrapping sync code in threads.
Current Architecture
mapepire_python/
├── asyncio/ # PEP 249 async — wraps sync via to_thread()
│ ├── connection.py # AsyncConnection → Connection (sync)
│ ├── cursor.py # AsyncCursor → Cursor (sync) → SQLJob (sync)
│ └── utils.py # to_thread() shim
│
├── client/ # Synchronous core
│ ├── sql_job.py # SQLJob — blocking send/recv
│ ├── query.py # Query — sync state machine
│ └── websocket_client.py # websockets.sync.client
│
├── pool/ # Native async (already working)
│ ├── pool_job.py # PoolJob — async with event-driven message handler
│ ├── pool_query.py # PoolQuery — async query
│ ├── pool_client.py # Pool — async connection pool
│ └── async_websocket_client.py # websockets.asyncio.client
The Problem
In asyncio/cursor.py, every method delegates to the sync cursor in a thread:
async def execute(self, operation, parameters=None):
await to_thread(self._cursor.execute, operation, parameters) # thread pool!
return self
async def fetchone(self):
return await to_thread(self._cursor.fetchone) # thread pool!This means:
- Thread overhead: Each DB call spawns or uses a thread from the default executor
- No true concurrency: The sync
SQLJobuses blockingsocket.recv()— only one query can execute at a time per connection, even when called from async code - GIL contention: Thread-based I/O still contends with the GIL
- No multiplexing: Unlike
PoolJob, which can handle multiple concurrent requests on a single WebSocket via event-driven message routing, the wrapped sync connection is single-request-at-a-time
Proposed Changes
1. Create AsyncSQLJob — a native async single-connection job
A new class modeled after PoolJob but designed for single-connection use (no pool required):
# mapepire_python/client/async_sql_job.py
class AsyncSQLJob(BaseJob):
"""Native async SQL job using async WebSocket — no thread delegation."""
async def connect(self, db2_server: DaemonServer):
self.socket = await AsyncWebSocketConnection(...).connect()
asyncio.create_task(self._message_handler())
# ...
async def send(self, content: str) -> dict:
await self.socket.send(content)
return await self._wait_for_response(req_id)
async def query(self, sql: str, opts=None) -> AsyncQuery:
return AsyncQuery(self, sql, opts)
async def query_and_run(self, sql: str, **kwargs) -> dict:
async with self.query(sql, **kwargs) as q:
return await q.run()This reuses the proven PoolJob patterns (event emitter, message handler, futures-based response waiting) without requiring a pool.
2. Rewrite AsyncConnection to use AsyncSQLJob
# mapepire_python/asyncio/connection.py
class AsyncConnection(aiopep249.AsyncConnection):
def __init__(self, database, opts={}, **kwargs):
super().__init__()
self._job = AsyncSQLJob() # Native async, not wrapping sync
self._database = database
async def _ensure_connected(self):
if self._job.status == JobStatus.NotStarted:
await self._job.connect(self._database)
async def close(self):
await self._job.close() # Direct async, no to_thread()
async def execute(self, operation, parameters=None):
cursor = await self.cursor()
return await cursor.execute(operation, parameters)3. Rewrite AsyncCursor to use native async queries
# mapepire_python/asyncio/cursor.py
class AsyncCursor(aiopep249.TransactionalAsyncCursor):
async def execute(self, operation, parameters=None):
opts = QueryOptions(parameters=parameters) if parameters else None
self._query = await self._job.query(operation, opts=opts)
self._result = await self._query.run()
return self
async def fetchone(self):
# Direct result access, no thread delegation
...
async def fetchall(self):
# Accumulate via async fetch_more loop
...4. Remove to_thread() dependency
Once AsyncConnection/AsyncCursor are backed by native async I/O, the asyncio/utils.py to_thread() shim is no longer needed for core operations.
5. Async iterator support for streaming results
class AsyncCursor:
async def __aiter__(self):
while not self._is_done:
row = await self.fetchone()
if row is not None:
yield rowScope & Checklist
- Create
AsyncSQLJob(native async single-connection job, reusingPoolJobpatterns) - Create
AsyncQueryfor use withAsyncSQLJob(or refactorPoolQueryto be reusable) - Rewrite
AsyncConnectionto useAsyncSQLJobinstead of wrapping syncConnection - Rewrite
AsyncCursorto use native async queries instead ofto_thread(self._cursor.*) - Add
__aiter__/__anext__toAsyncCursorforasync for row in cursorstreaming - Update
from mapepire_python.asyncio import connectto use the new implementation - Ensure PEP 249 async compliance (
aiopep249ABC contracts still satisfied) - Add/update async tests in
tests/pep249_async_test.pyto cover native async paths - Maintain backward compatibility —
from mapepire_python.asyncio import connectAPI unchanged - Consider extracting shared async logic from
PoolJobso bothAsyncSQLJobandPoolJobcan reuse it (DRY) - Update documentation and examples
Benefits
| Aspect | Current (to_thread) |
Proposed (native async) |
|---|---|---|
| I/O model | Thread pool executor | Non-blocking event loop |
| Concurrency | 1 query/connection | Multiplexed via event emitter |
| Overhead | Thread per call | Zero threads for I/O |
| GIL | Contended | Not applicable (pure async) |
| Streaming | Not supported | async for row in cursor |
| Architecture | Wrapper around sync | Shared infrastructure with Pool |
Non-Goals
- Removing the sync API:
SQLJob,Connection,Cursorremain unchanged - Changing the Pool API:
Pool,PoolJob,PoolQueryremain unchanged - Breaking the public API:
from mapepire_python.asyncio import connectkeeps the same signature
Related
- The
pool/module already proves the async WebSocket + event emitter pattern works well at scale - The
websockets>=14.0dependency already provides both sync and async clients pyeeis already a dependency for event-driven response routing