diff --git a/backend/app/routers/analyze.py b/backend/app/routers/analyze.py index 234a687..45a4cd4 100644 --- a/backend/app/routers/analyze.py +++ b/backend/app/routers/analyze.py @@ -1,4 +1,5 @@ """Full analysis router - POST /analyze/, /analyze/stream/, GET /analyze/stream, and /analyze/zip/.""" + from __future__ import annotations import asyncio @@ -11,7 +12,16 @@ from fastapi import APIRouter, File, HTTPException, Query, Request, Response, UploadFile from fastapi.responses import StreamingResponse -from ..schemas import AnalyzeResponse, CodeRequest, ZipAnalyzeResponse +from ..schemas import ( + AnalyzeResponse, + CodeRequest, + IncrementalAnalyzeRequest, + IncrementalAnalyzeResponse, + ZipAnalyzeResponse, +) + +from ..services.incremental_analysis import build_incremental_plan + from ..services.cache import cache from ..services.code_assistant import ( detect_language, @@ -137,11 +147,7 @@ def _safe_zip_name(name: str) -> str: def _is_safe_member(name: str) -> bool: path = PurePosixPath(name.replace("\\", "/")) has_drive = bool(path.parts and path.parts[0].endswith(":")) - return ( - not path.is_absolute() - and ".." not in path.parts - and not has_drive - ) + return not path.is_absolute() and ".." not in path.parts and not has_drive def _is_ignored_member(name: str) -> bool: @@ -173,11 +179,15 @@ async def analyze_stream(req: CodeRequest): response_class=StreamingResponse, ) async def analyze_stream_get( - code: str = Query(..., min_length=1, max_length=50000, description="Source code to analyze"), + code: str = Query( + ..., min_length=1, max_length=50000, description="Source code to analyze" + ), language: str | None = Query(None, description="Optional language hint"), ): if not code.strip(): - raise HTTPException(status_code=400, detail="code must not be empty or whitespace") + raise HTTPException( + status_code=400, detail="code must not be empty or whitespace" + ) return StreamingResponse( _stream_analysis(code.strip(), language), media_type="text/event-stream", @@ -206,6 +216,75 @@ async def analyze(req: CodeRequest, response: Response): return payload +@router.post( + "/incremental/", + response_model=IncrementalAnalyzeResponse, + summary="Run incremental analysis for changed files only", +) +async def analyze_incremental(req: IncrementalAnalyzeRequest): + """Analyze only changed files or changed hunks from a previous version.""" + t0 = time.perf_counter() + + plans = build_incremental_plan(req.files) + + results: list[dict] = [] + analyzed_count = 0 + + for plan in plans: + if plan.skipped_reason or not plan.analysis_code: + results.append( + { + "filename": plan.path, + "previous_filename": plan.previous_path, + "status": plan.status, + "language": plan.language, + "changed_line_ranges": plan.changed_line_ranges, + "changed_line_count": plan.changed_line_count, + "size_bytes": len(plan.content.encode("utf-8")) + if plan.content + else 0, + "analysis": None, + "skipped_reason": plan.skipped_reason, + } + ) + continue + + analysis = full_analysis(plan.analysis_code, plan.language) + analyzed_count += 1 + + results.append( + { + "filename": plan.path, + "previous_filename": plan.previous_path, + "status": plan.status, + "language": analysis["explanation"]["language"], + "changed_line_ranges": plan.changed_line_ranges, + "changed_line_count": plan.changed_line_count, + "size_bytes": len(plan.content.encode("utf-8")) if plan.content else 0, + "analysis": analysis, + "skipped_reason": None, + } + ) + + elapsed_ms = round((time.perf_counter() - t0) * 1000, 2) + skipped_count = len(results) - analyzed_count + + return { + "provider": "rule-based", + "model": "qyverix-engine-v3", + "file_count": len(results), + "analyzed_file_count": analyzed_count, + "skipped_file_count": skipped_count, + "files": results, + "summary": ( + f"Incremental analysis completed. " + f"Analyzed {analyzed_count} changed file(s), " + f"skipped {skipped_count} unchanged/deleted file(s)." + ), + "analysis_time_ms": elapsed_ms, + } + + @router.post( "/zip/", response_model=ZipAnalyzeResponse, @@ -266,10 +345,7 @@ async def analyze_zip(request: Request, file: UploadFile = File(...)): total_size = 0 with archive: - members = [ - info for info in archive.infolist() - if not info.is_dir() - ] + members = [info for info in archive.infolist() if not info.is_dir()] if not members: raise HTTPException( @@ -352,10 +428,7 @@ async def analyze_zip(request: Request, file: UploadFile = File(...)): detail="ZIP file does not contain readable source files", ) - scores = [ - item["analysis"]["suggestions"]["overall_score"] - for item in results - ] + scores = [item["analysis"]["suggestions"]["overall_score"] for item in results] overall_score = round(sum(scores) / len(scores)) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index cfde6d9..a67f5de 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -2,7 +2,7 @@ from __future__ import annotations import json -from typing import Any +from typing import Any, Literal from pydantic import BaseModel, Field, field_validator, model_validator @@ -16,6 +16,7 @@ validate_stored_result_json, ) + class CodeRequest(BaseModel): code: str language: str | None = None @@ -95,6 +96,70 @@ class AnalyzeResponse(BaseModel): analysis_time_ms: float | None = None +class IncrementalFileChange(BaseModel): + path: str = Field(..., min_length=1, max_length=500) + content: str | None = Field(default=None, max_length=50_000) + previous_path: str | None = Field(default=None, max_length=500) + previous_content: str | None = Field(default=None, max_length=50_000) + language: str | None = None + status: Literal["added", "modified", "renamed", "deleted"] | None = None + + @field_validator("path", "previous_path") + @classmethod + def validate_path(cls, value: str | None) -> str | None: + if value is None: + return value + + cleaned = value.replace("\\", "/").strip() + + if not cleaned: + raise ValueError("path must not be empty") + + if cleaned.startswith("/") or ".." in cleaned.split("/"): + raise ValueError("path must be a safe relative path") + + return cleaned + + @field_validator("content", "previous_content") + @classmethod + def sanitize_file_content(cls, value: str | None) -> str | None: + if value is None: + return value + return sanitize_code_input(value) + + @field_validator("language") + @classmethod + def sanitize_incremental_language(cls, value: str | None) -> str | None: + return validate_language_hint(value) + + +class IncrementalAnalyzeRequest(BaseModel): + files: list[IncrementalFileChange] = Field(..., min_length=1, max_length=50) + + +class IncrementalAnalyzeFileResult(BaseModel): + filename: str + previous_filename: str | None = None + status: str + language: str | None = None + changed_line_ranges: list[list[int]] = Field(default_factory=list) + changed_line_count: int = 0 + size_bytes: int = 0 + analysis: AnalyzeResponse | None = None + skipped_reason: str | None = None + + +class IncrementalAnalyzeResponse(BaseModel): + provider: str + model: str + file_count: int + analyzed_file_count: int + skipped_file_count: int + files: list[IncrementalAnalyzeFileResult] + summary: str + analysis_time_ms: float | None = None + + class ZipAnalyzeFileResult(BaseModel): filename: str language: str diff --git a/backend/app/services/incremental_analysis.py b/backend/app/services/incremental_analysis.py new file mode 100644 index 0000000..db66ca6 --- /dev/null +++ b/backend/app/services/incremental_analysis.py @@ -0,0 +1,243 @@ +"""Incremental analysis helpers for changed-file analysis.""" + +from __future__ import annotations + +from dataclasses import dataclass +from difflib import SequenceMatcher + +LARGE_CHANGE_RATIO = 0.75 +CONTEXT_LINES = 2 + + +@dataclass +class IncrementalPlan: + path: str + previous_path: str | None + status: str + content: str | None + language: str | None + changed_line_ranges: list[list[int]] + changed_line_count: int + analysis_code: str | None + skipped_reason: str | None = None + + +def _merge_ranges(ranges: list[list[int]]) -> list[list[int]]: + if not ranges: + return [] + + ranges = sorted(ranges, key=lambda item: item[0]) + merged = [ranges[0]] + + for start, end in ranges[1:]: + last = merged[-1] + if start <= last[1] + 1: + last[1] = max(last[1], end) + else: + merged.append([start, end]) + + return merged + + +def _changed_ranges(previous_content: str, content: str) -> list[list[int]]: + previous_lines = previous_content.splitlines() + current_lines = content.splitlines() + + matcher = SequenceMatcher( + None, + previous_lines, + current_lines, + autojunk=False, + ) + + ranges: list[list[int]] = [] + + for tag, _old_start, _old_end, new_start, new_end in matcher.get_opcodes(): + if tag in {"replace", "insert"} and new_start != new_end: + ranges.append([new_start + 1, new_end]) + + return _merge_ranges(ranges) + + +def _expand_ranges( + ranges: list[list[int]], + total_lines: int, + context_lines: int = CONTEXT_LINES, +) -> list[list[int]]: + expanded = [ + [ + max(1, start - context_lines), + min(total_lines, end + context_lines), + ] + for start, end in ranges + ] + return _merge_ranges(expanded) + + +def _extract_changed_code(content: str, ranges: list[list[int]]) -> str: + lines = content.splitlines() + + if not lines or not ranges: + return "" + + chunks: list[str] = [] + + for start, end in ranges: + selected = lines[start - 1 : end] + chunks.append("\n".join(selected)) + + return "\n\n".join(chunk for chunk in chunks if chunk.strip()).strip() + + +def _infer_status( + path: str, + content: str | None, + previous_path: str | None, + previous_content: str | None, + explicit_status: str | None, +) -> str: + if explicit_status: + return explicit_status + + if previous_content is None and content is not None: + return "added" + + if previous_content is not None and content is None: + return "deleted" + + if previous_path and previous_path != path: + return "renamed" + + if previous_content != content: + return "modified" + + return "unchanged" + + +def build_incremental_plan(files: list) -> list[IncrementalPlan]: + plans: list[IncrementalPlan] = [] + + for file_change in files: + path = file_change.path + previous_path = file_change.previous_path + content = file_change.content + previous_content = file_change.previous_content + language = file_change.language + + status = _infer_status( + path=path, + content=content, + previous_path=previous_path, + previous_content=previous_content, + explicit_status=file_change.status, + ) + + if status == "deleted": + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status=status, + content=content, + language=language, + changed_line_ranges=[], + changed_line_count=0, + analysis_code=None, + skipped_reason="deleted file has no new code to analyze", + ) + ) + continue + + if content is None: + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status=status, + content=content, + language=language, + changed_line_ranges=[], + changed_line_count=0, + analysis_code=None, + skipped_reason="missing current file content", + ) + ) + continue + + current_lines = content.splitlines() + + if status == "added": + full_range = [[1, len(current_lines)]] if current_lines else [] + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status=status, + content=content, + language=language, + changed_line_ranges=full_range, + changed_line_count=len(current_lines), + analysis_code=content, + ) + ) + continue + + if previous_content is None: + full_range = [[1, len(current_lines)]] if current_lines else [] + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status="added", + content=content, + language=language, + changed_line_ranges=full_range, + changed_line_count=len(current_lines), + analysis_code=content, + ) + ) + continue + + ranges = _changed_ranges(previous_content, content) + changed_line_count = sum(end - start + 1 for start, end in ranges) + + if not ranges: + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status=status, + content=content, + language=language, + changed_line_ranges=[], + changed_line_count=0, + analysis_code=None, + skipped_reason="no changed lines detected", + ) + ) + continue + + total_lines = max(len(current_lines), 1) + change_ratio = changed_line_count / total_lines + + if change_ratio >= LARGE_CHANGE_RATIO: + analysis_code = content + analysis_ranges = [[1, len(current_lines)]] + else: + analysis_ranges = _expand_ranges(ranges, total_lines) + analysis_code = _extract_changed_code(content, analysis_ranges) + + plans.append( + IncrementalPlan( + path=path, + previous_path=previous_path, + status=status, + content=content, + language=language, + changed_line_ranges=ranges, + changed_line_count=changed_line_count, + analysis_code=analysis_code, + ) + ) + + return plans diff --git a/backend/tests/test_endpoints.py b/backend/tests/test_endpoints.py index c4d776b..967c38b 100644 --- a/backend/tests/test_endpoints.py +++ b/backend/tests/test_endpoints.py @@ -498,7 +498,6 @@ def test_debug_kotlin(): assert d is not None - def test_debug_cpp_syntax_errors(): code = "void main() {\n cout << 'Hello World'\n}" r = client.post("/debugging/", json={"code": code, "language": "cpp"}) @@ -579,15 +578,20 @@ def test_add(): d = r.json() assert d["overall_score"] >= 60 # clean code should score reasonably + def test_suggestions_observability_print_only_python(): # Pasting code with print() in Java should NOT trigger the Observability suggestion - r_java = client.post("/suggestions/", json={"code": 'print("hello");', "language": "java"}) + r_java = client.post( + "/suggestions/", json={"code": 'print("hello");', "language": "java"} + ) assert r_java.status_code == 200 s_java = [s["category"] for s in r_java.json()["suggestions"]] assert "Observability" not in s_java # Pasting code with print() in Python SHOULD trigger the Observability suggestion - r_py = client.post("/suggestions/", json={"code": 'print("hello")', "language": "python"}) + r_py = client.post( + "/suggestions/", json={"code": 'print("hello")', "language": "python"} + ) assert r_py.status_code == 200 s_py = [s["category"] for s in r_py.json()["suggestions"]] assert "Observability" in s_py @@ -645,6 +649,123 @@ def test_analyze_cache_expires(monkeypatch): cache.clear_memory() +def test_incremental_analysis_analyzes_only_changed_file(): + payload = { + "files": [ + { + "path": "app.py", + "previous_content": "def divide(a, b):\n return a + b\n", + "content": "def divide(a, b):\n return a / b\n", + "language": "python", + }, + { + "path": "unchanged.py", + "previous_content": "print('same')\n", + "content": "print('same')\n", + "language": "python", + }, + ] + } + + response = client.post("/analyze/incremental/", json=payload) + + assert response.status_code == 200 + data = response.json() + + assert data["provider"] == "rule-based" + assert data["file_count"] == 2 + assert data["analyzed_file_count"] == 1 + assert data["skipped_file_count"] == 1 + + changed_file = data["files"][0] + unchanged_file = data["files"][1] + + assert changed_file["filename"] == "app.py" + assert changed_file["status"] == "modified" + assert changed_file["changed_line_count"] >= 1 + assert changed_file["analysis"] is not None + + assert unchanged_file["filename"] == "unchanged.py" + assert unchanged_file["analysis"] is None + assert unchanged_file["skipped_reason"] == "no changed lines detected" + + +def test_incremental_analysis_handles_added_file(): + payload = { + "files": [ + { + "path": "new_app.py", + "content": "def divide(a, b):\n return a / b\n", + "language": "python", + "status": "added", + } + ] + } + + response = client.post("/analyze/incremental/", json=payload) + + assert response.status_code == 200 + data = response.json() + + assert data["file_count"] == 1 + assert data["analyzed_file_count"] == 1 + assert data["files"][0]["status"] == "added" + assert data["files"][0]["analysis"] is not None + assert data["files"][0]["changed_line_ranges"] == [[1, 2]] + + +def test_incremental_analysis_skips_deleted_file(): + payload = { + "files": [ + { + "path": "old_app.py", + "previous_content": "print('remove me')\n", + "content": None, + "language": "python", + "status": "deleted", + } + ] + } + + response = client.post("/analyze/incremental/", json=payload) + + assert response.status_code == 200 + data = response.json() + + assert data["file_count"] == 1 + assert data["analyzed_file_count"] == 0 + assert data["skipped_file_count"] == 1 + assert data["files"][0]["status"] == "deleted" + assert data["files"][0]["analysis"] is None + + +def test_incremental_analysis_handles_rename_with_content_change(): + payload = { + "files": [ + { + "previous_path": "old_name.py", + "path": "new_name.py", + "previous_content": "def greet():\n print('hello')\n", + "content": "def greet():\n print('hello debug')\n", + "language": "python", + "status": "renamed", + } + ] + } + + response = client.post("/analyze/incremental/", json=payload) + + assert response.status_code == 200 + data = response.json() + + file_result = data["files"][0] + assert file_result["filename"] == "new_name.py" + assert file_result["previous_filename"] == "old_name.py" + assert file_result["status"] == "renamed" + assert file_result["analysis"] is not None + assert file_result["changed_line_count"] >= 1 + + def test_memory_cache_evicts_least_recently_used_entries(): from app.services.cache import cache @@ -741,7 +862,9 @@ def test_get_stream_done_event_present(): def test_get_stream_with_language_hint(): - r = client.get("/analyze/stream", params={"code": JS_CODE, "language": "javascript"}) + r = client.get( + "/analyze/stream", params={"code": JS_CODE, "language": "javascript"} + ) assert r.status_code == 200 events = _parse_sse_events(r.text) exp = next(e["data"] for e in events if e["type"] == "explanation")