Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/gen_worker/cozy_cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async def _download_one_file(url: str, dst: Path, expected_size: int, expected_b
# are not killed. total=None lets multi-GB downloads run as long as data
# keeps flowing; sock_read=120 catches genuine stalls.
timeout = aiohttp.ClientTimeout(total=None, sock_connect=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_CONNECT_TIMEOUT_S", "60")),
sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "120")))
sock_read=float(os.getenv("WORKER_MODEL_DOWNLOAD_SOCK_READ_TIMEOUT_S", "180")))
tmp = dst.with_suffix(dst.suffix + ".part")
# If we have a partial file, try to resume via HTTP Range.
offset = 0
Expand Down
108 changes: 98 additions & 10 deletions src/gen_worker/cozy_snapshot_v2_downloader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import asyncio
import json
import os
import re
import shutil
import threading
from pathlib import Path
Expand All @@ -21,6 +23,39 @@ def _blob_path(blobs_root: Path, digest: str) -> Path:
return blobs_root / "blake3" / digest[:2] / digest[2:4] / digest


_PART_FILE_RE = re.compile(r"\.part\d{4}$")


def _strip_blake3_prefix(digest: str) -> str:
"""Strip the 'blake3:' scheme prefix if present, returning the bare hex."""
d = (digest or "").strip().lower()
if d.startswith("blake3:"):
d = d[len("blake3:"):]
return d


def _is_part_file(path: str) -> bool:
"""Return True if the path is a chunked part file (e.g. foo.part0001)."""
return bool(_PART_FILE_RE.search(path))


def _is_parts_manifest(path: str) -> bool:
"""Return True if the path is a chunked-blob manifes (e.g. foo.parts.json)."""
return path.endswith(".parts.json")


def _resolve_field(obj: Any, *keys: str) -> Any:
"""Get a field from either a dict or an attribute-bearing object, trying keys in order."""
for k in keys:
if isinstance(obj, dict):
v = obj.get(k)
else:
v = getattr(obj, k, None)
if v is not None:
return v
return None


def _try_hardlink_or_copy(src: Path, dst: Path) -> None:
if dst.exists():
dst.unlink()
Expand All @@ -38,24 +73,34 @@ def _try_hardlink_or_copy(src: Path, dst: Path) -> None:


def _coerce_resolved_model(ref: CozyRef, resolved: Any) -> CozyHubResolveArtifactResult:
"""Coerce an orchestrator-provided resolved model object into CozyHubResolveArtifactResult."""
snapshot_digest = str(getattr(resolved, "snapshot_digest", "") or "").strip()
if not snapshot_digest:
snapshot_digest = str(getattr(resolved, "snapshotDigest", "") or "").strip()
"""Coerce an orchestrator-provided resolved model object into CozyHubResolveArtifactResult.

Handles two wire shapes:
- Legacy: .snapshot_digest (bare hex) + .files[]
- v2: .snapshot_digest ('blake3:...' prefixed) + .entries[] (new chunked-blob format)

Both protobuf attribute access and plain-dict access are supported.
"""
snapshot_digest = str(_resolve_field(resolved, "snapshot_digest", "snapshotDigest") or "").strip()
if not snapshot_digest:
raise ValueError("resolved model missing snapshot_digest")
# Strip scheme prefix so the digest is a bare hex string suitable for path use.
snapshot_digest = _strip_blake3_prefix(snapshot_digest) or snapshot_digest

files_raw = list(getattr(resolved, "files", []) or [])
# New format uses "entries"; legacy format uses "files".
files_raw = list(_resolve_field(resolved, "entries", "files") or [])
files: List[CozyHubSnapshotFile] = []
for ent in files_raw:
path = str(getattr(ent, "path", "") or "").strip()
path = str(_resolve_field(ent, "path") or "").strip()
if not path:
continue
blake3_hex = str(getattr(ent, "blake3", "") or "").strip().lower()
# Prefer bare "blake3" field; fall back to "digest" which may carry the prefix.
blake3_hex = str(_resolve_field(ent, "blake3", "BLAKE3") or "").strip().lower()
if not blake3_hex:
blake3_hex = str(getattr(ent, "BLAKE3", "") or "").strip().lower()
url = str(getattr(ent, "url", "") or "").strip() or None
size_bytes = int(getattr(ent, "size_bytes", 0) or 0)
digest_raw = str(_resolve_field(ent, "digest") or "").strip().lower()
blake3_hex = _strip_blake3_prefix(digest_raw)
url = str(_resolve_field(ent, "url") or "").strip() or None
size_bytes = int(_resolve_field(ent, "size_bytes") or 0)
if not blake3_hex or not url:
raise ValueError(f"resolved model file missing blake3/url: {path}")
files.append(CozyHubSnapshotFile(path=path, size_bytes=size_bytes, blake3=blake3_hex, url=url))
Expand Down Expand Up @@ -123,7 +168,50 @@ async def ensure_snapshot(

tmp = snaps_root / f"{res.snapshot_digest}.building"
tmp.mkdir(parents=True, exist_ok=True)

# Reassemble any chunked files (produced by chunkedblob on ingest).
# A ".parts.json" entry describes how to reassemble N part blobs into
# the original file. Part blobs and the manifest itself are all already
# in the blob store at this point.
parts_manifest_entries = [f for f in res.files if _is_parts_manifest(f.path)]
part_file_paths = {f.path for f in res.files if _is_part_file(f.path)}

for pm_entry in parts_manifest_entries:
parts_json_blob = _blob_path(blobs_root, pm_entry.blake3)
try:
parts_manifest = json.loads(parts_json_blob.read_bytes())
except Exception as exc:
raise ValueError(f"failed to parse parts manifest {pm_entry.path}: {exc}") from exc

original_path = str(parts_manifest.get("original_path") or "").strip()
if not original_path:
raise ValueError(f"parts manifest {pm_entry.path} missing original_path")
parts = parts_manifest.get("parts") or []
if not parts:
raise ValueError(f"parts manifest {pm_entry.path} has no parts")

rel = _norm_rel_path(original_path)
dst = tmp / rel
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists():
dst.unlink()

with open(dst, "wb") as out_f:
for part in parts:
part_digest = _strip_blake3_prefix(
str(part.get("digest") or "").strip().lower()
)
if not part_digest:
raise ValueError(f"part entry in {pm_entry.path} missing digest")
part_blob = _blob_path(blobs_root, part_digest)
with open(part_blob, "rb") as in_f:
shutil.copyfileobj(in_f, out_f)

# Materialize regular files; skip part files and parts manifests since they
# have already been consumed above during reassembly.
for f in res.files:
if _is_parts_manifest(f.path) or f.path in part_file_paths:
continue
rel = _norm_rel_path(f.path)
dst = tmp / rel
dst.parent.mkdir(parents=True, exist_ok=True)
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.