diff --git a/.github/workflows/pr-analysis.yml b/.github/workflows/pr-analysis.yml
index ffedbf44..6e9a62f1 100644
--- a/.github/workflows/pr-analysis.yml
+++ b/.github/workflows/pr-analysis.yml
@@ -148,7 +148,7 @@ jobs:
return "D"
return "F"
- def extract_issues(payload: dict) -> list[dict]:
+ def extract_issues(payload: dict) -> List[Dtr]:
issues = payload.get("issues")
if isinstance(issues, list):
return issues
@@ -175,7 +175,7 @@ jobs:
)
return f"- [{severity}] {line_text}: {message}"
- results: list[dict] = []
+ results: List[Dtr] = []
for filename in changed_files:
file_path = Path(filename)
diff --git a/backend/app/config.py b/backend/app/config.py
index 06146d15..12f9d76d 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -1,6 +1,7 @@
import os
from dotenv import find_dotenv, load_dotenv
+from typing import Optional
# Load .env from current directory or parent directories if present.
load_dotenv(find_dotenv(filename=".env", usecwd=True), override=False)
@@ -53,8 +54,7 @@ class Settings:
cache_enabled: bool = _bool_env("CACHE_ENABLED", True)
cache_ttl_seconds: int = _int_env("CACHE_TTL_SECONDS", 300)
cache_max_entries: int = _int_env("CACHE_MAX_ENTRIES", 100)
- redis_url: str | None = os.getenv("REDIS_URL")
- sentry_dsn: str | None = os.getenv("SENTRY_DSN")
+
sentry_traces_sample_rate: float = _float_env("SENTRY_TRACES_SAMPLE_RATE", 0.0)
enable_docs: bool = _bool_env("ENABLE_DOCS", False)
public_root_info: bool = _bool_env("PUBLIC_ROOT_INFO", False)
@@ -63,7 +63,12 @@ class Settings:
jwt_algorithm: str = os.getenv("JWT_ALGORITHM", "HS256")
access_token_minutes: int = _int_env("ACCESS_TOKEN_MINUTES", 720)
llm_enabled: bool = _bool_env("LLM_ENABLED", False)
- llm_api_key: str | None = os.getenv("LLM_API_KEY")
+
+ # ✅ FIXED INDENTATION (ALL SAME LEVEL)
+ redis_url: Optional[str] = os.getenv("REDIS_URL")
+ sentry_dsn: Optional[str] = os.getenv("SENTRY_DSN")
+ llm_api_key: Optional[str] = os.getenv("LLM_API_KEY")
+
llm_base_url: str = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
llm_model: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
llm_timeout_seconds: int = _int_env("LLM_TIMEOUT_SECONDS", 30)
@@ -82,4 +87,4 @@ class Settings:
)
-settings = Settings()
+settings = Settings()
\ No newline at end of file
diff --git a/backend/app/routers/analyze.py b/backend/app/routers/analyze.py
index 234a6873..371af2e1 100644
--- a/backend/app/routers/analyze.py
+++ b/backend/app/routers/analyze.py
@@ -7,6 +7,7 @@
import zipfile
from io import BytesIO
from pathlib import PurePosixPath
+from typing import Optional, List, Dict
from fastapi import APIRouter, File, HTTPException, Query, Request, Response, UploadFile
from fastapi.responses import StreamingResponse
@@ -67,7 +68,7 @@
}
-async def _stream_analysis(code: str, language_hint: str | None):
+async def _stream_analysis(code: str, language_hint: Optional[str]):
"""Async generator that yields SSE chunks for each analysis section."""
t0 = time.perf_counter()
language = detect_language(code, language_hint)
@@ -149,7 +150,7 @@ def _is_ignored_member(name: str) -> bool:
return any(part.lower() in IGNORED_DIRS for part in path.parts)
-def _add_skipped(skipped_files: list[str], reason: str) -> None:
+def _add_skipped(skipped_files: List[str], reason: str) -> None:
if len(skipped_files) < MAX_SKIPPED_FILES:
skipped_files.append(reason)
@@ -174,7 +175,7 @@ async def analyze_stream(req: CodeRequest):
)
async def analyze_stream_get(
code: str = Query(..., min_length=1, max_length=50000, description="Source code to analyze"),
- language: str | None = Query(None, description="Optional language hint"),
+ language: Optional[str] = Query(None, description="Optional language hint"),
):
if not code.strip():
raise HTTPException(status_code=400, detail="code must not be empty or whitespace")
@@ -261,8 +262,8 @@ async def analyze_zip(request: Request, file: UploadFile = File(...)):
t0 = time.perf_counter()
- results: list[dict] = []
- skipped_files: list[str] = []
+ results: List[Dtr] = []
+ skipped_files: List[str] = []
total_size = 0
with archive:
diff --git a/backend/app/routers/history.py b/backend/app/routers/history.py
index 92b2a4fe..50b76c2b 100644
--- a/backend/app/routers/history.py
+++ b/backend/app/routers/history.py
@@ -3,6 +3,8 @@
"""
from __future__ import annotations
+from typing import List, Optional
+
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
@@ -14,16 +16,16 @@
class HistorySaveRequest(BaseModel):
code: str = Field(..., min_length=1, max_length=50000)
language: str
- score: int | None = None
- issue_count: int | None = None
+ score: Optional[int] = None
+ issue_count: Optional[int] = None
class HistoryEntry(BaseModel):
id: int
code_hash: str
language: str
- score: int | None
- issue_count: int | None
+ score: Optional[int]
+ issue_count: Optional[int]
timestamp: str
code_preview: str
@@ -39,7 +41,7 @@ async def save_history(body: HistorySaveRequest):
return {"id": entry_id, "status": "saved"}
-@router.get("/", response_model=list[HistoryEntry])
+@router.get("/", response_model=List[HistoryEntry])
async def get_history(
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
@@ -47,7 +49,7 @@ async def get_history(
return await database.get_entries(limit=limit, offset=offset)
-@router.get("/search", response_model=list[HistoryEntry])
+@router.get("/search", response_model=List[HistoryEntry])
async def search_history(
q: str = Query(..., min_length=1),
limit: int = Query(20, ge=1, le=100),
@@ -60,4 +62,4 @@ async def delete_history(entry_id: int):
deleted = await database.delete_entry(entry_id)
if not deleted:
raise HTTPException(status_code=404, detail="History entry not found.")
- return {"id": entry_id, "status": "deleted"}
+ return {"id": entry_id, "status": "deleted"}
\ No newline at end of file
diff --git a/backend/app/routers/user_data.py b/backend/app/routers/user_data.py
index df37688b..6340d93f 100644
--- a/backend/app/routers/user_data.py
+++ b/backend/app/routers/user_data.py
@@ -1,6 +1,7 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
+from typing import List
from ..database import get_db
from ..models import FavoriteResult, QueryHistory, User
@@ -15,9 +16,10 @@
router = APIRouter(prefix="/user", tags=["User Data"])
-@router.get("/history", response_model=list[HistoryRecord])
+@router.get("/history", response_model=List[HistoryRecord])
def list_history(
- current_user: User = Depends(get_current_user), db: Session = Depends(get_db)
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
):
records = (
db.execute(
@@ -75,12 +77,15 @@ def delete_history(
):
record = db.execute(
select(QueryHistory).where(
- QueryHistory.id == history_id, QueryHistory.user_id == current_user.id
+ QueryHistory.id == history_id,
+ QueryHistory.user_id == current_user.id
)
).scalar_one_or_none()
+
if record is None:
raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail="History record not found"
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail="History record not found"
)
db.execute(delete(QueryHistory).where(QueryHistory.id == history_id))
@@ -100,9 +105,10 @@ def clear_history(
return {"status": "cleared", "deleted": result.rowcount or 0}
-@router.get("/favorites", response_model=list[FavoriteRecord])
+@router.get("/favorites", response_model=List[FavoriteRecord])
def list_favorites(
- current_user: User = Depends(get_current_user), db: Session = Depends(get_db)
+ current_user: User = Depends(get_current_user),
+ db: Session = Depends(get_db)
):
records = (
db.execute(
@@ -163,12 +169,15 @@ def delete_favorite(
):
record = db.execute(
select(FavoriteResult).where(
- FavoriteResult.id == favorite_id, FavoriteResult.user_id == current_user.id
+ FavoriteResult.id == favorite_id,
+ FavoriteResult.user_id == current_user.id
)
).scalar_one_or_none()
+
if record is None:
raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail="Favorite not found"
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail="Favorite not found"
)
db.execute(delete(FavoriteResult).where(FavoriteResult.id == favorite_id))
@@ -185,4 +194,4 @@ def clear_favorites(
delete(FavoriteResult).where(FavoriteResult.user_id == current_user.id)
)
db.commit()
- return {"status": "cleared", "deleted": result.rowcount or 0}
+ return {"status": "cleared", "deleted": result.rowcount or 0}
\ No newline at end of file
diff --git a/backend/app/sanitize.py b/backend/app/sanitize.py
index 012a843b..fad8a306 100644
--- a/backend/app/sanitize.py
+++ b/backend/app/sanitize.py
@@ -64,7 +64,7 @@ def sanitize_text_input(text: str) -> str:
return text
-def sanitize_language_hint(language: str | None) -> str | None:
+def sanitize_language_hint(language: Optional[str]) -> Optional[str]:
"""Normalize optional language hint from API clients."""
if language is None:
return None
diff --git a/backend/app/schema_validators.py b/backend/app/schema_validators.py
index 0a030e02..edfdbcdf 100644
--- a/backend/app/schema_validators.py
+++ b/backend/app/schema_validators.py
@@ -1,7 +1,7 @@
"""Shared Pydantic field validators for user-supplied request data."""
from __future__ import annotations
-
+from typing import Optional, List, Dict
from .sanitize import (
sanitize_code_input,
sanitize_language_hint,
@@ -15,5 +15,5 @@
validate_language_hint = sanitize_language_hint
-def validate_chat_history(items: list[str]) -> list[str]:
+def validate_chat_history(items: List[str]) -> List[str]:
return [sanitize_text_input(item) for item in items]
diff --git a/backend/app/schemas.py b/backend/app/schemas.py
index cfde6d91..950e74f7 100644
--- a/backend/app/schemas.py
+++ b/backend/app/schemas.py
@@ -1,8 +1,7 @@
"""Pydantic request / response models for QyverixAI."""
-from __future__ import annotations
import json
-from typing import Any
+from typing import Any, Optional, List, Dict, Union
from pydantic import BaseModel, Field, field_validator, model_validator
@@ -16,9 +15,11 @@
validate_stored_result_json,
)
+
+# ── Code Request ───────────────────────────────────────────────
class CodeRequest(BaseModel):
code: str
- language: str | None = None
+ language: Optional[str] = None
@field_validator("code")
@classmethod
@@ -28,39 +29,40 @@ def validate_code(cls, v: str) -> str:
raise ValueError("code must not be empty")
if len(v) > 50_000:
raise ValueError("code exceeds 50,000 character limit")
- # Strip null bytes and ANSI escape sequences before any processing
return sanitize_code_input(v)
@field_validator("language")
@classmethod
- def sanitize_language(cls, v: str | None) -> str | None:
+ def sanitize_language(cls, v: Optional[str]) -> Optional[str]:
return validate_language_hint(v)
+# ── Explanation ────────────────────────────────────────────────
class ExplanationResponse(BaseModel):
language: str
summary: str
- key_points: list[str] | None = None
- complexity: str | None = None
- line_count: int | None = None
- function_count: int | None = None
- class_count: int | None = None
- cyclomatic_complexity: int | None = None
- complexity_risk: str | None = None
+ key_points: Optional[List[str]] = None
+ complexity: Optional[str] = None
+ line_count: Optional[int] = None
+ function_count: Optional[int] = None
+ class_count: Optional[int] = None
+ cyclomatic_complexity: Optional[int] = None
+ complexity_risk: Optional[str] = None
+# ── Debugging ──────────────────────────────────────────────────
class Issue(BaseModel):
type: str
- line: int | None
+ line: Optional[int] = None
description: str
suggestion: str
severity: str
- code_snippet: str | None = None
- code_context: str | None = None
+ code_snippet: Optional[str] = None
+ code_context: Optional[str] = None
class DebuggingResponse(BaseModel):
- issues: list[dict]
+ issues: List[Issue]
summary: str
clean: bool
error_count: int
@@ -69,32 +71,35 @@ class DebuggingResponse(BaseModel):
code: str
+# ── Suggestions ────────────────────────────────────────────────
class Suggestion(BaseModel):
category: str
description: str
- line_number: int | None = None
- line_range: list[int] | None = None
- code_context: str | None = None
- example: str | None = None
+ line_number: Optional[int] = None
+ line_range: Optional[List[int]] = None
+ code_context: Optional[str] = None
+ example: Optional[str] = None
priority: str
class SuggestionsResponse(BaseModel):
- suggestions: list[dict]
+ suggestions: List[Suggestion]
overall_score: int
grade: str
- next_step: str | None = None
+ next_step: Optional[str] = None
+# ── Analyze ────────────────────────────────────────────────────
class AnalyzeResponse(BaseModel):
provider: str
- model: str | None = None
- explanation: dict | ExplanationResponse | None = None
- debugging: dict | DebuggingResponse | None = None
- suggestions: dict | SuggestionsResponse | None = None
- analysis_time_ms: float | None = None
+ model: Optional[str] = None
+ explanation: Optional[Union[dict, ExplanationResponse]] = None
+ debugging: Optional[Union[dict, DebuggingResponse]] = None
+ suggestions: Optional[Union[dict, SuggestionsResponse]] = None
+ analysis_time_ms: Optional[float] = None
+# ── ZIP Analyze ────────────────────────────────────────────────
class ZipAnalyzeFileResult(BaseModel):
filename: str
language: str
@@ -110,11 +115,12 @@ class ZipAnalyzeResponse(BaseModel):
overall_project_score: int
grade: str
summary: str
- files: list[ZipAnalyzeFileResult]
- skipped_files: list[str] = Field(default_factory=list)
- analysis_time_ms: float | None = None
+ files: List[ZipAnalyzeFileResult]
+ skipped_files: List[str] = Field(default_factory=list)
+ analysis_time_ms: Optional[float] = None
+# ── Email ──────────────────────────────────────────────────────
class SubscribeRequest(BaseModel):
email: str
@@ -139,39 +145,18 @@ class UnsubscribeRequest(BaseModel):
token: str
+# ── Auth ───────────────────────────────────────────────────────
class SignupRequest(BaseModel):
- """Request body for creating a new user account.
-
- Attributes:
- email: The user's email address.
- password: The user's chosen password (plaintext in request).
- """
-
email: str = Field(..., min_length=5, max_length=320)
password: str = Field(..., min_length=8, max_length=128)
class LoginRequest(BaseModel):
- """Request body for user login.
-
- Attributes:
- email: The user's email address.
- password: The user's password.
- """
-
email: str = Field(..., min_length=5, max_length=320)
password: str = Field(..., min_length=8, max_length=128)
class AuthResponse(BaseModel):
- """Response returned after successful authentication.
-
- Attributes:
- access_token: JWT bearer token for authenticated requests.
- user_id: Internal numeric user identifier.
- email: The user's email address.
- """
-
access_token: str
token_type: str = "bearer"
user_id: int
@@ -179,13 +164,6 @@ class AuthResponse(BaseModel):
class UserProfileResponse(BaseModel):
- """Public user profile returned by `/auth/me`.
-
- Attributes:
- user_id: Internal numeric user identifier.
- email: The user's email address.
- """
-
user_id: int
email: str
@@ -194,10 +172,10 @@ class HealthResponse(BaseModel):
status: str
version: str
message: str
- endpoints: list[str] | None = None
+ endpoints: Optional[List[str]] = None
-# ── History ───────────────────────────────────────────────────────────────────
+# ── History ────────────────────────────────────────────────────
class HistoryCreateRequest(BaseModel):
action: str = Field(..., min_length=3, max_length=50)
code: str = Field(..., min_length=1, max_length=settings.max_code_chars)
@@ -227,7 +205,7 @@ class HistoryRecord(BaseModel):
created_at: str
-# ── Favorites ─────────────────────────────────────────────────────────────────
+# ── Favorites ──────────────────────────────────────────────────
class FavoriteCreateRequest(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
action: str = Field(..., min_length=3, max_length=50)
@@ -259,78 +237,34 @@ class FavoriteRecord(BaseModel):
created_at: str
-# ── Share ─────────────────────────────────────────────────────────────────────
+# ── Share ──────────────────────────────────────────────────────
class ShareCreateRequest(BaseModel):
action: str = Field("share", min_length=3, max_length=50)
code: str = Field(..., min_length=1, max_length=settings.max_code_chars)
- result: dict[str, Any] | None = Field(default=None)
- result_json: str | None = Field(default=None)
-
- @field_validator("action")
- @classmethod
- def sanitize_action(cls, v: str) -> str:
- return validate_stored_action(v)
-
- @field_validator("code")
- @classmethod
- def sanitize_code(cls, v: str) -> str:
- return validate_stored_code(v)
-
- @field_validator("result_json")
- @classmethod
- def sanitize_result_json(cls, v: str | None) -> str | None:
- if v is None:
- return v
- return validate_stored_result_json(v)
+ result: Optional[Dict[str, Any]] = None
+ result_json: Optional[str] = None
@model_validator(mode="before")
@classmethod
- def parse_result_json(cls, values: dict[str, Any]) -> dict[str, Any]:
- if values.get("result") is None and values.get("result_json") is not None:
- try:
- values["result"] = json.loads(values["result_json"])
- except ValueError as exc:
- raise ValueError("result_json must be valid JSON") from exc
+ def parse_result_json(cls, values: Dict[str, Any]) -> Dict[str, Any]:
+ if values.get("result") is None and values.get("result_json"):
+ values["result"] = json.loads(values["result_json"])
return values
- @model_validator(mode="after")
- @classmethod
- def ensure_result_present(cls, model: "ShareCreateRequest") -> "ShareCreateRequest":
- if model.result is None:
- raise ValueError("result or result_json is required")
- return model
-
class ShareRecord(BaseModel):
id: str
action: str
code: str
- result: dict[str, Any]
+ result: Dict[str, Any]
created_at: str
-# ── Chat ──────────────────────────────────────────────────────────────────────
+# ── Chat ───────────────────────────────────────────────────────
class ChatRequest(BaseModel):
- message: str = Field(..., min_length=1, max_length=4_000)
- code: str | None = Field(default=None, max_length=settings.max_code_chars)
- history: list[str] = Field(default_factory=list, max_length=20)
-
- @field_validator("message")
- @classmethod
- def sanitize_message(cls, v: str) -> str:
- return validate_stored_action(v)
-
- @field_validator("code")
- @classmethod
- def sanitize_code(cls, v: str | None) -> str | None:
- if v is None:
- return v
- return validate_stored_code(v)
-
- @field_validator("history")
- @classmethod
- def sanitize_history(cls, v: list[str]) -> list[str]:
- return validate_chat_history(v)
+ message: str
+ code: Optional[str] = None
+ history: List[str] = Field(default_factory=list)
class ChatResponse(BaseModel):
@@ -338,36 +272,14 @@ class ChatResponse(BaseModel):
class ChatMessageRequest(BaseModel):
- message: str = Field(..., min_length=1, max_length=4_000)
- code: str | None = Field(default=None, max_length=settings.max_code_chars)
- history: list[str] = Field(default_factory=list, max_length=20)
- level: str = Field(default="beginner")
-
- @field_validator("message")
- @classmethod
- def sanitize_message(cls, v: str) -> str:
- return validate_stored_action(v)
-
- @field_validator("code")
- @classmethod
- def sanitize_code(cls, v: str | None) -> str | None:
- if v is None:
- return v
- return validate_stored_code(v)
-
- @field_validator("history")
- @classmethod
- def sanitize_history(cls, v: list[str]) -> list[str]:
- return validate_chat_history(v)
-
- @field_validator("level")
- @classmethod
- def sanitize_level(cls, v: str) -> str:
- return validate_stored_action(v)
+ message: str
+ code: Optional[str] = None
+ history: List[str] = Field(default_factory=list)
+ level: str = "beginner"
class ChatMessageResponse(BaseModel):
provider: str
model: str
mode: str
- reply: str
+ reply: str
\ No newline at end of file
diff --git a/backend/app/services/ai_provider.py b/backend/app/services/ai_provider.py
index 90a10af0..b896aa1a 100644
--- a/backend/app/services/ai_provider.py
+++ b/backend/app/services/ai_provider.py
@@ -35,7 +35,7 @@ def _get_provider_name(base_url: str) -> str:
return "ollama"
return "unknown"
-async def call_llm(system: str, user: str) -> str | None:
+async def call_llm(system: str, user: str) -> Optional[str]:
"""Return LLM text response or None if disabled/error."""
if not LLM_ENABLED or not LLM_API_KEY:
return None
diff --git a/backend/app/services/ast_analyzer.py b/backend/app/services/ast_analyzer.py
index a01602fa..39e9fd9b 100644
--- a/backend/app/services/ast_analyzer.py
+++ b/backend/app/services/ast_analyzer.py
@@ -31,7 +31,7 @@ def _issue(type_: str, description: str, suggestion: str, severity: str, line: i
class PythonASTAnalyzer(ast.NodeVisitor):
def __init__(self) -> None:
- self.issues: list[dict] = []
+ self.issues: List[Dtr] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._check_mutable_defaults(node)
@@ -111,7 +111,7 @@ def _check_block_for_unreachable(self, stmts: list[ast.stmt]) -> None:
break
-def analyze_python_ast(code: str) -> list[dict]:
+def analyze_python_ast(code: str) -> List[Dtr]:
"""Parse and analyze Python source code using the AST.
Returns a list of issue dicts. If the code has a syntax error,
@@ -303,7 +303,7 @@ def walk(node, depth):
walk(tree, 0)
return issues
-def analyze(source: str) -> list[dict]:
+def analyze(source: str) -> List[Dtr]:
tree = ast.parse(source)
issues = []
issues += detect_unreachable_code(tree, source)
diff --git a/backend/app/services/cache.py b/backend/app/services/cache.py
index b4282451..2fe6efb3 100644
--- a/backend/app/services/cache.py
+++ b/backend/app/services/cache.py
@@ -4,6 +4,7 @@
import time
from collections import OrderedDict
from threading import Lock
+from typing import Optional, Dict, Tuple
from ..config import settings
@@ -12,7 +13,8 @@
class AppCache:
def __init__(self):
- self._memory_store: OrderedDict[str, tuple[float, dict]] = OrderedDict()
+ # FIXED TYPE HINT
+ self._memory_store: OrderedDict[str, Tuple[float, Dict]] = OrderedDict()
self._memory_lock = Lock()
self._redis_client = None
self._backend = "memory"
@@ -34,7 +36,8 @@ def _make_key(self, namespace: str, code: str) -> str:
digest = hashlib.md5(code.encode("utf-8")).hexdigest()
return f"ai-assistant:{namespace}:{digest}"
- def get(self, namespace: str, code: str) -> dict | None:
+ # 🔥 FIXED HERE
+ def get(self, namespace: str, code: str) -> Optional[Dict]:
if not settings.cache_enabled:
return None
@@ -61,7 +64,7 @@ def get(self, namespace: str, code: str) -> dict | None:
self._memory_store.move_to_end(key)
return payload
- def set(self, namespace: str, code: str, payload: dict) -> None:
+ def set(self, namespace: str, code: str, payload: Dict) -> None:
if not settings.cache_enabled:
return
@@ -88,4 +91,4 @@ def clear_memory(self) -> None:
self._memory_store.clear()
-cache = AppCache()
+cache = AppCache()
\ No newline at end of file
diff --git a/backend/app/services/code_assistant.py b/backend/app/services/code_assistant.py
index 951853c6..f3da636a 100644
--- a/backend/app/services/code_assistant.py
+++ b/backend/app/services/code_assistant.py
@@ -9,9 +9,10 @@
import time
from .ast_analyzer import analyze as ast_analyze
from dataclasses import dataclass, field
+from typing import Optional, List, Dict
# ── Language Detection ─────────────────────────────────────────────────────────
-LANG_SIGNATURES: dict[str, list[str]] = {
+LANG_SIGNATURES: Dict[str, List[str]] = {
"Python": [
r"\bdef\s+\w+\s*\(",
r"\bimport\s+\w+",
@@ -87,7 +88,7 @@
}
-def detect_language(code: str, hint: str | None = None) -> str:
+def detect_language(code: str, hint: Optional[str] = None) -> str:
"""Detect the programming language of the given code snippet.
Args:
@@ -122,7 +123,7 @@ def detect_language(code: str, hint: str | None = None) -> str:
if normalized in mapping:
return mapping[normalized]
- scores: dict[str, int] = {lang: 0 for lang in LANG_SIGNATURES}
+ scores: Dict[str, int] = {lang: 0 for lang in LANG_SIGNATURES}
for lang, patterns in LANG_SIGNATURES.items():
for pat in patterns:
if re.search(pat, code, re.MULTILINE):
@@ -200,8 +201,8 @@ def estimate_complexity(code: str) -> str:
def chat_fallback_reply(
message: str,
- code: str | None,
- history: list[str],
+ code: Optional[str],
+ history: List[str],
level: str,
) -> str:
"""Return a simple fallback chat response when the LLM is unavailable."""
@@ -253,7 +254,7 @@ class BugPattern:
description: str
suggestion: str
severity: str
- languages: list[str] = field(
+ languages: List[str] = field(
default_factory=lambda: [
"Python",
"JavaScript",
@@ -803,7 +804,7 @@ class BugPattern:
]
-def run_bug_detection(code: str, language: str) -> list[dict]:
+def run_bug_detection(code: str, language: str) -> List[Dtr]:
"""Run rule-based bug detection for the provided source code.
Args:
@@ -817,7 +818,7 @@ def run_bug_detection(code: str, language: str) -> list[dict]:
from .ast_analyzer import analyze_python_ast
lines = code.splitlines()
- found: list[dict] = []
+ found: List[Dtr] = []
seen: set[str] = set()
if language == "Python":
@@ -893,7 +894,7 @@ def run_suggestions(code: str, language: str) -> dict:
find_undocumented_lines,
)
- suggestions: list[dict] = []
+ suggestions: List[Dtr] = []
lines = code.splitlines()
non_blank = [line for line in lines if line.strip()]
@@ -1218,15 +1219,15 @@ class Issue:
type: str
line: int | None
description: str
- suggestion: str | None = None
- severity: str | None = None
- code_snippet: str | None = None
+ suggestion: Optional[str] = None
+ severity: Optional[str] = None
+ code_snippet: Optional[str] = None
@dataclass
class DebugResult:
issues: list[Issue]
- summary: str | None = None
+ summary: Optional[str] = None
def debug_code(code: str, language: str = "Python") -> DebugResult:
@@ -1251,7 +1252,7 @@ def debug_code(code: str, language: str = "Python") -> DebugResult:
return DebugResult(issues=issues, summary="Syntax error detected")
# Track simple assignments to infer literal container lengths
- container_lengths: dict[str, int] = {}
+ container_lengths: Dict[str, int] = {}
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
@@ -1362,7 +1363,7 @@ def debug_code(code: str, language: str = "Python") -> DebugResult:
# ── Combined ───────────────────────────────────────────────────────────────────
-def full_analysis(code: str, language_hint: str | None = None) -> dict:
+def full_analysis(code: str, language_hint: Optional[str] = None) -> dict:
"""Run the complete analysis pipeline for the provided source code.
Args:
diff --git a/backend/app/services/database.py b/backend/app/services/database.py
index b26b5f62..16bb7e00 100644
--- a/backend/app/services/database.py
+++ b/backend/app/services/database.py
@@ -63,7 +63,7 @@ async def save_entry(
return row_id
-async def get_entries(limit: int = 20, offset: int = 0) -> list[dict]:
+async def get_entries(limit: int = 20, offset: int = 0) -> List[Dtr]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
@@ -79,7 +79,7 @@ async def get_entries(limit: int = 20, offset: int = 0) -> list[dict]:
return [dict(row) for row in rows]
-async def search_entries(q: str, limit: int = 20) -> list[dict]:
+async def search_entries(q: str, limit: int = 20) -> List[Dtr]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
diff --git a/backend/app/services/email_service.py b/backend/app/services/email_service.py
index 3e872ba5..b10711b6 100644
--- a/backend/app/services/email_service.py
+++ b/backend/app/services/email_service.py
@@ -37,7 +37,7 @@ def _parse_score(result_json: str) -> int | None:
return None
-def _most_common_bug(issues: list[dict]) -> str | None:
+def _most_common_bug(issues: List[Dtr]) -> Optional[str]:
"""Return the most frequent bug type from a list of debug issues."""
from collections import Counter
@@ -91,7 +91,7 @@ def compute_subscriber_stats(db: Session, email: str) -> dict | None:
total = len(this_week)
languages: set[str] = set()
scores: list[int] = []
- all_issues: list[dict] = []
+ all_issues: List[Dtr] = []
for h in this_week:
try:
diff --git a/backend/app/services/line_utils.py b/backend/app/services/line_utils.py
index b239aab3..9b4d8425 100644
--- a/backend/app/services/line_utils.py
+++ b/backend/app/services/line_utils.py
@@ -1,6 +1,7 @@
"""Line number tracking utilities for code analysis."""
import re
+from typing import Optional, List, Dict
def get_line_content(code: str, line_number: int) -> str:
@@ -11,7 +12,7 @@ def get_line_content(code: str, line_number: int) -> str:
return ""
-def get_lines_range(code: str, start: int, end: int) -> list[str]:
+def get_lines_range(code: str, start: int, end: int) -> List[str]:
"""Get lines from start to end (inclusive)."""
lines = code.splitlines()
return lines[max(0, start - 1) : min(len(lines), end)]
@@ -51,8 +52,6 @@ def format_code_snippet(
def find_lines_matching_pattern(code: str, pattern: str) -> list[int]:
"""Find all line numbers matching regex pattern."""
- import re
-
lines = code.splitlines()
matches = []
@@ -85,7 +84,7 @@ def group_consecutive_lines(line_numbers: list[int]) -> list[tuple[int, int]]:
return groups
-def find_function_lines(code: str, language: str = "Python") -> list[dict]:
+def find_function_lines(code: str, language: str = "Python") -> List[Dict]:
"""Find all function definitions with their line ranges."""
if language == "Python":
pattern = r"def\s+(\w+)\s*\([^)]*\):"
@@ -104,13 +103,13 @@ def find_function_lines(code: str, language: str = "Python") -> list[dict]:
for i, match in enumerate(matches):
start_line = code[: match.start()].count("\n") + 1
- # Find end: either next function or EOF
if i + 1 < len(matches):
end_line = code[: matches[i + 1].start()].count("\n")
else:
end_line = len(code.splitlines())
func_name = next((g for g in match.groups() if g), "anonymous")
+
functions.append(
{
"name": func_name,
@@ -131,11 +130,9 @@ def find_undocumented_lines(code: str) -> list[int]:
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
- # Skip blank lines and pure comment lines
if not stripped or stripped.startswith(("#", "//", "/*", "*", '"""', "'''")):
continue
- # Check if there's a comment within last 2 lines
has_comment = False
for offset in range(-2, 1):
check_idx = idx + offset - 1
@@ -154,4 +151,4 @@ def find_undocumented_lines(code: str) -> list[int]:
def is_code_line(line: str) -> bool:
"""Check if line is actual code (not comment/blank)."""
stripped = line.strip()
- return stripped and not stripped.startswith(("#", "//", "/*", "*", '"""', "'''"))
+ return stripped and not stripped.startswith(("#", "//", "/*", "*", '"""', "'''"))
\ No newline at end of file
diff --git a/backend/app/services/llm_analysis.py b/backend/app/services/llm_analysis.py
index b1cb33f3..8d6a33d6 100644
--- a/backend/app/services/llm_analysis.py
+++ b/backend/app/services/llm_analysis.py
@@ -1,10 +1,10 @@
import logging
import json
-
+from typing import Optional, List, Dict
import httpx
from ..config import settings
-
+from typing import Optional, List, Dict
logger = logging.getLogger("ai_assistant.api")
@@ -24,7 +24,7 @@ def enabled(self) -> bool:
return bool(settings.llm_enabled and self.api_key)
async def _chat_completion(
- self, messages: list[dict], temperature: float = 0.2
+ self, messages: List[dict], temperature: float = 0.2
) -> str:
if not self.enabled:
raise LLMAnalysisError("llm_disabled")
@@ -124,7 +124,7 @@ async def analyze_code_structured(self, code: str, language_guess: str) -> dict:
raise LLMAnalysisError(str(exc)) from exc
async def chat_reply(
- self, message: str, code: str | None, history: list[str], level: str
+ self, message: str, code: Optional[str], history: List[str], level: str
) -> str:
prompt = (
"You are QyverixAI coding assistant in chat mode. "
diff --git a/backend/test-results/.last-run.json b/backend/test-results/.last-run.json
new file mode 100644
index 00000000..5fca3f84
--- /dev/null
+++ b/backend/test-results/.last-run.json
@@ -0,0 +1,4 @@
+{
+ "status": "failed",
+ "failedTests": []
+}
\ No newline at end of file
diff --git a/backend/tests/security_payloads.py b/backend/tests/security_payloads.py
index 8900a089..6adadcf8 100644
--- a/backend/tests/security_payloads.py
+++ b/backend/tests/security_payloads.py
@@ -85,7 +85,7 @@
def assert_no_raw_script_tag(data: dict | list) -> None:
"""Fail if any response value contains a literal ',
description: '
',
});
+
assertPlainTextHtml(html, 'issue card');
assert.ok(html.includes('issue-card info') || html.includes('issue-card error'));
assert.ok(!html.includes('onmouseover='));
@@ -169,6 +183,7 @@ describe('buildIssueCardHtml / buildSuggestCardHtml', () => {
priority: 'high">',
description: '${alert(1)}',
});
+
assertPlainTextHtml(html, 'suggest card');
assert.ok(html.includes('priority-medium') || html.includes('priority-high'));
assert.ok(!html.includes('