Skip to content

Latest commit

 

History

History
276 lines (216 loc) · 10.8 KB

File metadata and controls

276 lines (216 loc) · 10.8 KB

ensemble-client (Python)

Python client library for the Ensemble decentralized P2P messaging daemon. Wraps the RegisterService bidi gRPC stream so an external Python process can register a service against a local daemon and exchange chat messages with peers.

Installation

pip install ensemble-client

Requires Python 3.11+. Depends on grpcio, protobuf, cryptography.

For local development against an unpublished checkout:

git clone https://github.com/boxsie/ensemble.git
pip install -e ensemble/clients/python

Quick start

Connect to a daemon and inspect the service handle (address, onion) the moment the registration completes:

import asyncio
from ensemble import ACL, Client

async def main():
    async with Client(
        socket_path="/run/ensemble/sock",
        auth_seed="/etc/ensemble/admin.seed",
    ) as client:
        async with await client.register("echo", acl=ACL.CONTACTS) as svc:
            print(f"Registered: {svc.address} (onion={svc.onion})")

asyncio.run(main())

Service registration quickstart

Loop the events iterator and echo every inbound chat back to the sender:

import asyncio
from ensemble import ACL, ChatMessage, Client

async def main():
    async with Client(
        socket_path="/run/ensemble/sock",
        auth_seed="/etc/ensemble/admin.seed",
    ) as client:
        async with await client.register("echo", acl=ACL.CONTACTS) as svc:
            print(f"Registered: {svc.address} {svc.onion}")
            async for ev in svc.events():
                if isinstance(ev, ChatMessage):
                    await svc.send_message(ev.from_addr, f"echo: {ev.text}")

asyncio.run(main())

See examples/echo.py for a complete runnable example with argparse + connection-request handling.

Running examples against a loopback daemon

For local development the daemon's --signaling=loopback mode (Ensemble T2) skips Tor bootstrap entirely and brings the gRPC + signaling layers up in <200ms, using a per-host Unix-socket rendezvous directory in place of the DHT. Two daemons on the same host pointed at the same rendezvous directory can discover each other and complete a contact handshake without ever touching Tor. Ideal for iterating on examples/echo.py and examples/echo_rpc.py.

# Terminal 1 — start a daemon in loopback mode.
ensemble --headless --signaling=loopback \
  --data-dir /tmp/echo-daemon \
  --api-addr 127.0.0.1:9090

# Terminal 2 — register the echo service against it.
python examples/echo.py --addr localhost:9090

No --tor-path, no ENSEMBLE_ADMIN_KEY, no onion descriptor. The daemon's GetStatus still reports tor_state == "ready" once the loopback backend is up — that's a compat shim for SDK fixtures, not a Tor claim. Loopback is Linux-only (uses Unix domain sockets); pass --loopback-dir /custom/path if $XDG_RUNTIME_DIR/ensemble-loopback/ isn't a good fit. HTTP-transport services (the built-in ui service) are not supported under loopback — chat and RPC transports work unchanged.

Connecting

Pass exactly one of socket_path or addr to Client:

  • socket_path="/run/ensemble/sock" — Unix socket (the typical k8s sidecar setup).
  • addr="localhost:9090" — TCP, optionally with tls=True.

The auth_seed argument may be either raw bytes or a path to a file containing the seed (32 raw bytes, or 64 ASCII hex characters). It must match the daemon's configured admin key (see ensemble keygen in the main repo).

TLS

Pass tls=True to use TLS. The client uses the system trust store; self-signed daemon certs (e.g. behind a LAN CA) need either the CA installed in the trust store or GRPC_DEFAULT_SSL_ROOTS_FILE_PATH pointing at it. There is no clean equivalent to the Go CLI's --tls-insecure flag in grpc-python; the parameter exists for API parity but does not currently disable verification.

Supervised installs (Client.from_env())

When the daemon installs and supervises a service, it execs the child with a spawn contract in the environment and mints a per-spawn token. A supervised service must present that token on RegisterService (admin-signed auth alone is the registry/sidecar path, not the supervised one) — the daemon clamps the manifest to the install-time capability ceiling and binds the token to the installed name.

Client.from_env() builds the right client for either deployment, with this precedence:

  1. ENSEMBLE_SERVICE_TOKEN set → supervised: dial ENSEMBLE_SOCKET and attach the token as x-service-token metadata on RegisterService. No seed — the token is the auth.
  2. otherwise → standalone / sidecar: dial ENSEMBLE_SOCKET and, if ENSEMBLE_AUTH_SEED names a seed file, sign every RPC with it.

Read the name to register under (and the per-service data dir) from SpawnContext.from_env():

from ensemble import Client, SpawnContext

ctx = SpawnContext.from_env()           # socket / name / token / data_dir
async with Client.from_env() as client:
    svc = await client.register(ctx.service_name or "my-service", ...)
    # ctx.data_dir is the writable dir the daemon allocated for this service.

