-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmatchmaker_stub.py
More file actions
185 lines (159 loc) · 6.75 KB
/
Copy pathmatchmaker_stub.py
File metadata and controls
185 lines (159 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
"""Illustrative matchmaker stub — wires up all three T1-T4 primitives.
NOT production code. This sketches the shape a public matchmaker service
would take using:
* `transport=Transport.RPC` so peers send/receive raw bytes (T1)
* `introduce_peers(...)` to pair the two waiting callers (T2)
* `max_payload_bytes` + `rate_limit_per_minute` to bound abuse on a
PUBLIC-ACL service (T3 + T4)
Behaviour:
* Listens on `SERVICE_TRANSPORT_RPC`.
* Treats each inbound `RpcMessage` as a "join queue" request (payload is
ignored; a real matchmaker would parse a proto).
* Buffers join requests keyed by an arbitrary session bucket. When the
SECOND peer arrives for a bucket, calls `introduce_peers(...)` on both
with a 60-second `expires_at` horizon. The bucket is consumed.
A real matchmaker would:
* Parse a proto join-request with skill/role/game-mode and pick the
bucket from that, not "next two arrivals".
* Persist outstanding sessions so a daemon restart doesn't strand
waiters.
* Reject duplicate joins from the same peer in the same bucket.
* Use a fresh per-introduction session_id, NOT reuse the bucket key.
* Sign or wrap `payload` with a session ticket the peers verify on
direct dial.
Usage:
python matchmaker_stub.py --socket /run/ensemble/sock --auth-seed /path/to/seed
"""
from __future__ import annotations
import argparse
import asyncio
import secrets
import sys
import time
from ensemble import (
ACL,
Client,
ConnectionRequest,
PayloadTooLargeError,
RateLimitedError,
Transport,
)
# 60-second pairing horizon; long enough for both peers to dial each other
# after introduction, short enough that a stale waiter doesn't strand.
EXPIRY_MS = 60_000
class MatchQueue:
"""One pending peer per bucket key, paired with the next arrival."""
def __init__(self) -> None:
self._waiting: dict[str, str] = {} # bucket -> waiter's E-address
self._lock = asyncio.Lock()
async def offer(self, bucket: str, peer_addr: str) -> str | None:
"""Add a waiter; return the partner E-address if a pair is ready."""
async with self._lock:
partner = self._waiting.pop(bucket, None)
if partner is None or partner == peer_addr:
# First arrival, or same peer re-joining — buffer (and
# overwrite a stale self-entry).
self._waiting[bucket] = peer_addr
return None
return partner
async def main() -> int:
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--socket", help="Path to the daemon's gRPC unix socket")
group.add_argument("--addr", help="Daemon gRPC TCP address, e.g. localhost:9090")
parser.add_argument("--auth-seed", help="Path to the admin-key seed file")
parser.add_argument(
"--name", default="matchmaker", help="Service name (default: matchmaker)"
)
parser.add_argument(
"--bucket",
default="lobby",
help="Hard-coded bucket key for this illustrative stub",
)
parser.add_argument("--tls", action="store_true")
args = parser.parse_args()
queue = MatchQueue()
async with Client(
socket_path=args.socket,
addr=args.addr,
tls=args.tls,
auth_seed=args.auth_seed,
) as client:
# PUBLIC ACL is intentional: a matchmaker has to accept callers it
# has never seen before. The size + rate caps below are the
# mitigations that make that safe.
async with await client.register(
args.name,
acl=ACL.PUBLIC,
description="illustrative matchmaker stub",
transport=Transport.RPC,
max_payload_bytes=4 * 1024, # 4 KiB cap on join payloads
rate_limit_per_minute=30, # 30 joins/min per source
rate_limit_burst=5,
) as svc:
print(
f"matchmaker {svc.address} ({svc.onion}) listening; "
f"bucket={args.bucket!r}",
flush=True,
)
async def on_join(from_addr: str, payload: bytes, ts: int) -> None:
partner = await queue.offer(args.bucket, from_addr)
if partner is None:
print(f" buffered {from_addr} in bucket {args.bucket!r}", flush=True)
return
# Pair found. Mint a fresh session id (NOT the bucket key —
# bucket keys are predictable, session_id should be opaque).
session_id = secrets.token_hex(16)
expires_at = int(time.time() * 1000) + EXPIRY_MS
print(
f" pairing {from_addr} <-> {partner} session={session_id}",
flush=True,
)
# Two one-way introductions. The daemon stamps
# from_service_addr on each, so both peers can verify the
# introduction came from THIS service without needing
# extra crypto plumbing.
await svc.introduce_peers(
to_addr=from_addr,
other_addr=partner,
session_id=session_id,
expires_at_ms=expires_at,
role_hint="host",
)
await svc.introduce_peers(
to_addr=partner,
other_addr=from_addr,
session_id=session_id,
expires_at_ms=expires_at,
role_hint="joiner",
)
svc.on_rpc_message(on_join)
try:
async for ev in svc.events():
if isinstance(ev, ConnectionRequest):
# PUBLIC ACL still goes through the connection-request
# accept gate; the matchmaker auto-accepts.
await svc.accept_connection(ev.request_id)
except PayloadTooLargeError as e:
# A peer sent more than max_payload_bytes; the daemon
# dropped it and surfaced the typed error. Log and keep
# serving — a real matchmaker would also tarpit the
# offending source_addr.
print(
f" ! payload_too_large from peer: {e.message} "
f"(limit={e.limit_bytes})",
flush=True,
)
except RateLimitedError as e:
print(
f" ! rate_limited: {e.message} "
f"(retry_after_ms={e.retry_after_ms})",
flush=True,
)
return 0
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main()))
except KeyboardInterrupt:
sys.exit(130)