Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ on:
branches: ["main"]

env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENAI_BASE_URL: ${{ secrets.OPENAI_BASE_URL }}
MONGODB_URI: ${{ secrets.MONGODB_URI }}
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
RABBITMQ_USER: ${{ secrets.RABBITMQ_USER }}
Expand All @@ -17,7 +19,6 @@ env:
POSTGRES_USER: ${{ secrets.POSTGRES_USER }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}


jobs:
spring-services:
name: Spring services
Expand Down
24 changes: 22 additions & 2 deletions services/py-intelligence/app/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
"shortened": "Gemini",
"cloud": True,
},
{
"name": "openai/gpt-oss-120b",
"provider": "openai",
"shortened": "GPT",
"cloud": True,
},
{
"name": "Qwen/Qwen2.5-Coder-3B-Instruct-GGUF",
"model_path": "/app/models/qwen2.5-coder-3b-instruct-q4_k_m.gguf",
Expand Down Expand Up @@ -159,7 +165,18 @@ def analyze(
"""
model = self.get_model_for_mode(mode)
prompt = self._build_analysis_prompt(content, mode, use_rag, context, retrieved_docs or [])
raw_response = model.generate(prompt)
try:
raw_response = model.generate(prompt)
except Exception as e:
# Fallback to OpenAI GPT model if Gemini cloud provider fails
if mode == "cloud":
try:
fallback_model = next(m for m in self.models if m.cloud and m.provider == "openai")
raw_response = fallback_model.generate(prompt)
except StopIteration:
raise e
else:
raise e
parsed_response = self._parse_model_response(raw_response)
return self._normalize_response(parsed_response, retrieved_docs or [], use_rag)

Expand Down Expand Up @@ -287,7 +304,10 @@ def _parse_model_response(self, raw_response: str) -> dict[str, Any]:
raise ValueError("Model response was not valid JSON.")

def _normalize_response(
self, response: dict[str, Any], retrieved_docs: list[dict[str, Any]], use_rag: bool
self,
response: dict[str, Any],
retrieved_docs: list[dict[str, Any]],
use_rag: bool,
) -> dict[str, Any]:
"""Ensures that the model's parsed JSON response is fully compliant and structurally sound.

Expand Down
35 changes: 35 additions & 0 deletions services/py-intelligence/app/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def _load(self):
)
return self._client

if self.provider == "openai":
# Direct HTTP-based API calls; no heavy SDK setup required
self._client = True
return self._client

raise ValueError(f"Unknown provider: {self.provider}")

def generate(self, prompt: str) -> str:
Expand All @@ -72,6 +77,36 @@ def generate(self, prompt: str) -> str:
)
return result["choices"][0]["message"]["content"]

if self.provider == "openai":
import os
import httpx

api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY environment variable is not set.")

headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model_name,
"messages": [
{
"role": "system",
"content": "You are DevPulse AI Insighter. Return valid JSON only.",
},
{"role": "user", "content": prompt},
],
"temperature": 0.1,
}
api_base = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
url = f"{api_base.rstrip('/')}/chat/completions"
response = httpx.post(url, json=payload, headers=headers, timeout=30.0)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]

raise ValueError(f"Unknown provider: {self.provider}")

def __str__(self) -> str:
Expand Down
3 changes: 2 additions & 1 deletion services/py-intelligence/app/utils/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def __init__(self):
collection_name = os.getenv("COLLECTION_NAME", "rag_documents")

