diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8e3476..26e54ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,10 +15,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up Python 3.10 + - name: Set up Python 3.11 uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: "3.11" cache: "pip" cache-dependency-path: | pyproject.toml diff --git a/README.md b/README.md index 4d0cff1..ee68ff6 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,9 @@ Based on [socketpool](https://github.com/benoitc/socketpool). **Requires Python 3.8 or above.** ## Examples +You can run these examples, one after another, in a `python -m asyncio` shell. -Run a simple TCP echo server in a background thread, using the `asyncio` library. +1. Run a simple TCP echo server in a background thread, using the `asyncio` library. ```python import asyncio @@ -34,9 +35,19 @@ t.start() # run a tcp echo server using asyncio in the background event loop async def echo_handler(reader, writer): - writer.write(await reader.read(32)) - await writer.drain() - writer.close() + try: + while True: + try: + data = await reader.readuntil(b'\n') + except asyncio.IncompleteReadError: + break + + writer.write(data) + await writer.drain() + + finally: + writer.close() + await writer.wait_closed() async def echo_server(tcp_port): @@ -47,7 +58,7 @@ async def echo_server(tcp_port): asyncio.run_coroutine_threadsafe(echo_server(12345), loop) ``` -Create a new TCP connection pool in the main thread, get a connection, and send and receive data. +2. Create a new TCP connection pool in the main thread, get a connection, and send and receive data. ```python from aiosocketpool import AsyncConnectionPool, AsyncTcpConnector @@ -63,22 +74,20 @@ pool = AsyncConnectionPool( async def hello_world(): async with pool.connection(host="127.0.0.1", port=12345) as conn: - await conn.sendall(b"hello world") + await conn.sendall(b"hello world\n") print(await conn.recv(32)) await hello_world() ``` -Create a bunch of connections and run them all concurrently. +3. Create a bunch of connections and run them all concurrently. ```python -loop = asyncio.get_event_loop() - tasks = [] for _ in range(25): - tasks.append(loop.create_task(hello_world())) + tasks.append(asyncio.create_task(hello_world())) -loop.run_until_complete(asyncio.gather(*tasks)) +await asyncio.gather(*tasks) ``` diff --git a/aiosocketpool/__init__.py b/aiosocketpool/__init__.py index 253be4c..dae7288 100644 --- a/aiosocketpool/__init__.py +++ b/aiosocketpool/__init__.py @@ -168,7 +168,7 @@ async def connect(self) -> bool: if __debug__: logging.debug(f"AsyncTcpConnector: new connection to {self.host}:{self.port}") - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() await asyncio.wait_for( loop.sock_connect(self._socket, (self.host, self.port)), self.timeout @@ -226,7 +226,7 @@ async def sendall(self, data: bytes): ------- `None` on success """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() return await asyncio.wait_for(loop.sock_sendall(self._socket, data), self.timeout) @@ -244,7 +244,7 @@ async def recv(self, size: int = 1024) -> bytes: ------- `bytes` """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() return await asyncio.wait_for(loop.sock_recv(self._socket, size), self.timeout) @@ -342,8 +342,10 @@ def __init__( self.connections: weakref.WeakSet[BaseConnector] = weakref.WeakSet() self._reaper_task = None + self._reaper_should_be_running = False if reap_connections: + self._reaper_should_be_running = True self.reap_delay = reap_delay self.start_reaper() @@ -354,16 +356,29 @@ def too_old(self, conn: BaseConnector) -> bool: return time.time() - conn.get_lifetime() > self.max_lifetime def start_reaper(self): - loop = asyncio.get_event_loop() + """Start the background reaper task on the current running loop. + + If called when no event loop is running (e.g., pool constructed before + asyncio.run()), defer startup until a running loop is available. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return self._reaper_task = loop.create_task(self._reaper_loop()) - self.ensure_reaper_started() def stop_reaper(self): if self._reaper_task is not None: self._reaper_task.cancel() def ensure_reaper_started(self): + if self._reaper_task is None: + if self._reaper_should_be_running: + self.start_reaper() + + return + if self._reaper_task.cancelled() or self._reaper_task.done(): self.start_reaper() @@ -446,8 +461,7 @@ def release_connection(self, conn: BaseConnector): ----- If a connector is too old, or we are going to exceed `max_size`, drop it. """ - if self._reaper_task is not None: - self.ensure_reaper_started() + self.ensure_reaper_started() current_pool_size = self.pool.qsize() diff --git a/aiosocketpool/version.py b/aiosocketpool/version.py index b1a19e3..034f46c 100644 --- a/aiosocketpool/version.py +++ b/aiosocketpool/version.py @@ -1 +1 @@ -__version__ = "0.0.5" +__version__ = "0.0.6" diff --git a/pyproject.toml b/pyproject.toml index 3f0a107..cbfa6da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "aiosocketpool" description = "An asyncio-compatible socket pool" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.11" license = { file = "LICENSE" } authors = [ { name = "Roo Sczesnak", email = "andrewscz@gmail.com" },