-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstaller.py
More file actions
359 lines (295 loc) · 13.8 KB
/
installer.py
File metadata and controls
359 lines (295 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
#!/usr/bin/env python3
"""WCM app installer.
Registry-first dev-time tool for adding WaveLang app-policy packages.
Current responsibilities:
1. Compile .wl -> .wpk.json.
2. Deploy the package into sdk_apps/system_policy/.
3. Update the canonical policy registry.
4. Regenerate derived registry artifacts and validate sync.
This tool does not patch Python runtime tables anymore. Runtime consumption of
boot packages, launcher metadata, and app-policy dispatch is now driven by the
policy registry outputs.
Operational rule:
If a new app kind requires additional static wiring, the fix should land in
`policy_registry.json` or `policy_registry.py`, not as a new handwritten
patch step here. This file is intentionally narrow: it stages package build
and deploy work, updates canonical metadata, then defers derivation to the
registry generator.
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
ROOT = os.path.dirname(os.path.abspath(__file__))
SYSTEM_POLICY_DIR = os.path.join(ROOT, "sdk_apps", "system_policy")
POLICY_REGISTRY = os.path.join(SYSTEM_POLICY_DIR, "policy_registry.json")
POLICY_REGISTRY_TOOL = os.path.join(ROOT, "policy_registry.py")
WAVELANG = os.path.join(ROOT, "wavelang.py")
STORAGE_MAP = os.path.join(SYSTEM_POLICY_DIR, "storage_map.json")
PYTHON = sys.executable
def read_text(path):
"""Read a UTF-8 text file and return its contents."""
with open(path, "r", encoding="utf-8") as f:
return f.read()
def read_json(path):
"""Read a UTF-8 JSON file and return the parsed payload."""
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def write_json(path, obj):
"""Write JSON with stable indentation used across WCM metadata files."""
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, indent="\t", ensure_ascii=False)
f.write("\n")
def extract_package_name(wl_path):
"""Extract the declared WaveLang package id from a `.wl` source file."""
text = read_text(wl_path)
match = re.search(r'^\s*package\s+"([^"]+)"', text, re.MULTILINE)
if not match:
raise SystemExit(f"ERROR: No 'package \"...\"' declaration found in {wl_path}")
return match.group(1)
def safe_var(kind):
return kind.lower().replace("-", "_")
def bind_flow_name(kind):
return f"policy_bind_{safe_var(kind)}_flow"
def tick_flow_name(kind):
return f"policy_tick_{safe_var(kind)}_flow"
def tick_out_reg_name(kind):
return f"pt_{safe_var(kind)}_ok"
def step_compile(wl_path):
"""Compile one WaveLang source file into its `.wpk.json` artifact."""
print(f"[1/4] Compiling {wl_path} ...")
env = dict(os.environ, PYTHONIOENCODING="utf-8")
result = subprocess.run([PYTHON, WAVELANG, wl_path], capture_output=True, text=True, encoding="utf-8", env=env)
if result.returncode != 0:
print(result.stderr)
raise SystemExit("Compilation failed.")
print(f" {result.stdout.strip()}")
wpk_path = wl_path.replace(".wl", ".wpk.json")
if not os.path.isfile(wpk_path):
raise SystemExit(f"Expected output {wpk_path} not found.")
return wpk_path
def step_deploy(wpk_path, package_name):
"""Copy the compiled package into the system policy package directory."""
dst = os.path.join(SYSTEM_POLICY_DIR, f"{package_name}.wpk.json")
print(f"[2/4] Deploying -> {os.path.relpath(dst, ROOT)}")
shutil.copy2(wpk_path, dst)
return dst
def step_policy_registry(package_name, kind, icon, description, domain, launcher_name, max_instructions):
"""Upsert the app-policy entry in the canonical registry.
Existing typed opcode and hardware metadata are preserved so the installer
can safely update launch/tick metadata without erasing previously-curated
compatibility fields.
"""
print("[3/4] Updating policy_registry.json ...")
registry = read_json(POLICY_REGISTRY)
packages = registry.get("packages", [])
if not isinstance(packages, list):
raise SystemExit("policy_registry.json has invalid 'packages' payload")
existing = None
existing_index = -1
for index, package in enumerate(packages):
if isinstance(package, dict) and str(package.get("package_id", "")) == package_name:
existing = package
existing_index = index
break
launcher_enabled = True
tick_out_reg = tick_out_reg_name(kind)
if isinstance(existing, dict):
launcher_meta = existing.get("launcher", {}) if isinstance(existing.get("launcher"), dict) else {}
launcher_enabled = bool(launcher_meta.get("enabled", True))
tick_runtime = existing.get("tick_runtime", {}) if isinstance(existing.get("tick_runtime"), dict) else {}
tick_out_reg = str(tick_runtime.get("out_reg", "")).strip() or tick_out_reg
entry = {
"package_id": package_name,
"package_file": f"{package_name}.wpk.json",
"package_role": "app-policy",
"kind": kind,
"boot_required": True,
"bind_flow": bind_flow_name(kind),
"tick_flow": tick_flow_name(kind),
"tick_runtime": {
"out_reg": tick_out_reg,
"max_instructions": int(max_instructions),
},
"launcher": {
"name": launcher_name,
"icon": icon,
"description": description,
"default_domain": domain,
"enabled": launcher_enabled,
},
}
if isinstance(existing, dict):
if isinstance(existing.get("typed_opcodes"), dict):
entry["typed_opcodes"] = existing["typed_opcodes"]
if isinstance(existing.get("hardware"), dict):
entry["hardware"] = existing["hardware"]
packages[existing_index] = entry
print(f" Updated '{package_name}'.")
else:
packages.append(entry)
print(f" Added '{package_name}'.")
registry["packages"] = packages
write_json(POLICY_REGISTRY, registry)
def load_storage_map():
"""Load the storage allocation registry used for overlap checks."""
if os.path.isfile(STORAGE_MAP):
return read_json(STORAGE_MAP).get("zones", {})
return {}
def save_storage_map(zones):
"""Persist the storage allocation registry after a successful update."""
write_json(STORAGE_MAP, {"zones": zones})
def zone_bounds(zone):
"""Return guarded bounds for a declared storage zone."""
guard_width = zone.get("guard_width", 0)
return (
zone["r0"] - guard_width,
zone["r0"] + zone["rows"] + guard_width,
zone["c0"] - guard_width,
zone["c0"] + zone["cols"] + guard_width,
)
def zones_overlap(a, b):
"""Return True when two guarded storage zones intersect."""
ar0, ar1, ac0, ac1 = zone_bounds(a)
br0, br1, bc0, bc1 = zone_bounds(b)
return ar0 < br1 and br0 < ar1 and ac0 < bc1 and bc0 < ac1
def print_storage_map(zones):
"""Render the current storage allocation table for `--show-storage`."""
if not zones:
print(" (no zones registered)")
return
print(f" {'Zone':<20s} {'Rows':>12s} {'Cols':>12s} Source")
print(f" {'─' * 20} {'─' * 12} {'─' * 12} {'─' * 30}")
for name in sorted(zones, key=lambda n: zones[n]["r0"]):
zone = zones[name]
r0, r1, c0, c1 = zone_bounds(zone)
print(f" {name:<20s} {r0:4d} – {r1:<4d} {c0:4d} – {c1:<4d} {zone.get('source', '?')}")
def extract_storage_allocs(wl_path):
"""Parse `storage_alloc(...)` declarations from a WaveLang source file."""
text = read_text(wl_path)
allocs = {}
for match in re.finditer(r"storage_alloc\(([^)]+)\)", text):
payload = {}
for kv in re.finditer(r'(\w+)\s*=\s*("[^"]*"|\d+)', match.group(1)):
key, value = kv.group(1), kv.group(2).strip('"')
try:
value = int(value)
except ValueError:
pass
payload[key] = value
if "name" in payload and "r0" in payload:
allocs[payload["name"]] = {
"source": os.path.relpath(wl_path, ROOT).replace("\\", "/"),
"r0": payload["r0"],
"c0": payload.get("c0", 0),
"rows": payload.get("rows", 0),
"cols": payload.get("cols", 0),
"guard_width": payload.get("guard_width", 0),
}
return allocs
def step_storage_map(wl_path):
"""Validate and merge any storage allocations declared by the app source."""
allocs = extract_storage_allocs(wl_path)
if not allocs:
return
existing = load_storage_map()
src = os.path.relpath(wl_path, ROOT).replace("\\", "/")
errors = []
for name, zone in allocs.items():
for existing_name, existing_zone in existing.items():
if existing_name == name:
continue
if existing_zone.get("source") == src:
continue
if zones_overlap(zone, existing_zone):
zone_bounds_now = zone_bounds(zone)
existing_bounds = zone_bounds(existing_zone)
errors.append(
f" OVERLAP: '{name}' rows {zone_bounds_now[0]}–{zone_bounds_now[1]} cols {zone_bounds_now[2]}–{zone_bounds_now[3]}"
f" vs '{existing_name}' rows {existing_bounds[0]}–{existing_bounds[1]} cols {existing_bounds[2]}–{existing_bounds[3]}"
f" (from {existing_zone.get('source', '?')})"
)
if errors:
print(" Storage overlap detected!")
for error in errors:
print(error)
raise SystemExit("Fix storage_alloc regions to avoid overlaps.")
for name, zone in allocs.items():
existing[name] = zone
save_storage_map(existing)
print(f" Storage map updated: {', '.join(sorted(allocs.keys()))}")
def step_generate_policy_registry():
"""Run registry generation followed immediately by parity validation."""
print("[4/4] Regenerating derived registry outputs ...")
generate = subprocess.run([PYTHON, POLICY_REGISTRY_TOOL, "generate"], capture_output=True, text=True, encoding="utf-8")
if generate.returncode != 0:
print(generate.stdout)
print(generate.stderr)
raise SystemExit("policy_registry.py generate failed")
check = subprocess.run([PYTHON, POLICY_REGISTRY_TOOL, "check"], capture_output=True, text=True, encoding="utf-8")
if check.returncode != 0:
print(check.stdout)
print(check.stderr)
raise SystemExit("policy_registry.py check failed after generation")
if generate.stdout.strip():
print(f" {generate.stdout.strip()}")
if check.stdout.strip():
print(f" {check.stdout.strip()}")
def main():
"""CLI entry point for installing or updating one WaveLang app package."""
parser = argparse.ArgumentParser(description="WCM App Installer")
parser.add_argument("source", nargs="?", help="Path to .wl source file")
parser.add_argument("--kind", help="App kind string (e.g. 'clock', 'my-app')")
parser.add_argument("--icon", default="[?]", help="Launcher icon text")
parser.add_argument("--description", default="", help="Launcher description")
parser.add_argument("--domain", default="p03", help="Default launcher domain")
parser.add_argument("--name", default=None, help="Launcher name (default: derived from kind)")
parser.add_argument("--max-instructions", type=int, default=256, help="Max instructions per tick")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without modifying files")
parser.add_argument("--show-storage", action="store_true", help="Show storage map and exit")
args = parser.parse_args()
if args.show_storage:
print("═══════════════════════════════════════════════════════")
print(" WCM Storage Map")
print("═══════════════════════════════════════════════════════")
print_storage_map(load_storage_map())
return
if not args.source:
parser.error("source is required (unless --show-storage)")
if not args.kind:
parser.error("--kind is required")
wl_path = os.path.abspath(args.source)
if not os.path.isfile(wl_path):
raise SystemExit(f"Source file not found: {wl_path}")
package_name = extract_package_name(wl_path)
launcher_name = args.name or args.kind.replace("-", "").replace("_", "")
kind = args.kind
print("═══════════════════════════════════════════════════════")
print(" WCM App Installer")
print(f" Source: {os.path.relpath(wl_path, ROOT)}")
print(f" Package: {package_name}")
print(f" Kind: {kind}")
print(f" Launcher: {launcher_name} {args.icon}")
print(f" Domain: {args.domain}")
print(f" Budget: {args.max_instructions} instructions/tick")
print("═══════════════════════════════════════════════════════")
if args.dry_run:
print("\n[DRY RUN] No files modified.")
return
wpk_path = step_compile(wl_path)
step_storage_map(wl_path)
step_deploy(wpk_path, package_name)
step_policy_registry(package_name, kind, args.icon, args.description, args.domain, launcher_name, args.max_instructions)
step_generate_policy_registry()
print(f"\n{'═' * 55}")
print(f" DONE — {package_name} installed as '{kind}'")
print(" Restart the OS to activate.")
print("")
print(" NOTE: If you edit the .wl source later, re-run")
print(" this installer to recompile and redeploy.")
print(f"{'═' * 55}")
if __name__ == "__main__":
main()