From 25736b430d3445977c61a6b2eb23e2ebfbbbcc98 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 12:55:37 +0000 Subject: [PATCH 1/2] Initial plan From f2d29f5a90351b30f19c85adc895addc1dcdfbee Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 13:01:53 +0000 Subject: [PATCH 2/2] Fix unified_pipeline: add missing Python modules, SecurityError, and env-based credentials Co-authored-by: tattoosonmyskin <155841536+tattoosonmyskin@users.noreply.github.com> --- deconstructor.py | 25 +++++++++ integrity_manager.py | 42 +++++++++++++++ kg_navigator.py | 40 +++++++++++++++ tests/test_unified_pipeline.py | 94 ++++++++++++++++++++++++++++++++++ unified_pipeline.py | 63 +++++++++++++++++++++++ 5 files changed, 264 insertions(+) create mode 100644 deconstructor.py create mode 100644 integrity_manager.py create mode 100644 kg_navigator.py create mode 100644 tests/test_unified_pipeline.py create mode 100644 unified_pipeline.py diff --git a/deconstructor.py b/deconstructor.py new file mode 100644 index 0000000..4fbb0ad --- /dev/null +++ b/deconstructor.py @@ -0,0 +1,25 @@ +import re +from typing import Dict, List + +# Common English function words to exclude from noun extraction +_STOPWORDS = { + 'a', 'an', 'the', 'with', 'in', 'on', 'at', 'to', 'for', + 'of', 'and', 'or', 'but', 'is', 'are', 'was', 'were', 'be', + 'been', 'being', 'do', 'does', 'did', 'has', 'have', 'had', + 'will', 'would', 'could', 'should', 'may', 'might', 'shall', + 'can', 'not', 'no', 'nor', 'so', 'yet', 'both', 'either', + 'that', 'this', 'these', 'those', 'it', 'its', 'my', 'your', + 'his', 'her', 'our', 'their', 'we', 'you', 'he', 'she', 'they', + 'i', 'me', 'him', 'us', 'them', 'who', 'which', 'what', 'how', + 'when', 'where', 'why', 'all', 'each', 'every', 'some', 'any', +} + + +def deconstruct(query: str) -> Dict[str, List[str]]: + """ + Prompt Analyzer: Deconstructs a user query into semantic components. + Returns a dict with 'nouns' extracted from the query for graph lookup. + """ + tokens = re.findall(r'[a-zA-Z]+', query.lower()) + nouns = [t for t in tokens if t not in _STOPWORDS and len(t) > 2] + return {'nouns': nouns} diff --git a/integrity_manager.py b/integrity_manager.py new file mode 100644 index 0000000..8c1f532 --- /dev/null +++ b/integrity_manager.py @@ -0,0 +1,42 @@ +import hashlib +from typing import Dict +from pydantic import BaseModel + + +class ReliabilityReport(BaseModel): + score: float + assessment: str + indicators: Dict[str, bool] + + +def evaluate_source_quality(content: str, source_count: int) -> ReliabilityReport: + """ + Ported from poc-v2: Logic & Constraint Checker logic. + Analyzes quality based on specific data points and structure. + """ + indicators = { + "has_citations": "[" in content and "]" in content, + "has_specific_data": any(char.isdigit() for char in content), + "reasonable_length": len(content) > 200, + "has_multiple_sources": source_count > 1 + } + + # Calculate score as a percentage of passed checks + passed_checks = sum(1 for v in indicators.values() if v) + score = passed_checks / len(indicators) + + assessment = "high" if score > 0.7 else "medium" if score > 0.4 else "low" + + return ReliabilityReport(score=score, assessment=assessment, indicators=indicators) + + +def verify_worm_integrity(file_path: str, expected_hash: str) -> bool: + """ + Resource Manager: Immutable storage verification. + Ensures Path B (Context) is untampered. + """ + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() == expected_hash diff --git a/kg_navigator.py b/kg_navigator.py new file mode 100644 index 0000000..90e4aa4 --- /dev/null +++ b/kg_navigator.py @@ -0,0 +1,40 @@ +import logging +from neo4j import GraphDatabase + + +class KnowledgeGraphNavigator: + def __init__(self, uri, user, password): + try: + self.driver = GraphDatabase.driver(uri, auth=(user, password)) + except Exception as e: + logging.error("Failed to initialize Neo4j driver: %s", e) + raise + + def close(self): + self.driver.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get_semantic_context(self, terms: list): + """ + Path A: Map tokens to the Etymological Graph. + Returns the 'Core Concept Map' for the Reasoning Bridge. + """ + context_map = {} + with self.driver.session() as session: + for term in terms: + # Optimized Cypher using the execute_read pattern for safety + query = """ + MATCH (n:EtymologicalRoot {name: $term}) + OPTIONAL MATCH (n)-[:DEFINES|RELATED_TO]-(related) + RETURN n.name as root, collect(related.name) as relations + """ + result = session.run(query, term=term.lower()) + record = result.single() + if record: + context_map[record["root"]] = record["relations"] + return context_map diff --git a/tests/test_unified_pipeline.py b/tests/test_unified_pipeline.py new file mode 100644 index 0000000..0ecf9ce --- /dev/null +++ b/tests/test_unified_pipeline.py @@ -0,0 +1,94 @@ +import hashlib +import os +import tempfile +import pytest + +from deconstructor import deconstruct +from integrity_manager import evaluate_source_quality, verify_worm_integrity +from unified_pipeline import SecurityError + + +class TestDeconstruct: + def test_returns_nouns_key(self): + result = deconstruct("Create an auth session with JWT") + assert "nouns" in result + + def test_filters_stopwords(self): + result = deconstruct("Create an auth session with JWT") + assert "an" not in result["nouns"] + assert "with" not in result["nouns"] + + def test_extracts_meaningful_terms(self): + result = deconstruct("Create an auth session with JWT") + assert "auth" in result["nouns"] + assert "session" in result["nouns"] + assert "jwt" in result["nouns"] + + def test_returns_lowercase_terms(self): + result = deconstruct("JWT Auth Session") + for noun in result["nouns"]: + assert noun == noun.lower() + + def test_empty_query(self): + result = deconstruct("") + assert result["nouns"] == [] + + def test_filters_short_tokens(self): + result = deconstruct("an is at to") + assert result["nouns"] == [] + + +class TestEvaluateSourceQuality: + def test_high_quality_content(self): + content = ( + "This content references [source1] and has 42 data points. " * 10 + ) + report = evaluate_source_quality(content, 3) + assert report.score > 0.7 + assert report.assessment == "high" + + def test_low_quality_content(self): + report = evaluate_source_quality("short", 1) + assert report.assessment == "low" + + def test_score_between_zero_and_one(self): + report = evaluate_source_quality("some content", 1) + assert 0.0 <= report.score <= 1.0 + + def test_indicators_present(self): + report = evaluate_source_quality("content [ref] with 5 items " * 20, 2) + assert "has_citations" in report.indicators + assert "has_specific_data" in report.indicators + assert "reasonable_length" in report.indicators + assert "has_multiple_sources" in report.indicators + + +class TestVerifyWormIntegrity: + def test_matching_hash_returns_true(self): + content = b"secure worm content" + expected = hashlib.sha256(content).hexdigest() + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(content) + path = f.name + try: + assert verify_worm_integrity(path, expected) is True + finally: + os.unlink(path) + + def test_wrong_hash_returns_false(self): + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(b"some content") + path = f.name + try: + assert verify_worm_integrity(path, "deadbeef" * 8) is False + finally: + os.unlink(path) + + +class TestSecurityError: + def test_security_error_is_exception(self): + assert issubclass(SecurityError, Exception) + + def test_security_error_can_be_raised(self): + with pytest.raises(SecurityError, match="WORM Integrity Compromised!"): + raise SecurityError("WORM Integrity Compromised!") diff --git a/unified_pipeline.py b/unified_pipeline.py new file mode 100644 index 0000000..3918498 --- /dev/null +++ b/unified_pipeline.py @@ -0,0 +1,63 @@ +import os +import yaml +from deconstructor import deconstruct +from integrity_manager import verify_worm_integrity, evaluate_source_quality +from kg_navigator import KnowledgeGraphNavigator + + +class SecurityError(Exception): + """Raised when a security integrity check fails.""" + pass + + +class RootAIPipeline: + def __init__(self): + # Resource Manager Configuration + self.WORM_PATH = os.getenv("WORM_PATH", "secure-code.yaml") + self.EXPECTED_HASH = os.getenv("WORM_EXPECTED_HASH", "") # Defined in Central Manifest + + # Knowledge Graph Navigator (Path A) + neo4j_uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD", "") + self.kg = KnowledgeGraphNavigator(neo4j_uri, neo4j_user, neo4j_password) + + def execute(self, user_query: str): + print(f"[*] Analyzing Query: {user_query}") + + # 1. Prompt Analyzer + analysis = deconstruct(user_query) + terms = analysis['nouns'] + + # 2. Path A: Knowledge Graph Navigator (Meaning) + # Pulls semantic context from the seeded etymological graph + semantic_map = self.kg.get_semantic_context(terms) + + # 3. Path B: Resource Manager (Context & WORM) + # Strict integrity check before accessing grounding data + if not verify_worm_integrity(self.WORM_PATH, self.EXPECTED_HASH): + raise SecurityError("WORM Integrity Compromised!") + + with open(self.WORM_PATH, 'r') as f: + grounding_data = yaml.safe_load(f) + + # 4. Reasoning Bridge & Constraint Checker (Synthesis) + # Using poc-v2 quality scoring to evaluate Path B + quality_report = evaluate_source_quality(yaml.dump(grounding_data), len(grounding_data)) + + # Construct the "Verified Execution Plan" + verified_plan = { + "intent": user_query, + "semantic_roots": semantic_map, + "hard_constraints": grounding_data.get('constraints', {}).get('hard', []), + "reliability": quality_report.score, + "hallucination_risk": "low" if quality_report.score > 0.7 else "high" + } + + return verified_plan + + +# Usage +if __name__ == "__main__": + root_ai = RootAIPipeline() + plan = root_ai.execute("Create an auth session with JWT")