-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcluster.py
More file actions
187 lines (157 loc) · 6 KB
/
cluster.py
File metadata and controls
187 lines (157 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Launch a local Raft cluster for testing/demo.
Usage:
python cluster.py --nodes 3
python cluster.py --nodes 5
Spawns each node as a subprocess.
Port scheme: Node n gets RPC=5000+n, HTTP=8000+n.
Interactive commands:
kill <node_id> - Kill a node's process
start <node_id> - Restart a killed node
status - Show status of all nodes
quit - Kill all and exit
"""
import subprocess
import sys
import signal
import os
import json
import urllib.request
import argparse
class ClusterManager:
def __init__(self, num_nodes: int):
self.num_nodes = num_nodes
self.processes: dict[str, subprocess.Popen] = {}
self._nodes: dict[str, dict] = {}
self._build_config()
def _build_config(self) -> None:
for i in range(1, self.num_nodes + 1):
node_id = f"node{i}"
rpc_port = 5000 + i
http_port = 8000 + i
self._nodes[node_id] = {
"id": node_id,
"host": "127.0.0.1",
"rpc_port": rpc_port,
"http_port": http_port,
}
def _build_peers_str(self, node_id: str) -> str:
peers = []
for nid, cfg in self._nodes.items():
if nid != node_id:
peers.append(f"{nid}={cfg['host']}:{cfg['rpc_port']}")
return ",".join(peers)
def start_all(self) -> None:
for node_id in self._nodes:
self.start_node(node_id)
def start_node(self, node_id: str) -> None:
if node_id in self.processes:
proc = self.processes[node_id]
if proc.poll() is None:
print(f"{node_id} is already running (PID {proc.pid})")
return
cfg = self._nodes[node_id]
peers_str = self._build_peers_str(node_id)
data_dir = os.path.join("data", node_id)
os.makedirs(data_dir, exist_ok=True)
cmd = [
sys.executable, "server.py",
"--id", node_id,
"--host", cfg["host"],
"--port", str(cfg["rpc_port"]),
"--http-port", str(cfg["http_port"]),
"--peers", peers_str,
"--data-dir", data_dir,
]
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
self.processes[node_id] = proc
print(f"Started {node_id} (PID {proc.pid}) RPC={cfg['rpc_port']} HTTP={cfg['http_port']}")
def kill_node(self, node_id: str) -> None:
proc = self.processes.get(node_id)
if proc is None or proc.poll() is not None:
print(f"{node_id} is not running")
return
proc.terminate()
proc.wait(timeout=5)
print(f"Killed {node_id} (PID {proc.pid})")
def stop_all(self) -> None:
for node_id, proc in self.processes.items():
if proc.poll() is None:
proc.terminate()
for node_id, proc in self.processes.items():
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
print("All nodes stopped")
def get_status(self, node_id: str) -> dict | None:
cfg = self._nodes.get(node_id)
if cfg is None:
return None
try:
url = f"http://{cfg['host']}:{cfg['http_port']}/status"
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=2) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
def print_status(self) -> None:
print(f"\n{'Node':<8} {'PID':<8} {'Running':<9} {'State':<12} {'Term':<6} {'Leader':<10} {'Log':<5} {'Commit':<7}")
print("-" * 70)
for node_id in sorted(self._nodes.keys()):
proc = self.processes.get(node_id)
running = proc is not None and proc.poll() is None
pid = proc.pid if proc else "-"
status = self.get_status(node_id) if running else None
state = status["state"] if status else "-"
term = status["term"] if status else "-"
leader = status["leader_id"] if status else "-"
log_len = status["log_length"] if status else "-"
commit = status["commit_index"] if status else "-"
print(f"{node_id:<8} {str(pid):<8} {'yes' if running else 'no':<9} {str(state):<12} {str(term):<6} {str(leader):<10} {str(log_len):<5} {str(commit):<7}")
print()
def interactive_loop(self) -> None:
print("\nCommands: kill <id>, start <id>, status, quit")
while True:
try:
line = input("cluster> ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not line:
continue
parts = line.split()
cmd = parts[0].lower()
if cmd == "quit":
break
elif cmd == "status":
self.print_status()
elif cmd == "kill" and len(parts) >= 2:
self.kill_node(parts[1])
elif cmd == "start" and len(parts) >= 2:
self.start_node(parts[1])
else:
print("Unknown command. Use: kill <id>, start <id>, status, quit")
def main():
parser = argparse.ArgumentParser(description="Launch a local Raft cluster")
parser.add_argument("--nodes", type=int, default=3, choices=[3, 5], help="Number of nodes")
args = parser.parse_args()
cluster = ClusterManager(args.nodes)
# Handle SIGINT gracefully
def sigint_handler(sig, frame):
print("\nShutting down cluster...")
cluster.stop_all()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
print(f"Launching {args.nodes}-node Raft cluster...")
cluster.start_all()
import time
time.sleep(2) # Let nodes start up and elect a leader
cluster.print_status()
cluster.interactive_loop()
cluster.stop_all()
if __name__ == "__main__":
main()