-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlock_utils.py
More file actions
193 lines (159 loc) · 7.03 KB
/
Copy pathlock_utils.py
File metadata and controls
193 lines (159 loc) · 7.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
r"""
lock_utils.py -- Canonical logic for project locks (LOCK*.txt)
Single source of truth for the LOCK file format and the scope/expiry logic
across all configured roots (see lock_roots.json).
Canonical spec (lifecycle, tiers, scripts): LOCK-SYSTEM.md (same directory).
Convention:
- LOCK.txt = entire project locked (scope = "project")
- LOCK.<scope>.txt = only this component locked (scope = "<scope>")
free scope name (sub-area/sub-folder),
e.g. LOCK.frontend.txt, LOCK.web.txt, LOCK.api.txt
- Detection regex: ^LOCK(\.[A-Za-z0-9_-]+(\.[A-Za-z0-9_-]+)*)?\.txt$
Matches: LOCK.txt, LOCK.api.txt, LOCK.team.LAPTOP.txt,
LOCK.team.frontend.LAPTOP.txt
- Legacy (deprecated, do not create): TEST.txt / TESTS.txt
File format (one setting per line, stdlib parser, no extra dependency):
- Lines starting with '#' = comment, blank lines = ignored.
- Otherwise split on the FIRST ':'; trim key/value; key lowercased.
Fields:
owner (required) Who holds the lock.
created (required) ISO YYYY-MM-DDTHH:MM (base for expiry).
expires_after (optional) e.g. "24h" / "48h" / "90m". Default = 24h.
release_condition (optional) Free-text release condition.
mode (optional) "hard" (default) | "soft".
purpose (optional) Free-text description.
scope (optional) Informational only; AUTHORITATIVE is the filename.
"""
from __future__ import annotations
import re
from datetime import datetime, timedelta
from pathlib import Path
# Current lock files: LOCK.txt, LOCK.<scope>.txt, LOCK.team.<host>.txt,
# LOCK.team.<scope>.<host>.txt (multi-segment names allowed)
LOCK_RE = re.compile(r"^LOCK(?:\.([A-Za-z0-9_-]+(?:\.[A-Za-z0-9_-]+)*))?\.txt$", re.IGNORECASE)
# Legacy locks (still recognised, but marked as deprecated)
LEGACY_LOCK_NAMES = ("TEST.txt", "TESTS.txt")
DEFAULT_EXPIRES = timedelta(hours=24)
# Duration strings: "24h", "48h", "90m", "30s", "2d"
_DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE)
_DURATION_UNITS = {"s": "seconds", "m": "minutes", "h": "hours", "d": "days"}
def scope_from_name(name: str) -> str | None:
"""Derive scope from filename.
LOCK.txt -> 'project'
LOCK.api.txt -> 'api'
LOCK.team.LAPTOP.txt -> 'project' (team lock, whole project)
LOCK.team.frontend.LAPTOP.txt -> 'frontend' (team lock, scoped)
Returns None if not a lock filename.
Team locks are identified by a 'team.' prefix in the segment string;
use is_team_lock() to distinguish them from exclusive locks.
"""
m = LOCK_RE.match(name)
if not m:
return None
segments = m.group(1)
if not segments:
return "project"
parts = segments.split(".")
# Team lock: LOCK.team.<host>.txt or LOCK.team.<scope>.<host>.txt
if parts[0].lower() == "team":
if len(parts) == 2:
return "project"
# middle parts are the scope; last part is host
return ".".join(parts[1:-1])
return segments
def is_team_lock(name: str) -> bool:
"""Return True if filename is a Team Lock (LOCK.team.*.txt)."""
m = LOCK_RE.match(name)
if not m or not m.group(1):
return False
return m.group(1).lower().startswith("team.")
def is_lock_file(name: str) -> bool:
return LOCK_RE.match(name) is not None
def parse_lock_file(lock_path: Path) -> dict[str, str]:
"""Parse a LOCK file into a key:value dict (keys lowercased)."""
data: dict[str, str] = {}
try:
text = lock_path.read_text(encoding="utf-8", errors="replace")
except OSError:
return data
for line in text.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if ":" not in stripped:
continue
key, value = stripped.split(":", 1)
data[key.strip().lower()] = value.strip()
return data
def parse_duration(value: str | None) -> timedelta:
"""'24h'/'90m'/... -> timedelta. Defaults to 24h if missing or unparseable."""
if not value:
return DEFAULT_EXPIRES
m = _DURATION_RE.match(value)
if not m:
return DEFAULT_EXPIRES
amount = int(m.group(1))
unit = _DURATION_UNITS[m.group(2).lower()]
return timedelta(**{unit: amount})
def _parse_created(value: str | None) -> datetime | None:
"""Parse ISO timestamp from 'created' field (T or space separator,
seconds optional)."""
if not value:
return None
candidate = value.strip().replace("T", " ")
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def lock_created_and_expiry(lock_path: Path) -> tuple[datetime, timedelta, str]:
"""Return (created, expires_after, source).
source = 'header' if created came from the file, else 'mtime' (fallback)."""
data = parse_lock_file(lock_path)
created = _parse_created(data.get("created"))
expires = parse_duration(data.get("expires_after"))
if created is not None:
return created, expires, "header"
mtime = datetime.fromtimestamp(lock_path.stat().st_mtime)
return mtime, expires, "mtime"
def is_expired(lock_path: Path, now: datetime | None = None) -> bool:
now = now or datetime.now()
created, expires, _ = lock_created_and_expiry(lock_path)
return now > created + expires
def lock_host(lock_path: Path) -> str | None:
"""Machine/hostname from the 'host' field of the LOCK file.
Identifies which system currently holds the lock (cross-system
coordination). Returns None when the field is absent (backwards
compatible)."""
return parse_lock_file(lock_path).get("host") or None
def find_lock_files(project_dir: Path, include_legacy: bool = True):
"""Find all lock files in a project root directory.
Returns: list of (name, scope, is_legacy)."""
results = []
for hit in sorted(project_dir.glob("*.txt")):
if not hit.is_file():
continue
scope = scope_from_name(hit.name)
if scope is not None:
results.append((hit.name, scope, False))
if include_legacy:
for legacy in LEGACY_LOCK_NAMES:
for hit in project_dir.glob(legacy):
if hit.is_file():
results.append((hit.name, "project", True))
return sorted(set(results))
def active_locks(project_dir: Path, now: datetime | None = None):
"""Non-expired lock files. Returns list of (name, scope, is_legacy).
Legacy locks (TEST.txt/TESTS.txt) have no expiry format -> always treated
as active (stale cleanup only applies to LOCK*.txt)."""
now = now or datetime.now()
out = []
for name, scope, is_legacy in find_lock_files(project_dir):
lock_path = project_dir / name
if is_legacy:
out.append((name, scope, is_legacy))
continue
if not is_expired(lock_path, now):
out.append((name, scope, is_legacy))
return out