Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Package marker
140 changes: 140 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
RootAI FastAPI application.

Endpoints
---------
GET /health – liveness probe
POST /authority/verify – check whether an action is authorised
POST /pipeline/execute – run the unified RootAI pipeline
"""

from __future__ import annotations

import os
import sys
from contextlib import asynccontextmanager

# Ensure the repo root is on sys.path so sibling modules are importable
# when the app is launched via `uvicorn api.main:app`.
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))

from typing import Optional

import yaml
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from authority_gate import AuthorityGate
from deconstructor import deconstruct
from integrity_manager import evaluate_source_quality, verify_worm_integrity
from kg_navigator import KnowledgeGraphNavigator

# ---------------------------------------------------------------------------
# Application lifespan – initialise shared resources once at startup
# ---------------------------------------------------------------------------

_kg: Optional[KnowledgeGraphNavigator] = None


@asynccontextmanager
async def lifespan(application: FastAPI):
global _kg
neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
neo4j_user = os.getenv("NEO4J_USER", "neo4j")
neo4j_password = os.getenv("NEO4J_PASSWORD", "rootai")
_kg = KnowledgeGraphNavigator(neo4j_uri, neo4j_user, neo4j_password)
yield
if _kg is not None:
_kg.close()


# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------

app = FastAPI(title="RootAI API", version="1.0.0", lifespan=lifespan)

# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------


class HealthResponse(BaseModel):
status: str


class ActionRequest(BaseModel):
action: str
token: Optional[str] = None


class AuthorityResponse(BaseModel):
action: str
authorized: bool


class PipelineRequest(BaseModel):
query: str
worm_path: str = "packs/secure-code.yaml"
expected_hash: Optional[str] = None


# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------


@app.get("/health", response_model=HealthResponse)
def health() -> HealthResponse:
"""Liveness probe."""
return HealthResponse(status="ok")


@app.post("/authority/verify", response_model=AuthorityResponse)
def authority_verify(req: ActionRequest) -> AuthorityResponse:
"""Check whether *action* is authorised with the supplied *token*."""
gate = AuthorityGate()
authorized = gate.verify_authority(req.action, req.token)
return AuthorityResponse(action=req.action, authorized=authorized)


@app.post("/pipeline/execute")
def pipeline_execute(req: PipelineRequest) -> dict:
"""
Run the RootAI unified pipeline:

1. Deconstruct the query (Prompt Analyzer).
2. Map terms to the Knowledge Graph (Path A).
3. Load WORM constraints — skip integrity check when no hash is supplied
so that the API remains useful without a pre-computed hash (Path B).
4. Score grounding quality and return a Verified Execution Plan.
"""
# Step 1 – Prompt Analyzer
analysis = deconstruct(req.query)
terms = analysis["nouns"]

# Step 2 – Path A: Knowledge Graph Navigator (shared instance)
kg = _kg
semantic_map = kg.get_semantic_context(terms) if kg is not None else {}

# Step 3 – Path B: Resource Manager / WORM
if req.expected_hash:
if not verify_worm_integrity(req.worm_path, req.expected_hash):
raise HTTPException(status_code=409, detail="WORM integrity check failed")

try:
with open(req.worm_path, "r") as fh:
grounding_data = yaml.safe_load(fh)
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"WORM file not found: {req.worm_path}")

# Step 4 – Reasoning Bridge & Constraint Checker
quality_report = evaluate_source_quality(str(grounding_data), len(grounding_data))

return {
"intent": req.query,
"semantic_roots": semantic_map,
"hard_constraints": grounding_data.get("constraints", {}).get("hard", []),
"reliability": quality_report.score,
"hallucination_risk": "low" if quality_report.score > 0.7 else "high",
}
37 changes: 37 additions & 0 deletions deconstructor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Prompt Analyzer – deconstructs a natural-language query into its
structural components (nouns, verbs, original text).

Uses spaCy when the 'en_core_web_sm' model is available and falls
back to simple whitespace tokenisation otherwise. The spaCy model
is loaded once at module level to avoid repeated initialisation overhead.
"""

_nlp = None


def _get_nlp():
"""Return a cached spaCy model, loading it on first call."""
global _nlp
if _nlp is None:
try:
import spacy
_nlp = spacy.load("en_core_web_sm")
except Exception:
_nlp = False # sentinel: spaCy unavailable
return _nlp


def deconstruct(text: str) -> dict:
"""Return a dict with 'nouns', 'verbs', and 'original' keys."""
nlp = _get_nlp()
if nlp:
doc = nlp(text)
nouns = [token.text.lower() for token in doc if token.pos_ in ("NOUN", "PROPN")]
verbs = [token.text.lower() for token in doc if token.pos_ == "VERB"]
else:
# Graceful fallback: treat every whitespace-separated word as a noun
nouns = text.lower().split()
verbs = []

return {"nouns": nouns, "verbs": verbs, "original": text}
2 changes: 1 addition & 1 deletion docker_RootAi_fastAPI
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ COPY . .
EXPOSE 8000

# Start the API with uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
42 changes: 42 additions & 0 deletions integrity_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Integrity Manager – Resource Manager utilities.

