diff --git a/.gitignore b/.gitignore index 96a52ddb..baf7406e 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ data/ *.faiss *.pkl index/cache/ +index/llamaindex/ # --- Model files --- models/ diff --git a/scripts/build_index.sh b/scripts/build_index.sh new file mode 100644 index 00000000..90e0b5ff --- /dev/null +++ b/scripts/build_index.sh @@ -0,0 +1 @@ +nohup python -u -m src.main index > out.log 2>&1 & \ No newline at end of file diff --git a/src/llamaindex/__init__.py b/src/llamaindex/__init__.py new file mode 100644 index 00000000..073e91f1 --- /dev/null +++ b/src/llamaindex/__init__.py @@ -0,0 +1,9 @@ +""" +LlamaIndex-based RAG pipeline for TokenSmith. + +Behaviorally equivalent to the original src/ pipeline: +- Qwen/Qwen3-Embedding-4B via HuggingFace for embeddings +- Qwen2.5-1.5B GGUF model for generation +- Vector + BM25 retrieval with RRF fusion (LlamaIndex built-in modules) +- Cross-encoder reranking (ms-marco-MiniLM-L6-v2) +""" diff --git a/src/llamaindex/__main__.py b/src/llamaindex/__main__.py new file mode 100644 index 00000000..456ecb3d --- /dev/null +++ b/src/llamaindex/__main__.py @@ -0,0 +1,4 @@ +"""Allow running as: python -m src.llamaindex """ +from .main import main + +main() diff --git a/src/llamaindex/config.py b/src/llamaindex/config.py new file mode 100644 index 00000000..2a0fc6dc --- /dev/null +++ b/src/llamaindex/config.py @@ -0,0 +1,63 @@ +"""Configuration for the LlamaIndex RAG pipeline. + +Defaults match the original TokenSmith config/config.yaml. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +import yaml + + +@dataclass +class LlamaIndexConfig: + + # ── Paths ──────────────────────────────────────────────────────────── + data_dir: str = "data" + persist_dir: str = "index/llamaindex" + log_dir: str = "logs/llamaindex" + + # ── Embedding (same GGUF model as original pipeline) ─────────────── + embed_model: str = "models/Qwen3-Embedding-4B-Q5_K_M.gguf" + embed_n_ctx: int = 4096 + + # ── Generation (same GGUF model as original pipeline) ──────────────── + gen_model: str = "models/qwen2.5-1.5b-instruct-q5_k_m.gguf" + gen_context_window: int = 4096 + max_gen_tokens: int = 400 + gen_temperature: float = 0.2 + n_gpu_layers: int = -1 # -1 = offload all to GPU + + # ── Chunking (matches original: 2000 / 200) ───────────────────────── + chunk_size: int = 2000 + chunk_overlap: int = 200 + + # ── Retrieval (matches original: RRF fusion) ───────────────────────── + num_candidates: int = 50 # per-retriever pool size + top_k: int = 5 # final chunks after reranking + + # ── Reranking (same cross-encoder as original) ─────────────────────── + rerank_model: str = "cross-encoder/ms-marco-MiniLM-L6-v2" + use_reranker: bool = True + + # ── System prompt ──────────────────────────────────────────────────── + system_prompt: str = ( + "You are a helpful assistant. Answer the question using the provided " + "context excerpts. If the context doesn't contain the answer, say so. " + "Be concise and accurate." + ) + + # ── Factory ────────────────────────────────────────────────────────── + @classmethod + def from_yaml(cls, path: os.PathLike) -> "LlamaIndexConfig": + with open(path, "r") as f: + data = yaml.safe_load(f) + valid = {k: v for k, v in data.items() if k in cls.__dataclass_fields__} + return cls(**valid) + + def __post_init__(self) -> None: + Path(self.persist_dir).mkdir(parents=True, exist_ok=True) + Path(self.log_dir).mkdir(parents=True, exist_ok=True) diff --git a/src/llamaindex/indexer.py b/src/llamaindex/indexer.py new file mode 100644 index 00000000..0d80d197 --- /dev/null +++ b/src/llamaindex/indexer.py @@ -0,0 +1,106 @@ +""" +Document ingestion, parsing, and index management. + +Pipeline: + 1. Load markdown docs from data/ + 2. Parse with MarkdownNodeParser (header-aware splitting) + 3. Apply SentenceSplitter for size-consistent chunks (2000 chars / 200 overlap) + 4. Build VectorStoreIndex with GGUF embeddings + 5. Persist to disk for fast reload +""" + +from __future__ import annotations + +import time +from pathlib import Path + +from llama_index.core import ( + Document, + SimpleDirectoryReader, + StorageContext, + VectorStoreIndex, + load_index_from_storage, +) +from llama_index.core.node_parser import MarkdownNodeParser, SentenceSplitter +from llama_index.core.ingestion import IngestionPipeline + +from .config import LlamaIndexConfig + + +def load_markdown_documents(data_dir: str) -> list[Document]: + """Load all markdown files from data_dir.""" + data_path = Path(data_dir) + md_files = sorted(data_path.glob("*.md")) + if not md_files: + raise FileNotFoundError( + f"No markdown files found in {data_dir}/. " + "Run extraction first or place .md files there." + ) + print(f"Found {len(md_files)} markdown file(s): {[f.name for f in md_files]}") + reader = SimpleDirectoryReader(input_files=[str(f) for f in md_files]) + return reader.load_data(show_progress=True) + + +def build_ingestion_pipeline(cfg: LlamaIndexConfig) -> IngestionPipeline: + """MarkdownNodeParser → SentenceSplitter (matches original recursive_sections chunking).""" + return IngestionPipeline( + transformations=[ + MarkdownNodeParser(), + SentenceSplitter( + chunk_size=cfg.chunk_size, + chunk_overlap=cfg.chunk_overlap, + ), + ] + ) + + +def build_index(cfg: LlamaIndexConfig) -> VectorStoreIndex: + """Build a fresh VectorStoreIndex from documents and persist it.""" + print("=" * 60) + print("Building LlamaIndex VectorStoreIndex ...") + print(f" Data dir : {cfg.data_dir}") + print(f" Persist dir : {cfg.persist_dir}") + print(f" Embed model : {cfg.embed_model}") + print(f" Chunk size : {cfg.chunk_size} overlap: {cfg.chunk_overlap}") + print("=" * 60) + + t0 = time.time() + + documents = load_markdown_documents(cfg.data_dir) + print(f"Loaded {len(documents)} document(s) in {time.time() - t0:.1f}s") + + pipeline = build_ingestion_pipeline(cfg) + nodes = pipeline.run(documents=documents, show_progress=True) + print(f"Created {len(nodes)} nodes after parsing + chunking") + + t1 = time.time() + index = VectorStoreIndex(nodes, show_progress=True) + print(f"Index built in {time.time() - t1:.1f}s") + + index.storage_context.persist(persist_dir=cfg.persist_dir) + print(f"Index persisted to {cfg.persist_dir}") + print(f"Total indexing time: {time.time() - t0:.1f}s") + + return index + + +def load_index(cfg: LlamaIndexConfig) -> VectorStoreIndex: + """Load a previously persisted index from disk.""" + persist_path = Path(cfg.persist_dir) + if not persist_path.exists(): + raise FileNotFoundError( + f"No persisted index at {cfg.persist_dir}. Run indexing first." + ) + print(f"Loading index from {cfg.persist_dir} ...") + storage_context = StorageContext.from_defaults(persist_dir=cfg.persist_dir) + index = load_index_from_storage(storage_context) + print("Index loaded successfully.") + return index + + +def get_or_build_index(cfg: LlamaIndexConfig, force_rebuild: bool = False) -> VectorStoreIndex: + """Load existing index or build a new one.""" + persist_path = Path(cfg.persist_dir) + if not force_rebuild and persist_path.exists() and any(persist_path.iterdir()): + return load_index(cfg) + return build_index(cfg) diff --git a/src/llamaindex/logger.py b/src/llamaindex/logger.py new file mode 100644 index 00000000..f3437c65 --- /dev/null +++ b/src/llamaindex/logger.py @@ -0,0 +1,61 @@ +""" +JSON query logger. + +Writes one pretty-printed .json file per session to logs/llamaindex/.json +""" + +from __future__ import annotations + +import json +from datetime import datetime +from pathlib import Path +from typing import Any + +from .config import LlamaIndexConfig + + +class QueryLogger: + """Pretty-printed JSON logger for query diagnostics.""" + + def __init__(self, cfg: LlamaIndexConfig) -> None: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + self._path = Path(cfg.log_dir) / f"run_{ts}.json" + self._data = { + "session_start": ts, + "config": { + "embed_model": cfg.embed_model, + "gen_model": cfg.gen_model, + "chunk_size": cfg.chunk_size, + "chunk_overlap": cfg.chunk_overlap, + "num_candidates": cfg.num_candidates, + "top_k": cfg.top_k, + "rerank_model": cfg.rerank_model if cfg.use_reranker else None, + }, + "queries": [], + } + self._flush() + + def log_query( + self, + question: str, + answer: str, + chunks: list[dict[str, Any]], + retrieval_time_s: float, + generation_time_s: float, + ) -> None: + """Log a single query with its chunks and timings.""" + self._data["queries"].append({ + "timestamp": datetime.now().isoformat(), + "question": question, + "answer": answer, + "num_chunks": len(chunks), + "chunks": chunks, + "retrieval_time_s": round(retrieval_time_s, 3), + "generation_time_s": round(generation_time_s, 3), + "total_time_s": round(retrieval_time_s + generation_time_s, 3), + }) + self._flush() + + def _flush(self) -> None: + with open(self._path, "w") as f: + json.dump(self._data, f, indent=2, ensure_ascii=False) diff --git a/src/llamaindex/main.py b/src/llamaindex/main.py new file mode 100644 index 00000000..d941c88e --- /dev/null +++ b/src/llamaindex/main.py @@ -0,0 +1,245 @@ +""" +CLI entry point for the LlamaIndex RAG pipeline. + +Mirrors the flow of src/main.py: + 1. Parse args -> load config + 2. Index mode : load docs -> chunk -> embed -> persist + 3. Chat mode : load artifacts once -> chat loop (retrieve -> rerank -> generate) + +Usage: + python -m src.llamaindex index + python -m src.llamaindex index --rebuild + python -m src.llamaindex chat + python -m src.llamaindex query "What is normalization?" +""" + +from __future__ import annotations + +import argparse +import sys +import time +from pathlib import Path +from typing import Dict, Optional + +from .config import LlamaIndexConfig + +ANSWER_NOT_FOUND = "I'm sorry, but I don't have enough information to answer that question." + + +# ── Argument parsing ───────────────────────────────────────────────────── + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="LlamaIndex RAG Pipeline") + parser.add_argument("mode", choices=["index", "chat", "query"]) + parser.add_argument("question", nargs="?", default=None) + parser.add_argument("--rebuild", action="store_true") + parser.add_argument("--data-dir", default=None) + parser.add_argument("--gen-model", default=None) + parser.add_argument("--embed-model", default=None) + parser.add_argument("--no-rerank", action="store_true") + parser.add_argument("--gpu-layers", type=int, default=None) + return parser.parse_args() + + +_CONFIG_PATH = "config/config.yaml" + + +def build_config(args: argparse.Namespace) -> LlamaIndexConfig: + cfg = LlamaIndexConfig.from_yaml(_CONFIG_PATH) + if args.data_dir: + cfg.data_dir = args.data_dir + if args.gen_model: + cfg.gen_model = args.gen_model + if args.embed_model: + cfg.embed_model = args.embed_model + if args.no_rerank: + cfg.use_reranker = False + if args.gpu_layers is not None: + cfg.n_gpu_layers = args.gpu_layers + return cfg + + +# ── Index mode ─────────────────────────────────────────────────────────── + + +def run_index_mode(cfg: LlamaIndexConfig, rebuild: bool) -> None: + from llama_index.core import Settings + from .models import build_llm, build_embed_model + from .indexer import get_or_build_index, build_index + + Settings.llm = build_llm(cfg) + Settings.embed_model = build_embed_model(cfg) + Settings.chunk_size = cfg.chunk_size + Settings.chunk_overlap = cfg.chunk_overlap + + if rebuild: + build_index(cfg) + else: + get_or_build_index(cfg) + print("\nIndexing complete.") + + +# ── Core query function ────────────────────────────────────────────────── + + +def get_answer( + question: str, + cfg: LlamaIndexConfig, + artifacts: Dict, + logger: Optional["QueryLogger"] = None, +) -> str: + """ + Run a single query through the pipeline. Mirrors src/main.py:get_answer(). + + Flow: retrieve (vector + BM25 via RRF) -> rerank -> generate (stream). + Logs question + chunks + answer to JSON. + """ + from llama_index.core import QueryBundle + from llama_index.core.response_synthesizers import get_response_synthesizer + + retriever = artifacts["retriever"] + reranker = artifacts["reranker"] + llm = artifacts["llm"] + + # Step 1: Retrieve (RRF fusion of vector + BM25) + t0 = time.time() + query_bundle = QueryBundle(query_str=question) + nodes = retriever.retrieve(query_bundle) + + # Step 2: Rerank with cross-encoder + if reranker is not None: + nodes = reranker.postprocess_nodes(nodes, query_bundle) + + retrieval_time = time.time() - t0 + + if not nodes: + print(f"\n{ANSWER_NOT_FOUND}\n") + return ANSWER_NOT_FOUND + + # Collect chunk info for logging + chunks_for_log = [] + for rank, node in enumerate(nodes, 1): + chunks_for_log.append({ + "rank": rank, + "score": round(float(node.score), 4) if node.score is not None else None, + "text": node.text, + "metadata": {k: str(v) for k, v in node.metadata.items()}, + "char_len": len(node.text), + }) + + # Step 3: Generate (streaming) + t1 = time.time() + synthesizer = get_response_synthesizer( + llm=llm, + response_mode="compact", + streaming=True, + ) + streaming_response = synthesizer.synthesize(question, nodes) + + print("\n" + "=" * 60) + print(" ANSWER") + print("=" * 60 + "\n") + + answer_text = "" + for token in streaming_response.response_gen: + print(token, end="", flush=True) + answer_text += token + print("\n\n" + "=" * 60 + "\n") + + generation_time = time.time() - t1 + + # Log to JSON + if logger: + logger.log_query( + question=question, + answer=answer_text, + chunks=chunks_for_log, + retrieval_time_s=retrieval_time, + generation_time_s=generation_time, + ) + + return answer_text + + +# ── Initialization helper ──────────────────────────────────────────────── + + +def init_artifacts(cfg: LlamaIndexConfig) -> tuple: + """Load models + index + retriever + reranker. Fail fast on any error.""" + from llama_index.core import Settings + from .models import build_llm, build_embed_model + from .indexer import load_index + from .retriever import build_retriever, build_reranker + from .logger import QueryLogger + + llm = build_llm(cfg) + embed_model = build_embed_model(cfg) + Settings.llm = llm + Settings.embed_model = embed_model + + index = load_index(cfg) + retriever = build_retriever(index, cfg) + reranker = build_reranker(cfg) + + artifacts = {"llm": llm, "retriever": retriever, "reranker": reranker} + logger = QueryLogger(cfg) + + return artifacts, logger + + +# ── Chat session ───────────────────────────────────────────────────────── + + +def run_chat_session(cfg: LlamaIndexConfig) -> None: + """Load artifacts once, then run the interactive chat loop.""" + print("Initializing LlamaIndex pipeline...") + artifacts, logger = init_artifacts(cfg) + print("Initialization complete. Type 'exit' or 'quit' to end.\n") + + while True: + try: + q = input("Ask > ").strip() + if not q: + continue + if q.lower() in {"exit", "quit"}: + print("Goodbye!") + break + get_answer(q, cfg, artifacts, logger) + except KeyboardInterrupt: + print("\nGoodbye!") + break + except Exception as e: + print(f"\nError: {e}", file=sys.stderr) + raise + + +# ── Single query mode ──────────────────────────────────────────────────── + + +def run_query_mode(cfg: LlamaIndexConfig, question: str) -> None: + """Run a single query and exit.""" + artifacts, logger = init_artifacts(cfg) + get_answer(question, cfg, artifacts, logger) + + +# ── Entry point ────────────────────────────────────────────────────────── + + +def main() -> None: + args = parse_args() + cfg = build_config(args) + + if args.mode == "index": + run_index_mode(cfg, rebuild=args.rebuild) + elif args.mode == "chat": + run_chat_session(cfg) + elif args.mode == "query": + if not args.question: + print("Error: 'query' mode requires a question.", file=sys.stderr) + sys.exit(1) + run_query_mode(cfg, args.question) + + +if __name__ == "__main__": + main() diff --git a/src/llamaindex/models.py b/src/llamaindex/models.py new file mode 100644 index 00000000..0992ab5f --- /dev/null +++ b/src/llamaindex/models.py @@ -0,0 +1,103 @@ +""" +Model factories for the LlamaIndex pipeline. + +- LLM: Qwen 2.5 1.5B GGUF via llama-cpp-python (same as original) +- Embeddings: Qwen3-Embedding-4B Q5_K_M GGUF via llama-cpp-python (same as original) +""" + +from __future__ import annotations + +from typing import Any, List + +import numpy as np +from llama_index.core.bridge.pydantic import PrivateAttr +from llama_index.core.embeddings import BaseEmbedding +from llama_index.llms.llama_cpp import LlamaCPP + +from .config import LlamaIndexConfig + + +# ── Prompt formatting for Qwen / ChatML ────────────────────────────────── + + +def _messages_to_prompt(messages) -> str: + prompt = "" + for m in messages: + role = m.role.value if hasattr(m.role, "value") else m.role + prompt += f"<|im_start|>{role}\n{m.content}<|im_end|>\n" + prompt += "<|im_start|>assistant\n" + return prompt + + +def _completion_to_prompt(completion: str) -> str: + return ( + "<|im_start|>user\n" + f"{completion}<|im_end|>\n" + "<|im_start|>assistant\n" + ) + + +# ── GGUF Embedding model ──────────────────────────────────────────────── + + +class LlamaCppEmbedding(BaseEmbedding): + """Wraps llama-cpp-python for GGUF embedding models.""" + + _model: Any = PrivateAttr() + + def __init__(self, model_path: str, n_gpu_layers: int = -1, n_ctx: int = 4096, **kwargs): + super().__init__(**kwargs) + from llama_cpp import Llama + + self._model = Llama( + model_path=model_path, + embedding=True, + n_gpu_layers=n_gpu_layers, + n_ctx=n_ctx, + verbose=False, + ) + + def _get_query_embedding(self, query: str) -> List[float]: + return self._embed(query) + + async def _aget_query_embedding(self, query: str) -> List[float]: + return self._get_query_embedding(query) + + def _get_text_embedding(self, text: str) -> List[float]: + return self._embed(text) + + def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]: + return [self._embed(t) for t in texts] + + def _embed(self, text: str) -> List[float]: + output = self._model.embed(text) + vec = np.array(output, dtype=np.float32).flatten() + norm = np.linalg.norm(vec) + if norm > 0: + vec = vec / norm + return vec.tolist() + + +# ── Factories ──────────────────────────────────────────────────────────── + + +def build_llm(cfg: LlamaIndexConfig) -> LlamaCPP: + return LlamaCPP( + model_path=cfg.gen_model, + temperature=cfg.gen_temperature, + max_new_tokens=cfg.max_gen_tokens, + context_window=cfg.gen_context_window, + model_kwargs={"n_gpu_layers": cfg.n_gpu_layers}, + messages_to_prompt=_messages_to_prompt, + completion_to_prompt=_completion_to_prompt, + verbose=False, + ) + + +def build_embed_model(cfg: LlamaIndexConfig) -> LlamaCppEmbedding: + return LlamaCppEmbedding( + model_path=cfg.embed_model, + n_gpu_layers=cfg.n_gpu_layers, + n_ctx=cfg.embed_n_ctx, + n_batch=512 + ) diff --git a/src/llamaindex/q.txt b/src/llamaindex/q.txt new file mode 100644 index 00000000..0ec90ebf --- /dev/null +++ b/src/llamaindex/q.txt @@ -0,0 +1,6 @@ +In ARIES, what is PageLSN? +In ARIES, what is recLSN and how does ARIES use it in redo? +Why does redo start from the smallest recLSN? Couldn’t we just start from the checkpoint? +If a page’s pageLSN is greater than the smallest recLSN, does ARIES still have to start redo from that smallest recLSN? + +conda run -n tokensmith env CUDACXX=/usr/local/cuda/bin/nvcc CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python --force-reinstall --no-cache-dir \ No newline at end of file diff --git a/src/llamaindex/retriever.py b/src/llamaindex/retriever.py new file mode 100644 index 00000000..40a86f47 --- /dev/null +++ b/src/llamaindex/retriever.py @@ -0,0 +1,54 @@ +""" +Retriever and reranker factories. + +Uses LlamaIndex built-in modules to match the original TokenSmith pipeline: + - VectorIndexRetriever (equivalent to FAISS) + - BM25Retriever (llama-index-retrievers-bm25) + - QueryFusionRetriever (reciprocal rank fusion) + - SentenceTransformerRerank (cross-encoder/ms-marco-MiniLM-L6-v2) +""" + +from __future__ import annotations + +from llama_index.core import VectorStoreIndex +from llama_index.core.postprocessor import SentenceTransformerRerank +from llama_index.core.retrievers import QueryFusionRetriever +from llama_index.retrievers.bm25 import BM25Retriever + +from .config import LlamaIndexConfig + + +def build_retriever( + index: VectorStoreIndex, + cfg: LlamaIndexConfig, +) -> QueryFusionRetriever: + """ + Build a hybrid retriever: vector + BM25, fused with RRF. + + Equivalent to the original pipeline's: + FAISSRetriever + BM25Retriever -> EnsembleRanker(method="rrf") + """ + vector_retriever = index.as_retriever(similarity_top_k=cfg.num_candidates) + + bm25_retriever = BM25Retriever.from_defaults( + docstore=index.docstore, + similarity_top_k=cfg.num_candidates, + ) + + return QueryFusionRetriever( + retrievers=[vector_retriever, bm25_retriever], + similarity_top_k=cfg.num_candidates, + num_queries=1, # no query generation, just fuse + mode="reciprocal_rerank", # RRF + use_async=False, + ) + + +def build_reranker(cfg: LlamaIndexConfig) -> SentenceTransformerRerank | None: + """Build cross-encoder reranker (same model as original pipeline).""" + if not cfg.use_reranker: + return None + return SentenceTransformerRerank( + model=cfg.rerank_model, + top_n=cfg.top_k, + )