diff --git a/src/gen_worker/cozy_cas.py b/src/gen_worker/cozy_cas.py index 6866c89..45836a0 100644 --- a/src/gen_worker/cozy_cas.py +++ b/src/gen_worker/cozy_cas.py @@ -324,7 +324,7 @@ async def _download_one_file(url: str, dst: Path, expected_size: int, expected_b # are not killed. total=None lets multi-GB downloads run as long as data # keeps flowing; sock_read=120 catches genuine stalls. timeout = aiohttp.ClientTimeout(total=None, sock_connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_CONNECT_TIMEOUT_S", "60")), - sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "120"))) + sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "180"))) tmp = dst.with_suffix(dst.suffix + ".part") # If we have a partial file, try to resume via HTTP Range. offset = 0 diff --git a/src/gen_worker/cozy_snapshot_v2_downloader.py b/src/gen_worker/cozy_snapshot_v2_downloader.py index cd74459..9399988 100644 --- a/src/gen_worker/cozy_snapshot_v2_downloader.py +++ b/src/gen_worker/cozy_snapshot_v2_downloader.py @@ -1,7 +1,9 @@ from __future__ import annotations import asyncio +import json import os +import re import shutil import threading from pathlib import Path @@ -21,6 +23,39 @@ def _blob_path(blobs_root: Path, digest: str) -> Path: return blobs_root / "blake3" / digest[:2] / digest[2:4] / digest +_PART_FILE_RE = re.compile(r"\.part\d{4}$") + + +def _strip_blake3_prefix(digest: str) -> str: + """Strip the 'blake3:' scheme prefix if present, returning the bare hex.""" + d = (digest or "").strip().lower() + if d.startswith("blake3:"): + d = d[len("blake3:"):] + return d + + +def _is_part_file(path: str) -> bool: + """Return True if the path is a chunked part file (e.g. foo.part0001).""" + return bool(_PART_FILE_RE.search(path)) + + +def _is_parts_manifest(path: str) -> bool: + """Return True if the path is a chunked-blob manifes (e.g. foo.parts.json).""" + return path.endswith(".parts.json") + + +def _resolve_field(obj: Any, *keys: str) -> Any: + """Get a field from either a dict or an attribute-bearing object, trying keys in order.""" + for k in keys: + if isinstance(obj, dict): + v = obj.get(k) + else: + v = getattr(obj, k, None) + if v is not None: + return v + return None + + def _try_hardlink_or_copy(src: Path, dst: Path) -> None: if dst.exists(): dst.unlink() @@ -38,24 +73,34 @@ def _try_hardlink_or_copy(src: Path, dst: Path) -> None: def _coerce_resolved_model(ref: CozyRef, resolved: Any) -> CozyHubResolveArtifactResult: - """Coerce an orchestrator-provided resolved model object into CozyHubResolveArtifactResult.""" - snapshot_digest = str(getattr(resolved, "snapshot_digest", "") or "").strip() - if not snapshot_digest: - snapshot_digest = str(getattr(resolved, "snapshotDigest", "") or "").strip() + """Coerce an orchestrator-provided resolved model object into CozyHubResolveArtifactResult. + + Handles two wire shapes: + - Legacy: .snapshot_digest (bare hex) + .files[] + - v2: .snapshot_digest ('blake3:...' prefixed) + .entries[] (new chunked-blob format) + + Both protobuf attribute access and plain-dict access are supported. + """ + snapshot_digest = str(_resolve_field(resolved, "snapshot_digest", "snapshotDigest") or "").strip() if not snapshot_digest: raise ValueError("resolved model missing snapshot_digest") + # Strip scheme prefix so the digest is a bare hex string suitable for path use. + snapshot_digest = _strip_blake3_prefix(snapshot_digest) or snapshot_digest - files_raw = list(getattr(resolved, "files", []) or []) + # New format uses "entries"; legacy format uses "files". + files_raw = list(_resolve_field(resolved, "entries", "files") or []) files: List[CozyHubSnapshotFile] = [] for ent in files_raw: - path = str(getattr(ent, "path", "") or "").strip() + path = str(_resolve_field(ent, "path") or "").strip() if not path: continue - blake3_hex = str(getattr(ent, "blake3", "") or "").strip().lower() + # Prefer bare "blake3" field; fall back to "digest" which may carry the prefix. + blake3_hex = str(_resolve_field(ent, "blake3", "BLAKE3") or "").strip().lower() if not blake3_hex: - blake3_hex = str(getattr(ent, "BLAKE3", "") or "").strip().lower() - url = str(getattr(ent, "url", "") or "").strip() or None - size_bytes = int(getattr(ent, "size_bytes", 0) or 0) + digest_raw = str(_resolve_field(ent, "digest") or "").strip().lower() + blake3_hex = _strip_blake3_prefix(digest_raw) + url = str(_resolve_field(ent, "url") or "").strip() or None + size_bytes = int(_resolve_field(ent, "size_bytes") or 0) if not blake3_hex or not url: raise ValueError(f"resolved model file missing blake3/url: {path}") files.append(CozyHubSnapshotFile(path=path, size_bytes=size_bytes, blake3=blake3_hex, url=url)) @@ -123,7 +168,50 @@ async def ensure_snapshot( tmp = snaps_root / f"{res.snapshot_digest}.building" tmp.mkdir(parents=True, exist_ok=True) + + # Reassemble any chunked files (produced by chunkedblob on ingest). + # A ".parts.json" entry describes how to reassemble N part blobs into + # the original file. Part blobs and the manifest itself are all already + # in the blob store at this point. + parts_manifest_entries = [f for f in res.files if _is_parts_manifest(f.path)] + part_file_paths = {f.path for f in res.files if _is_part_file(f.path)} + + for pm_entry in parts_manifest_entries: + parts_json_blob = _blob_path(blobs_root, pm_entry.blake3) + try: + parts_manifest = json.loads(parts_json_blob.read_bytes()) + except Exception as exc: + raise ValueError(f"failed to parse parts manifest {pm_entry.path}: {exc}") from exc + + original_path = str(parts_manifest.get("original_path") or "").strip() + if not original_path: + raise ValueError(f"parts manifest {pm_entry.path} missing original_path") + parts = parts_manifest.get("parts") or [] + if not parts: + raise ValueError(f"parts manifest {pm_entry.path} has no parts") + + rel = _norm_rel_path(original_path) + dst = tmp / rel + dst.parent.mkdir(parents=True, exist_ok=True) + if dst.exists(): + dst.unlink() + + with open(dst, "wb") as out_f: + for part in parts: + part_digest = _strip_blake3_prefix( + str(part.get("digest") or "").strip().lower() + ) + if not part_digest: + raise ValueError(f"part entry in {pm_entry.path} missing digest") + part_blob = _blob_path(blobs_root, part_digest) + with open(part_blob, "rb") as in_f: + shutil.copyfileobj(in_f, out_f) + + # Materialize regular files; skip part files and parts manifests since they + # have already been consumed above during reassembly. for f in res.files: + if _is_parts_manifest(f.path) or f.path in part_file_paths: + continue rel = _norm_rel_path(f.path) dst = tmp / rel dst.parent.mkdir(parents=True, exist_ok=True) diff --git a/uv.lock b/uv.lock index 7db9fb9..056c247 100644 --- a/uv.lock +++ b/uv.lock @@ -671,7 +671,7 @@ wheels = [ [[package]] name = "gen-worker" -version = "0.3.8" +version = "0.3.9" source = { editable = "." } dependencies = [ { name = "aiohttp" },