Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions deconstructor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import re
from typing import Dict, List

# Common English function words to exclude from noun extraction
_STOPWORDS = {
'a', 'an', 'the', 'with', 'in', 'on', 'at', 'to', 'for',
'of', 'and', 'or', 'but', 'is', 'are', 'was', 'were', 'be',
'been', 'being', 'do', 'does', 'did', 'has', 'have', 'had',
'will', 'would', 'could', 'should', 'may', 'might', 'shall',
'can', 'not', 'no', 'nor', 'so', 'yet', 'both', 'either',
'that', 'this', 'these', 'those', 'it', 'its', 'my', 'your',
'his', 'her', 'our', 'their', 'we', 'you', 'he', 'she', 'they',
'i', 'me', 'him', 'us', 'them', 'who', 'which', 'what', 'how',
'when', 'where', 'why', 'all', 'each', 'every', 'some', 'any',
}


def deconstruct(query: str) -> Dict[str, List[str]]:
"""
Prompt Analyzer: Deconstructs a user query into semantic components.
Returns a dict with 'nouns' extracted from the query for graph lookup.
"""
tokens = re.findall(r'[a-zA-Z]+', query.lower())
nouns = [t for t in tokens if t not in _STOPWORDS and len(t) > 2]
return {'nouns': nouns}
42 changes: 42 additions & 0 deletions integrity_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import hashlib
from typing import Dict
from pydantic import BaseModel


class ReliabilityReport(BaseModel):
score: float
assessment: str
indicators: Dict[str, bool]


def evaluate_source_quality(content: str, source_count: int) -> ReliabilityReport:
"""
Ported from poc-v2: Logic & Constraint Checker logic.
Analyzes quality based on specific data points and structure.
"""
indicators = {
"has_citations": "[" in content and "]" in content,
"has_specific_data": any(char.isdigit() for char in content),
"reasonable_length": len(content) > 200,
"has_multiple_sources": source_count > 1
}

# Calculate score as a percentage of passed checks
passed_checks = sum(1 for v in indicators.values() if v)
score = passed_checks / len(indicators)

assessment = "high" if score > 0.7 else "medium" if score > 0.4 else "low"

return ReliabilityReport(score=score, assessment=assessment, indicators=indicators)