The spawn-contract env vars (ENSEMBLE_SOCKET, ENSEMBLE_SERVICE_NAME, ENSEMBLE_SERVICE_TOKEN, ENSEMBLE_DATA_DIR) are set by the daemon's supervisor — you don't set them yourself. Passing service_token=... to Client(...) directly is the lower-level escape hatch from_env() wraps.

Events

ServiceHandle.events() yields decoded dataclasses, not raw protobuf:

  • ChatMessage(type, from_addr, text, ts) — inbound chat.
  • ConnectionRequest(type, request_id, from_addr) — inbound connection awaiting accept/reject. Respond with svc.accept_connection(request_id) or svc.reject_connection(request_id, reason).
  • UnknownEvent(type, payload) — forward-compat fallback for event types the client version doesn't recognise.

The daemon enforces backpressure with a 256-deep per-stream queue and drops oldest events under sustained load (no on-wire signal). Consume events promptly.

Operator chat vs service-identity dials

Two distinct identities can send a message, and picking the wrong one is the difference between an accepted dial and a rejected one:

  • Operator / node identityClient.connect(addr), Client.send(addr, text), Client.events(). These dial and sign as the daemon's node master key — the operator's address. Use this when the peer gates on the operator (e.g. an allowlist admitting the operator). Replies arrive on Client.events() as chat_message events.
  • Service identityServiceHandle.connect_peer(addr) / send_message(addr, text) on a handle from register(...). These dial as the registered service's own ephemeral identity (ConnectAs, ticket 5fb8deb1). The peer's gate sees the service address, not the operator.
import asyncio
from ensemble import Client, DaemonChatMessage

async def main():
    async with Client(
        addr="me.example:443", tls=True,
        auth_seed="/etc/ensemble/admin.seed",
    ) as client:
        # Dial + chat as the operator (the peer's allowlist sees the operator).
        result = await client.connect("Ejeff...")
        if not result.accepted:
            print("dial refused:", result.message)
            return
        await client.send("Ejeff...", "hello from the operator")
        # Surface the reply.
        async for ev in client.events():
            msg = DaemonChatMessage.from_event(ev)
            if msg and msg.direction == "incoming":
                print(f"{msg.from_addr}: {msg.text}")
                break

asyncio.run(main())

Client.events() is the node-level stream — distinct from ServiceHandle.events() (the per-service bidi stream). It yields DaemonEvent(type, payload) where payload is the decoded JSON object; decode chat with DaemonChatMessage.from_event(...). The bus also echoes the node's own outbound sends, so filter on direction == "incoming" for replies only.

Public services (RPC transport + introductions)

For services that accept callers beyond the contact list, three primitives work together (see examples/matchmaker_stub.py):

  • transport=Transport.RPC on client.register(...) opts the service into raw protobuf bytes both directions. Reply via svc.send_bytes(to_addr, payload); receive via either an svc.on_rpc_message(handler) callback or RpcMessage items from svc.events().
  • svc.introduce_peers(to_addr, other_addr, session_id, expires_at_ms, role_hint="", payload=b"") asks the daemon to introduce two peers to each other. The receiving peer gets a PeerIntroduction event with a daemon-attested from_service_addr — provenance comes free; replay protection (session_id + expires_at) is consumer-side.
  • max_payload_bytes and rate_limit_per_minute / rate_limit_burst on the manifest cap inbound abuse. Oversize / throttled inbound envelopes surface as PayloadTooLargeError / RateLimitedError from events() — branch on the typed exception, do NOT string-match message.

Caveats

  • keypair_seed on the manifest is currently advisory: the daemon's keystore is append-only and ignores externally-supplied seeds. Pin to the server-issued address from ServiceRegistered for stability across restarts. (T07 limitation; tracked for follow-up.)
  • ServiceHandle.send_message / connect_peer dial as the registered service's own identity; for operator-identity chat use Client.send / Client.connect instead (see Operator chat vs service-identity dials). Mixing them up is the usual cause of a peer rejecting a dial it should accept.
  • Async-only API. There's no synchronous wrapper; use asyncio.run or embed in your existing event loop.

Regenerating the gRPC stubs

The ensemble/_proto/*.py files are checked in. Regenerate via the top-level Makefile (canonical entrypoint — regenerates Go and Python stubs together so they stay in lock-step):

make proto

Versioning

ensemble-client is versioned independently from the daemon and from the .NET client.

  • Tag pattern for PyPI releases: client-python/v<MAJOR>.<MINOR>.<PATCH> (e.g. client-python/v0.1.0). The publish workflow at ci/workflows/python-publish.yml is wired to trigger only on tags matching that prefix, so daemon (v*.*.*) and .NET (client-dotnet/v*.*.*) tags never accidentally cut a Python release.
  • Pre-1.0: minor bumps may include breaking changes. Pin the major+minor (e.g. ensemble-client~=0.1.0) if you depend on this in production.
  • Post-1.0: semver. Breaking changes bump the major.

Links