self.client = pymongo.MongoClient(
mongodb_uri, tlsCAFile=certifi.where() if "mongodb+srv" in mongodb_uri else None
mongodb_uri,
tlsCAFile=certifi.where() if "mongodb+srv" in mongodb_uri else None,
)
self.db = self.client[db_name]
self.collection = self.db[collection_name]
Expand Down
6 changes: 5 additions & 1 deletion services/py-intelligence/tests/test_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ def mock_db_config():
"""
with patch.dict(
"os.environ",
{"MONGODB_URI": "mongodb://localhost:27017", "DB_NAME": "test_db", "COLLECTION_NAME": "test_collection"},
{
"MONGODB_URI": "mongodb://localhost:27017",
"DB_NAME": "test_db",
"COLLECTION_NAME": "test_collection",
},
):
yield

Expand Down
16 changes: 13 additions & 3 deletions services/py-intelligence/tests/test_embedding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
import pytest
import numpy as np
from unittest.mock import MagicMock, patch
from app.utils.embedding_utils import get_embedding, create_all_embeddings, similarity_search
from app.utils.embedding_utils import (
get_embedding,
create_all_embeddings,
similarity_search,
)


@pytest.fixture
Expand Down Expand Up @@ -52,7 +56,10 @@ def test_create_all_embeddings_success(mock_db, mock_sentence_transformer):
mock_db.return_value.db = {collection_name: mock_collection}

# Mock documents to be processed
mock_collection.find.return_value = [{"_id": "1", "content": "text 1"}, {"_id": "2", "content": "text 2"}]
mock_collection.find.return_value = [
{"_id": "1", "content": "text 1"},
{"_id": "2", "content": "text 2"},
]

# Mock embedding generation
mock_sentence_transformer.encode.return_value = np.array([0.5, 0.6])
Expand Down Expand Up @@ -93,7 +100,10 @@ def test_similarity_search(mock_db, mock_sentence_transformer):
mock_db.return_value.db = {collection_name: mock_collection}

# Mock aggregation results
mock_collection.aggregate.return_value = [{"title": "Result 1", "embedding": [0.1]}, {"title": "Result 2"}]
mock_collection.aggregate.return_value = [
{"title": "Result 1", "embedding": [0.1]},
{"title": "Result 2"},
]

# Mock query embedding
mock_sentence_transformer.encode.return_value = np.array([0.9, 0.8])
Expand Down
72 changes: 69 additions & 3 deletions services/py-intelligence/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,25 @@ def test_analyze_endpoint_uses_rag(mock_search, mock_get_model) -> None:
mock_model.generate.return_value = json.dumps(
{
**LOCAL_ANALYSIS_RESPONSE,
"sources": [{"id": "mock_id", "title": "Database timeout fix", "tags": ["db", "timeout"]}],
"sources": [
{
"id": "mock_id",
"title": "Database timeout fix",
"tags": ["db", "timeout"],
}
],
"confidence": "high",
}
)
mock_get_model.return_value = mock_model

response = client.post(
"/api/v1/analyze",
json={"content": "Deployment failed: database connection timeout", "mode": "local", "use_rag": True},
json={
"content": "Deployment failed: database connection timeout",
"mode": "local",
"use_rag": True,
},
)

assert response.status_code == 200
Expand Down Expand Up @@ -201,7 +211,14 @@ def test_normalize_response_defaults_and_rag_sources() -> None:
assert normalized["confidence"] == "low"

# RAG enabled, empty sources: should populate sources from retrieved docs
retrieved = [{"_id": "1a", "title": "Doc 1", "tags": ["tag1"], "content": "This is a detailed snippet of Doc 1."}]
retrieved = [
{
"_id": "1a",
"title": "Doc 1",
"tags": ["tag1"],
"content": "This is a detailed snippet of Doc 1.",
}
]
normalized_rag = intel._normalize_response({"problem_type": "x"}, retrieved_docs=retrieved, use_rag=True)

assert len(normalized_rag["sources"]) == 1
Expand Down Expand Up @@ -375,3 +392,52 @@ def test_model_load_raises_value_error_for_unknown_provider() -> None:

with pytest.raises(ValueError, match="Unknown provider: unknown_provider"):
model._load()


def test_openai_model_load_and_generate() -> None:
"""Verifies that the Model class successfully handles the OpenAI provider."""
from app.model import Model

openai_cfg = {
"name": "gpt-4o-mini",
"provider": "openai",
"shortened": "GPT",
"cloud": True,
}
model = Model(openai_cfg)

# Test load
assert model._load() is True

# Test generate
with patch("httpx.post") as mock_post, patch.dict("os.environ", {"OPENAI_API_KEY": "test-key"}):
mock_response = MagicMock()
mock_response.json.return_value = {"choices": [{"message": {"content": '{"problem_type": "none"}'}}]}
mock_post.return_value = mock_response

res = model.generate("test prompt")
assert res == '{"problem_type": "none"}'
mock_post.assert_called_once()


@patch("app.apis.intelligence.models")
def test_analyze_fallback_to_openai(mock_models) -> None:
"""Verifies that analyze falls back to OpenAI if Gemini fails in cloud mode."""
from app.func import Intelligence

intel = Intelligence()

# Find the models in the instances
gemini_model = next(m for m in intel.models if m.provider == "google")
openai_model = next(m for m in intel.models if m.provider == "openai")

# Mock Gemini model generate to raise an exception, and OpenAI model generate to return JSON
with (
patch.object(gemini_model, "generate", side_effect=Exception("Gemini Offline")),
patch.object(openai_model, "generate", return_value=json.dumps(LOCAL_ANALYSIS_RESPONSE)) as mock_openai_gen,
):

res = intel.analyze(content="test logs", mode="cloud", use_rag=False)

assert res["problem_type"] == LOCAL_ANALYSIS_RESPONSE["problem_type"]
mock_openai_gen.assert_called_once()
Loading