* verify_worm_integrity: SHA-256 hash check for immutable WORM storage.
* evaluate_source_quality: Reliability scoring for grounding documents.
"""

import hashlib
from typing import Dict

from pydantic import BaseModel


class ReliabilityReport(BaseModel):
score: float
assessment: str
indicators: Dict[str, bool]


def evaluate_source_quality(content: str, source_count: int) -> ReliabilityReport:
"""Analyse quality based on specific data-points and structure."""
indicators = {
"has_citations": "[" in content and "]" in content,
"has_specific_data": any(char.isdigit() for char in content),
"reasonable_length": len(content) > 200,
"has_multiple_sources": source_count > 1,
}

passed_checks = sum(1 for v in indicators.values() if v)
score = passed_checks / len(indicators)
assessment = "high" if score > 0.7 else "medium" if score > 0.4 else "low"

return ReliabilityReport(score=score, assessment=assessment, indicators=indicators)


def verify_worm_integrity(file_path: str, expected_hash: str) -> bool:
"""Return True only when the file's SHA-256 digest matches expected_hash."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest() == expected_hash
54 changes: 54 additions & 0 deletions kg_navigator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Knowledge Graph Navigator – Path A (Semantic Meaning).

Queries the Neo4j etymological graph that is seeded by
Etymological_Seeder. When Neo4j is unavailable the navigator
degrades gracefully and returns empty context maps so that the
rest of the pipeline can continue.
"""

import logging

logger = logging.getLogger(__name__)


class KnowledgeGraphNavigator:
def __init__(self, uri: str, user: str, password: str):
self._available = False
try:
from neo4j import GraphDatabase # type: ignore
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self._available = True
except Exception as exc:
logger.warning("Neo4j unavailable – KnowledgeGraphNavigator degraded: %s", exc)

def close(self):
if self._available:
self.driver.close()

def get_semantic_context(self, terms: list) -> dict:
"""
Path A: map tokens to the Etymological Graph.
Returns the 'Core Concept Map' for the Reasoning Bridge.
Falls back to an empty mapping when Neo4j is not reachable.
"""
if not self._available:
return {term: [] for term in terms}

context_map = {}
try:
with self.driver.session() as session:
for term in terms:
query = """
MATCH (n:EtymologicalRoot {name: $term})
OPTIONAL MATCH (n)-[:DEFINES|RELATED_TO]-(related)
RETURN n.name as root, collect(related.name) as relations
"""
result = session.run(query, term=term.lower())
record = result.single()
if record:
context_map[record["root"]] = record["relations"]
except Exception as exc:
logger.warning("Error querying knowledge graph: %s", exc)

return context_map
13 changes: 13 additions & 0 deletions packs/secure-code.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: "1.0"
constraints:
hard:
- "No use of eval() or exec()"
- "All inputs must be validated and sanitized"
- "Authentication required for protected actions"
- "Use prepared statements for database queries"
- "Secrets must not be hardcoded in source code"
- "Use HTTPS for all external communications"
soft:
- "Prefer immutable data structures where possible"
- "Use logging for audit trails of sensitive operations"
- "Follow the principle of least privilege"
8 changes: 8 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fastapi>=0.111.0
uvicorn[standard]>=0.29.0
pydantic>=2.0.0
pyyaml>=6.0
pytest>=8.0.0
httpx>=0.27.0
spacy>=3.7.0
neo4j>=5.0.0
65 changes: 65 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient

from api.main import app

client = TestClient(app)


class TestHealth:
def test_health_returns_ok(self):
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}


class TestAuthorityVerify:
def test_low_risk_action_is_authorized(self):
response = client.post("/authority/verify", json={"action": "READ_FILE"})
assert response.status_code == 200
assert response.json()["authorized"] is True

def test_protected_action_without_token_is_denied(self):
response = client.post("/authority/verify", json={"action": "FILE_DELETE"})
assert response.status_code == 200
assert response.json()["authorized"] is False

def test_protected_action_with_correct_token_is_authorized(self):
with patch.dict(os.environ, {"ROOTAI_AUTH_TOKEN": "s3cr3t"}):
response = client.post(
"/authority/verify", json={"action": "FILE_DELETE", "token": "s3cr3t"}
)
assert response.status_code == 200
assert response.json()["authorized"] is True

def test_response_includes_action_field(self):
response = client.post("/authority/verify", json={"action": "LOG_EVENT"})
assert response.json()["action"] == "LOG_EVENT"


class TestPipelineExecute:
def test_pipeline_returns_expected_keys(self):
response = client.post("/pipeline/execute", json={"query": "Create an auth session"})
assert response.status_code == 200
body = response.json()
for key in ("intent", "semantic_roots", "hard_constraints", "reliability", "hallucination_risk"):
assert key in body

def test_pipeline_echoes_query_as_intent(self):
response = client.post("/pipeline/execute", json={"query": "validate input"})
assert response.json()["intent"] == "validate input"

def test_pipeline_returns_hard_constraints_from_yaml(self):
response = client.post("/pipeline/execute", json={"query": "test query"})
constraints = response.json()["hard_constraints"]
assert isinstance(constraints, list)
assert len(constraints) > 0

def test_pipeline_missing_worm_file_returns_404(self):
response = client.post(
"/pipeline/execute",
json={"query": "test", "worm_path": "nonexistent/path.yaml"},
)
assert response.status_code == 404
Loading