Python client library for the Ensemble
decentralized P2P messaging daemon. Wraps the RegisterService bidi gRPC
stream so an external Python process can register a service against a local
daemon and exchange chat messages with peers.
pip install ensemble-clientRequires Python 3.11+. Depends on grpcio, protobuf, cryptography.
For local development against an unpublished checkout:
git clone https://github.com/boxsie/ensemble.git
pip install -e ensemble/clients/pythonConnect to a daemon and inspect the service handle (address, onion) the
moment the registration completes:
import asyncio
from ensemble import ACL, Client
async def main():
async with Client(
socket_path="/run/ensemble/sock",
auth_seed="/etc/ensemble/admin.seed",
) as client:
async with await client.register("echo", acl=ACL.CONTACTS) as svc:
print(f"Registered: {svc.address} (onion={svc.onion})")
asyncio.run(main())Loop the events iterator and echo every inbound chat back to the sender:
import asyncio
from ensemble import ACL, ChatMessage, Client
async def main():
async with Client(
socket_path="/run/ensemble/sock",
auth_seed="/etc/ensemble/admin.seed",
) as client:
async with await client.register("echo", acl=ACL.CONTACTS) as svc:
print(f"Registered: {svc.address} {svc.onion}")
async for ev in svc.events():
if isinstance(ev, ChatMessage):
await svc.send_message(ev.from_addr, f"echo: {ev.text}")
asyncio.run(main())See examples/echo.py for a complete runnable example
with argparse + connection-request handling.
For local development the daemon's --signaling=loopback mode (Ensemble
T2) skips Tor bootstrap entirely and brings the gRPC + signaling layers
up in <200ms, using a per-host Unix-socket rendezvous directory in
place of the DHT. Two daemons on the same host pointed at the same
rendezvous directory can discover each other and complete a contact
handshake without ever touching Tor. Ideal for iterating on
examples/echo.py and examples/echo_rpc.py.
# Terminal 1 — start a daemon in loopback mode.
ensemble --headless --signaling=loopback \
--data-dir /tmp/echo-daemon \
--api-addr 127.0.0.1:9090
# Terminal 2 — register the echo service against it.
python examples/echo.py --addr localhost:9090No --tor-path, no ENSEMBLE_ADMIN_KEY, no onion descriptor. The
daemon's GetStatus still reports tor_state == "ready" once the
loopback backend is up — that's a compat shim for SDK fixtures, not a
Tor claim. Loopback is Linux-only (uses Unix domain sockets); pass
--loopback-dir /custom/path if $XDG_RUNTIME_DIR/ensemble-loopback/
isn't a good fit. HTTP-transport services (the built-in ui service)
are not supported under loopback — chat and RPC transports work
unchanged.
Pass exactly one of socket_path or addr to Client:
socket_path="/run/ensemble/sock"— Unix socket (the typical k8s sidecar setup).addr="localhost:9090"— TCP, optionally withtls=True.
The auth_seed argument may be either raw bytes or a path to a file
containing the seed (32 raw bytes, or 64 ASCII hex characters). It must match
the daemon's configured admin key (see ensemble keygen in the main repo).
Pass tls=True to use TLS. The client uses the system trust store;
self-signed daemon certs (e.g. behind a LAN CA) need either the CA installed
in the trust store or GRPC_DEFAULT_SSL_ROOTS_FILE_PATH pointing at it.
There is no clean equivalent to the Go CLI's --tls-insecure flag in
grpc-python; the parameter exists for API parity but does not currently
disable verification.
When the daemon installs and supervises a service, it execs the child with a
spawn contract in the environment and mints a per-spawn token. A supervised
service must present that token on RegisterService (admin-signed auth alone
is the registry/sidecar path, not the supervised one) — the daemon clamps the
manifest to the install-time capability ceiling and binds the token to the
installed name.
Client.from_env() builds the right client for either deployment, with this
precedence:
ENSEMBLE_SERVICE_TOKENset → supervised: dialENSEMBLE_SOCKETand attach the token asx-service-tokenmetadata onRegisterService. No seed — the token is the auth.- otherwise → standalone / sidecar: dial
ENSEMBLE_SOCKETand, ifENSEMBLE_AUTH_SEEDnames a seed file, sign every RPC with it.
Read the name to register under (and the per-service data dir) from
SpawnContext.from_env():
from ensemble import Client, SpawnContext
ctx = SpawnContext.from_env() # socket / name / token / data_dir
async with Client.from_env() as client:
svc = await client.register(ctx.service_name or "my-service", ...)
# ctx.data_dir is the writable dir the daemon allocated for this service.The spawn-contract env vars (ENSEMBLE_SOCKET, ENSEMBLE_SERVICE_NAME,
ENSEMBLE_SERVICE_TOKEN, ENSEMBLE_DATA_DIR) are set by the daemon's
supervisor — you don't set them yourself. Passing service_token=... to
Client(...) directly is the lower-level escape hatch from_env() wraps.
ServiceHandle.events() yields decoded dataclasses, not raw protobuf:
ChatMessage(type, from_addr, text, ts)— inbound chat.ConnectionRequest(type, request_id, from_addr)— inbound connection awaiting accept/reject. Respond withsvc.accept_connection(request_id)orsvc.reject_connection(request_id, reason).UnknownEvent(type, payload)— forward-compat fallback for event types the client version doesn't recognise.
The daemon enforces backpressure with a 256-deep per-stream queue and drops oldest events under sustained load (no on-wire signal). Consume events promptly.
Two distinct identities can send a message, and picking the wrong one is the difference between an accepted dial and a rejected one:
- Operator / node identity —
Client.connect(addr),Client.send(addr, text),Client.events(). These dial and sign as the daemon's node master key — the operator's address. Use this when the peer gates on the operator (e.g. an allowlist admitting the operator). Replies arrive onClient.events()aschat_messageevents. - Service identity —
ServiceHandle.connect_peer(addr)/send_message(addr, text)on a handle fromregister(...). These dial as the registered service's own ephemeral identity (ConnectAs, ticket5fb8deb1). The peer's gate sees the service address, not the operator.
import asyncio
from ensemble import Client, DaemonChatMessage
async def main():
async with Client(
addr="me.example:443", tls=True,
auth_seed="/etc/ensemble/admin.seed",
) as client:
# Dial + chat as the operator (the peer's allowlist sees the operator).
result = await client.connect("Ejeff...")
if not result.accepted:
print("dial refused:", result.message)
return
await client.send("Ejeff...", "hello from the operator")
# Surface the reply.
async for ev in client.events():
msg = DaemonChatMessage.from_event(ev)
if msg and msg.direction == "incoming":
print(f"{msg.from_addr}: {msg.text}")
break
asyncio.run(main())Client.events() is the node-level stream — distinct from
ServiceHandle.events() (the per-service bidi stream). It yields
DaemonEvent(type, payload) where payload is the decoded JSON object; decode
chat with DaemonChatMessage.from_event(...). The bus also echoes the node's
own outbound sends, so filter on direction == "incoming" for replies only.
For services that accept callers beyond the contact list, three primitives
work together (see examples/matchmaker_stub.py):
transport=Transport.RPConclient.register(...)opts the service into raw protobuf bytes both directions. Reply viasvc.send_bytes(to_addr, payload); receive via either ansvc.on_rpc_message(handler)callback orRpcMessageitems fromsvc.events().svc.introduce_peers(to_addr, other_addr, session_id, expires_at_ms, role_hint="", payload=b"")asks the daemon to introduce two peers to each other. The receiving peer gets aPeerIntroductionevent with a daemon-attestedfrom_service_addr— provenance comes free; replay protection (session_id+expires_at) is consumer-side.max_payload_bytesandrate_limit_per_minute/rate_limit_burston the manifest cap inbound abuse. Oversize / throttled inbound envelopes surface asPayloadTooLargeError/RateLimitedErrorfromevents()— branch on the typed exception, do NOT string-matchmessage.
keypair_seedon the manifest is currently advisory: the daemon's keystore is append-only and ignores externally-supplied seeds. Pin to the server-issuedaddressfromServiceRegisteredfor stability across restarts. (T07 limitation; tracked for follow-up.)ServiceHandle.send_message/connect_peerdial as the registered service's own identity; for operator-identity chat useClient.send/Client.connectinstead (see Operator chat vs service-identity dials). Mixing them up is the usual cause of a peer rejecting a dial it should accept.- Async-only API. There's no synchronous wrapper; use
asyncio.runor embed in your existing event loop.
The ensemble/_proto/*.py files are checked in. Regenerate via the
top-level Makefile (canonical entrypoint — regenerates Go and Python
stubs together so they stay in lock-step):
make protoensemble-client is versioned independently from the daemon and from the
.NET client.
- Tag pattern for PyPI releases:
client-python/v<MAJOR>.<MINOR>.<PATCH>(e.g.client-python/v0.1.0). The publish workflow atci/workflows/python-publish.ymlis wired to trigger only on tags matching that prefix, so daemon (v*.*.*) and .NET (client-dotnet/v*.*.*) tags never accidentally cut a Python release. - Pre-1.0: minor bumps may include breaking changes. Pin the major+minor
(e.g.
ensemble-client~=0.1.0) if you depend on this in production. - Post-1.0: semver. Breaking changes bump the major.
- Daemon + source: https://github.com/boxsie/ensemble
- Issues: https://github.com/boxsie/ensemble/issues
- PyPI: https://pypi.org/project/ensemble-client/