Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Set up Python 3.10
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
cache: "pip"
cache-dependency-path: |
pyproject.toml
Expand Down
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ Based on [socketpool](https://github.com/benoitc/socketpool).
**Requires Python 3.8 or above.**

## Examples
You can run these examples, one after another, in a `python -m asyncio` shell.

Run a simple TCP echo server in a background thread, using the `asyncio` library.
1. Run a simple TCP echo server in a background thread, using the `asyncio` library.

```python
import asyncio
Expand All @@ -34,9 +35,19 @@ t.start()

# run a tcp echo server using asyncio in the background event loop
async def echo_handler(reader, writer):
writer.write(await reader.read(32))
await writer.drain()
writer.close()
try:
while True:
try:
data = await reader.readuntil(b'\n')
except asyncio.IncompleteReadError:
break

writer.write(data)
await writer.drain()

finally:
writer.close()
await writer.wait_closed()


async def echo_server(tcp_port):
Expand All @@ -47,7 +58,7 @@ async def echo_server(tcp_port):
asyncio.run_coroutine_threadsafe(echo_server(12345), loop)
```

Create a new TCP connection pool in the main thread, get a connection, and send and receive data.
2. Create a new TCP connection pool in the main thread, get a connection, and send and receive data.

```python
from aiosocketpool import AsyncConnectionPool, AsyncTcpConnector
Expand All @@ -63,22 +74,20 @@ pool = AsyncConnectionPool(

async def hello_world():
async with pool.connection(host="127.0.0.1", port=12345) as conn:
await conn.sendall(b"hello world")
await conn.sendall(b"hello world\n")
print(await conn.recv(32))


await hello_world()
```

Create a bunch of connections and run them all concurrently.
3. Create a bunch of connections and run them all concurrently.

```python
loop = asyncio.get_event_loop()

tasks = []

for _ in range(25):
tasks.append(loop.create_task(hello_world()))
tasks.append(asyncio.create_task(hello_world()))

loop.run_until_complete(asyncio.gather(*tasks))
await asyncio.gather(*tasks)
```
28 changes: 21 additions & 7 deletions aiosocketpool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def connect(self) -> bool:
if __debug__:
logging.debug(f"AsyncTcpConnector: new connection to {self.host}:{self.port}")

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

await asyncio.wait_for(
loop.sock_connect(self._socket, (self.host, self.port)), self.timeout
Expand Down Expand Up @@ -226,7 +226,7 @@ async def sendall(self, data: bytes):
-------
`None` on success
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

return await asyncio.wait_for(loop.sock_sendall(self._socket, data), self.timeout)

Expand All @@ -244,7 +244,7 @@ async def recv(self, size: int = 1024) -> bytes:
-------
`bytes`
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

return await asyncio.wait_for(loop.sock_recv(self._socket, size), self.timeout)

Expand Down Expand Up @@ -342,8 +342,10 @@ def __init__(
self.connections: weakref.WeakSet[BaseConnector] = weakref.WeakSet()

self._reaper_task = None
self._reaper_should_be_running = False

if reap_connections:
self._reaper_should_be_running = True
self.reap_delay = reap_delay
self.start_reaper()

Expand All @@ -354,16 +356,29 @@ def too_old(self, conn: BaseConnector) -> bool:
return time.time() - conn.get_lifetime() > self.max_lifetime

def start_reaper(self):
loop = asyncio.get_event_loop()
"""Start the background reaper task on the current running loop.

If called when no event loop is running (e.g., pool constructed before
asyncio.run()), defer startup until a running loop is available.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return

self._reaper_task = loop.create_task(self._reaper_loop())
self.ensure_reaper_started()

def stop_reaper(self):
if self._reaper_task is not None:
self._reaper_task.cancel()

def ensure_reaper_started(self):
if self._reaper_task is None:
if self._reaper_should_be_running:
self.start_reaper()

return

if self._reaper_task.cancelled() or self._reaper_task.done():
self.start_reaper()

Expand Down Expand Up @@ -446,8 +461,7 @@ def release_connection(self, conn: BaseConnector):
-----
If a connector is too old, or we are going to exceed `max_size`, drop it.
"""
if self._reaper_task is not None:
self.ensure_reaper_started()
self.ensure_reaper_started()

current_pool_size = self.pool.qsize()

Expand Down
2 changes: 1 addition & 1 deletion aiosocketpool/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.5"
__version__ = "0.0.6"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "aiosocketpool"
description = "An asyncio-compatible socket pool"
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [
{ name = "Roo Sczesnak", email = "andrewscz@gmail.com" },
Expand Down