diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..0e632e1 --- /dev/null +++ b/api/__init__.py @@ -0,0 +1 @@ +# Package marker diff --git a/api/main.py b/api/main.py new file mode 100644 index 0000000..e5aeaf5 --- /dev/null +++ b/api/main.py @@ -0,0 +1,140 @@ +""" +RootAI FastAPI application. + +Endpoints +--------- +GET /health – liveness probe +POST /authority/verify – check whether an action is authorised +POST /pipeline/execute – run the unified RootAI pipeline +""" + +from __future__ import annotations + +import os +import sys +from contextlib import asynccontextmanager + +# Ensure the repo root is on sys.path so sibling modules are importable +# when the app is launched via `uvicorn api.main:app`. +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from typing import Optional + +import yaml +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from authority_gate import AuthorityGate +from deconstructor import deconstruct +from integrity_manager import evaluate_source_quality, verify_worm_integrity +from kg_navigator import KnowledgeGraphNavigator + +# --------------------------------------------------------------------------- +# Application lifespan – initialise shared resources once at startup +# --------------------------------------------------------------------------- + +_kg: Optional[KnowledgeGraphNavigator] = None + + +@asynccontextmanager +async def lifespan(application: FastAPI): + global _kg + neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD", "rootai") + _kg = KnowledgeGraphNavigator(neo4j_uri, neo4j_user, neo4j_password) + yield + if _kg is not None: + _kg.close() + + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- + +app = FastAPI(title="RootAI API", version="1.0.0", lifespan=lifespan) + +# --------------------------------------------------------------------------- +# Pydantic schemas +# --------------------------------------------------------------------------- + + +class HealthResponse(BaseModel): + status: str + + +class ActionRequest(BaseModel): + action: str + token: Optional[str] = None + + +class AuthorityResponse(BaseModel): + action: str + authorized: bool + + +class PipelineRequest(BaseModel): + query: str + worm_path: str = "packs/secure-code.yaml" + expected_hash: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + +@app.get("/health", response_model=HealthResponse) +def health() -> HealthResponse: + """Liveness probe.""" + return HealthResponse(status="ok") + + +@app.post("/authority/verify", response_model=AuthorityResponse) +def authority_verify(req: ActionRequest) -> AuthorityResponse: + """Check whether *action* is authorised with the supplied *token*.""" + gate = AuthorityGate() + authorized = gate.verify_authority(req.action, req.token) + return AuthorityResponse(action=req.action, authorized=authorized) + + +@app.post("/pipeline/execute") +def pipeline_execute(req: PipelineRequest) -> dict: + """ + Run the RootAI unified pipeline: + + 1. Deconstruct the query (Prompt Analyzer). + 2. Map terms to the Knowledge Graph (Path A). + 3. Load WORM constraints — skip integrity check when no hash is supplied + so that the API remains useful without a pre-computed hash (Path B). + 4. Score grounding quality and return a Verified Execution Plan. + """ + # Step 1 – Prompt Analyzer + analysis = deconstruct(req.query) + terms = analysis["nouns"] + + # Step 2 – Path A: Knowledge Graph Navigator (shared instance) + kg = _kg + semantic_map = kg.get_semantic_context(terms) if kg is not None else {} + + # Step 3 – Path B: Resource Manager / WORM + if req.expected_hash: + if not verify_worm_integrity(req.worm_path, req.expected_hash): + raise HTTPException(status_code=409, detail="WORM integrity check failed") + + try: + with open(req.worm_path, "r") as fh: + grounding_data = yaml.safe_load(fh) + except FileNotFoundError: + raise HTTPException(status_code=404, detail=f"WORM file not found: {req.worm_path}") + + # Step 4 – Reasoning Bridge & Constraint Checker + quality_report = evaluate_source_quality(str(grounding_data), len(grounding_data)) + + return { + "intent": req.query, + "semantic_roots": semantic_map, + "hard_constraints": grounding_data.get("constraints", {}).get("hard", []), + "reliability": quality_report.score, + "hallucination_risk": "low" if quality_report.score > 0.7 else "high", + } diff --git a/deconstructor.py b/deconstructor.py new file mode 100644 index 0000000..a80610d --- /dev/null +++ b/deconstructor.py @@ -0,0 +1,37 @@ +""" +Prompt Analyzer – deconstructs a natural-language query into its +structural components (nouns, verbs, original text). + +Uses spaCy when the 'en_core_web_sm' model is available and falls +back to simple whitespace tokenisation otherwise. The spaCy model +is loaded once at module level to avoid repeated initialisation overhead. +""" + +_nlp = None + + +def _get_nlp(): + """Return a cached spaCy model, loading it on first call.""" + global _nlp + if _nlp is None: + try: + import spacy + _nlp = spacy.load("en_core_web_sm") + except Exception: + _nlp = False # sentinel: spaCy unavailable + return _nlp + + +def deconstruct(text: str) -> dict: + """Return a dict with 'nouns', 'verbs', and 'original' keys.""" + nlp = _get_nlp() + if nlp: + doc = nlp(text) + nouns = [token.text.lower() for token in doc if token.pos_ in ("NOUN", "PROPN")] + verbs = [token.text.lower() for token in doc if token.pos_ == "VERB"] + else: + # Graceful fallback: treat every whitespace-separated word as a noun + nouns = text.lower().split() + verbs = [] + + return {"nouns": nouns, "verbs": verbs, "original": text} diff --git a/docker_RootAi_fastAPI b/docker_RootAi_fastAPI index a0bca81..63fa160 100644 --- a/docker_RootAi_fastAPI +++ b/docker_RootAi_fastAPI @@ -24,4 +24,4 @@ COPY . . EXPOSE 8000 # Start the API with uvicorn -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/integrity_manager.py b/integrity_manager.py new file mode 100644 index 0000000..cc47f59 --- /dev/null +++ b/integrity_manager.py @@ -0,0 +1,42 @@ +""" +Integrity Manager – Resource Manager utilities. + +* verify_worm_integrity: SHA-256 hash check for immutable WORM storage. +* evaluate_source_quality: Reliability scoring for grounding documents. +""" + +import hashlib +from typing import Dict + +from pydantic import BaseModel + + +class ReliabilityReport(BaseModel): + score: float + assessment: str + indicators: Dict[str, bool] + + +def evaluate_source_quality(content: str, source_count: int) -> ReliabilityReport: + """Analyse quality based on specific data-points and structure.""" + indicators = { + "has_citations": "[" in content and "]" in content, + "has_specific_data": any(char.isdigit() for char in content), + "reasonable_length": len(content) > 200, + "has_multiple_sources": source_count > 1, + } + + passed_checks = sum(1 for v in indicators.values() if v) + score = passed_checks / len(indicators) + assessment = "high" if score > 0.7 else "medium" if score > 0.4 else "low" + + return ReliabilityReport(score=score, assessment=assessment, indicators=indicators) + + +def verify_worm_integrity(file_path: str, expected_hash: str) -> bool: + """Return True only when the file's SHA-256 digest matches expected_hash.""" + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() == expected_hash diff --git a/kg_navigator.py b/kg_navigator.py new file mode 100644 index 0000000..4a19d83 --- /dev/null +++ b/kg_navigator.py @@ -0,0 +1,54 @@ +""" +Knowledge Graph Navigator – Path A (Semantic Meaning). + +Queries the Neo4j etymological graph that is seeded by +Etymological_Seeder. When Neo4j is unavailable the navigator +degrades gracefully and returns empty context maps so that the +rest of the pipeline can continue. +""" + +import logging + +logger = logging.getLogger(__name__) + + +class KnowledgeGraphNavigator: + def __init__(self, uri: str, user: str, password: str): + self._available = False + try: + from neo4j import GraphDatabase # type: ignore + self.driver = GraphDatabase.driver(uri, auth=(user, password)) + self._available = True + except Exception as exc: + logger.warning("Neo4j unavailable – KnowledgeGraphNavigator degraded: %s", exc) + + def close(self): + if self._available: + self.driver.close() + + def get_semantic_context(self, terms: list) -> dict: + """ + Path A: map tokens to the Etymological Graph. + Returns the 'Core Concept Map' for the Reasoning Bridge. + Falls back to an empty mapping when Neo4j is not reachable. + """ + if not self._available: + return {term: [] for term in terms} + + context_map = {} + try: + with self.driver.session() as session: + for term in terms: + query = """ + MATCH (n:EtymologicalRoot {name: $term}) + OPTIONAL MATCH (n)-[:DEFINES|RELATED_TO]-(related) + RETURN n.name as root, collect(related.name) as relations + """ + result = session.run(query, term=term.lower()) + record = result.single() + if record: + context_map[record["root"]] = record["relations"] + except Exception as exc: + logger.warning("Error querying knowledge graph: %s", exc) + + return context_map diff --git a/packs/secure-code.yaml b/packs/secure-code.yaml new file mode 100644 index 0000000..752ecfe --- /dev/null +++ b/packs/secure-code.yaml @@ -0,0 +1,13 @@ +version: "1.0" +constraints: + hard: + - "No use of eval() or exec()" + - "All inputs must be validated and sanitized" + - "Authentication required for protected actions" + - "Use prepared statements for database queries" + - "Secrets must not be hardcoded in source code" + - "Use HTTPS for all external communications" + soft: + - "Prefer immutable data structures where possible" + - "Use logging for audit trails of sensitive operations" + - "Follow the principle of least privilege" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..860be63 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +pydantic>=2.0.0 +pyyaml>=6.0 +pytest>=8.0.0 +httpx>=0.27.0 +spacy>=3.7.0 +neo4j>=5.0.0 diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..251b1c3 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,65 @@ +import os +import pytest +from unittest.mock import patch +from fastapi.testclient import TestClient + +from api.main import app + +client = TestClient(app) + + +class TestHealth: + def test_health_returns_ok(self): + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +class TestAuthorityVerify: + def test_low_risk_action_is_authorized(self): + response = client.post("/authority/verify", json={"action": "READ_FILE"}) + assert response.status_code == 200 + assert response.json()["authorized"] is True + + def test_protected_action_without_token_is_denied(self): + response = client.post("/authority/verify", json={"action": "FILE_DELETE"}) + assert response.status_code == 200 + assert response.json()["authorized"] is False + + def test_protected_action_with_correct_token_is_authorized(self): + with patch.dict(os.environ, {"ROOTAI_AUTH_TOKEN": "s3cr3t"}): + response = client.post( + "/authority/verify", json={"action": "FILE_DELETE", "token": "s3cr3t"} + ) + assert response.status_code == 200 + assert response.json()["authorized"] is True + + def test_response_includes_action_field(self): + response = client.post("/authority/verify", json={"action": "LOG_EVENT"}) + assert response.json()["action"] == "LOG_EVENT" + + +class TestPipelineExecute: + def test_pipeline_returns_expected_keys(self): + response = client.post("/pipeline/execute", json={"query": "Create an auth session"}) + assert response.status_code == 200 + body = response.json() + for key in ("intent", "semantic_roots", "hard_constraints", "reliability", "hallucination_risk"): + assert key in body + + def test_pipeline_echoes_query_as_intent(self): + response = client.post("/pipeline/execute", json={"query": "validate input"}) + assert response.json()["intent"] == "validate input" + + def test_pipeline_returns_hard_constraints_from_yaml(self): + response = client.post("/pipeline/execute", json={"query": "test query"}) + constraints = response.json()["hard_constraints"] + assert isinstance(constraints, list) + assert len(constraints) > 0 + + def test_pipeline_missing_worm_file_returns_404(self): + response = client.post( + "/pipeline/execute", + json={"query": "test", "worm_path": "nonexistent/path.yaml"}, + ) + assert response.status_code == 404 diff --git a/unified_pipeline b/unified_pipeline index 7a7b60d..7fb9d21 100644 --- a/unified_pipeline +++ b/unified_pipeline @@ -1,20 +1,32 @@ +import os + import yaml + from deconstructor import deconstruct from integrity_manager import verify_worm_integrity, evaluate_source_quality from kg_navigator import KnowledgeGraphNavigator + +class SecurityError(Exception): + """Raised when an immutable resource fails its integrity check.""" + + class RootAIPipeline: def __init__(self): # Resource Manager Configuration - self.WORM_PATH = "secure-code.yaml" - self.EXPECTED_HASH = "f1e2d3c4..." # Defined in Central Manifest - + self.WORM_PATH = "packs/secure-code.yaml" + # Set via environment or leave None to skip integrity check + self.EXPECTED_HASH = os.getenv("WORM_EXPECTED_HASH") + # Knowledge Graph Navigator (Path A) - self.kg = KnowledgeGraphNavigator("bolt://localhost:7687", "neo4j", "rootai") + neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD", "rootai") + self.kg = KnowledgeGraphNavigator(neo4j_uri, neo4j_user, neo4j_password) def execute(self, user_query: str): print(f"[*] Analyzing Query: {user_query}") - + # 1. Prompt Analyzer analysis = deconstruct(user_query) terms = analysis['nouns'] @@ -24,9 +36,10 @@ class RootAIPipeline: semantic_map = self.kg.get_semantic_context(terms) # 3. Path B: Resource Manager (Context & WORM) - # Strict integrity check before accessing grounding data - if not verify_worm_integrity(self.WORM_PATH, self.EXPECTED_HASH): - raise SecurityError("WORM Integrity Compromised!") + # Integrity check is performed only when a hash is configured + if self.EXPECTED_HASH: + if not verify_worm_integrity(self.WORM_PATH, self.EXPECTED_HASH): + raise SecurityError("WORM Integrity Compromised!") with open(self.WORM_PATH, 'r') as f: grounding_data = yaml.safe_load(f) @@ -34,7 +47,7 @@ class RootAIPipeline: # 4. Reasoning Bridge & Constraint Checker (Synthesis) # Using poc-v2 quality scoring to evaluate Path B quality_report = evaluate_source_quality(str(grounding_data), len(grounding_data)) - + # Construct the "Verified Execution Plan" verified_plan = { "intent": user_query, @@ -45,7 +58,3 @@ class RootAIPipeline: } return verified_plan - -# Usage -root_ai = RootAIPipeline() -plan = root_ai.execute("Create an auth session with JWT")