def verify_worm_integrity(file_path: str, expected_hash: str) -> bool:
"""
Resource Manager: Immutable storage verification.
Ensures Path B (Context) is untampered.
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest() == expected_hash
40 changes: 40 additions & 0 deletions kg_navigator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
from neo4j import GraphDatabase


class KnowledgeGraphNavigator:
def __init__(self, uri, user, password):
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
except Exception as e:
logging.error("Failed to initialize Neo4j driver: %s", e)
raise

def close(self):
self.driver.close()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def get_semantic_context(self, terms: list):
"""
Path A: Map tokens to the Etymological Graph.
Returns the 'Core Concept Map' for the Reasoning Bridge.
"""
context_map = {}
with self.driver.session() as session:
for term in terms:
# Optimized Cypher using the execute_read pattern for safety
query = """
MATCH (n:EtymologicalRoot {name: $term})
OPTIONAL MATCH (n)-[:DEFINES|RELATED_TO]-(related)
RETURN n.name as root, collect(related.name) as relations
"""
result = session.run(query, term=term.lower())
record = result.single()
if record:
context_map[record["root"]] = record["relations"]
return context_map
94 changes: 94 additions & 0 deletions tests/test_unified_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import hashlib
import os
import tempfile
import pytest

from deconstructor import deconstruct
from integrity_manager import evaluate_source_quality, verify_worm_integrity
from unified_pipeline import SecurityError


class TestDeconstruct:
def test_returns_nouns_key(self):
result = deconstruct("Create an auth session with JWT")
assert "nouns" in result

def test_filters_stopwords(self):
result = deconstruct("Create an auth session with JWT")
assert "an" not in result["nouns"]
assert "with" not in result["nouns"]

def test_extracts_meaningful_terms(self):
result = deconstruct("Create an auth session with JWT")
assert "auth" in result["nouns"]
assert "session" in result["nouns"]
assert "jwt" in result["nouns"]

def test_returns_lowercase_terms(self):
result = deconstruct("JWT Auth Session")
for noun in result["nouns"]:
assert noun == noun.lower()

def test_empty_query(self):
result = deconstruct("")
assert result["nouns"] == []

def test_filters_short_tokens(self):
result = deconstruct("an is at to")
assert result["nouns"] == []


class TestEvaluateSourceQuality:
def test_high_quality_content(self):
content = (
"This content references [source1] and has 42 data points. " * 10
)
report = evaluate_source_quality(content, 3)
assert report.score > 0.7
assert report.assessment == "high"

def test_low_quality_content(self):
report = evaluate_source_quality("short", 1)
assert report.assessment == "low"

def test_score_between_zero_and_one(self):
report = evaluate_source_quality("some content", 1)
assert 0.0 <= report.score <= 1.0

def test_indicators_present(self):
report = evaluate_source_quality("content [ref] with 5 items " * 20, 2)
assert "has_citations" in report.indicators
assert "has_specific_data" in report.indicators
assert "reasonable_length" in report.indicators
assert "has_multiple_sources" in report.indicators


class TestVerifyWormIntegrity:
def test_matching_hash_returns_true(self):
content = b"secure worm content"
expected = hashlib.sha256(content).hexdigest()
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(content)
path = f.name
try:
assert verify_worm_integrity(path, expected) is True
finally:
os.unlink(path)

def test_wrong_hash_returns_false(self):
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"some content")
path = f.name
try:
assert verify_worm_integrity(path, "deadbeef" * 8) is False
finally:
os.unlink(path)


class TestSecurityError:
def test_security_error_is_exception(self):
assert issubclass(SecurityError, Exception)

def test_security_error_can_be_raised(self):
with pytest.raises(SecurityError, match="WORM Integrity Compromised!"):
raise SecurityError("WORM Integrity Compromised!")
63 changes: 63 additions & 0 deletions unified_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import yaml
from deconstructor import deconstruct
from integrity_manager import verify_worm_integrity, evaluate_source_quality
from kg_navigator import KnowledgeGraphNavigator


class SecurityError(Exception):
"""Raised when a security integrity check fails."""
pass


class RootAIPipeline:
def __init__(self):
# Resource Manager Configuration
self.WORM_PATH = os.getenv("WORM_PATH", "secure-code.yaml")
self.EXPECTED_HASH = os.getenv("WORM_EXPECTED_HASH", "") # Defined in Central Manifest

# Knowledge Graph Navigator (Path A)
neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
neo4j_user = os.getenv("NEO4J_USER", "neo4j")
neo4j_password = os.getenv("NEO4J_PASSWORD", "")
self.kg = KnowledgeGraphNavigator(neo4j_uri, neo4j_user, neo4j_password)

def execute(self, user_query: str):
print(f"[*] Analyzing Query: {user_query}")

# 1. Prompt Analyzer
analysis = deconstruct(user_query)
terms = analysis['nouns']

# 2. Path A: Knowledge Graph Navigator (Meaning)
# Pulls semantic context from the seeded etymological graph
semantic_map = self.kg.get_semantic_context(terms)

# 3. Path B: Resource Manager (Context & WORM)
# Strict integrity check before accessing grounding data
if not verify_worm_integrity(self.WORM_PATH, self.EXPECTED_HASH):
raise SecurityError("WORM Integrity Compromised!")

with open(self.WORM_PATH, 'r') as f:
grounding_data = yaml.safe_load(f)

# 4. Reasoning Bridge & Constraint Checker (Synthesis)
# Using poc-v2 quality scoring to evaluate Path B
quality_report = evaluate_source_quality(yaml.dump(grounding_data), len(grounding_data))

# Construct the "Verified Execution Plan"
verified_plan = {
"intent": user_query,
"semantic_roots": semantic_map,
"hard_constraints": grounding_data.get('constraints', {}).get('hard', []),
"reliability": quality_report.score,
"hallucination_risk": "low" if quality_report.score > 0.7 else "high"
}

return verified_plan


# Usage
if __name__ == "__main__":
root_ai = RootAIPipeline()
plan = root_ai.execute("Create an auth session with JWT")