diff --git a/.dockerignore b/.dockerignore
index 638e37f..ce8a783 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -1,3 +1,37 @@
+# Include any files or directories that you don't want to be copied to your
+# container here (e.g., local build artifacts, temporary files, etc.).
+#
+# For more help, visit the .dockerignore file reference guide at
+# https://docs.docker.com/engine/reference/builder/#dockerignore-file
+
+**/.DS_Store
+**/__pycache__
+**/.venv
+**/.classpath
+**/.dockerignore
+**/.env
+**/.git
+**/.gitignore
+**/.project
+**/.settings
+**/.toolstarget
+**/.vs
+**/.vscode
+**/*.*proj.user
+**/*.dbmdl
+**/*.jfm
+**/bin
+**/charts
+**/docker-compose*
+**/compose*
+**/Dockerfile*
+**/node_modules
+**/npm-debug.log
+**/obj
+**/secrets.dev.yaml
+**/values.dev.yaml
+LICENSE
+README.md
# UV and Python cache directories
**/__pycache__/
**/*.py[cod]
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..5122f9e
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,9 @@
+{
+ "chat.tools.terminal.autoApprove": {
+ "/^cd H:\\\\Works\\\\Code-Migration\\\\Container-Migration-Solution-Accelerator\\\\src\\\\backend-api ; python -m ruff check src/ --fix 2>&1$/": {
+ "approve": true,
+ "matchCommandLine": true
+ },
+ "npx eslint": true
+ }
+}
\ No newline at end of file
diff --git a/infra/main.bicep b/infra/main.bicep
index b4efa0f..16a2440 100644
--- a/infra/main.bicep
+++ b/infra/main.bicep
@@ -78,6 +78,16 @@ param aiModelVersion string = '2025-04-16'
@description('Optional. AI model deployment token capacity. Lower this if initial provisioning fails due to capacity. Defaults to 50K tokens per minute to improve regional success rate.')
param aiModelCapacity int = 500
+@minLength(1)
+@description('Optional. Name of the embedding model to deploy. Defaults to text-embedding-3-large.')
+param aiEmbeddingModelName string = 'text-embedding-3-large'
+
+@description('Optional. Version of the embedding model. Defaults to 1.')
+param aiEmbeddingModelVersion string = '1'
+
+@description('Optional. Embedding model deployment token capacity. Defaults to 500.')
+param aiEmbeddingModelCapacity int = 500
+
@description('Optional. The tags to apply to all deployed Azure resources.')
param tags resourceInput<'Microsoft.Resources/resourceGroups@2025-04-01'>.tags = {}
@@ -761,6 +771,18 @@ module existingAiFoundryAiServicesDeployments 'modules/ai-services-deployments.b
capacity: aiModelCapacity
}
}
+ {
+ name: aiEmbeddingModelName
+ model: {
+ format: 'OpenAI'
+ name: aiEmbeddingModelName
+ version: aiEmbeddingModelVersion
+ }
+ sku: {
+ name: 'Standard'
+ capacity: aiEmbeddingModelCapacity
+ }
+ }
]
roleAssignments: [
// Service Principal permissions
@@ -857,6 +879,18 @@ module aiFoundry 'br/public:avm/ptn/ai-ml/ai-foundry:0.4.0' = if(!useExistingAiF
capacity: aiModelCapacity
}
}
+ {
+ name: aiEmbeddingModelName
+ model: {
+ format: 'OpenAI'
+ name: aiEmbeddingModelName
+ version: aiEmbeddingModelVersion
+ }
+ sku: {
+ name: 'Standard'
+ capacity: aiEmbeddingModelCapacity
+ }
+ }
]
tags: allTags
enableTelemetry: enableTelemetry
@@ -905,6 +939,10 @@ module appConfiguration 'br/public:avm/res/app-configuration/configuration-store
name: 'AZURE_OPENAI_CHAT_DEPLOYMENT_NAME'
value: aiModelDeploymentName
}
+ {
+ name: 'AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME'
+ value: aiEmbeddingModelName
+ }
{
name: 'AZURE_OPENAI_ENDPOINT'
value: 'https://${aiServicesName}.cognitiveservices.azure.com/'
diff --git a/src/frontend/src/pages/batchView.tsx b/src/frontend/src/pages/batchView.tsx
index f0c2e75..1ec56c8 100644
--- a/src/frontend/src/pages/batchView.tsx
+++ b/src/frontend/src/pages/batchView.tsx
@@ -713,14 +713,14 @@ const BatchStoryPage = () => {
Step Timeline
{(() => {
- const stepOrder = ['analysis', 'design', 'yaml', 'yaml_conversion', 'documentation'];
+ const stepOrder = ['analysis', 'design', 'yaml', 'documentation'];
const stepLabels: Record = {
'analysis': 'Analysis', 'design': 'Design', 'yaml': 'YAML Conversion',
- 'yaml_conversion': 'YAML Conversion', 'documentation': 'Documentation'
+ 'documentation': 'Documentation'
};
const stepIcons: Record = {
'analysis': '🔍', 'design': '📐', 'yaml': '📄',
- 'yaml_conversion': '📄', 'documentation': '📝'
+ 'documentation': '📝'
};
const timings = telemetryData.step_timings;
const totalElapsed = Object.values(timings).reduce((sum: number, t: any) => sum + (t?.elapsed_seconds || 0), 0);
@@ -747,7 +747,7 @@ const BatchStoryPage = () => {
const r = Array.isArray(stepResult.result) ? stepResult.result[0] : stepResult.result;
if (key === 'analysis') {
summary = `${r?.output?.platform_detected || ''} detected (${r?.output?.confidence_score || ''})`;
- } else if (key === 'yaml' || key === 'yaml_conversion') {
+ } else if (key === 'yaml') {
const metrics = r?.termination_output?.overall_conversion_metrics;
if (metrics) summary = `${metrics.successful_conversions}/${metrics.total_files} files converted (${metrics.overall_accuracy})`;
} else if (key === 'design') {
@@ -800,7 +800,7 @@ const BatchStoryPage = () => {
};
const stepLabels: Record = {
'analysis': 'Analysis', 'design': 'Design', 'yaml': 'YAML',
- 'yaml_conversion': 'YAML', 'documentation': 'Docs'
+ 'documentation': 'Docs'
};
return Object.entries(agents)
diff --git a/src/processor/src/libs/agent_framework/azure_openai_response_retry.py b/src/processor/src/libs/agent_framework/azure_openai_response_retry.py
index 251e029..2b76768 100644
--- a/src/processor/src/libs/agent_framework/azure_openai_response_retry.py
+++ b/src/processor/src/libs/agent_framework/azure_openai_response_retry.py
@@ -31,9 +31,9 @@ def _format_exc_brief(exc: BaseException) -> str:
@dataclass(frozen=True)
class RateLimitRetryConfig:
- max_retries: int = 5
- base_delay_seconds: float = 2.0
- max_delay_seconds: float = 30.0
+ max_retries: int = 8
+ base_delay_seconds: float = 5.0
+ max_delay_seconds: float = 120.0
@staticmethod
def from_env(
@@ -54,9 +54,9 @@ def _float(name: str, default: float) -> float:
return default
return RateLimitRetryConfig(
- max_retries=max(0, _int(max_retries_env, 5)),
- base_delay_seconds=max(0.0, _float(base_delay_env, 2.0)),
- max_delay_seconds=max(0.0, _float(max_delay_env, 30.0)),
+ max_retries=max(0, _int(max_retries_env, 8)),
+ base_delay_seconds=max(0.0, _float(base_delay_env, 5.0)),
+ max_delay_seconds=max(0.0, _float(max_delay_env, 120.0)),
)
@@ -69,6 +69,15 @@ def _looks_like_rate_limit(error: BaseException) -> bool:
if status == 429:
return True
+ # Treat empty error messages as transient (likely connection reset or
+ # incomplete response from Azure front-end) — worth retrying.
+ if not msg or msg == str(type(error).__name__).lower():
+ return True
+
+ # Server errors (5xx) are transient and should be retried.
+ if isinstance(status, int) and 500 <= status < 600:
+ return True
+
cause = getattr(error, "__cause__", None)
if cause and cause is not error:
return _looks_like_rate_limit(cause)
@@ -246,14 +255,14 @@ class ContextTrimConfig:
"""
enabled: bool = True
- # GPT-5.1 supports 272K input tokens (~800K chars). These defaults stay well
- # within that budget while guarding against accidental large blob injection.
- # Progressive trimming on retry will reduce these further if needed.
- max_total_chars: int = 600_000
- max_message_chars: int = 40_000
- keep_last_messages: int = 50
- keep_head_chars: int = 15_000
- keep_tail_chars: int = 5_000
+ # GPT-5.1 supports 272K input tokens (~800K chars). With workspace context
+ # injected into system instructions (never trimmed) and Qdrant shared memory
+ # providing cross-step context, we can keep fewer conversation messages.
+ max_total_chars: int = 400_000
+ max_message_chars: int = 0 # Disabled — with keep_last_messages=15, per-message truncation is unnecessary
+ keep_last_messages: int = 15
+ keep_head_chars: int = 12_000
+ keep_tail_chars: int = 4_000
keep_system_messages: bool = True
retry_on_context_error: bool = True
@@ -284,7 +293,7 @@ def _bool(name: str, default: bool) -> bool:
enabled=_bool(enabled_env, True),
max_total_chars=max(0, _int(max_total_chars_env, 240_000)),
max_message_chars=max(0, _int(max_message_chars_env, 20_000)),
- keep_last_messages=max(1, _int(keep_last_messages_env, 40)),
+ keep_last_messages=max(1, _int(keep_last_messages_env, 15)),
keep_head_chars=max(0, _int(keep_head_chars_env, 10_000)),
keep_tail_chars=max(0, _int(keep_tail_chars_env, 3_000)),
keep_system_messages=_bool(keep_system_messages_env, True),
@@ -299,42 +308,18 @@ def _trim_messages(
return list(messages)
# ──────────────────────────────────────────────────────────────────────
- # Phase 0: Smart tool-result compression.
- # Tool outputs (read_blob_content, save_content_to_blob, etc.) are the
- # largest context consumers. Once an agent has responded after a tool
- # call, the raw output is redundant — the agent's response is the
- # distilled intelligence. We compress old tool results aggressively
- # while keeping the most recent ones intact for the current agent turn.
+ # Phase 0: Summarize large save_content_to_blob calls.
+ # Write payloads are redundant once persisted — replace with a short
+ # summary. Read tool results are never truncated so the model always
+ # has the full file content to reason about.
# ──────────────────────────────────────────────────────────────────────
- KEEP_RECENT_TOOL_RESULTS = 4 # Keep the N most recent tool results in full
- TOOL_RESULT_MAX_CHARS = 500 # Truncate older tool results to this size
SAVE_ARG_MAX_CHARS = 200 # Truncate save_content_to_blob arguments
- tool_result_indices: list[int] = []
for i, m in enumerate(messages):
- role = _get_message_role(m)
text = _estimate_message_text(m)
- if role == "tool" or (role is None and _looks_like_tool_result(text)):
- tool_result_indices.append(i)
- # Also detect save_content_to_blob in assistant/function messages
- elif _looks_like_save_blob_call(text):
- if len(text) > SAVE_ARG_MAX_CHARS:
- # Extract just the blob name and byte count
- summary = _summarize_save_blob(text, SAVE_ARG_MAX_CHARS)
- messages[i] = _set_message_text(m, summary)
-
- # Compress older tool results, keep recent ones in full
- if len(tool_result_indices) > KEEP_RECENT_TOOL_RESULTS:
- old_indices = tool_result_indices[:-KEEP_RECENT_TOOL_RESULTS]
- for idx in old_indices:
- m = messages[idx]
- text = _estimate_message_text(m)
- if len(text) > TOOL_RESULT_MAX_CHARS:
- truncated = (
- text[:TOOL_RESULT_MAX_CHARS]
- + f"\n[... tool output truncated from {len(text)} chars ...]"
- )
- messages[idx] = _set_message_text(m, truncated)
+ if _looks_like_save_blob_call(text) and len(text) > SAVE_ARG_MAX_CHARS:
+ summary = _summarize_save_blob(text, SAVE_ARG_MAX_CHARS)
+ messages[i] = _set_message_text(m, summary)
# Keep last N messages; optionally keep system messages from the head.
system_messages: list[Any] = []
@@ -354,14 +339,21 @@ def _trim_messages(
seen_fingerprints: set[tuple[str, str]] = set()
cleaned: list[Any] = []
- for m in tail:
+ for idx, m in enumerate(tail):
text = _estimate_message_text(m)
fp = (text[:200], text[-200:])
if fp in seen_fingerprints:
continue
seen_fingerprints.add(fp)
- if cfg.max_message_chars > 0 and len(text) > cfg.max_message_chars:
+ # Never truncate the last message — the agent needs it in full
+ # to reason about the most recent tool result or instruction.
+ is_last = idx == len(tail) - 1
+ if (
+ not is_last
+ and cfg.max_message_chars > 0
+ and len(text) > cfg.max_message_chars
+ ):
text = _truncate_text(
text,
max_chars=cfg.max_message_chars,
@@ -584,6 +576,14 @@ async def _inner_get_response(
len(messages),
len(trimmed),
)
+ # Cool down before retrying to avoid triggering 429s immediately.
+ trim_delay = self._retry_config.base_delay_seconds
+ trim_delay = min(trim_delay, self._retry_config.max_delay_seconds)
+ logger.info(
+ "[AOAI_CTX_TRIM] sleeping %ss before retry",
+ round(trim_delay, 1),
+ )
+ await asyncio.sleep(trim_delay)
return await _retry_call(
lambda: parent_inner_get_response(
messages=trimmed, chat_options=chat_options, **kwargs
@@ -690,6 +690,18 @@ async def _tail():
if attempt_index >= attempts - 1:
# No more retries available.
raise
+
+ # Cool down before retrying — immediate retries after trimming
+ # tend to trigger 429s because the API hasn't recovered yet.
+ trim_delay = self._retry_config.base_delay_seconds * (
+ 2**attempt_index
+ )
+ trim_delay = min(trim_delay, self._retry_config.max_delay_seconds)
+ logger.info(
+ "[AOAI_CTX_TRIM_STREAM] sleeping %ss before retry",
+ round(trim_delay, 1),
+ )
+ await asyncio.sleep(trim_delay)
continue
if not _looks_like_rate_limit(e) or attempt_index >= attempts - 1:
diff --git a/src/processor/src/libs/agent_framework/mem0_async_memory.py b/src/processor/src/libs/agent_framework/mem0_async_memory.py
index 35b18c7..5740790 100644
--- a/src/processor/src/libs/agent_framework/mem0_async_memory.py
+++ b/src/processor/src/libs/agent_framework/mem0_async_memory.py
@@ -3,6 +3,8 @@
"""Lazy-initialized async wrapper around the Mem0 vector-store memory backend."""
+import os
+
from mem0 import AsyncMemory
@@ -17,6 +19,13 @@ async def get_memory(self):
return self._memory_instance
async def _create_memory(self):
+ endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "")
+ chat_deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME", "gpt-5.1")
+ embedding_deployment = os.getenv(
+ "AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", "text-embedding-3-large"
+ )
+ api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-12-01-preview")
+
config = {
"vector_store": {
"provider": "redis",
@@ -29,27 +38,24 @@ async def _create_memory(self):
"llm": {
"provider": "azure_openai",
"config": {
- "model": "gpt-5.1",
+ "model": chat_deployment,
"temperature": 0.1,
- "max_tokens": 100000,
+ "max_tokens": 4000,
"azure_kwargs": {
- "azure_deployment": "gpt-5.1",
- "api_version": "2024-12-01-preview",
- "azure_endpoint": "https://aifappframework.cognitiveservices.azure.com/",
+ "azure_deployment": chat_deployment,
+ "api_version": api_version,
+ "azure_endpoint": endpoint,
},
},
},
"embedder": {
"provider": "azure_openai",
"config": {
- "model": "text-embedding-3-large",
+ "model": embedding_deployment,
"azure_kwargs": {
- "api_version": "2024-02-01",
- "azure_deployment": "text-embedding-3-large",
- "azure_endpoint": "https://aifappframework.openai.azure.com/",
- "default_headers": {
- "CustomHeader": "container migration",
- },
+ "api_version": api_version,
+ "azure_deployment": embedding_deployment,
+ "azure_endpoint": endpoint,
},
},
},
diff --git a/src/processor/src/libs/agent_framework/qdrant_memory_store.py b/src/processor/src/libs/agent_framework/qdrant_memory_store.py
new file mode 100644
index 0000000..b71d937
--- /dev/null
+++ b/src/processor/src/libs/agent_framework/qdrant_memory_store.py
@@ -0,0 +1,327 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+"""Qdrant-backed shared memory store for multi-agent context sharing.
+
+This module provides a vector memory store using Qdrant (in-process embedded mode)
+that enables agents to share relevant context without carrying full conversation
+history. Each migration process gets its own isolated collection.
+
+Usage:
+ store = QdrantMemoryStore(process_id="abc-123")
+ await store.initialize(embedding_client)
+
+ # Store a memory
+ await store.add("AKS supports node auto-provisioning via Karpenter",
+ agent_name="AKS Expert", step="analysis", turn=3)
+
+ # Retrieve relevant memories
+ memories = await store.search("How should we handle node scaling?", top_k=5)
+
+ # Cleanup when process completes
+ await store.close()
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import time
+import uuid
+from dataclasses import dataclass
+
+from openai import AsyncAzureOpenAI
+from qdrant_client import AsyncQdrantClient, models
+
+logger = logging.getLogger(__name__)
+
+# Qdrant collection settings
+EMBEDDING_DIM = 3072 # text-embedding-3-large dimension
+DISTANCE_METRIC = models.Distance.COSINE
+
+
+@dataclass
+class MemoryEntry:
+ """A single memory retrieved from the store."""
+
+ content: str
+ agent_name: str
+ step: str
+ turn: int
+ score: float
+ memory_id: str
+
+
+class QdrantMemoryStore:
+ """Qdrant-backed vector memory store for sharing context across agents.
+
+ Uses Qdrant embedded (in-process) mode — no external server needed.
+ Each migration process gets its own collection for isolation.
+ """
+
+ def __init__(self, process_id: str):
+ self.process_id = process_id
+ self.collection_name = f"migration_{process_id.replace('-', '_')}"
+ self._client: AsyncQdrantClient | None = None
+ self._embedding_client: AsyncAzureOpenAI | None = None
+ self._embedding_deployment: str | None = None
+ self._initialized = False
+ self._turn_counter = 0
+
+ async def initialize(
+ self,
+ embedding_client: AsyncAzureOpenAI,
+ embedding_deployment: str,
+ ) -> None:
+ """Initialize the Qdrant client and create the collection.
+
+ Args:
+ embedding_client: Azure OpenAI async client for generating embeddings.
+ embedding_deployment: Deployment name for the embedding model.
+ """
+ if self._initialized:
+ return
+
+ self._embedding_client = embedding_client
+ self._embedding_deployment = embedding_deployment
+
+ # In-memory Qdrant — no server, no persistence, auto-cleanup
+ self._client = AsyncQdrantClient(":memory:")
+
+ await self._client.create_collection(
+ collection_name=self.collection_name,
+ vectors_config=models.VectorParams(
+ size=EMBEDDING_DIM,
+ distance=DISTANCE_METRIC,
+ ),
+ )
+
+ self._initialized = True
+ logger.info(
+ "[MEMORY] QdrantMemoryStore initialized for process %s (collection: %s)",
+ self.process_id,
+ self.collection_name,
+ )
+
+ async def add(
+ self,
+ content: str,
+ *,
+ agent_name: str,
+ step: str,
+ turn: int | None = None,
+ metadata: dict | None = None,
+ ) -> str:
+ """Store a memory entry with its embedding.
+
+ Args:
+ content: The text content to store.
+ agent_name: Name of the agent that produced this content.
+ step: Migration step (analysis, design, convert, documentation).
+ turn: Conversation turn number (auto-incremented if None).
+ metadata: Optional additional metadata.
+
+ Returns:
+ The unique ID of the stored memory.
+ """
+ if not self._initialized:
+ raise RuntimeError(
+ "QdrantMemoryStore not initialized. Call initialize() first."
+ )
+
+ if not content or not content.strip():
+ return ""
+
+ if turn is None:
+ self._turn_counter += 1
+ turn = self._turn_counter
+
+ # Generate embedding
+ embedding = await self._embed(content)
+ if embedding is None:
+ logger.warning("[MEMORY] Failed to generate embedding, skipping store")
+ return ""
+
+ memory_id = str(uuid.uuid4())
+ payload = {
+ "content": content,
+ "agent_name": agent_name,
+ "step": step,
+ "turn": turn,
+ "process_id": self.process_id,
+ "timestamp": time.time(),
+ }
+ if metadata:
+ payload["metadata"] = metadata
+
+ await self._client.upsert(
+ collection_name=self.collection_name,
+ points=[
+ models.PointStruct(
+ id=memory_id,
+ vector=embedding,
+ payload=payload,
+ )
+ ],
+ )
+
+ logger.debug(
+ "[MEMORY] Stored memory from %s (step=%s, turn=%d, %d chars)",
+ agent_name,
+ step,
+ turn,
+ len(content),
+ )
+ return memory_id
+
+ async def search(
+ self,
+ query: str,
+ *,
+ top_k: int = 10,
+ step_filter: str | None = None,
+ agent_filter: str | None = None,
+ score_threshold: float = 0.3,
+ ) -> list[MemoryEntry]:
+ """Search for relevant memories using semantic similarity.
+
+ Args:
+ query: The search query text.
+ top_k: Maximum number of results to return.
+ step_filter: Optional filter by migration step.
+ agent_filter: Optional filter by agent name.
+ score_threshold: Minimum similarity score (0-1).
+
+ Returns:
+ List of MemoryEntry objects sorted by relevance.
+ """
+ if not self._initialized:
+ return []
+
+ embedding = await self._embed(query)
+ if embedding is None:
+ return []
+
+ # Build optional filters
+ conditions = []
+ if step_filter:
+ conditions.append(
+ models.FieldCondition(
+ key="step",
+ match=models.MatchValue(value=step_filter),
+ )
+ )
+ if agent_filter:
+ conditions.append(
+ models.FieldCondition(
+ key="agent_name",
+ match=models.MatchValue(value=agent_filter),
+ )
+ )
+
+ query_filter = models.Filter(must=conditions) if conditions else None
+
+ results = await self._client.query_points(
+ collection_name=self.collection_name,
+ query=embedding,
+ query_filter=query_filter,
+ limit=top_k,
+ score_threshold=score_threshold,
+ )
+
+ memories = []
+ for point in results.points:
+ payload = point.payload or {}
+ memories.append(
+ MemoryEntry(
+ content=payload.get("content", ""),
+ agent_name=payload.get("agent_name", ""),
+ step=payload.get("step", ""),
+ turn=payload.get("turn", 0),
+ score=point.score,
+ memory_id=str(point.id),
+ )
+ )
+
+ logger.debug(
+ "[MEMORY] Search returned %d results (query: %.80s...)",
+ len(memories),
+ query,
+ )
+ return memories
+
+ async def get_count(self) -> int:
+ """Return the number of memories stored."""
+ if not self._initialized:
+ return 0
+ info = await self._client.get_collection(self.collection_name)
+ return info.points_count
+
+ async def close(self) -> None:
+ """Close the Qdrant client and release resources."""
+ if self._client:
+ try:
+ await self._client.delete_collection(self.collection_name)
+ except Exception:
+ pass
+ await self._client.close()
+ self._client = None
+ self._initialized = False
+ logger.info("[MEMORY] QdrantMemoryStore closed for process %s", self.process_id)
+
+ # Embedding retry config (lighter than chat — embeddings are fast and cheap)
+ _EMBED_MAX_RETRIES = 3
+ _EMBED_BASE_DELAY = 2.0
+ _EMBED_MAX_DELAY = 30.0
+
+ async def _embed(self, text: str) -> list[float] | None:
+ """Generate an embedding vector for the given text with retry."""
+ if not self._embedding_client or not self._embedding_deployment:
+ logger.warning(
+ "[MEMORY] _embed skipped — client=%s, deployment=%s",
+ "set" if self._embedding_client else "None",
+ self._embedding_deployment or "None",
+ )
+ return None
+
+ last_error: Exception | None = None
+ for attempt in range(self._EMBED_MAX_RETRIES + 1):
+ try:
+ response = await self._embedding_client.embeddings.create(
+ input=text,
+ model=self._embedding_deployment,
+ )
+ return response.data[0].embedding
+ except Exception as e:
+ last_error = e
+ msg = str(e).lower()
+ is_retryable = any(
+ s in msg
+ for s in ["429", "too many requests", "rate limit", "throttle",
+ "timeout", "connection", "server error", "502", "503", "504"]
+ ) or (not msg) # empty error message = transient
+
+ if not is_retryable or attempt >= self._EMBED_MAX_RETRIES:
+ logger.warning(
+ "[MEMORY] Embedding call failed (attempt %d/%d, not retrying): %s",
+ attempt + 1,
+ self._EMBED_MAX_RETRIES + 1,
+ e,
+ )
+ return None
+
+ delay = min(
+ self._EMBED_BASE_DELAY * (2 ** attempt),
+ self._EMBED_MAX_DELAY,
+ )
+ logger.warning(
+ "[MEMORY] Embedding call failed (attempt %d/%d), retrying in %.1fs: %s",
+ attempt + 1,
+ self._EMBED_MAX_RETRIES + 1,
+ delay,
+ e,
+ )
+ await asyncio.sleep(delay)
+
+ logger.warning("[MEMORY] Embedding exhausted all retries: %s", last_error)
+ return None
diff --git a/src/processor/src/libs/agent_framework/shared_memory_context_provider.py b/src/processor/src/libs/agent_framework/shared_memory_context_provider.py
new file mode 100644
index 0000000..a143a88
--- /dev/null
+++ b/src/processor/src/libs/agent_framework/shared_memory_context_provider.py
@@ -0,0 +1,316 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+"""ContextProvider that injects shared Qdrant-backed memories into agent context.
+
+This provider is attached to each agent in a GroupChat. Before each LLM call,
+it queries the shared QdrantMemoryStore for relevant memories and injects them
+as additional context. After each LLM response, it stores the agent's response
+back into the shared memory for other agents to discover.
+
+This enables agents to share knowledge without carrying the full conversation
+history in their context window.
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import MutableSequence, Sequence
+from typing import TYPE_CHECKING
+
+from agent_framework import ChatMessage, Context, ContextProvider
+
+if TYPE_CHECKING:
+ from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore
+
+logger = logging.getLogger(__name__)
+
+# Maximum characters of memory context to inject (prevents context bloat)
+MAX_MEMORY_CONTEXT_CHARS = 15_000
+
+# Minimum content length to store (skip trivial messages)
+MIN_CONTENT_LENGTH_TO_STORE = 50
+
+
+# Step order for determining cross-step queries
+_STEP_ORDER = ["analysis", "design", "convert", "documentation"]
+
+
+class SharedMemoryContextProvider(ContextProvider):
+ """ContextProvider that reads/writes shared memory via Qdrant.
+
+ Attached to each agent individually, but all agents share the same
+ QdrantMemoryStore instance, enabling cross-agent knowledge sharing.
+
+ Optimized for cross-step memory sharing:
+ - invoking(): only searches memories from PREVIOUS steps (within-step context
+ is already available via GroupChat conversation broadcast)
+ - invoked(): only stores the LAST response per agent per step (avoids
+ redundant embedding calls for intermediate turns)
+ """
+
+ def __init__(
+ self,
+ memory_store: QdrantMemoryStore,
+ agent_name: str,
+ step: str,
+ top_k: int = 10,
+ score_threshold: float = 0.3,
+ ):
+ """Initialize the shared memory context provider.
+
+ Args:
+ memory_store: Shared QdrantMemoryStore instance (same across all agents).
+ agent_name: Name of the agent this provider is attached to.
+ step: Current migration step (analysis, design, convert, documentation).
+ top_k: Number of relevant memories to retrieve per turn.
+ score_threshold: Minimum similarity score for memory retrieval.
+ """
+ self._memory_store = memory_store
+ self._agent_name = agent_name
+ self._step = step
+ self._top_k = top_k
+ self._score_threshold = score_threshold
+ self._turn_counter = 0
+ self._last_content: str | None = (
+ None # Track last response for deferred storage
+ )
+
+ # Determine which prior steps to search (skip current step)
+ step_lower = step.lower()
+ step_idx = None
+ for i, s in enumerate(_STEP_ORDER):
+ if s == step_lower:
+ step_idx = i
+ break
+ self._prior_steps = _STEP_ORDER[:step_idx] if step_idx else []
+
+ async def invoking(
+ self,
+ messages: ChatMessage | MutableSequence[ChatMessage],
+ **kwargs,
+ ) -> Context:
+ """Called before the agent's LLM call. Injects relevant shared memories.
+
+ Only searches memories from PREVIOUS steps. Within the current step,
+ agents already see all messages via GroupChat broadcast.
+ """
+ # Skip if this is the first step (no prior memories exist)
+ if not self._prior_steps:
+ return Context()
+
+ # Extract query from the most recent messages
+ query = self._extract_query(messages)
+ if not query:
+ return Context()
+
+ try:
+ memories = await self._memory_store.search(
+ query=query,
+ top_k=self._top_k,
+ score_threshold=self._score_threshold,
+ )
+ except Exception as e:
+ logger.warning(
+ "[MEMORY] Failed to search memories for %s: %s",
+ self._agent_name,
+ e,
+ )
+ return Context()
+
+ if not memories:
+ return Context()
+
+ # Format memories into context instructions
+ formatted = self._format_memories(memories)
+ if not formatted:
+ return Context()
+
+ instructions = f"{self.DEFAULT_CONTEXT_PROMPT}\n\n{formatted}"
+
+ logger.info(
+ "[MEMORY] Injecting %d memories for %s (step=%s, %d chars)",
+ len(memories),
+ self._agent_name,
+ self._step,
+ len(instructions),
+ )
+
+ return Context(instructions=instructions)
+
+ async def invoked(
+ self,
+ request_messages: ChatMessage | Sequence[ChatMessage],
+ response_messages: ChatMessage | Sequence[ChatMessage] | None = None,
+ invoke_exception: Exception | None = None,
+ **kwargs,
+ ) -> None:
+ """Called after the agent's LLM response. Buffers the response for storage.
+
+ Instead of storing every turn (expensive), we buffer the latest response
+ and only store it when the next invocation happens or the step ends.
+ This means only the agent's last response per step gets stored,
+ which is the most complete and useful summary.
+ """
+ if invoke_exception is not None:
+ logger.debug(
+ "[MEMORY] invoked() skipped for %s — exception: %s",
+ self._agent_name,
+ invoke_exception,
+ )
+ return
+
+ if response_messages is None:
+ logger.debug(
+ "[MEMORY] invoked() skipped for %s — no response_messages",
+ self._agent_name,
+ )
+ return
+
+ # Extract text from response
+ content = self._extract_text(response_messages)
+ if not content or len(content) < MIN_CONTENT_LENGTH_TO_STORE:
+ logger.debug(
+ "[MEMORY] invoked() skipped for %s — content too short (%d chars)",
+ self._agent_name,
+ len(content) if content else 0,
+ )
+ return
+
+ logger.info(
+ "[MEMORY] invoked() buffering for %s (step=%s, %d chars)",
+ self._agent_name,
+ self._step,
+ len(content),
+ )
+
+ # Store previous buffered content before replacing
+ if self._last_content is not None:
+ await self._flush_memory()
+
+ self._last_content = content
+ self._turn_counter += 1
+
+ async def flush(self) -> None:
+ """Flush any buffered memory to the store.
+
+ Called at step completion to ensure the last agent response is stored.
+ """
+ if self._last_content is not None:
+ logger.info(
+ "[MEMORY] flush() called for %s (step=%s, buffered=%d chars)",
+ self._agent_name,
+ self._step,
+ len(self._last_content),
+ )
+ await self._flush_memory()
+ else:
+ logger.debug(
+ "[MEMORY] flush() called for %s (step=%s) — nothing buffered",
+ self._agent_name,
+ self._step,
+ )
+
+ async def _flush_memory(self) -> None:
+ """Store the buffered content into the memory store."""
+ content = self._last_content
+ self._last_content = None
+ if not content:
+ return
+
+ # Guard: skip if memory store is no longer available
+ if not getattr(self._memory_store, "_initialized", False):
+ logger.warning(
+ "[MEMORY] _flush_memory skipped for %s — memory store not initialized (initialized=%s)",
+ self._agent_name,
+ getattr(self._memory_store, "_initialized", "missing"),
+ )
+ return
+
+ try:
+ await self._memory_store.add(
+ content=content,
+ agent_name=self._agent_name,
+ step=self._step,
+ turn=self._turn_counter,
+ )
+ logger.info(
+ "[MEMORY] Stored memory from %s (step=%s, turn=%d, %d chars)",
+ self._agent_name,
+ self._step,
+ self._turn_counter,
+ len(content),
+ )
+ except Exception as e:
+ logger.warning(
+ "[MEMORY] Failed to store memory for %s: %s",
+ self._agent_name,
+ e,
+ )
+
+ def _extract_query(
+ self, messages: ChatMessage | MutableSequence[ChatMessage]
+ ) -> str:
+ """Extract a search query from the input messages.
+
+ Uses the last non-system message as the query, truncated for embedding.
+ """
+ # Single message (not a list/sequence)
+ if not isinstance(messages, (list, MutableSequence)):
+ return self._get_text(messages)[:2000]
+
+ if not messages:
+ return ""
+
+ # Search from the end for the most recent substantive message
+ for msg in reversed(messages):
+ text = self._get_text(msg)
+ if text and len(text) > 20:
+ return text[:2000]
+
+ return ""
+
+ def _format_memories(self, memories: list) -> str:
+ """Format retrieved memories into a readable context block."""
+ if not memories:
+ return ""
+
+ lines = []
+ total_chars = 0
+
+ for mem in memories:
+ # Truncate individual memories to prevent a single one from dominating
+ content = mem.content[:3000] if len(mem.content) > 3000 else mem.content
+ entry = f"- [{mem.agent_name} / {mem.step}] {content}"
+
+ if total_chars + len(entry) > MAX_MEMORY_CONTEXT_CHARS:
+ break
+
+ lines.append(entry)
+ total_chars += len(entry)
+
+ return "\n".join(lines)
+
+ @staticmethod
+ def _get_text(message: ChatMessage) -> str:
+ """Extract text content from a ChatMessage."""
+ if hasattr(message, "text") and message.text:
+ return message.text
+ if hasattr(message, "content"):
+ return str(message.content) if message.content else ""
+ return str(message) if message else ""
+
+ @staticmethod
+ def _extract_text(
+ messages: ChatMessage | Sequence[ChatMessage],
+ ) -> str:
+ """Extract text content from response message(s)."""
+ if not isinstance(messages, (list, Sequence)) or isinstance(messages, str):
+ return SharedMemoryContextProvider._get_text(messages)
+
+ parts = []
+ for msg in messages:
+ text = SharedMemoryContextProvider._get_text(msg)
+ if text:
+ parts.append(text)
+ return "\n".join(parts)
diff --git a/src/processor/src/libs/base/orchestrator_base.py b/src/processor/src/libs/base/orchestrator_base.py
index c58ebb3..46dce8c 100644
--- a/src/processor/src/libs/base/orchestrator_base.py
+++ b/src/processor/src/libs/base/orchestrator_base.py
@@ -20,6 +20,10 @@
AgentResponseStream,
OrchestrationResult,
)
+from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore
+from libs.agent_framework.shared_memory_context_provider import (
+ SharedMemoryContextProvider,
+)
from utils.agent_telemetry import TelemetryManager
from utils.console_util import format_agent_message
@@ -29,10 +33,15 @@
ResultT = TypeVar("ResultT")
+logger = logging.getLogger(__name__)
+
+
class OrchestratorBase(AgentBase, Generic[TaskParamT, ResultT]):
def __init__(self, app_context=None):
super().__init__(app_context)
self.initialized = False
+ self.memory_store: QdrantMemoryStore | None = None
+ self.step_name: str = ""
def is_console_summarization_enabled(self) -> bool:
"""Return True if console summarization (extra LLM call per turn) is enabled.
@@ -57,9 +66,46 @@ async def initialize(self, process_id: str):
| Sequence[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]]
) = await self.prepare_mcp_tools()
self.agentinfos = await self.prepare_agent_infos()
+
+ # Resolve workflow-level shared memory store from AppContext (if registered)
+ if self.app_context.is_registered(QdrantMemoryStore):
+ try:
+ self.memory_store = self.app_context.get_service(QdrantMemoryStore)
+ logger.info(
+ "[MEMORY] Resolved memory store for step=%s, initialized=%s, id=%s",
+ self.step_name,
+ getattr(self.memory_store, "_initialized", "?"),
+ id(self.memory_store),
+ )
+ except Exception:
+ self.memory_store = None
+
self.agents = await self.create_agents(self.agentinfos, process_id=process_id)
self.initialized = True
+ async def flush_agent_memories(self) -> None:
+ """Flush buffered memories from all agent context providers.
+
+ Called at step completion to ensure each agent's last response
+ is stored in the shared memory before the next step begins.
+ """
+ for agent in (self.agents or {}).values():
+ # ChatAgent stores providers in agent.context_provider (AggregateContextProvider)
+ # which has a .providers list of individual ContextProvider instances
+ agg_provider = getattr(agent, "context_provider", None)
+ if agg_provider is None:
+ continue
+ inner_providers = getattr(agg_provider, "providers", None)
+ if not inner_providers:
+ continue
+ for provider in inner_providers:
+ flush = getattr(provider, "flush", None)
+ if callable(flush):
+ try:
+ await flush()
+ except Exception as e:
+ logger.warning("[MEMORY] flush failed: %s", e)
+
def load_platform_registry(self, registry_path: str) -> list[dict[str, Any]]:
with open(registry_path, "r", encoding="utf-8") as f:
data = json.load(f)
@@ -101,11 +147,25 @@ async def create_agents(
) -> list[ChatAgent]:
agents = dict[str, ChatAgent]()
agent_client = await self.get_client(thread_id=process_id)
+
+ # Workspace context — injected into every agent's system instructions
+ # so it survives context trimming (system messages are never trimmed)
+ workspace_context = (
+ f"\n\n## WORKSPACE CONTEXT\n"
+ f"- Process ID: {process_id}\n"
+ f"- Container: processes\n"
+ f"- Source folder: {process_id}/source\n"
+ f"- Output folder: {process_id}/converted\n"
+ )
+
for agent_info in agent_infos:
+ # Append workspace context to every agent's instruction
+ instruction = agent_info.agent_instruction + workspace_context
+
builder = (
AgentBuilder(agent_client)
.with_name(agent_info.agent_name)
- .with_instructions(agent_info.agent_instruction)
+ .with_instructions(instruction)
)
# Only attach tools when provided. (Coordinator should typically have none.)
@@ -134,6 +194,20 @@ async def create_agents(
.with_max_tokens(12_000)
.with_tool_choice("none")
)
+
+ # Attach shared memory context provider to expert agents
+ # (not Coordinator, not ResultGenerator — they don't need memory)
+ if (
+ self.memory_store is not None
+ and agent_info.agent_name not in ("Coordinator", "ResultGenerator")
+ ):
+ memory_provider = SharedMemoryContextProvider(
+ memory_store=self.memory_store,
+ agent_name=agent_info.agent_name,
+ step=self.step_name,
+ )
+ builder = builder.with_context_providers(memory_provider)
+
agent = builder.build()
agents[agent_info.agent_name] = agent
@@ -160,7 +234,7 @@ async def get_client(self, thread_id: str = None):
).api_version,
thread_id=thread_id,
retry_config=RateLimitRetryConfig(
- max_retries=5, base_delay_seconds=3.0, max_delay_seconds=60.0
+ max_retries=8, base_delay_seconds=5.0, max_delay_seconds=120.0
),
)
self._client_cache[thread_id] = client
@@ -241,8 +315,11 @@ async def on_agent_response(self, response: AgentResponse):
summarized_response = await summarizer_agent.run(
f"speak as {response.agent_name} : {coordinator_response.instruction} to {coordinator_response.selected_participant}"
)
- print(
- f"{response.agent_name}: {summarized_response.text} ({response.elapsed_time:.2f}s)\n\n"
+ logger.info(
+ "%s: %s (%.2fs)",
+ response.agent_name,
+ summarized_response.text,
+ response.elapsed_time,
)
await telemetry.update_agent_activity(
process_id=self.task_param.process_id,
@@ -253,18 +330,15 @@ async def on_agent_response(self, response: AgentResponse):
)
except Exception as e:
logging.error(f"Error in summarization: {e}")
- print(f"{response.agent_name}: {response.message}\n\n")
+ logger.info("%s: %s", response.agent_name, response.message)
else:
- # print(
- # f"{response.agent_name}: {coordinator_response.selected_participant} ← {coordinator_response.instruction} ({response.elapsed_time:.2f}s)\n\n"
- # )
- # use format_agent_message
- print(
+ logger.info(
+ "%s",
format_agent_message(
name=response.agent_name,
content=f"{response.agent_name}: {coordinator_response.selected_participant} ← {coordinator_response.instruction}",
timestamp=f"{response.elapsed_time:.2f}s",
- )
+ ),
)
await telemetry.update_agent_activity(
@@ -279,7 +353,7 @@ async def on_agent_response(self, response: AgentResponse):
# something wrong with deserialization, ignore
pass
elif response.agent_name == "ResultGenerator":
- print("Step results has been generated")
+ logger.info("Step results has been generated")
else:
# print(f"{response.agent_name}: {response.message} ({response.elapsed_time:.2f}s)\n\n")
if self.is_console_summarization_enabled():
@@ -288,8 +362,11 @@ async def on_agent_response(self, response: AgentResponse):
summarized_response = await summarizer_agent.run(
f"speak as {response.agent_name} : {response.message}"
)
- print(
- f"{response.agent_name}: {summarized_response.text} ({response.elapsed_time:.2f}s)\n\n"
+ logger.info(
+ "%s: %s (%.2fs)",
+ response.agent_name,
+ summarized_response.text,
+ response.elapsed_time,
)
await telemetry.update_agent_activity(
@@ -301,17 +378,15 @@ async def on_agent_response(self, response: AgentResponse):
except Exception as e:
logging.error(f"Error in summarization: {e}")
- print(f"{response.agent_name}: {response.message}\n\n")
+ logger.info("%s: %s", response.agent_name, response.message)
else:
- # print(
- # f"{response.agent_name}: {response.message} ({response.elapsed_time:.2f}s)\n\n"
- # )
- print(
+ logger.info(
+ "%s",
format_agent_message(
name=response.agent_name,
content=f"{response.agent_name}: {response.message}",
timestamp=f"{response.elapsed_time:.2f}s",
- )
+ ),
)
await telemetry.update_agent_activity(
@@ -329,12 +404,13 @@ async def on_agent_response_stream(self, response: AgentResponseStream):
if response.response_type == "message":
# GroupChatOrchestrator emits this when an agent starts streaming a new message.
# print(f"{response.agent_name} is thinking...\n")
- print(
+ logger.info(
+ "%s",
format_agent_message(
name=response.agent_name,
content=f"{response.agent_name} is thinking...",
timestamp="",
- )
+ ),
)
await telemetry.update_agent_activity(
@@ -361,12 +437,13 @@ async def on_agent_response_stream(self, response: AgentResponseStream):
preview_suffix = f"({args_preview})" if args_preview else "()"
# print(f"{response.agent_name} is invoking {tool_name}{preview_suffix}...\n")
- print(
+ logger.info(
+ "%s",
format_agent_message(
name=response.agent_name,
content=f"{response.agent_name} is invoking {tool_name}{preview_suffix}...",
timestamp="",
- )
+ ),
)
await telemetry.update_agent_activity(
diff --git a/src/processor/src/libs/mcp_server/blob_io_operation/mcp_blob_io_operation.py b/src/processor/src/libs/mcp_server/blob_io_operation/mcp_blob_io_operation.py
index c8093a8..bb20d86 100644
--- a/src/processor/src/libs/mcp_server/blob_io_operation/mcp_blob_io_operation.py
+++ b/src/processor/src/libs/mcp_server/blob_io_operation/mcp_blob_io_operation.py
@@ -368,7 +368,12 @@ def list_blobs_in_container(
container_client = client.get_container_client(container_name)
# Set up name prefix for folder filtering
- name_starts_with = folder_path if folder_path else None
+ # Ensure prefix ends with / so relative_path computation is correct
+ # (without trailing /, relative_path starts with "/" and gets wrongly
+ # excluded by the non-recursive subfolder check)
+ name_starts_with = None
+ if folder_path:
+ name_starts_with = folder_path if folder_path.endswith("/") else folder_path + "/"
try:
blobs = container_client.list_blobs(name_starts_with=name_starts_with)
@@ -382,8 +387,8 @@ def list_blobs_in_container(
continue
# Skip if not recursive and blob is in a subfolder
- if not recursive and folder_path:
- relative_path = blob.name[len(folder_path) :]
+ if not recursive and name_starts_with:
+ relative_path = blob.name[len(name_starts_with) :]
if "/" in relative_path:
continue
elif not recursive and not folder_path:
diff --git a/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py b/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py
index fde7dc7..7652ca7 100644
--- a/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py
+++ b/src/processor/src/libs/mcp_server/mermaid/mcp_mermaid.py
@@ -373,7 +373,7 @@ def _mermaid_render_check(code: str, timeout: int = 10) -> tuple[bool, str]:
# Extract just the error message
lines = stderr.split("\n")
error_line = next(
- (l for l in lines if "Error" in l or "error" in l), lines[0]
+ (line for line in lines if "Error" in line or "error" in line), lines[0]
)
return False, error_line[:200]
return True, ""
diff --git a/src/processor/src/main.py b/src/processor/src/main.py
index c1748c2..79531ff 100644
--- a/src/processor/src/main.py
+++ b/src/processor/src/main.py
@@ -8,6 +8,7 @@
"""
import asyncio
+import logging
import os
from libs.agent_framework.agent_framework_helper import AgentFrameworkHelper
@@ -22,6 +23,8 @@
from steps.migration_processor import MigrationProcessor
from utils.agent_telemetry import TelemetryManager
+logger = logging.getLogger(__name__)
+
class Application(ApplicationBase):
"""
@@ -37,8 +40,8 @@ def initialize(self):
Initialize the application.
This method can be overridden by subclasses to perform any necessary setup.
"""
- print(
- "Application initialized with configuration:",
+ logger.info(
+ "Application initialized with configuration: %s",
self.application_context.configuration,
)
@@ -88,10 +91,8 @@ def register_services(self):
),
)
except Exception as e:
- # Keep it as a print to match the current style of this entrypoint.
- print(
- "[WARN] Cosmos checkpoint storage disabled due to import/config error:",
- e,
+ logger.warning(
+ "Cosmos checkpoint storage disabled due to import/config error: %s", e
)
async def run(self):
diff --git a/src/processor/src/main_service.py b/src/processor/src/main_service.py
index 1bbf8d8..ba79d56 100644
--- a/src/processor/src/main_service.py
+++ b/src/processor/src/main_service.py
@@ -90,7 +90,7 @@ def _configure_logging(self):
configure_application_logging(debug_mode=self.debug_mode)
if self.debug_mode:
- print("🐛 Debug logging enabled - level set to DEBUG")
+ logger.debug("Debug logging enabled - level set to DEBUG")
logger.debug("🔇 Verbose third-party logging suppressed to reduce noise")
def initialize(self):
@@ -101,8 +101,8 @@ def initialize(self):
(agent framework helpers, telemetry, process control, and the migration
processor).
"""
- print(
- "Application initialized with configuration:",
+ logger.info(
+ "Application initialized with configuration: %s",
self.application_context.configuration,
)
self.register_services()
@@ -166,10 +166,8 @@ def register_services(self):
),
)
except Exception as e:
- # Keep it as a print to match the current style of this entrypoint.
- print(
- "[WARN] Cosmos checkpoint storage disabled due to import/config error:",
- e,
+ logger.warning(
+ "Cosmos checkpoint storage disabled due to import/config error: %s", e
)
# Only log initialization if debug mode is explicitly enabled
if self.debug_mode:
@@ -272,18 +270,18 @@ def _build_service_config(
# Debug print to see what we're getting (only if debug mode is enabled)
if self.debug_mode:
- print("DEBUG - Environment variables:")
- print(
- f" VISIBILITY_TIMEOUT_MINUTES: {visibility_timeout} (type: {type(visibility_timeout)})"
+ logger.debug("DEBUG - Environment variables:")
+ logger.debug(
+ " VISIBILITY_TIMEOUT_MINUTES: %s (type: %s)", visibility_timeout, type(visibility_timeout)
)
- print(
- f" POLL_INTERVAL_SECONDS: {poll_interval} (type: {type(poll_interval)})"
+ logger.debug(
+ " POLL_INTERVAL_SECONDS: %s (type: %s)", poll_interval, type(poll_interval)
)
- print(
- f" MESSAGE_TIMEOUT_MINUTES: {message_timeout} (type: {type(message_timeout)})"
+ logger.debug(
+ " MESSAGE_TIMEOUT_MINUTES: %s (type: %s)", message_timeout, type(message_timeout)
)
- print(
- f" CONCURRENT_WORKERS: {concurrent_workers} (type: {type(concurrent_workers)})"
+ logger.debug(
+ " CONCURRENT_WORKERS: %s (type: %s)", concurrent_workers, type(concurrent_workers)
)
config = QueueServiceConfig(
diff --git a/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py b/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py
index 40aeba0..93f8f2f 100644
--- a/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py
+++ b/src/processor/src/steps/analysis/orchestration/analysis_orchestrator.py
@@ -92,6 +92,7 @@ async def execute(
on_agent_response_stream=self.on_agent_response_stream,
on_workflow_complete=self.on_orchestration_complete,
)
+ await self.flush_agent_memories()
return orchestration_result
async def prepare_mcp_tools(
@@ -209,9 +210,9 @@ async def prepare_agent_infos(self) -> list[AgentInfo]:
# Render coordinator prompt with the current participant list.
participant_names = [ai.agent_name for ai in agent_infos]
- valid_participants_block = "\n".join(
- [f'- "{name}"' for name in participant_names]
- )
+ valid_participants_block = "\n".join([
+ f'- "{name}"' for name in participant_names
+ ])
coordinator_agent_info.render(
**self.task_param.model_dump(),
current_timestamp=get_current_timestamp_utc(),
@@ -223,34 +224,9 @@ async def prepare_agent_infos(self) -> list[AgentInfo]:
agent_infos.append(coordinator_agent_info)
# ResultGenerator: serializes the completed conversation into the output schema.
- result_generator_instruction = """
- You are a Result Generator.
-
- ROLE & RESPONSIBILITY (do not exceed scope):
- - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
- - The step outcome has already happened via stakeholder discussion and coordinator termination.
- - Your only job is to serialize the final outcome into the required schema exactly.
-
- STRICT JSON RULES:
- - Output MUST be valid JSON only (no markdown, no prose).
- - Do NOT call tools.
- - Do NOT verify file existence.
- - Do NOT invent termination codes or blockers.
-
- HARD-TERMINATION SERIALIZATION RULE (IMPORTANT):
- - Set `is_hard_terminated=true` ONLY if a participant explicitly provided a hard-termination decision with a termination code
- from this exact set: NO_YAML_FILES, NO_KUBERNETES_CONTENT, ALL_CORRUPTED, SECURITY_POLICY_VIOLATION, RAI_POLICY_VIOLATION, PROFANITY_DETECTED, MIXED_PLATFORM_DETECTED.
- - If hard-terminated, `blocking_issues` must be a list of those exact codes ONLY (no extra explanation text inside the list).
-
- EVIDENCE PRESERVATION (when hard-terminated):
- - The `reason` MUST include a short **Evidence** section listing which file(s) triggered the termination and what was detected.
- - NEVER include secret values (tokens/passwords/private keys/base64 blobs). For Secrets, include only key names + resource metadata.
-
- WHAT TO DO:
- 1) Review the conversation (excluding the Coordinator).
- 2) Extract the final agreed facts and any explicit PASS/FAIL sign-offs exactly as stated.
- 3) Emit JSON that conforms exactly to `Analysis_ExtendedBooleanResult`.
- """
+ result_generator_instruction = self.read_prompt_file(
+ registry_dir / "prompt_resultgenerator.txt"
+ )
result_generator_info = AgentInfo(
agent_name="ResultGenerator",
agent_instruction=result_generator_instruction,
diff --git a/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt
new file mode 100644
index 0000000..a82f201
--- /dev/null
+++ b/src/processor/src/steps/analysis/orchestration/prompt_resultgenerator.txt
@@ -0,0 +1,26 @@
+You are a Result Generator.
+
+ROLE & RESPONSIBILITY (do not exceed scope):
+- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
+- The step outcome has already happened via stakeholder discussion and coordinator termination.
+- Your only job is to serialize the final outcome into the required schema exactly.
+
+STRICT JSON RULES:
+- Output MUST be valid JSON only (no markdown, no prose).
+- Do NOT call tools.
+- Do NOT verify file existence.
+- Do NOT invent termination codes or blockers.
+
+HARD-TERMINATION SERIALIZATION RULE (IMPORTANT):
+- Set `is_hard_terminated=true` ONLY if a participant explicitly provided a hard-termination decision with a termination code
+ from this exact set: NO_YAML_FILES, NO_KUBERNETES_CONTENT, ALL_CORRUPTED, SECURITY_POLICY_VIOLATION, RAI_POLICY_VIOLATION, PROFANITY_DETECTED, MIXED_PLATFORM_DETECTED.
+- If hard-terminated, `blocking_issues` must be a list of those exact codes ONLY (no extra explanation text inside the list).
+
+ EVIDENCE PRESERVATION (when hard-terminated):
+ - The `reason` MUST include a short **Evidence** section listing which file(s) triggered the termination and what was detected.
+ - NEVER include secret values (tokens/passwords/private keys/base64 blobs). For Secrets, include only key names + resource metadata.
+
+WHAT TO DO:
+1) Review the conversation (excluding the Coordinator).
+2) Extract the final agreed facts and any explicit PASS/FAIL sign-offs exactly as stated.
+3) Emit JSON that conforms exactly to `Analysis_ExtendedBooleanResult`.
diff --git a/src/processor/src/steps/analysis/workflow/analysis_executor.py b/src/processor/src/steps/analysis/workflow/analysis_executor.py
index 35bd355..924a928 100644
--- a/src/processor/src/steps/analysis/workflow/analysis_executor.py
+++ b/src/processor/src/steps/analysis/workflow/analysis_executor.py
@@ -7,8 +7,9 @@
records step lifecycle events via `TelemetryManager`.
"""
+import logging
+
from agent_framework import Executor, WorkflowContext, handler
-from art import text2art
from libs.application.application_context import AppContext
from utils.agent_telemetry import TelemetryManager
@@ -17,6 +18,8 @@
from ..models.step_param import Analysis_TaskParam
from ..orchestration.analysis_orchestrator import AnalysisOrchestrator
+logger = logging.getLogger(__name__)
+
class AnalysisExecutor(Executor):
"""Workflow executor that runs the analysis orchestrator."""
@@ -45,15 +48,15 @@ async def handle_execute(
# Start to logging the process
# Due to the bug, first Executor's ExecutorInvokedEvent is not fired so I had to put it here
#########################################################################################################
- print("Executor invoked (analysis)")
+ logger.info("Executor invoked (analysis)")
telemetry: TelemetryManager = await self.app_context.get_service_async(
TelemetryManager
)
await telemetry.transition_to_phase(
- process_id=message.process_id, step="analysis", phase="Analysis"
+ process_id=message.process_id, step="analysis", phase="Initializing Analysis"
)
- print(text2art("Analysis"))
+ logger.info("Starting Analysis step")
#######################################################################################################
result = await analysis_orchestrator.execute(task_param=message)
diff --git a/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt b/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt
index 8378023..4a510a1 100644
--- a/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt
+++ b/src/processor/src/steps/convert/orchestration/prompt_coordinator.txt
@@ -67,7 +67,9 @@ STATE-AWARE ROUTING:
- Chief Architect MUST merge/de-duplicate blockers into one prioritized fix plan
5) Else if there are any Open blockers → select YAML Expert
- YAML Expert MUST apply merged fix plan in one pass, re-save YAMLs, update file_converting_result.md
-6) Else select Chief Architect to finalize
+6) Else if YAML Expert's sign-off is PENDING → select YAML Expert
+ - Instruction: "All reviewers have provided PASS. Update your own SIGN-OFF from PENDING to PASS in file_converting_result.md and populate the ## References section with validated Microsoft Learn URLs."
+7) Else select Chief Architect to finalize
PHASE 3: CONFLICT RESOLUTION (Chief Architect, if any FAIL)
- Chief Architect reconciles conflicts and de-duplicates blockers
diff --git a/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt
new file mode 100644
index 0000000..8393f43
--- /dev/null
+++ b/src/processor/src/steps/convert/orchestration/prompt_resultgenerator.txt
@@ -0,0 +1,19 @@
+You are a Result Generator.
+
+ROLE & RESPONSIBILITY (do not exceed scope):
+- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
+- The step outcome has already happened via stakeholder discussion and coordinator termination.
+- Your only job is to serialize the final outcome into the required schema exactly.
+
+RULES:
+- Output MUST be valid JSON only.
+- Do NOT call tools.
+- Do NOT verify file existence.
+- Do NOT invent missing files, blockers, or metrics.
+- Only summarize what participants explicitly stated.
+- Keep `reason` short (one sentence).
+
+WHAT TO DO:
+1) Review the conversation (excluding the Coordinator).
+2) Extract conversion results (converted files, concerns, metrics, and report path) as stated.
+3) Emit JSON that conforms exactly to `Yaml_ExtendedBooleanResult`.
diff --git a/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py b/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py
index 6ecf5b2..f1fe8b4 100644
--- a/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py
+++ b/src/processor/src/steps/convert/orchestration/yaml_convert_orchestrator.py
@@ -7,6 +7,7 @@
to produce a structured `Yaml_ExtendedBooleanResult`.
"""
+import logging
import os
import re
from pathlib import Path
@@ -30,6 +31,8 @@
from utils.datetime_util import get_current_timestamp_utc
from utils.prompt_util import TemplateUtility
+logger = logging.getLogger(__name__)
+
class YamlConvertOrchestrator(
OrchestratorBase[Design_ExtendedBooleanResult, Yaml_ExtendedBooleanResult]
@@ -98,6 +101,7 @@ async def execute(
on_workflow_complete=self.on_orchestration_complete,
on_agent_response_stream=self.on_agent_response_stream,
)
+ await self.flush_agent_memories()
return orchestration_result
async def prepare_mcp_tools(
@@ -212,27 +216,9 @@ async def prepare_agent_infos(self) -> list[Any]:
agent_infos.append(coordinator_agent_info)
# Result generator
- result_generator_instruction = """
- You are a Result Generator.
-
- ROLE & RESPONSIBILITY (do not exceed scope):
- - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
- - The step outcome has already happened via stakeholder discussion and coordinator termination.
- - Your only job is to serialize the final outcome into the required schema exactly.
-
- RULES:
- - Output MUST be valid JSON only.
- - Do NOT call tools.
- - Do NOT verify file existence.
- - Do NOT invent missing files, blockers, or metrics.
- - Only summarize what participants explicitly stated.
- - Keep `reason` short (one sentence).
-
- WHAT TO DO:
- 1) Review the conversation (excluding the Coordinator).
- 2) Extract conversion results (converted files, concerns, metrics, and report path) as stated.
- 3) Emit JSON that conforms exactly to `Yaml_ExtendedBooleanResult`.
-"""
+ result_generator_instruction = self.read_prompt_file(
+ str(Path(__file__).parent / "prompt_resultgenerator.txt")
+ )
result_generator_info = AgentInfo(
agent_name="ResultGenerator",
agent_instruction=result_generator_instruction,
@@ -250,11 +236,10 @@ async def on_orchestration_complete(
self, result: OrchestrationResult[Yaml_ExtendedBooleanResult]
):
"""Handle orchestration completion (console summary)."""
- print("*" * 40)
- print("Yaml Convert Orchestration complete.")
- print(f"Elapsed: {result.execution_time_seconds:.2f}s")
- print(f"Final Result: {result}")
- print("*" * 40)
+ logger.info(
+ "Yaml Convert Orchestration complete. Elapsed: %.2fs",
+ result.execution_time_seconds,
+ )
async def on_agent_response_stream(self, response):
"""Forward streaming agent output to base hooks."""
diff --git a/src/processor/src/steps/convert/workflow/yaml_convert_executor.py b/src/processor/src/steps/convert/workflow/yaml_convert_executor.py
index ae5b744..7a9e283 100644
--- a/src/processor/src/steps/convert/workflow/yaml_convert_executor.py
+++ b/src/processor/src/steps/convert/workflow/yaml_convert_executor.py
@@ -34,7 +34,7 @@ async def handle_execute(
TelemetryManager
)
await telemetry.transition_to_phase(
- process_id=message.process_id, step="yaml_conversion", phase="YAML"
+ process_id=message.process_id, step="yaml", phase="YAML"
)
result = await yaml_convert_orchestrator.execute(task_param=message)
diff --git a/src/processor/src/steps/design/orchestration/design_orchestrator.py b/src/processor/src/steps/design/orchestration/design_orchestrator.py
index 379b0a1..d2dd47f 100644
--- a/src/processor/src/steps/design/orchestration/design_orchestrator.py
+++ b/src/processor/src/steps/design/orchestration/design_orchestrator.py
@@ -92,6 +92,7 @@ async def execute(
on_workflow_complete=self.on_orchestration_complete,
on_agent_response_stream=self.on_agent_response_stream,
)
+ await self.flush_agent_memories()
return orchestration_result
async def prepare_mcp_tools(
@@ -225,27 +226,9 @@ async def prepare_agent_infos(self) -> list[Any]:
agent_infos.append(coordinator_agent_info)
# ResultGenerator: Generates structured Design_ExtendedBooleanResult AFTER GroupChat completes
- result_generator_instruction = """
- You are a Result Generator.
-
- ROLE & RESPONSIBILITY (do not exceed scope):
- - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
- - The step outcome has already happened via stakeholder discussion and coordinator termination.
- - Your only job is to serialize the final outcome into the required schema exactly.
-
- RULES:
- - Output MUST be valid JSON only (no markdown, no prose).
- - Do NOT call tools.
- - Do NOT verify file existence.
- - Do NOT add new requirements.
- - Only summarize what participants explicitly said/did.
- - Keep `reason` short (one sentence).
-
- WHAT TO DO:
- 1) Review the conversation (excluding the Coordinator).
- 2) Extract the final, agreed design summary, key decisions, and the expected output artifact paths.
- 3) Emit JSON that conforms exactly to `Design_ExtendedBooleanResult`.
-"""
+ result_generator_instruction = self.read_prompt_file(
+ str(Path(__file__).parent / "prompt_resultgenerator.txt")
+ )
result_generator_info = AgentInfo(
agent_name="ResultGenerator",
agent_instruction=result_generator_instruction,
diff --git a/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt
new file mode 100644
index 0000000..62481fb
--- /dev/null
+++ b/src/processor/src/steps/design/orchestration/prompt_resultgenerator.txt
@@ -0,0 +1,19 @@
+You are a Result Generator.
+
+ROLE & RESPONSIBILITY (do not exceed scope):
+- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
+- The step outcome has already happened via stakeholder discussion and coordinator termination.
+- Your only job is to serialize the final outcome into the required schema exactly.
+
+RULES:
+- Output MUST be valid JSON only (no markdown, no prose).
+- Do NOT call tools.
+- Do NOT verify file existence.
+- Do NOT add new requirements.
+- Only summarize what participants explicitly said/did.
+- Keep `reason` short (one sentence).
+
+WHAT TO DO:
+1) Review the conversation (excluding the Coordinator).
+2) Extract the final, agreed design summary, key decisions, and the expected output artifact paths.
+3) Emit JSON that conforms exactly to `Design_ExtendedBooleanResult`.
diff --git a/src/processor/src/steps/documentation/agents/prompt_technical_writer.txt b/src/processor/src/steps/documentation/agents/prompt_technical_writer.txt
index 76d812b..d897956 100644
--- a/src/processor/src/steps/documentation/agents/prompt_technical_writer.txt
+++ b/src/processor/src/steps/documentation/agents/prompt_technical_writer.txt
@@ -259,7 +259,8 @@ Replace `[CURRENT_TIMESTAMP]` with the actual current date and time in this form
```
## CITATIONS (GENERAL RULE)
-- If you reference Microsoft documentation or Azure service behavior in the report body, include inline citations where appropriate.
+- If you reference Microsoft documentation or Azure service behavior in the report body, use inline hyperlinks (e.g., `[AKS overview](https://learn.microsoft.com/...)`).
+- Do NOT use Markdown footnote syntax (`[^1]`, `[^2]`, etc.) or generate a "Footnotes" section. Footnotes clutter the report and duplicate the References section.
- Maintain the `## References` section as the authoritative list of all external documentation used.
## URL VALIDATION FOR NON-AZURE REFERENCES
diff --git a/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py b/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py
index 5470d13..0aa6c44 100644
--- a/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py
+++ b/src/processor/src/steps/documentation/orchestration/documentation_orchestrator.py
@@ -10,6 +10,7 @@
from __future__ import annotations
+import logging
import os
from pathlib import Path
from typing import Any, Callable, MutableMapping, Sequence
@@ -35,6 +36,8 @@
from utils.datetime_util import get_current_timestamp_utc
from utils.prompt_util import TemplateUtility
+logger = logging.getLogger(__name__)
+
class DocumentationOrchestrator(
OrchestratorBase[Yaml_ExtendedBooleanResult, Documentation_ExtendedBooleanResult]
@@ -103,6 +106,7 @@ async def execute(
on_workflow_complete=self.on_orchestration_complete,
on_agent_response_stream=self.on_agent_response_stream,
)
+ await self.flush_agent_memories()
return orchestration_result
async def prepare_mcp_tools(
@@ -250,27 +254,9 @@ async def prepare_agent_infos(self) -> list[Any]:
)
agent_infos.append(coordinator_info)
- result_generator_instruction = """
- You are a Result Generator.
-
- ROLE & RESPONSIBILITY (do not exceed scope):
- - You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
- - The step outcome has already happened via stakeholder discussion and coordinator termination.
- - Your only job is to serialize the final outcome into the required schema exactly.
-
- RULES:
- - Output MUST be valid JSON only.
- - Do NOT call tools.
- - Do NOT verify file existence.
- - Do NOT invent metrics, blockers, or sign-offs.
- - Only summarize what participants explicitly stated.
- - Keep `reason` short (one sentence).
-
- WHAT TO DO:
- 1) Review the conversation (excluding the Coordinator).
- 2) Extract roll-up metrics, expert collaboration/consensus notes, and generated file references as stated.
- 3) Emit JSON that conforms exactly to `Documentation_ExtendedBooleanResult`.
- """
+ result_generator_instruction = self.read_prompt_file(
+ str(Path(__file__).parent / "prompt_resultgenerator.txt")
+ )
result_generator_info = AgentInfo(
agent_name="ResultGenerator",
agent_instruction=result_generator_instruction,
@@ -288,9 +274,10 @@ async def on_orchestration_complete(
self, result: OrchestrationResult[Documentation_ExtendedBooleanResult]
):
"""Handle orchestration completion (console summary)."""
- print("Orchestration complete.")
- print(f"Elapsed: {result.execution_time_seconds:.2f}s")
- print(f"Final Result: {result}")
+ logger.info(
+ "Documentation Orchestration complete. Elapsed: %.2fs",
+ result.execution_time_seconds,
+ )
async def on_agent_response_stream(self, response):
"""Forward streaming agent output to base hooks."""
diff --git a/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt b/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt
new file mode 100644
index 0000000..11df6bf
--- /dev/null
+++ b/src/processor/src/steps/documentation/orchestration/prompt_resultgenerator.txt
@@ -0,0 +1,19 @@
+You are a Result Generator.
+
+ROLE & RESPONSIBILITY (do not exceed scope):
+- You do NOT decide whether the step succeeded/failed and you do NOT introduce new blockers.
+- The step outcome has already happened via stakeholder discussion and coordinator termination.
+- Your only job is to serialize the final outcome into the required schema exactly.
+
+RULES:
+- Output MUST be valid JSON only.
+- Do NOT call tools.
+- Do NOT verify file existence.
+- Do NOT invent metrics, blockers, or sign-offs.
+- Only summarize what participants explicitly stated.
+- Keep `reason` short (one sentence).
+
+WHAT TO DO:
+1) Review the conversation (excluding the Coordinator).
+2) Extract roll-up metrics, expert collaboration/consensus notes, and generated file references as stated.
+3) Emit JSON that conforms exactly to `Documentation_ExtendedBooleanResult`.
diff --git a/src/processor/src/steps/migration_processor.py b/src/processor/src/steps/migration_processor.py
index c7bf859..66d232c 100644
--- a/src/processor/src/steps/migration_processor.py
+++ b/src/processor/src/steps/migration_processor.py
@@ -26,6 +26,8 @@
"""
import json
+import logging
+import os
import time
from datetime import datetime
from typing import Any
@@ -40,9 +42,12 @@
WorkflowOutputEvent,
WorkflowStartedEvent,
)
-from art import text2art
+from openai import AsyncAzureOpenAI
+
+from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore
from libs.application.application_context import AppContext
+from libs.base.orchestrator_base import OrchestratorBase
from libs.reporting import (
MigrationReportCollector,
MigrationReportGenerator,
@@ -50,6 +55,7 @@
)
from libs.reporting.models.failure_context import FailureType
from utils.agent_telemetry import TelemetryManager
+from utils.credential_util import get_bearer_token_provider
from .analysis.models.step_param import Analysis_TaskParam
from .analysis.workflow.analysis_executor import AnalysisExecutor
@@ -57,6 +63,8 @@
from .design.workflow.design_executor import DesignExecutor
from .documentation.workflow.documentation_executor import DocumentationExecutor
+logger = logging.getLogger(__name__)
+
class WorkflowExecutorFailedException(Exception):
"""Raised when an executor fails, preserving WorkflowErrorDetails payload."""
@@ -187,6 +195,62 @@ def _init_workflow(self) -> Workflow:
return workflow
+ async def _create_memory_store(
+ self, process_id: str
+ ) -> QdrantMemoryStore | None:
+ """Create a workflow-scoped shared memory store.
+
+ The memory store lives for the entire workflow (analysis → design → convert
+ → documentation) so memories from earlier steps are available to later steps.
+ Returns None if disabled or misconfigured (workflow proceeds without memory).
+ """
+ enabled = os.getenv("SHARED_MEMORY_ENABLED", "true").strip().lower()
+ if enabled not in ("1", "true", "yes", "on"):
+ logger.info("[MEMORY] Shared memory disabled via SHARED_MEMORY_ENABLED")
+ return None
+
+ try:
+ from libs.agent_framework.agent_framework_helper import AgentFrameworkHelper
+
+ helper: AgentFrameworkHelper = self.app_context.get_service(
+ AgentFrameworkHelper
+ )
+ service_config = helper.settings.get_service_config("default")
+ if not service_config:
+ logger.warning("[MEMORY] No default service config — skipping memory")
+ return None
+
+ embedding_deployment = service_config.embedding_deployment_name
+ if not embedding_deployment:
+ logger.warning(
+ "[MEMORY] No embedding deployment configured — skipping memory. "
+ "Set AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME to enable."
+ )
+ return None
+
+ token_provider = get_bearer_token_provider()
+ embedding_client = AsyncAzureOpenAI(
+ azure_endpoint=service_config.endpoint,
+ azure_ad_token_provider=token_provider,
+ api_version=service_config.api_version,
+ )
+
+ store = QdrantMemoryStore(process_id=process_id)
+ await store.initialize(
+ embedding_client=embedding_client,
+ embedding_deployment=embedding_deployment,
+ )
+ logger.info(
+ "[MEMORY] Workflow-level shared memory store initialized (process=%s)",
+ process_id,
+ )
+ return store
+ except Exception as e:
+ logger.warning(
+ "[MEMORY] Failed to create memory store: %s — continuing without", e
+ )
+ return None
+
async def run(self, input_data: Analysis_TaskParam) -> Any:
"""Run the migration workflow.
@@ -224,6 +288,11 @@ async def run(self, input_data: Analysis_TaskParam) -> Any:
report_generator = MigrationReportGenerator(report_collector)
step_start_perf: dict[str, float] = {}
+ # Initialize shared memory store at workflow level (shared across all 4 steps)
+ memory_store = await self._create_memory_store(input_data.process_id)
+ if memory_store is not None:
+ self.app_context.add_singleton(QdrantMemoryStore, memory_store)
+
try:
telemetry: TelemetryManager = await self.app_context.get_service_async(
TelemetryManager
@@ -296,7 +365,7 @@ async def _generate_report_summary(
async for event in self.workflow.run_stream(input_data):
if isinstance(event, WorkflowStartedEvent):
- print(f"Workflow started ({event.origin.value})")
+ logger.info("Workflow started (%s)", event.origin.value)
report_collector.set_current_step("analysis", step_phase="start")
step_start_perf["analysis"] = time.perf_counter()
@@ -457,7 +526,7 @@ async def _generate_report_summary(
return event.data
# Normal completion
- print(f"Workflow output ({event.origin.value}): {event.data}")
+ logger.info("Workflow output (%s): %s", event.origin.value, event.data)
await telemetry.record_step_result(
process_id=input_data.process_id,
step_name=event.source_executor_id,
@@ -503,10 +572,13 @@ async def _generate_report_summary(
pass
# will handle in WorkflowFailedEvent
elif isinstance(event, WorkflowFailedEvent):
- print(
- f"Executor failed ({event.origin.value}): "
- f"{event.details.executor_id} [{event.details.error_type}]: {event.details.message}"
- f" (traceback: {event.details.traceback})"
+ logger.error(
+ "Executor failed (%s): %s [%s]: %s (traceback: %s)",
+ event.origin.value,
+ event.details.executor_id,
+ event.details.error_type,
+ event.details.message,
+ event.details.traceback,
)
report_collector.set_current_step(event.details.executor_id)
@@ -574,21 +646,21 @@ async def _generate_report_summary(
telemetry: TelemetryManager = (
await self.app_context.get_service_async(TelemetryManager)
)
- # Map executor IDs to human-readable phase names
- phase_names = {
+ # Map executor IDs to human-readable step names
+ step_display_names = {
"design": "Design",
- "yaml_conversion": "YAML",
+ "yaml": "YAML",
"documentation": "Documentation",
}
+ step_display = step_display_names.get(
+ event.executor_id, event.executor_id.capitalize()
+ )
await telemetry.transition_to_phase(
process_id=event.data.process_id,
step=event.executor_id,
- phase=phase_names.get(
- event.executor_id, event.executor_id.capitalize()
- ),
+ phase=f"Initializing {step_display}",
)
- print(f"Executor invoked ({event.executor_id})")
- print(text2art(event.executor_id.capitalize()))
+ logger.info("Executor invoked (%s)", event.executor_id)
report_collector.set_current_step(
event.executor_id, step_phase="start"
@@ -602,6 +674,18 @@ async def _generate_report_summary(
elif isinstance(event, ExecutorCompletedEvent):
# print(f"Executor completed ({event.executor_id}): {event.data}")
+ # Log shared memory stats after each step
+ if memory_store is not None:
+ try:
+ mem_count = await memory_store.get_count()
+ logger.info(
+ "[MEMORY] Step '%s' completed — %d total memories in store",
+ event.executor_id,
+ mem_count,
+ )
+ except Exception:
+ pass
+
# step name -> executor_id
# output result -> event.data => if event.data is not None
if event.data:
@@ -626,10 +710,29 @@ async def _generate_report_summary(
# print(f"{event.__class__.__name__} ({event.origin.value}): {event}")
pass
finally:
+ # Clean up shared memory store
+ if memory_store is not None:
+ try:
+ count = await memory_store.get_count()
+ logger.info(
+ "[MEMORY] Workflow complete — closing memory store (%d memories)",
+ count,
+ )
+ await memory_store.close()
+ except Exception as e:
+ logger.warning("[MEMORY] Error closing memory store: %s", e)
+
+ # Clear cached Azure OpenAI clients for this process to free
+ # connections and prevent stale state from leaking into future runs.
+ OrchestratorBase._client_cache.clear()
+
elapsed_seconds = time.perf_counter() - start_perf
end_dt = datetime.now()
elapsed_mins, elapsed_secs = divmod(int(elapsed_seconds), 60)
- print(
- f"Workflow elapsed time: {elapsed_mins:d} mins {elapsed_secs:d} sec "
- f"(start={start_dt.isoformat(timespec='seconds')}, end={end_dt.isoformat(timespec='seconds')})"
+ logger.info(
+ "Workflow elapsed time: %d mins %d sec (start=%s, end=%s)",
+ elapsed_mins,
+ elapsed_secs,
+ start_dt.isoformat(timespec="seconds"),
+ end_dt.isoformat(timespec="seconds"),
)
diff --git a/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py b/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py
new file mode 100644
index 0000000..a891cca
--- /dev/null
+++ b/src/processor/src/tests/unit/libs/agent_framework/test_qdrant_memory_store.py
@@ -0,0 +1,385 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+"""Unit tests for QdrantMemoryStore."""
+
+from __future__ import annotations
+
+import asyncio
+from unittest.mock import AsyncMock, MagicMock
+
+from libs.agent_framework.qdrant_memory_store import QdrantMemoryStore
+
+
+def _make_embedding_client():
+ """Create a mock Azure OpenAI embedding client."""
+ client = AsyncMock()
+ embedding_obj = MagicMock()
+ embedding_obj.embedding = [0.1] * 3072
+ response = MagicMock()
+ response.data = [embedding_obj]
+ client.embeddings.create = AsyncMock(return_value=response)
+ return client
+
+
+def _make_failing_embedding_client():
+ """Create a mock embedding client that fails."""
+ client = AsyncMock()
+ client.embeddings.create = AsyncMock(side_effect=Exception("API error"))
+ return client
+
+
+# ---------------------------------------------------------------------------
+# Initialization & Lifecycle
+# ---------------------------------------------------------------------------
+
+
+def test_initialize_creates_collection():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="test-001")
+ assert not store._initialized
+
+ await store.initialize(
+ embedding_client=client, embedding_deployment="text-embedding-3-large"
+ )
+ assert store._initialized
+ assert store._client is not None
+ assert await store.get_count() == 0
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_initialize_idempotent():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="test-002")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ qdrant_before = store._client
+
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ assert store._client is qdrant_before
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_close_releases_resources():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="test-003")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ await store.close()
+
+ assert store._client is None
+ assert not store._initialized
+
+ asyncio.run(_run())
+
+
+def test_close_idempotent():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="test-004")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ await store.close()
+ await store.close() # Should not raise
+
+ asyncio.run(_run())
+
+
+def test_collection_name_from_process_id():
+ store = QdrantMemoryStore(process_id="abc-def-123")
+ assert store.collection_name == "migration_abc_def_123"
+
+
+# ---------------------------------------------------------------------------
+# Add
+# ---------------------------------------------------------------------------
+
+
+def test_add_stores_memory():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="add-001")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ memory_id = await store.add(
+ "AKS supports Karpenter", agent_name="AKS Expert", step="analysis", turn=1
+ )
+ assert memory_id
+ assert await store.get_count() == 1
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_add_multiple_memories():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="add-002")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ await store.add("Mem 1", agent_name="A", step="analysis", turn=1)
+ await store.add("Mem 2", agent_name="B", step="analysis", turn=2)
+ await store.add("Mem 3", agent_name="C", step="design", turn=3)
+ assert await store.get_count() == 3
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_add_empty_content_skipped():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="add-003")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ result = await store.add("", agent_name="A", step="analysis")
+ assert result == ""
+ assert await store.get_count() == 0
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_add_whitespace_content_skipped():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="add-004")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ result = await store.add(" ", agent_name="A", step="analysis")
+ assert result == ""
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_add_auto_increments_turn():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="add-005")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ await store.add("First", agent_name="A", step="analysis")
+ await store.add("Second", agent_name="B", step="analysis")
+ assert store._turn_counter == 2
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_add_without_initialization_raises():
+ async def _run():
+ store = QdrantMemoryStore(process_id="add-006")
+ try:
+ await store.add("test", agent_name="A", step="analysis")
+ assert False, "Should have raised RuntimeError"
+ except RuntimeError as e:
+ assert "not initialized" in str(e)
+
+ asyncio.run(_run())
+
+
+def test_add_with_embedding_failure_returns_empty():
+ async def _run():
+ client = _make_failing_embedding_client()
+ store = QdrantMemoryStore(process_id="add-007")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ result = await store.add("content", agent_name="A", step="analysis")
+ assert result == ""
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+# ---------------------------------------------------------------------------
+# Search
+# ---------------------------------------------------------------------------
+
+
+def test_search_returns_results():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="search-001")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ await store.add("GKE Filestore CSI", agent_name="GKE", step="analysis", turn=1)
+ await store.add("AKS Azure Files", agent_name="AKS", step="analysis", turn=2)
+
+ results = await store.search("storage drivers", top_k=5)
+ assert len(results) == 2
+ assert all(r.content for r in results)
+ assert all(r.score > 0 for r in results)
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_search_empty_store():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="search-002")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ results = await store.search("anything")
+ assert results == []
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_search_respects_top_k():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="search-003")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ for i in range(5):
+ await store.add(f"Entry {i}", agent_name="A", step="analysis", turn=i)
+
+ results = await store.search("entry", top_k=3)
+ assert len(results) <= 3
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_search_uninitialized_returns_empty():
+ async def _run():
+ store = QdrantMemoryStore(process_id="search-004")
+ results = await store.search("anything")
+ assert results == []
+
+ asyncio.run(_run())
+
+
+def test_search_with_embedding_failure():
+ async def _run():
+ embedding_obj = MagicMock()
+ embedding_obj.embedding = [0.1] * 3072
+ ok_response = MagicMock()
+ ok_response.data = [embedding_obj]
+
+ client = AsyncMock()
+ client.embeddings.create = AsyncMock(
+ side_effect=[ok_response, Exception("API error")]
+ )
+
+ store = QdrantMemoryStore(process_id="search-005")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ await store.add("content", agent_name="A", step="analysis", turn=1)
+
+ results = await store.search("query")
+ assert results == []
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_search_result_fields():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="search-006")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ await store.add(
+ "Karpenter for scaling", agent_name="AKS Expert", step="design", turn=5
+ )
+
+ results = await store.search("scaling")
+ assert len(results) == 1
+ entry = results[0]
+ assert entry.content == "Karpenter for scaling"
+ assert entry.agent_name == "AKS Expert"
+ assert entry.step == "design"
+ assert entry.turn == 5
+ assert entry.memory_id
+ assert isinstance(entry.score, float)
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+# ---------------------------------------------------------------------------
+# Workflow Lifecycle
+# ---------------------------------------------------------------------------
+
+
+def test_memories_persist_across_steps():
+ """Analysis adds memories, design reads them — simulating workflow scope."""
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="lifecycle-001")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+
+ # Analysis step
+ await store.add("GKE 3 node pools", agent_name="GKE", step="analysis", turn=1)
+ await store.add("GPU training nodes", agent_name="AKS", step="analysis", turn=2)
+
+ # Design step reads analysis
+ results = await store.search("node pools", top_k=5)
+ assert len(results) == 2
+
+ # Design adds its own
+ await store.add("Use NC6s_v3 for GPU", agent_name="Arch", step="design", turn=3)
+ assert await store.get_count() == 3
+
+ # Convert step sees all
+ results = await store.search("GPU", top_k=10)
+ assert len(results) == 3
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+def test_fresh_store_per_process():
+ """Different process IDs get independent stores."""
+ async def _run():
+ client = _make_embedding_client()
+ s1 = QdrantMemoryStore(process_id="proc-1")
+ s2 = QdrantMemoryStore(process_id="proc-2")
+
+ await s1.initialize(embedding_client=client, embedding_deployment="emb")
+ await s2.initialize(embedding_client=client, embedding_deployment="emb")
+
+ await s1.add("Only in proc 1", agent_name="A", step="analysis")
+ assert await s1.get_count() == 1
+ assert await s2.get_count() == 0
+
+ await s1.close()
+ await s2.close()
+
+ asyncio.run(_run())
+
+
+def test_close_disposes_all_memories():
+ async def _run():
+ client = _make_embedding_client()
+ store = QdrantMemoryStore(process_id="dispose-001")
+ await store.initialize(embedding_client=client, embedding_deployment="emb")
+ await store.add("content", agent_name="A", step="analysis")
+ assert await store.get_count() == 1
+
+ await store.close()
+ assert await store.get_count() == 0
+ assert await store.search("anything") == []
+
+ asyncio.run(_run())
diff --git a/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py b/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py
new file mode 100644
index 0000000..1d75ee7
--- /dev/null
+++ b/src/processor/src/tests/unit/libs/agent_framework/test_shared_memory_context_provider.py
@@ -0,0 +1,306 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+
+"""Unit tests for SharedMemoryContextProvider."""
+
+from __future__ import annotations
+
+import asyncio
+from unittest.mock import AsyncMock, MagicMock
+
+from libs.agent_framework.qdrant_memory_store import MemoryEntry
+from libs.agent_framework.shared_memory_context_provider import (
+ MAX_MEMORY_CONTEXT_CHARS,
+ MIN_CONTENT_LENGTH_TO_STORE,
+ SharedMemoryContextProvider,
+)
+
+
+def _make_chat_message(text: str, role: str = "assistant") -> MagicMock:
+ msg = MagicMock()
+ msg.text = text
+ msg.content = text
+ msg.role = MagicMock()
+ msg.role.value = role
+ return msg
+
+
+def _make_memory_entry(
+ content: str,
+ agent_name: str = "Agent",
+ step: str = "analysis",
+ turn: int = 1,
+ score: float = 0.9,
+) -> MemoryEntry:
+ return MemoryEntry(
+ content=content,
+ agent_name=agent_name,
+ step=step,
+ turn=turn,
+ score=score,
+ memory_id="test-id",
+ )
+
+
+def _make_mock_store():
+ store = AsyncMock()
+ store.search = AsyncMock(return_value=[])
+ store.add = AsyncMock(return_value="test-id")
+ return store
+
+
+def _make_provider(store=None):
+ if store is None:
+ store = _make_mock_store()
+ return SharedMemoryContextProvider(
+ memory_store=store,
+ agent_name="AKS Expert",
+ step="design",
+ top_k=5,
+ score_threshold=0.3,
+ ), store
+
+
+# ---------------------------------------------------------------------------
+# invoking() — Pre-LLM memory injection
+# ---------------------------------------------------------------------------
+
+
+def test_invoking_injects_memories():
+ async def _run():
+ provider, store = _make_provider()
+ store.search.return_value = [
+ _make_memory_entry("GKE Filestore CSI", agent_name="GKE Expert"),
+ _make_memory_entry("Azure Files for AKS", agent_name="AKS Expert"),
+ ]
+ messages = [_make_chat_message("How should we handle storage configuration?")]
+
+ context = await provider.invoking(messages)
+
+ assert context.instructions is not None
+ assert "GKE Filestore CSI" in context.instructions
+ assert "Azure Files for AKS" in context.instructions
+ store.search.assert_called_once()
+
+ asyncio.run(_run())
+
+
+def test_invoking_empty_messages_returns_empty():
+ async def _run():
+ provider, _ = _make_provider()
+ context = await provider.invoking([])
+ assert context.instructions is None
+ assert context.messages == []
+
+ asyncio.run(_run())
+
+
+def test_invoking_no_memories_returns_empty():
+ async def _run():
+ provider, store = _make_provider()
+ store.search.return_value = []
+ messages = [_make_chat_message("What is the overall migration plan for AKS?")]
+
+ context = await provider.invoking(messages)
+ assert context.instructions is None
+
+ asyncio.run(_run())
+
+
+def test_invoking_search_failure_graceful():
+ async def _run():
+ provider, store = _make_provider()
+ store.search.side_effect = Exception("search failed")
+ messages = [_make_chat_message("What is the networking plan for AKS?")]
+
+ context = await provider.invoking(messages)
+ assert context.instructions is None
+
+ asyncio.run(_run())
+
+
+def test_invoking_truncates_long_query():
+ async def _run():
+ provider, store = _make_provider()
+ long_text = "x" * 5000
+ messages = [_make_chat_message(long_text)]
+
+ await provider.invoking(messages)
+
+ query = store.search.call_args.kwargs["query"]
+ assert len(query) <= 2000
+
+ asyncio.run(_run())
+
+
+def test_invoking_uses_last_message_as_query():
+ async def _run():
+ provider, store = _make_provider()
+ messages = [
+ _make_chat_message("First"),
+ _make_chat_message("Second"),
+ _make_chat_message("Latest question about storage"),
+ ]
+
+ await provider.invoking(messages)
+
+ query = store.search.call_args.kwargs["query"]
+ assert "Latest question about storage" in query
+
+ asyncio.run(_run())
+
+
+def test_invoking_respects_max_context_chars():
+ async def _run():
+ provider, store = _make_provider()
+ large_memories = [
+ _make_memory_entry("x" * 4000, agent_name=f"Agent{i}") for i in range(10)
+ ]
+ store.search.return_value = large_memories
+ messages = [_make_chat_message("What storage configuration should we use for persistent volumes?")]
+
+ context = await provider.invoking(messages)
+
+ assert context.instructions is not None
+ assert len(context.instructions) <= MAX_MEMORY_CONTEXT_CHARS + 200
+
+ asyncio.run(_run())
+
+
+def test_invoking_formats_with_agent_and_step():
+ async def _run():
+ provider, store = _make_provider()
+ store.search.return_value = [
+ _make_memory_entry("Use Premium SSD", agent_name="Chief Architect", step="design"),
+ ]
+ messages = [_make_chat_message("What storage class should we choose for the cluster?")]
+
+ context = await provider.invoking(messages)
+
+ assert "Chief Architect" in context.instructions
+ assert "design" in context.instructions
+
+ asyncio.run(_run())
+
+
+def test_invoking_with_single_message():
+ async def _run():
+ provider, store = _make_provider()
+ store.search.return_value = [_make_memory_entry("some memory")]
+ single = _make_chat_message("What about networking configuration for AKS?")
+
+ context = await provider.invoking(single)
+
+ assert context.instructions is not None
+ store.search.assert_called_once()
+
+ asyncio.run(_run())
+
+
+# ---------------------------------------------------------------------------
+# invoked() — Post-LLM memory storage
+# ---------------------------------------------------------------------------
+
+
+def test_invoked_stores_response():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("What is the networking plan for AKS?")]
+ response = [_make_chat_message("We should use Azure CNI for networking configuration in the AKS cluster")]
+
+ await provider.invoked(request, response)
+ await provider.flush()
+
+ store.add.assert_called_once()
+ kwargs = store.add.call_args
+ assert kwargs.kwargs["agent_name"] == "AKS Expert"
+ assert kwargs.kwargs["step"] == "design"
+
+ asyncio.run(_run())
+
+
+def test_invoked_skips_on_exception():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("Q")]
+ response = [_make_chat_message("A" * 100)]
+
+ await provider.invoked(request, response, invoke_exception=Exception("fail"))
+ store.add.assert_not_called()
+
+ asyncio.run(_run())
+
+
+def test_invoked_skips_none_response():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("Q")]
+
+ await provider.invoked(request, None)
+ store.add.assert_not_called()
+
+ asyncio.run(_run())
+
+
+def test_invoked_skips_short_response():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("Q")]
+ short = [_make_chat_message("x" * (MIN_CONTENT_LENGTH_TO_STORE - 1))]
+
+ await provider.invoked(request, short)
+ store.add.assert_not_called()
+
+ asyncio.run(_run())
+
+
+def test_invoked_stores_long_response():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("Q")]
+ long_resp = [_make_chat_message("x" * (MIN_CONTENT_LENGTH_TO_STORE + 1))]
+
+ await provider.invoked(request, long_resp)
+ await provider.flush()
+ store.add.assert_called_once()
+
+ asyncio.run(_run())
+
+
+def test_invoked_increments_turn_counter():
+ async def _run():
+ provider, store = _make_provider()
+ request = [_make_chat_message("Q")]
+ response = [_make_chat_message("A" * 100)]
+
+ await provider.invoked(request, response)
+ await provider.invoked(request, response)
+ assert provider._turn_counter == 2
+
+ asyncio.run(_run())
+
+
+def test_invoked_store_failure_does_not_raise():
+ async def _run():
+ provider, store = _make_provider()
+ store.add.side_effect = Exception("store failed")
+ request = [_make_chat_message("Q")]
+ response = [_make_chat_message("A" * 100)]
+
+ await provider.invoked(request, response)
+ await provider.flush() # Should not raise
+
+ asyncio.run(_run())
+
+
+def test_invoked_with_single_message():
+ async def _run():
+ provider, store = _make_provider()
+ request = _make_chat_message("What is the question about networking?")
+ response = _make_chat_message("We should use Azure CNI Overlay for the networking configuration in AKS")
+
+ await provider.invoked(request, response)
+ await provider.flush()
+ store.add.assert_called_once()
+
+ asyncio.run(_run())
diff --git a/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py b/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py
index 0cf18a1..c0f2691 100644
--- a/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py
+++ b/src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py
@@ -82,7 +82,7 @@ async def execute(self, task_param=None):
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
- assert telemetry.transitions == [("p1", "analysis", "Analysis")]
+ assert telemetry.transitions == [("p1", "analysis", "Initializing Analysis")]
assert len(ctx.sent) == 1
assert len(ctx.yielded) == 0
assert isinstance(ctx.sent[0], Analysis_BooleanExtendedResult)
@@ -133,7 +133,7 @@ async def execute(self, task_param=None):
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
- assert telemetry.transitions == [("p1", "analysis", "Analysis")]
+ assert telemetry.transitions == [("p1", "analysis", "Initializing Analysis")]
assert len(ctx.sent) == 0
assert len(ctx.yielded) == 1
assert isinstance(ctx.yielded[0], Analysis_BooleanExtendedResult)
diff --git a/src/processor/src/tests/unit/steps/convert/test_yaml_convert_executor.py b/src/processor/src/tests/unit/steps/convert/test_yaml_convert_executor.py
index 0684d96..d195728 100644
--- a/src/processor/src/tests/unit/steps/convert/test_yaml_convert_executor.py
+++ b/src/processor/src/tests/unit/steps/convert/test_yaml_convert_executor.py
@@ -71,7 +71,7 @@ async def execute(self, task_param=None):
message = Design_ExtendedBooleanResult(process_id="p1")
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
- assert telemetry.transitions == [("p1", "yaml_conversion", "YAML")]
+ assert telemetry.transitions == [("p1", "yaml", "YAML")]
assert len(ctx.sent) == 1
assert len(ctx.yielded) == 0
assert isinstance(ctx.sent[0], Yaml_ExtendedBooleanResult)
@@ -112,7 +112,7 @@ async def execute(self, task_param=None):
message = Design_ExtendedBooleanResult(process_id="p1")
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
- assert telemetry.transitions == [("p1", "yaml_conversion", "YAML")]
+ assert telemetry.transitions == [("p1", "yaml", "YAML")]
assert len(ctx.sent) == 0
assert len(ctx.yielded) == 1
assert isinstance(ctx.yielded[0], Yaml_ExtendedBooleanResult)
diff --git a/src/processor/src/utils/logging_utils.py b/src/processor/src/utils/logging_utils.py
index 11b87e2..29da222 100644
--- a/src/processor/src/utils/logging_utils.py
+++ b/src/processor/src/utils/logging_utils.py
@@ -36,7 +36,7 @@ def configure_application_logging(debug_mode: bool = False):
# Use force=True to ensure our settings actually apply.
if debug_mode:
logging.basicConfig(level=logging.DEBUG, force=True)
- print("🐛 Debug logging enabled")
+ logging.getLogger(__name__).debug("Debug logging enabled")
else:
logging.basicConfig(level=logging.INFO, force=True)
@@ -125,9 +125,9 @@ def configure_application_logging(debug_mode: bool = False):
os.environ.setdefault("AZURE_CORE_ENABLE_HTTP_LOGGER", "false")
if debug_mode:
- print("🔇 Verbose logging suppressed (debug mode: some INFO logging allowed)")
+ logging.getLogger(__name__).info("Verbose logging suppressed (debug mode: some INFO logging allowed)")
else:
- print("🔇 All verbose logging suppressed (production mode)")
+ logging.getLogger(__name__).info("All verbose logging suppressed (production mode)")
def create_migration_logger(name: str, level: int = logging.INFO) -> logging.Logger: