diff --git a/docs/proposals/1-triage.md b/docs/proposals/1-triage.md new file mode 100644 index 00000000..c74c3188 --- /dev/null +++ b/docs/proposals/1-triage.md @@ -0,0 +1,74 @@ +--- +title: PlanExe Proposal Triage — 80/20 Landscape +date: 2026-02-25 +status: working note +author: Egon + Larry +--- + +# Overview + +Simon asked us to triage the proposal space with an 80:20 lens. The goal of this note is to capture: +1. Which proposals deliver outsized value (the 20% that unlock 80% of the architecture) +2. Which other proposals are nearby in the graph and could reuse their artifacts or reasoning +3. High-leverage parameter tweaks, code tweaks, and second/third order effects +4. Gaps in the current docs and ideas for new proposals +5. Relevant questions/tasks you might not have asked yet + +We focused on the most recent proposals ("67+" cluster) plus the ones directly touching the validation/orchestration story that FermiSanityCheck will unlock. + +# High-Leverage Proposals (the 20%) + +1. **#07 Elo Ranking System (1,751 lines)** – Core ranking mechanism for comparing idea variants, plan quality, and post-plan summaries. Louis-level heuristics here inform nearly every downstream comparison use case. +2. **#63 Luigi Agent Integration & #64 Post-plan Orchestration Layer** – These three documents (#63, #64, #66) describe how PlanExe schedules, retries, and enriches its Luigi DAG. Any change to the DAG (including FermiSanityCheck or arcgentica-style loops) ripples through this cluster. +3. **#62 Agent-first Frontend Discoverability (609 lines)** – Defines the agent UX, which depends on the scoring/ranking engine (#07) and the reliability signals that our validation cluster will provide. +4. **#69 Arcgentica Agent Patterns (279 lines)** – The arcgentica comparison is already referencing our validation work and sets the guardrails for self-evaluation/soft-autonomy. +5. **#41 Autonomous Execution of Plan & #05 Semantic Plan Search Graph** – These represent core system-level capabilities (distributed execution and semantic search) whose outputs feed the ranking and reporting layers. + +These documents together unlock most of the architectural work. They interlock around: planning quality signals (#07, #69, Fermi), orchestration (#63, #64, #66), and the interfaces (#62, #41, #05). + +# Related Proposals & Reuse Opportunities + +- **#07 Elo Ranking + #62 Agent-first Frontend** can share heuristics. Instead of reinventing ranking weights in #62, reuse the cost/feasibility tradeoffs defined in #07 plus FermiSanityCheck flags as features. +- **#63-66 orchestration cluster** already describe Luigi tasks. The validation loop doc should be cross-referenced there to show where FermiSanityCheck sits in the DAG and how downstream tasks like WBS, Scheduler, ExpertOrchestrator should consume the validation report. +- **#69 + #56 (Adversarial Red Team) + #43 (Assumption Drift Monitor)** form a validation cluster. FermiSanityCheck is the front line; these others are observers (red team, drift monitor) that should consume the validation report and escalate to human review. +- **#32 Gantt Parallelization & #33 CBS** could re-use the same thresholds as FermiSanityCheck when calculating duration plausibility (e.g., if duration falls outside the published feasible range, highlight the same issue in the Gantt UI). + +# 80:20 Tweaks & Parameter Changes + +- **Ranking weights (#07)** – adjust cost vs. feasibility vs. confidence to surface plans that pass quantitative grounding. No rewrite needed; just new weights (e.g., penalize plans where FermiSanityCheck flags >3 assumptions). +- **Batch size thresholds (#63)** – the Luigi DAG currently runs every task. We can gate the WBS tasks with a flag that only fires if FermiSanityCheck passes or fails softly, enabling a smaller workflow for low-risk inputs without re-architecting. +- **Risk terminology alignment (#38 & #44)** – harmonize the words used in the risk propagation network and investor audit pack so they can share visualization tooling, reducing duplicate explanations. + +# Second/Third Order Effects + +- **Validation loop → downstream trust**: Once FermiSanityCheck is in place, client reports (e.g., #60 plan-to-repo, #41 autonomous execution) can annotate numbers with the validation status, reducing rework. +- **Arcgentica/agent patterns**: Hardening PlanExe encourages stricter typed outputs (#69). This lets the UI (#08) and ranking engine (#07) rely on structured data instead of parsing Markdown. +- **Quantitative grounding improves ranking** (#07, #62) which in turn makes downstream dashboards (#60, #62) more actionable and reduces QA overhead. +- **Clustering proposals** (#63-66, #69, #56) around validation/orchestration helps the next human reviewer (Simon) make a single decision that affects multiple docs. + +# Gaps & Future Proposal Ideas + +- **FermiSanityCheck Implementation Roadmap** – Document how MakeAssumptions output becomes QuantifiedAssumption, where heuristics live, and how Luigi tasks consume the validation_report. (We have the spec in `planexe-validation-loop-spec.md` but not a public proposal yet.) +- **Validation Observability Dashboard** – A proposal capturing how the validation report is surfaced to humans (per #44, #60). Could cover alerts (Slack/Discord) when FermiSanityCheck fails or when repeated fails accumulate. +- **Arbitration Workflow** – When FermiSanityCheck fails and ReviewPlan still thinks the plan is OK, we need a human-in-the-loop workflow. This is not yet documented anywhere. + +# Questions You Might Not Be Asking + +1. What are the acceptance criteria for FermiSanityCheck? (confidence levels, heuristics, why 100× spans?) +2. Who owns the validation report downstream? Should ExpertOrchestrator or Governance phases be responsible for acting on it? +3. Does FermiSanityCheck expire per run or is it stored for audit trails (per #42 evidence traceability)? +4. Can we reuse the same heuristics for other tasks (#32 Gantt, #34 finance) to maximize payoff? +5. How do we rank the outputs once FermiSanityCheck is added? Should ranking (#07) penalize low confidence even if the costs look good? +6. Do we need a battle plan for manual overrides when FermiSanityCheck is overzealous (e.g., ROI assumptions where domain experts know the average is >100×)? + +# Tasks We Can Own Now + +- Extract the QuantifiedAssumption schema (claim, lower_bound, upper_bound, unit, confidence, evidence) and add it to PlanExe’s assumption bundle. +- Implement a FermiSanityCheck Luigi task that runs immediately after MakeAssumptions and produces validation_report.json. +- Hook the validation report into DistillAssumptions / ReviewAssumptions by adding a `validation_passed` flag. +- Update #69 and #56 docs with references to the validation report to keep the narrative cohesive. +- Create the proposed dashboard proposal (validation observability) to track how many plans fail numeric sanity each week. + +# Summary + +The high-leverage 20% of proposals are: ranking (#07), orchestration (#63-66), UI (#62), arcgentica patterns (#69), and autonomous execution/search (#41, #05). We can activate them by implementing FermiSanityCheck, aligning their heuristics, and surfacing the new validation signals in the UI/dashboards. The docs already cover most of the research; now we need a short, focused proposal/clustering doc (this one) plus the Fermi implementation and dashboards. After Simon approves, we can execute the chosen cluster. diff --git a/worker_plan/worker_plan_api/filenames.py b/worker_plan/worker_plan_api/filenames.py index 82abbea8..014fc93f 100644 --- a/worker_plan/worker_plan_api/filenames.py +++ b/worker_plan/worker_plan_api/filenames.py @@ -37,6 +37,8 @@ class FilenameEnum(str, Enum): REVIEW_ASSUMPTIONS_MARKDOWN = "003-9-review_assumptions.md" CONSOLIDATE_ASSUMPTIONS_FULL_MARKDOWN = "003-10-consolidate_assumptions_full.md" CONSOLIDATE_ASSUMPTIONS_SHORT_MARKDOWN = "003-11-consolidate_assumptions_short.md" + FERMI_SANITY_CHECK_REPORT = "003-12-fermi_sanity_check_report.json" + FERMI_SANITY_CHECK_SUMMARY = "003-13-fermi_sanity_check_summary.md" PRE_PROJECT_ASSESSMENT_RAW = "004-1-pre_project_assessment_raw.json" PRE_PROJECT_ASSESSMENT = "004-2-pre_project_assessment.json" PROJECT_PLAN_RAW = "005-1-project_plan_raw.json" diff --git a/worker_plan/worker_plan_internal/assume/domain_normalizer.py b/worker_plan/worker_plan_internal/assume/domain_normalizer.py new file mode 100644 index 00000000..5153b90b --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/domain_normalizer.py @@ -0,0 +1,284 @@ +""" +Author: Larry (Claude Opus 4.6) +Date: 2026-02-25 +PURPOSE: Domain-aware normalization for FermiSanityCheck. Loads domain profiles (YAML), +auto-detects project domain from assumptions, and normalizes currency/units/confidence +to standard metric/English output for AI agents. +SRP/DRY check: Pass - Consumes QuantifiedAssumption schema + domain profile YAML. +Outputs normalized assumptions ready for validation. +""" + +import logging +import yaml +from dataclasses import dataclass, field +from enum import Enum +from typing import List, Optional, Dict, Any +from pathlib import Path + +from worker_plan_internal.assume.quantified_assumptions import ( + QuantifiedAssumption, + ConfidenceLevel, +) + +LOGGER = logging.getLogger(__name__) + +# Find domain profiles YAML +DOMAIN_PROFILES_PATH = Path(__file__).parent.parent / "docs" / "domain-profiles" / "domain-profile-schema.md" + + +class DomainProfile: + """Represents a single domain profile (carpenter, dentist, etc.)""" + + def __init__(self, profile_dict: Dict[str, Any]): + self.id = profile_dict.get("id") + self.name = profile_dict.get("name") + self.description = profile_dict.get("description") + + # Currency + currency_cfg = profile_dict.get("currency", {}) + self.default_currency = currency_cfg.get("default", "USD") + self.currency_aliases = set(currency_cfg.get("aliases", [])) + self.currency_aliases.add(self.default_currency.lower()) + + # Units + units_cfg = profile_dict.get("units", {}) + self.metric_first = units_cfg.get("metric", True) + self.unit_conversions = {} + for conv in units_cfg.get("convert", []): + self.unit_conversions[conv["from"].lower()] = { + "to": conv["to"], + "factor": conv["factor"], + } + + # Heuristics + heuristics = profile_dict.get("heuristics", {}) + self.budget_keywords = set(heuristics.get("budget_keywords", [])) + self.timeline_keywords = set(heuristics.get("timeline_keywords", [])) + self.team_keywords = set(heuristics.get("team_keywords", [])) + + confidence_kw = heuristics.get("confidence_keywords", {}) + self.high_confidence_words = set(confidence_kw.get("high", [])) + self.medium_confidence_words = set(confidence_kw.get("medium", [])) + self.low_confidence_words = set(confidence_kw.get("low", [])) + + # Detection + detection = profile_dict.get("detection", {}) + self.currency_signals = set(detection.get("currency_signals", [])) + self.unit_signals = set(detection.get("unit_signals", [])) + self.keyword_signals = set(detection.get("keyword_signals", [])) + + def score_match(self, currency_found: List[str], units_found: List[str], keywords_found: List[str]) -> int: + """Score how well this profile matches the found signals.""" + score = 0 + for c in currency_found: + if c.lower() in [s.lower() for s in self.currency_signals]: + score += 10 + for u in units_found: + if u.lower() in [s.lower() for s in self.unit_signals]: + score += 5 + for k in keywords_found: + if k.lower() in [s.lower() for s in self.keyword_signals]: + score += 3 + return score + + +@dataclass +class NormalizedAssumption: + """Assumption after domain-aware normalization.""" + assumption_id: str + original_claim: str + normalized_claim: str + domain_id: str + currency: str # Normalized to domain default + currency_eur_equivalent: Optional[float] = None # For comparison + unit: str = "metric" # All converted to metric + confidence: ConfidenceLevel = ConfidenceLevel.medium + notes: List[str] = field(default_factory=list) + + +class DomainNormalizer: + """Loads domain profiles and normalizes assumptions to metric/currency/confidence.""" + + def __init__(self, profiles_yaml_path: Optional[str] = None): + self.profiles: Dict[str, DomainProfile] = {} + self.default_profile = None + + path = Path(profiles_yaml_path) if profiles_yaml_path else DOMAIN_PROFILES_PATH + self._load_profiles(path) + + def _load_profiles(self, yaml_path: Path) -> None: + """Load domain profiles from YAML file.""" + if not yaml_path.exists(): + LOGGER.warning(f"Domain profiles not found at {yaml_path}; using defaults") + self._create_default_profiles() + return + + try: + with open(yaml_path, "r") as f: + content = f.read() + # Extract YAML from markdown code block + if "```yaml" in content: + yaml_start = content.index("```yaml") + 7 + yaml_end = content.index("```", yaml_start) + yaml_str = content[yaml_start:yaml_end] + else: + yaml_str = content + + data = yaml.safe_load(yaml_str) + if data and "profiles" in data: + for profile_dict in data["profiles"]: + profile = DomainProfile(profile_dict) + self.profiles[profile.id] = profile + if not self.default_profile: + self.default_profile = profile + + LOGGER.info(f"Loaded {len(self.profiles)} domain profiles from {yaml_path}") + except Exception as e: + LOGGER.error(f"Error loading domain profiles: {e}; using defaults") + self._create_default_profiles() + + def _create_default_profiles(self) -> None: + """Create minimal default profiles if YAML not available.""" + default_profile_dict = { + "id": "default", + "name": "General Business", + "description": "Default profile for unclassified projects.", + "currency": {"default": "USD", "aliases": ["usd", "$"]}, + "units": {"metric": True, "convert": []}, + "heuristics": { + "budget_keywords": ["budget", "cost"], + "timeline_keywords": ["days", "weeks"], + "team_keywords": ["team", "people"], + "confidence_keywords": { + "high": ["guarantee", "have done"], + "medium": ["plan to", "expect"], + "low": ["estimate", "maybe"], + }, + }, + "detection": { + "currency_signals": ["USD", "$"], + "unit_signals": [], + "keyword_signals": [], + }, + } + self.default_profile = DomainProfile(default_profile_dict) + self.profiles["default"] = self.default_profile + + def detect_domain(self, assumption: QuantifiedAssumption) -> DomainProfile: + """Auto-detect domain profile from assumption metadata.""" + # Extract signals from assumption + currency_found = [] + if assumption.unit: + currency_found.append(assumption.unit) + + units_found = [] + if assumption.unit: + units_found.append(assumption.unit) + + keywords_found = [] + # Extract keywords from claim + evidence + claim_lower = assumption.claim.lower() + evidence_lower = (assumption.evidence or "").lower() + combined = f"{claim_lower} {evidence_lower}".split() + + # Score all profiles + scores = {} + for profile_id, profile in self.profiles.items(): + score = profile.score_match(currency_found, units_found, combined) + scores[profile_id] = score + + # Pick highest scoring profile + if scores: + best_profile_id = max(scores, key=scores.get) + if scores[best_profile_id] > 0: + return self.profiles[best_profile_id] + + return self.default_profile + + def normalize_currency( + self, value: Optional[float], from_currency: str, to_profile: DomainProfile + ) -> tuple[Optional[float], Optional[float]]: + """ + Convert currency to profile default. + Returns (normalized_value, eur_equivalent). + """ + if value is None: + return None, None + + # Placeholder conversion rates (in production, use real FX API) + fx_rates = { + "USD": 0.92, # USD → EUR + "DKK": 0.124, # DKK → EUR + "EUR": 1.0, + } + + # For now, assume all inputs are in the detected currency or profile default + normalized = value + eur_equiv = value * fx_rates.get(to_profile.default_currency, 1.0) + + return normalized, eur_equiv + + def normalize_unit(self, value: Optional[float], from_unit: str, to_profile: DomainProfile) -> Optional[float]: + """Convert unit to metric (based on profile conversions).""" + if value is None or not from_unit: + return value + + from_unit_lower = from_unit.lower() + if from_unit_lower in to_profile.unit_conversions: + conversion = to_profile.unit_conversions[from_unit_lower] + return value * conversion["factor"] + + return value + + def normalize_confidence(self, assumption: QuantifiedAssumption, domain: DomainProfile) -> ConfidenceLevel: + """Re-assess confidence level based on domain keywords.""" + claim_lower = assumption.claim.lower() + evidence_lower = (assumption.evidence or "").lower() + combined = f"{claim_lower} {evidence_lower}" + + # Check high confidence + if any(word in combined for word in domain.high_confidence_words): + return ConfidenceLevel.high + + # Check low confidence + if any(word in combined for word in domain.low_confidence_words): + return ConfidenceLevel.low + + # Default to medium + return ConfidenceLevel.medium + + def normalize(self, assumption: QuantifiedAssumption) -> NormalizedAssumption: + """Normalize a QuantifiedAssumption to domain standards.""" + domain = self.detect_domain(assumption) + + # Normalize currency + norm_currency, eur_equiv = self.normalize_currency(assumption.lower_bound, assumption.unit or "", domain) + + # Normalize unit (keep as "metric" for now) + norm_unit = "metric" + + # Re-assess confidence per domain + norm_confidence = self.normalize_confidence(assumption, domain) + + # Build normalized claim + norm_claim = f"{assumption.claim} [normalized to {domain.id} domain]" + + notes = [] + if domain.id != "default": + notes.append(f"Auto-detected domain: {domain.name}") + + return NormalizedAssumption( + assumption_id=assumption.assumption_id, + original_claim=assumption.claim, + normalized_claim=norm_claim, + domain_id=domain.id, + currency=domain.default_currency, + currency_eur_equivalent=eur_equiv, + unit=norm_unit, + confidence=norm_confidence, + notes=notes, + ) + + def normalize_batch(self, assumptions: List[QuantifiedAssumption]) -> List[NormalizedAssumption]: + """Normalize a batch of assumptions.""" + return [self.normalize(assumption) for assumption in assumptions] diff --git a/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py b/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py new file mode 100644 index 00000000..355b6caf --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py @@ -0,0 +1,224 @@ +"""Validation helpers for QuantifiedAssumption data.""" +from __future__ import annotations + +from typing import List, Optional, Sequence + +from pydantic import BaseModel, Field + +from worker_plan_internal.assume.quantified_assumptions import ConfidenceLevel, QuantifiedAssumption + +MAX_SPAN_RATIO = 100.0 +MIN_EVIDENCE_LENGTH = 40 +BUDGET_LOWER_THRESHOLD = 1_000.0 +BUDGET_UPPER_THRESHOLD = 100_000_000.0 +TIMELINE_MAX_DAYS = 3650 +TIMELINE_MIN_DAYS = 1 +TEAM_MIN = 1 +TEAM_MAX = 1000 + +CURRENCY_UNITS = { + "usd", + "eur", + "dkk", + "gbp", + "cad", + "aud", + "sek", + "nzd", + "mxn", + "chf" +} + +TIME_UNIT_TO_DAYS = { + "day": 1, + "days": 1, + "week": 7, + "weeks": 7, + "month": 30, + "months": 30, + "year": 365, + "years": 365 +} + +TEAM_KEYWORDS = { + "team", + "people", + "engineer", + "engineers", + "staff", + "headcount", + "crew", + "members", + "contractors", + "workers" +} + +BUDGET_KEYWORDS = { + "budget", + "cost", + "funding", + "investment", + "price", + "capex", + "spend", + "expense", + "capital" +} + +TIMELINE_KEYWORDS = { + "timeline", + "duration", + "schedule", + "milestone", + "delivery", + "months", + "years", + "weeks", + "days" +} + + +class ValidationEntry(BaseModel): + assumption_id: str = Field(description="Stable identifier for the assumption") + question: str = Field(description="Source question for context") + passed: bool = Field(description="Whether the assumption passed validation") + reasons: List[str] = Field(description="List of validation failures") + + +class ValidationReport(BaseModel): + entries: List[ValidationEntry] = Field(description="Detailed result per assumption") + total_assumptions: int = Field(description="Total number of assumptions processed") + passed: int = Field(description="Count of assumptions that passed") + failed: int = Field(description="Count of assumptions that failed") + pass_rate_pct: float = Field(description="Percentage of assumptions that passed") + + +def validate_quantified_assumptions( + assumptions: Sequence[QuantifiedAssumption] +) -> ValidationReport: + entries: List[ValidationEntry] = [] + passed = 0 + + for assumption in assumptions: + reasons: List[str] = [] + lower = assumption.lower_bound + upper = assumption.upper_bound + + if lower is None or upper is None: + reasons.append("Missing lower or upper bound.") + elif lower > upper: + reasons.append("Lower bound is greater than upper bound.") + else: + if ratio := assumption.span_ratio: + if ratio > MAX_SPAN_RATIO: + reasons.append("Range spans more than 100×; too wide.") + + if assumption.confidence == ConfidenceLevel.low: + evidence = assumption.evidence or "" + if len(evidence.strip()) < MIN_EVIDENCE_LENGTH: + reasons.append("Low confidence claim lacks sufficient evidence.") + + if _should_check_budget(assumption): + _apply_budget_constraints(lower, upper, reasons) + + if _should_check_timeline(assumption): + _apply_timeline_constraints(lower, upper, assumption.unit, reasons) + + if _should_check_team(assumption): + _apply_team_constraints(lower, upper, reasons) + + passed_flag = not reasons + if passed_flag: + passed += 1 + + entry = ValidationEntry( + assumption_id=assumption.assumption_id, + question=assumption.question, + passed=passed_flag, + reasons=reasons + ) + entries.append(entry) + + total = len(entries) + failed = total - passed + pass_rate = (passed / total * 100.0) if total else 0.0 + return ValidationReport( + entries=entries, + total_assumptions=total, + passed=passed, + failed=failed, + pass_rate_pct=round(pass_rate, 2) + ) + + +def render_validation_summary(report: ValidationReport) -> str: + lines = [ + "# Fermi Sanity Check", + "", + f"- Total assumptions: {report.total_assumptions}", + f"- Passed: {report.passed}", + f"- Failed: {report.failed}", + f"- Pass rate: {report.pass_rate_pct:.1f}%", + "" + ] + + if report.failed: + lines.append("## Failed assumptions") + for entry in report.entries: + if not entry.passed: + reasons = ", ".join(entry.reasons) if entry.reasons else "No details provided." + lines.append(f"- `{entry.assumption_id}` ({entry.question or 'question missing'}): {reasons}") + + return "\n".join(lines) + + +def _should_check_budget(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in BUDGET_KEYWORDS) or (assumption.unit or "") in CURRENCY_UNITS + + +def _should_check_timeline(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in TIMELINE_KEYWORDS) + + +def _should_check_team(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in TEAM_KEYWORDS) + + +def _apply_budget_constraints(lower: Optional[float], upper: Optional[float], reasons: List[str]) -> None: + if lower is not None and lower < BUDGET_LOWER_THRESHOLD: + reasons.append(f"Budget below ${BUDGET_LOWER_THRESHOLD:,.0f}.") + if upper is not None and upper > BUDGET_UPPER_THRESHOLD: + reasons.append(f"Budget above ${BUDGET_UPPER_THRESHOLD:,.0f}.") + + +def _apply_timeline_constraints( + lower: Optional[float], upper: Optional[float], unit: Optional[str], reasons: List[str] +) -> None: + lower_days = _normalize_to_days(lower, unit) + upper_days = _normalize_to_days(upper, unit) + + if lower_days is not None and lower_days < TIMELINE_MIN_DAYS: + reasons.append("Timeline below 1 day.") + if upper_days is not None and upper_days > TIMELINE_MAX_DAYS: + reasons.append("Timeline exceeds ten years (3,650 days).") + + +def _normalize_to_days(value: Optional[float], unit: Optional[str]) -> Optional[float]: + if value is None: + return None + if not unit: + return value + normalized = TIME_UNIT_TO_DAYS.get(unit.lower()) + if normalized is None: + return value + return value * normalized + + +def _apply_team_constraints(lower: Optional[float], upper: Optional[float], reasons: List[str]) -> None: + if lower is not None and lower < TEAM_MIN: + reasons.append("Team size below 1 person.") + if upper is not None and upper > TEAM_MAX: + reasons.append("Team size above 1,000 people.") diff --git a/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md b/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md new file mode 100644 index 00000000..974317ef --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md @@ -0,0 +1,39 @@ +# QuantifiedAssumption Schema Reference + +| Field | Type | Description | +| --- | --- | --- | +| `assumption_id` | `str` | Unique stable identifier for the assumption (use `assumption-` when not provided). | +| `question` | `str` | The source question that prompted the assumption. | +| `claim` | `str` | Normalized assumption text with the `Assumption:` prefix removed. | +| `lower_bound` | `float?` | Parsed lower numeric bound (if present). | +| `upper_bound` | `float?` | Parsed upper numeric bound (mirror of lower_bound when none explicitly provided). | +| `unit` | `str?` | Detected unit token (e.g., `mw`, `days`, `usd`, `%`). | +| `confidence` | `ConfidenceLevel` (`high` / `medium` / `low`) | Estimated confidence level inferred from hedging words. | +| `evidence` | `str` | Text excerpt used as evidence (currently same as `claim` but can be overridden with extracted snippets). | +| `extracted_numbers` | `List[float]` | All numeric values found in the assumption for further heuristics. | +| `raw_assumption` | `str` | Original string returned by `MakeAssumptions` (includes prefix). | + +## Confidence Enum Values + +| Level | Detection Signals | +| --- | --- | +| `high` | Contains strong modality ("will", "must", "ensure", "guarantee"). | +| `medium` | Default when no strong signal is detected. | +| `low` | Contains hedging words ("estimate", "approx", "may", "likely"). | + +## Unit Examples + +- Financial: `usd`, `eur`, `million`, `billion` +- Capacity/Scale: `mw`, `kw`, `tonnes`, `sqft`, `people` +- Time: `days`, `weeks`, `months`, `years` (expressed as words following the range) +- Percentage/Ratio: `%`, `bps` + +Units are extracted by scanning the text around the numeric range or first detected unit word after the numbers. + +## Evidence Expectations by Confidence + +- `high`: sentence should include explicit value statements or commitments (e.g., "We will deliver 30 MW") and the evidence string can be the same sentence. +- `medium`: treat as the default; evidence is the claim text itself. +- `low`: must cite qualifiers and ideally pair the claim with supporting context (e.g., "~8 months" followed by "assuming no permit delays"). Evidence may include surrounding context when available. + +Use this reference when wiring FermiSanityCheck so the validation functions know what fields exist, what values they expect, and how to treat the evidence for confidence levels. diff --git a/worker_plan/worker_plan_internal/assume/quantified_assumptions.py b/worker_plan/worker_plan_internal/assume/quantified_assumptions.py new file mode 100644 index 00000000..ba8e7587 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/quantified_assumptions.py @@ -0,0 +1,194 @@ +"""Structured helpers for extracting numerical assumptions from MakeAssumptions outputs.""" +from __future__ import annotations + +import json +import logging +import re +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional, Sequence + +from pydantic import BaseModel, Field + +from worker_plan_internal.assume.make_assumptions import MakeAssumptions + +LOGGER = logging.getLogger(__name__) + +RANGE_PATTERN = re.compile( + r"(?P-?\d+(?:[\.,]\d+)?)(?:\s*(?:-|–|—|to|and)\s*(?P-?\d+(?:[\.,]\d+)?))?", + re.IGNORECASE, +) +NUMBER_PATTERN = re.compile(r"-?\d+(?:[\.,]\d+)?") +UNIT_WORD_PATTERN = re.compile(r"\b([A-Za-z%°µΩ]+)\b") + +LOW_CONFIDENCE_WORDS = { + "estimate", + "approx", + "approximately", + "around", + "roughly", + "maybe", + "could", + "likely", + "tends", + "suggest", +} +HIGH_CONFIDENCE_WORDS = { + "will", + "must", + "guarantee", + "ensure", + "ensures", + "ensuring", + "required", + "definitely", + "strongly", + "committed", +} + +ASSUMPTION_PREFIX = "Assumption:" + + +class ConfidenceLevel(str, Enum): + high = "high" + medium = "medium" + low = "low" + + +class QuantifiedAssumption(BaseModel): + assumption_id: str = Field(description="Unique identifier for this assumption") + question: str = Field(description="Source question that elicited the assumption") + claim: str = Field(description="Normalized assumption text without the 'Assumption:' label") + lower_bound: Optional[float] = Field(None, description="Lower bound extracted from the claim") + upper_bound: Optional[float] = Field(None, description="Upper bound extracted from the claim") + unit: Optional[str] = Field(None, description="Unit associated with the bounds") + confidence: ConfidenceLevel = Field( + default=ConfidenceLevel.medium, + description="Estimated confidence level for this claim", + ) + evidence: str = Field(description="Evidence excerpt or justification for the numeric claim") + extracted_numbers: List[float] = Field(default_factory=list) + raw_assumption: str = Field(description="Original assumption text from MakeAssumptions") + + class Config: + allow_mutation = False + frozen = True + + @property + def span_ratio(self) -> Optional[float]: + if self.lower_bound is None or self.upper_bound is None: + return None + if self.lower_bound <= 0: + return None + ratio = self.upper_bound / self.lower_bound + LOGGER.debug("Computed span_ratio=%.2f for %s", ratio, self.assumption_id) + return ratio + + +@dataclass +class QuantifiedAssumptionSummary: + assumptions: List[QuantifiedAssumption] + + @property + def average_span(self) -> Optional[float]: + spans = [assumption.span_ratio for assumption in self.assumptions if assumption.span_ratio is not None] + if not spans: + return None + return sum(spans) / len(spans) + + +class QuantifiedAssumptionExtractor: + """Extract structured numeric assumptions from MakeAssumptions outputs.""" + + def extract(self, assumption_entries: Sequence[dict]) -> List[QuantifiedAssumption]: + results: list[QuantifiedAssumption] = [] + for idx, entry in enumerate(assumption_entries, start=1): + question = (entry.get("question") or "").strip() + raw_assumption = (entry.get("assumptions") or "").strip() + if not raw_assumption: + LOGGER.debug("Skipping empty assumption entry at index %s", idx) + continue + claim = self._normalize_claim(raw_assumption) + lower, upper, unit = self._parse_bounds(claim) + extracted = self._extract_numbers(claim) + confidence = self._guess_confidence(claim) + assumption_id = entry.get("assumption_id") or f"assumption-{idx}" + results.append( + QuantifiedAssumption( + assumption_id=assumption_id, + question=question, + claim=claim, + lower_bound=lower, + upper_bound=upper, + unit=unit, + confidence=confidence, + evidence=claim, + extracted_numbers=extracted, + raw_assumption=raw_assumption, + ) + ) + return results + + def extract_from_make_assumptions(self, result: MakeAssumptions) -> List[QuantifiedAssumption]: + return self.extract(result.assumptions) + + def _guess_confidence(self, claim: str) -> ConfidenceLevel: + lowered = claim.lower() + if any(word in lowered for word in LOW_CONFIDENCE_WORDS): + return ConfidenceLevel.low + if any(word in lowered for word in HIGH_CONFIDENCE_WORDS): + return ConfidenceLevel.high + return ConfidenceLevel.medium + + def _normalize_claim(self, raw_assumption: str) -> str: + trimmed = raw_assumption.strip() + if trimmed.lower().startswith(ASSUMPTION_PREFIX.lower()): + trimmed = trimmed[len(ASSUMPTION_PREFIX) :].strip() + trimmed = re.sub(r"^[\-:]+", "", trimmed).strip() + trimmed = re.sub(r"\s{2,}", " ", trimmed) + return trimmed + + def _parse_bounds(self, claim: str) -> tuple[Optional[float], Optional[float], Optional[str]]: + sanitized = claim.replace("—", "-").replace("–", "-") + match = RANGE_PATTERN.search(sanitized) + if not match: + return None, None, self._extract_unit(claim) + lower = self._coerce_number(match.group("low")) + upper = self._coerce_number(match.group("high")) if match.group("high") else lower + unit = self._extract_unit(claim, match.end()) + return lower, upper, unit + + def _extract_unit(self, claim: str, position: Optional[int] = None) -> Optional[str]: + target = claim + if position is not None: + target = claim[position : position + 20] + match = UNIT_WORD_PATTERN.search(target) + if match: + return match.group(1).lower() + return None + + def _extract_numbers(self, claim: str) -> List[float]: + numbers: List[float] = [] + for value in NUMBER_PATTERN.findall(claim): + coerced = self._coerce_number(value) + if coerced is not None: + numbers.append(coerced) + return numbers + + def _coerce_number(self, value: Optional[str]) -> Optional[float]: + if value is None: + return None + cleaned = value.replace(",", "").strip() + try: + return float(cleaned) + except ValueError: + LOGGER.debug("Failed to coerce %s to float", value) + return None + + +if __name__ == "__main__": + extractor = QuantifiedAssumptionExtractor() + with open("worker_plan/worker_plan_internal/assume/test_data/assumptions_solar_farm_in_denmark.json", "r", encoding="utf-8") as fh: + entries = json.load(fh) + for assumption in extractor.extract(entries): + print(assumption.json(indent=2)) diff --git a/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py b/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py new file mode 100644 index 00000000..13b324f0 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py @@ -0,0 +1,192 @@ +"""Unit tests for DomainNormalizer.""" + +from worker_plan_internal.assume.quantified_assumptions import ( + QuantifiedAssumption, + ConfidenceLevel, +) +from worker_plan_internal.assume.domain_normalizer import ( + DomainNormalizer, + DomainProfile, +) + + +def test_domain_normalizer_loads_default_profiles(): + """DomainNormalizer initializes with default profiles.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + assert normalizer.default_profile is not None + assert normalizer.default_profile.id == "default" + + +def test_domain_profile_currency_detection(): + """DomainProfile correctly scores currency signals.""" + profile_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK", "aliases": ["kr", "dkk"]}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": ["DKK", "kr"], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(profile_dict) + score = profile.score_match(["DKK"], [], []) + assert score == 10 # DKK matches currency signal + + +def test_domain_profile_keyword_detection(): + """DomainProfile scores keyword signals.""" + profile_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": { + "currency_signals": [], + "unit_signals": [], + "keyword_signals": ["carpenter", "wood", "materials"], + }, + } + profile = DomainProfile(profile_dict) + score = profile.score_match([], [], ["carpenter", "wood"]) + assert score == 6 # Two keyword matches @ 3 points each + + +def test_domain_detection_carpenter(): + """Carpenter profile is detected from DKK + metric + material keywords.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + # Manually add carpenter profile + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK", "aliases": ["kr"]}, + "units": {"metric": True, "convert": [{"from": "sqft", "to": "m2", "factor": 0.092903}]}, + "heuristics": {"confidence_keywords": {"high": ["I've done this"], "medium": [], "low": ["estimate"]}}, + "detection": {"currency_signals": ["DKK"], "unit_signals": ["m2"], "keyword_signals": ["carpenter"]}, + } + normalizer.profiles["carpenter"] = DomainProfile(carpenter_dict) + + # Test detection + assumption = QuantifiedAssumption( + assumption_id="test1", + question="Cost?", + claim="Carpenter project in DKK costing 10000 to 15000 for materials in m2.", + lower_bound=10000, + upper_bound=15000, + unit="DKK", + confidence=ConfidenceLevel.medium, + evidence="Quote from carpenter", + extracted_numbers=[10000, 15000], + raw_assumption="Cost estimate: 10000-15000 DKK", + ) + + domain = normalizer.detect_domain(assumption) + assert domain.id == "carpenter" + + +def test_normalize_confidence_per_domain(): + """Confidence is re-assessed based on domain keywords.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {"high": ["I've done this"], "medium": ["expect"], "low": ["estimate"]}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + normalizer.profiles["carpenter"] = DomainProfile(carpenter_dict) + + # Low confidence claim with domain keyword + assumption = QuantifiedAssumption( + assumption_id="test2", + question="Duration?", + claim="Estimate 5 to 7 days.", + lower_bound=5, + upper_bound=7, + unit="days", + confidence=ConfidenceLevel.low, + evidence="Rough estimate", + extracted_numbers=[5, 7], + raw_assumption="Duration: 5-7 days (estimate)", + ) + + normalized = normalizer.normalize(assumption) + # Since "estimate" is in low_confidence_words, should stay low + assert normalized.confidence == ConfidenceLevel.low + + +def test_unit_conversion(): + """Units are converted to metric.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": [{"from": "sqft", "to": "m2", "factor": 0.092903}]}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(carpenter_dict) + + # Convert 100 sqft to m2 + result = normalizer.normalize_unit(100, "sqft", profile) + assert abs(result - 9.2903) < 0.001 + + +def test_currency_normalization(): + """Currency converts to profile default.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(carpenter_dict) + + norm_val, eur_equiv = normalizer.normalize_currency(10000, "DKK", profile) + assert norm_val == 10000 # DKK stays as-is + assert eur_equiv is not None # EUR equivalent calculated + + +def test_batch_normalization(): + """Batch normalization processes multiple assumptions.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + assumptions = [ + QuantifiedAssumption( + assumption_id="a1", + question="Q1", + claim="Budget 5000 to 7000.", + lower_bound=5000, + upper_bound=7000, + unit="USD", + confidence=ConfidenceLevel.high, + evidence="Approved", + extracted_numbers=[5000, 7000], + raw_assumption="Assumption: 5000-7000", + ), + QuantifiedAssumption( + assumption_id="a2", + question="Q2", + claim="Timeline 10 to 14 days.", + lower_bound=10, + upper_bound=14, + unit="days", + confidence=ConfidenceLevel.medium, + evidence="Estimate", + extracted_numbers=[10, 14], + raw_assumption="Assumption: 10-14 days", + ), + ] + + normalized = normalizer.normalize_batch(assumptions) + assert len(normalized) == 2 + assert normalized[0].assumption_id == "a1" + assert normalized[1].assumption_id == "a2" diff --git a/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py b/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py new file mode 100644 index 00000000..2b2739b9 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py @@ -0,0 +1,51 @@ +from worker_plan_internal.assume.fermi_sanity_check import validate_quantified_assumptions +from worker_plan_internal.assume.quantified_assumptions import ConfidenceLevel, QuantifiedAssumption + + +def _build_assumption(**kwargs) -> QuantifiedAssumption: + defaults = { + "assumption_id": "test", + "question": "What is the budget?", + "claim": "Assumption: We will deliver 5,000,000 USD.", + "lower_bound": 5_000_000.0, + "upper_bound": 5_000_000.0, + "unit": "usd", + "confidence": ConfidenceLevel.high, + "evidence": "Assumption: We will deliver 5,000,000 USD.", + "extracted_numbers": [5_000_000.0], + "raw_assumption": "Assumption: We will deliver 5,000,000 USD." + } + defaults.update(kwargs) + return QuantifiedAssumption(**defaults) + + +def test_budget_passes_basic_checks(): + assumption = _build_assumption() + report = validate_quantified_assumptions([assumption]) + assert report.passed == 1 + assert report.failed == 0 + assert report.total_assumptions == 1 + + +def test_low_confidence_needs_evidence(): + assumption = _build_assumption( + assumption_id="low-evidence", + confidence=ConfidenceLevel.low, + evidence="Low", + ) + report = validate_quantified_assumptions([assumption]) + assert report.failed == 1 + assert any("Low confidence" in reason for reason in report.entries[0].reasons) + + +def test_span_ratio_detects_wide_boundaries(): + assumption = _build_assumption( + assumption_id="wide-range", + lower_bound=1.0, + upper_bound=100_000.0, + claim="Assumption: The project will cost 1 to 100,000 USD.", + extracted_numbers=[1.0, 100_000.0] + ) + report = validate_quantified_assumptions([assumption]) + assert any("Range spans" in reason for reason in report.entries[0].reasons) + assert report.failed == 1 diff --git a/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py b/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py new file mode 100644 index 00000000..05bb19e9 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py @@ -0,0 +1,45 @@ +from worker_plan_internal.assume.quantified_assumptions import ( + ConfidenceLevel, + QuantifiedAssumptionExtractor, +) + + +def test_extract_range_and_unit(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "What capacity?", + "assumptions": "Assumption: The solar farm will deliver 50-60 MW of capacity before year two.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.lower_bound == 50.0 + assert assumption.upper_bound == 60.0 + assert assumption.unit == "mw" + assert assumption.extracted_numbers == [50.0, 60.0] + + +def test_confidence_detection_handles_low_words(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "Timeline", + "assumptions": "Assumption: We expect roughly 8 months of construction, though delays are possible.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.confidence == ConfidenceLevel.low + + +def test_extract_handles_missing_numbers(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "Safety", + "assumptions": "Assumption: Construction will follow all standards, no explicit numbers provided.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.lower_bound is None + assert assumption.upper_bound is None + assert assumption.extracted_numbers == [] diff --git a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py index d34c9a9b..fd02b2bd 100644 --- a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py +++ b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py @@ -34,6 +34,8 @@ from worker_plan_internal.assume.make_assumptions import MakeAssumptions from worker_plan_internal.assume.distill_assumptions import DistillAssumptions from worker_plan_internal.assume.review_assumptions import ReviewAssumptions +from worker_plan_internal.assume.quantified_assumptions import QuantifiedAssumptionExtractor +from worker_plan_internal.assume.fermi_sanity_check import render_validation_summary, validate_quantified_assumptions from worker_plan_internal.assume.shorten_markdown import ShortenMarkdown from worker_plan_internal.expert.pre_project_assessment import PreProjectAssessment from worker_plan_internal.plan.project_plan import ProjectPlan @@ -906,6 +908,48 @@ def run_with_llm(self, llm: LLM) -> None: make_assumptions.save_markdown(str(output_markdown_path)) + +class FermiSanityCheckTask(PlanTask): + """Validate numeric assumptions before distillation.""" + + def requires(self): + return { + 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask) + } + + def output(self): + return { + 'report': self.local_target(FilenameEnum.FERMI_SANITY_CHECK_REPORT), + 'summary': self.local_target(FilenameEnum.FERMI_SANITY_CHECK_SUMMARY) + } + + def run_inner(self): + assumptions_target = self.input()['make_assumptions']['clean'] + with assumptions_target.open('r', encoding='utf-8') as f: + assumptions_data = json.load(f) + + extractor = QuantifiedAssumptionExtractor() + quantified = extractor.extract(assumptions_data) + report = validate_quantified_assumptions(quantified) + + report_path = self.output()['report'] + with report_path.open('w', encoding='utf-8') as f: + json.dump(report.dict(), f, indent=2) + + summary_text = render_validation_summary(report) + summary_path = self.output()['summary'] + with summary_path.open('w', encoding='utf-8') as f: + f.write(summary_text) + + logger.info( + "Fermi sanity check completed: pass_rate=%.2f%% (%s/%s)", + report.pass_rate_pct, + report.passed, + report.total_assumptions + ) + + class DistillAssumptionsTask(PlanTask): """ Distill raw assumption data. @@ -970,6 +1014,7 @@ def requires(self): 'currency_strategy': self.clone(CurrencyStrategyTask), 'identify_risks': self.clone(IdentifyRisksTask), 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask), 'distill_assumptions': self.clone(DistillAssumptionsTask) } @@ -990,7 +1035,8 @@ def run_with_llm(self, llm: LLM) -> None: ('Currency Strategy', self.input()['currency_strategy']['markdown'].path), ('Identify Risks', self.input()['identify_risks']['markdown'].path), ('Make Assumptions', self.input()['make_assumptions']['markdown'].path), - ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path) + ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path), + ('Fermi Sanity Checks', self.input()['fermi_sanity_check']['summary'].path) ] # Read the files and handle exceptions @@ -1031,6 +1077,7 @@ def requires(self): 'currency_strategy': self.clone(CurrencyStrategyTask), 'identify_risks': self.clone(IdentifyRisksTask), 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask), 'distill_assumptions': self.clone(DistillAssumptionsTask), 'review_assumptions': self.clone(ReviewAssumptionsTask) } @@ -1053,6 +1100,7 @@ def run_inner(self): ('Identify Risks', self.input()['identify_risks']['markdown'].path), ('Make Assumptions', self.input()['make_assumptions']['markdown'].path), ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path), + ('Fermi Sanity Checks', self.input()['fermi_sanity_check']['summary'].path), ('Review Assumptions', self.input()['review_assumptions']['markdown'].path) ]