From c8a4c65b18aae2ce004563e3daf7ce099ec3f0d9 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Thu, 4 Sep 2025 16:48:48 +0300 Subject: [PATCH 01/17] improvments --- api/agents/analysis_agent.py | 1 + api/memory/graphiti_tool.py | 322 ++++++++++++++++++----------------- api/routes/graphs.py | 2 +- 3 files changed, 164 insertions(+), 161 deletions(-) diff --git a/api/agents/analysis_agent.py b/api/agents/analysis_agent.py index 661c3025..1642b41e 100644 --- a/api/agents/analysis_agent.py +++ b/api/agents/analysis_agent.py @@ -292,5 +292,6 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a 12. For personal queries, FIRST check memory context for user identification. If user identity is found in memory context (user name, previous personal queries, etc.), the query IS translatable. 13. CRITICAL PERSONALIZATION CHECK: If missing user identification/personalization is a significant or primary component of the query (e.g., "show my orders", "my account balance", "my recent purchases", "how many employees I have", "products I own") AND no user identification is available in memory context or schema, set "is_sql_translatable" to false. However, if memory context contains user identification (like user name or previous successful personal queries), then personal queries ARE translatable even if they are the primary component of the query. + **Do not use the User ID in the generated SQL queries (especially not as an email).** Again: OUTPUT ONLY VALID JSON. No explanations outside the JSON block. """ # pylint: disable=line-too-long return prompt diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 3fe2a5ef..61b046d9 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -6,7 +6,7 @@ import asyncio import os import uuid -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Tuple from datetime import datetime # Import Azure OpenAI components @@ -73,80 +73,6 @@ async def create(cls, user_id: str, graph_id: str, use_direct_entities: bool = T return self - async def _ensure_user_node(self, user_id: str) -> Optional[str]: - """Ensure user node exists in the memory graph.""" - user_node_name = f"User {user_id}" - try: - # First check if user node already exists - node_search_config = NODE_HYBRID_SEARCH_RRF.model_copy(deep=True) - node_search_config.limit = 1 # Limit to 1 results - - # Execute the node search - node_search_results = await self.graphiti_client.search_( - query=user_node_name, - config=node_search_config, - ) - - # If user node already exists, return the user_id - if node_search_results and len(node_search_results.nodes) > 0: - # Check if any result exactly matches the expected node name - for node in node_search_results.nodes: - if node.name == user_node_name: - print(f"User node for {user_id} already exists") - return user_id - - # Create new user node if it doesn't exist - await self.graphiti_client.add_episode( - name=user_node_name, - episode_body=f'User {user_id} is using QueryWeaver', - source=EpisodeType.text, - reference_time=datetime.now(), - source_description='User node creation' - ) - print(f"Created new user node for {user_id}") - return user_id - - except Exception as e: - print(f"Error creating user node for {user_id}: {e}") - return None - - async def _ensure_database_node(self, database_name: str, user_id: str) -> Optional[str]: - """Ensure database node exists in the memory graph.""" - database_node_name = f"Database {database_name}" - try: - # First check if database node already exists - node_search_config = NODE_HYBRID_SEARCH_RRF.model_copy(deep=True) - node_search_config.limit = 1 # Limit to 1 results - - # Execute the node search - node_search_results = await self.graphiti_client.search_( - query=database_node_name, - config=node_search_config, - ) - - # If database node already exists, return the database_name - if node_search_results and len(node_search_results.nodes) > 0: - # Check if any result exactly matches the expected node name - for node in node_search_results.nodes: - if node.name == database_node_name: - print(f"Database node for {database_name} already exists") - return database_name - - # Create new database node if it doesn't exist - await self.graphiti_client.add_episode( - name=database_node_name, - episode_body=f'User {user_id} has database {database_name} available for querying', - source=EpisodeType.text, - reference_time=datetime.now(), - source_description='Database node creation' - ) - print(f"Created new database node for {database_name}") - return database_name - - except Exception as e: - print(f"Error creating database node for {database_name}: {e}") - return None - async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> bool: """ Ensure user and database entity nodes exist using direct Cypher queries. @@ -156,7 +82,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> graph_driver = self.graphiti_client.driver # Check if user entity node already exists - user_node_name = f"User {user_id}" + user_node_name = f"{user_id}" check_user_query = """ MATCH (n:Entity {name: $name}) RETURN n.uuid AS uuid @@ -188,7 +114,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> """ await graph_driver.execute_query(user_cypher, node=user_node_data) - print(f"Created user entity node: {user_node_name} with UUID: {user_uuid}") + print(f"Created user entity node: {user_node_name} with UUID: {self.user_uuid}") else: print(f"User entity node already exists: {user_node_name}") @@ -225,7 +151,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> """ await graph_driver.execute_query(database_cypher, node=database_node_data) - print(f"Created database entity node: {database_node_name} with UUID: {database_uuid}") + print(f"Created database entity node: {database_node_name} with UUID: {self.database_uuid}") else: print(f"Database entity node already exists: {database_node_name}") @@ -255,9 +181,79 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> print(f"Error creating entity nodes directly for user {user_id} and database {database_name}: {e}") return False - async def add_new_memory(self, conversation: Dict[str, Any]) -> bool: + async def update_user_information(self, conversation: Dict[str, Any], history: Tuple[List[str], List[str]]) -> bool: + driver = self.graphiti_client.driver + query = """ + MATCH (u:Entity {name: $user_id}) + RETURN u.summary AS summary + """ + summary_result, __, _ = await driver.execute_query(query, user_id=self.user_id) + summary = summary_result[0].get("summary", "") if summary_result else "" + # Format conversation for summarization + conv_text = "" + conv_text += f"User: {conversation.get('question', '')}\n" + if conversation.get('generated_sql'): + conv_text += f"SQL: {conversation['generated_sql']}\n" + if conversation.get('error'): + conv_text += f"Error: {conversation['error']}\n" + if conversation.get('answer'): + conv_text += f"Assistant: {conversation['answer']}\n" + prompt = f""" + You are updating the personal memory of user "{self.user_id}". + + ### Inputs + 1. Existing user summary (overall + personal info): + {summary} + + 2. Latest Q&A conversational memory: + {conv_text} + + ### Task + - Produce a new user summary of his overall preferences and his personal information. + - *Important*: Ensure that the summary is contain any personal statements or preferences expressed by the user. + - Preserve existing personal information, preferences, and tendencies from the old summary. + - Integrate any **new insights** about the user’s interests, behaviors, or database usage patterns from the latest memory. + - If new info refines or corrects older info, update accordingly. + - Focus only on **overall and personal information** — do not include temporary query details. + - Write in **factual third-person style**, suitable for storage as a user node in a graph. + - Try to explicitly divide overall summary, usage preferences and personal information. + + ** Do not included the user-id in the content!** + + ### Output + An updated user summary for "{self.user_id}". + """ + try: + + if len(history[1]) == 0: + messages = [{"role": "user", "content": prompt}] + else: + messages = [] + for query, result in zip(history[0], history[1]): + messages.append({"role": "user", "content": query}) + messages.append({"role": "assistant", "content": result}) + messages.append({"role": "user", "content": prompt}) + response = completion( + model=Config.COMPLETION_MODEL, + messages=messages, + temperature=0.1 + ) + + # Parse the direct text response (no JSON parsing needed) + content = response.choices[0].message.content.strip() + query = """ + MATCH (u:Entity {name: $user_id}) + SET u.summary = $summary + RETURN u.summary AS summary + """ + await driver.execute_query(query, user_id=self.user_id, summary=content) + return True + except Exception as e: + return False + + async def add_new_memory(self, conversation: Dict[str, Any], history: List[Dict[str, Any]]) -> bool: # Use LLM to analyze and summarize the conversation with focus on graph-oriented database facts - analysis = await self.summarize_conversation(conversation) + analysis = await self.summarize_conversation(conversation, history) user_id = self.user_id database_name = self.graph_id @@ -265,24 +261,20 @@ async def add_new_memory(self, conversation: Dict[str, Any]) -> bool: database_summary = analysis.get("database_summary", "") try: - if database_summary: - await self.graphiti_client.add_episode( - name=f"Database_Facts_{user_id}_{database_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - episode_body=f"Database: {database_name}\n{database_summary}", - source=EpisodeType.message, - reference_time=datetime.now(), - source_description=f"Graph-oriented facts about Database: {database_name} from User: {user_id} interaction" - ) - - # Keep personal memory as it was originally (only question) - await self.graphiti_client.add_episode( - name=f"Personal_Memory_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - episode_body=f"User: {user_id}\n{conversation['question']}", + # Run episode addition and user information update concurrently + add_episode_task = self.graphiti_client.add_episode( + name=f"Database_Facts_{user_id}_{database_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + episode_body=f"Database {database_name}:\n{database_summary}", source=EpisodeType.message, reference_time=datetime.now(), - source_description=f"Personal memory for user {user_id}" + source_description=f"Graph-oriented facts about Database: {database_name} from User: {user_id} interaction" ) - + + update_user_task = self.update_user_information(conversation, history=history) + + # Wait for both operations to complete + await asyncio.gather(add_episode_task, update_user_task) + except Exception as e: print(f"Error adding new memory episodes: {e}") return False @@ -453,27 +445,43 @@ async def search_user_summary(self, limit: int = 5) -> str: List of user node summaries with metadata """ try: - # First, find the user node specifically - user_node_search_config = NODE_HYBRID_SEARCH_RRF.model_copy(deep=True) - user_node_search_config.limit = limit - - user_node_results = await self.graphiti_client.search_( - query=f'User {self.user_id}', - config=user_node_search_config, - ) - - for node in user_node_results.nodes: - if node.name == f"User {self.user_id}": - user_summary = node.summary - return user_summary + driver = self.graphiti_client.driver + query = """ + MATCH (e:Entity {name: $name}) + RETURN e.summary AS summary + """ + result, __, _ = await driver.execute_query(query, name=self.user_id) + + if result: + user_summary = result[0].get("summary", "") + return user_summary return "" except Exception as e: print(f"Error searching user node for {self.user_id}: {e}") return "" + + async def extract_episode_from_rel(self, rel_result): + """ + """ + driver = self.graphiti_client.driver + query = """ + MATCH (e:Episodic {uuid: $uuid}) + RETURN e.content AS content + """ + episodes_uuid = rel_result.episodes + + episode_contents = [] + for episode_uuid in episodes_uuid: + episode_content, _, _ = await driver.execute_query(query, uuid=episode_uuid) + if episode_content: + content = episode_content[0].get("content") + episode_contents.append(content) - async def search_database_facts(self, query: str, limit: int = 10) -> str: + return episode_contents + + async def search_database_facts(self, query: str, limit: int = 5, episode_limit: int = 3) -> str: """ Search for database-specific facts and interaction history using database node as center. @@ -485,26 +493,13 @@ async def search_database_facts(self, query: str, limit: int = 10) -> str: String containing all relevant database facts with time relevancy information """ try: - # First, find the database node to use as center for reranking - database_node_search_config = NODE_HYBRID_SEARCH_RRF.model_copy(deep=True) - database_node_search_config.limit = limit - - database_node_results = await self.graphiti_client.search_( - query=f'Database {self.graph_id}', - config=database_node_search_config, - ) - - center_node_uuid = None - for node in database_node_results.nodes: - if node.name == f"Database {self.graph_id}": - print(f'Found database node: {node.name} with UUID: {node.uuid}') - center_node_uuid = node.uuid - break - - if center_node_uuid is None: - return "" - - + driver = self.graphiti_client.driver + query = """ + MATCH (e:Entity {name: $name}) + RETURN e.uuid AS uuid + """ + result, __, _ = await driver.execute_query(query, name=f"Database {self.graph_id}") + center_node_uuid = result[0].get("uuid", "") reranked_results = await self.graphiti_client.search( query=query, center_node_uuid=center_node_uuid, @@ -513,13 +508,16 @@ async def search_database_facts(self, query: str, limit: int = 10) -> str: # Filter and format results for database-specific content into a single string database_facts_text = [] + episodes_contents = [] if reranked_results and len(reranked_results) > 0: - print(f'\nDatabase Facts Search Results for {self.graph_id}:') + print(f'\nPrevious session and facts for {self.graph_id}:') for i, result in enumerate(reranked_results, 1): if result.source_node_uuid != center_node_uuid and result.target_node_uuid != center_node_uuid: continue - - fact_entry = f"Fact {i}: {result.fact}" + if len(episodes_contents) < episode_limit: + episodes_content = await self.extract_episode_from_rel(result) + episodes_contents.extend(episodes_content) + fact_entry = f"{result.fact}" # Add time information if available time_info = [] @@ -532,10 +530,12 @@ async def search_database_facts(self, query: str, limit: int = 10) -> str: fact_entry += f" ({', '.join(time_info)})" database_facts_text.append(fact_entry) - + facts = "Session:\n".join(database_facts_text) if database_facts_text else "" + episodes = "\n".join(episodes_contents) if episodes_contents else "" + database_context = "Previous sessions:\n" + episodes + "\n\nFacts:\n" + facts # Join all facts into a single string - return "\n".join(database_facts_text) if database_facts_text else "" - + return database_context + except Exception as e: print(f"Error searching database facts for {self.graph_id}: {e}") return "" @@ -639,7 +639,8 @@ async def clean_memory(self, size: int = 10000) -> int: except Exception as e: print(f"Error cleaning memory: {e}") return 0 - async def summarize_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]: + + async def summarize_conversation(self, conversation: Dict[str, Any], history: List[Dict[str, Any]]) -> Dict[str, Any]: """ Use LLM to summarize the conversation and extract database-oriented insights. @@ -665,37 +666,38 @@ async def summarize_conversation(self, conversation: Dict[str, Any]) -> Dict[str conv_text += "\n" prompt = f""" - Analyze this QueryWeaver question-answer interaction with database "{self.graph_id}". - Focus exclusively on extracting graph-oriented facts about the database and its entities, relationships, and structure. - - Your task is to extract database-specific facts that imply connections between database "{self.graph_id}" and entities within the conversation: - - Specific entities (tables, columns, data types) mentioned or discovered - - Relationships between entities in database "{self.graph_id}" - - Data patterns, constraints, or business rules learned about "{self.graph_id}" - - Query patterns that work well with "{self.graph_id}" structure - - Errors specific to "{self.graph_id}" schema or data - - ALWAYS be explicit about database name "{self.graph_id}" in all facts - - **Critical: Be very explicit about the database name in all facts. For example: "Database {self.graph_id} contains a customers table with columns id, name, revenue" instead of "The database contains a customers table"** - - Question-Answer Interaction: + Rewrite the following QueryWeaver question-answer interaction into a + database-oriented conversational summary for database "{self.graph_id}". + + ### Requirements + - Always explicitly say: "{self.graph_id} database" (not just "{self.graph_id}"). + - Always include the user id "{self.user_id}" in the summary. + - Capture the Q&A flow in natural, intuitive language (not just facts). + - Include the full **relevant query results** in the output, summarizing key fields if necessary. + - Keep it concise (2–6 sentences). + - Emphasize schema, entities, and queries relevant to "{self.graph_id} database". + - If no relevant database context exists, return an empty string. + + ### Input {conv_text} - Instructions: - - ALWAYS be explicit about database name "{self.graph_id}" in all facts - - Focus on graph relationships, entities, and structural facts about "{self.graph_id}" - - Include specific table names, column names, and data relationships discovered - - Document successful SQL patterns that work with "{self.graph_id}" structure - - Note any schema constraints or business rules specific to "{self.graph_id}" - - Emphasize connections between database "{self.graph_id}" and entities in the conversation - - Use empty string if no relevant database facts are discovered + ### Output + A conversational database-oriented summary mentioning both "{self.user_id}" and "{self.graph_id} database". """ - try: + + if len(history[1]) == 0: + messages = [{"role": "user", "content": prompt}] + else: + messages = [] + for query, result in zip(history[0], history[1]): + messages.append({"role": "user", "content": query}) + messages.append({"role": "assistant", "content": result}) + messages.append({"role": "user", "content": prompt}) response = completion( model=Config.COMPLETION_MODEL, - messages=[{"role": "user", "content": prompt}], + messages=messages, temperature=0.1 ) diff --git a/api/routes/graphs.py b/api/routes/graphs.py index 68d824d4..d10ae977 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -640,7 +640,7 @@ async def generate(): # pylint: disable=too-many-locals,too-many-branches,too-m ) # Save conversation with memory tool (run in background) - save_task = asyncio.create_task(memory_tool.add_new_memory(full_response)) + save_task = asyncio.create_task(memory_tool.add_new_memory(full_response, [queries_history, result_history])) # Add error handling callback to prevent silent failures save_task.add_done_callback( lambda t: logging.error("Memory save failed: %s", t.exception()) # nosemgrep From 52eebb9add4bef51be805f006f9e6eeb23fd6941 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Sun, 7 Sep 2025 18:14:26 +0300 Subject: [PATCH 02/17] improvments --- api/agents/analysis_agent.py | 27 +++++++++++++++++---------- api/memory/graphiti_tool.py | 13 +++++-------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/api/agents/analysis_agent.py b/api/agents/analysis_agent.py index 1642b41e..e3225057 100644 --- a/api/agents/analysis_agent.py +++ b/api/agents/analysis_agent.py @@ -254,6 +254,12 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a - CRITICAL: If missing personalization information is a significant part of the user query (e.g., the query is primarily about "my orders", "my account", "my data", "employees I have", "how many X do I have") AND no user identification exists in memory context or schema, set "is_sql_translatable" to false. - DO NOT assume general/company-wide interpretations for personal pronouns when NO user context is available. - Mark as translatable if sufficient user context exists in memory context to identify the specific user, even for primarily personal queries. + - If a query depends on personal context (e.g., "my", "me", "birthday", "account", "orders") + and the required information (user_id, birthday, etc.) is missing in memory context or schema: + - Set "is_sql_translatable" to false + - Add the required information to "missing_information" + - Leave "sql_query" as an empty string ("") + - Do NOT fabricate placeholders (e.g., , , ) Provide your output ONLY in the following JSON structure: @@ -281,17 +287,18 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a 2. Check if the query's intent is clear enough for SQL translation. 3. Identify any ambiguities in the query or instructions. 4. List missing information explicitly if applicable. - 5. Confirm if necessary joins are possible. - 6. Consider if complex calculations are feasible in SQL. - 7. Identify multiple interpretations if they exist. - 8. Strictly apply instructions; explain and penalize if not possible. - 9. If the question is a follow-up, resolve references using the + 5. When critical information is missing make the is_sql_translatable false and add it to missing_information. + 6. Confirm if necessary joins are possible. + 7. If similar query have been failed before, consider using a different approach or modifying the query. + 8. Consider if complex calculations are feasible in SQL. + 9. Identify multiple interpretations if they exist. + 10. If the question is a follow-up, resolve references using the conversation history and previous answers. - 10. Use memory context to provide more personalized and informed SQL generation. - 11. Learn from successful query patterns in memory context and avoid failed approaches. - 12. For personal queries, FIRST check memory context for user identification. If user identity is found in memory context (user name, previous personal queries, etc.), the query IS translatable. - 13. CRITICAL PERSONALIZATION CHECK: If missing user identification/personalization is a significant or primary component of the query (e.g., "show my orders", "my account balance", "my recent purchases", "how many employees I have", "products I own") AND no user identification is available in memory context or schema, set "is_sql_translatable" to false. However, if memory context contains user identification (like user name or previous successful personal queries), then personal queries ARE translatable even if they are the primary component of the query. + 11. Use memory context to provide more personalized and informed SQL generation. + 12. Learn from successful query patterns in memory context and avoid failed approaches. + 13. For personal queries, FIRST check memory context for user identification. If user identity is found in memory context (user name, previous personal queries, etc.), the query IS translatable. + 14. CRITICAL PERSONALIZATION CHECK: If missing user identification/personalization is a significant or primary component of the query (e.g., "show my orders", "my account balance", "my recent purchases", "how many employees I have", "products I own") AND no user identification is available in memory context or schema, set "is_sql_translatable" to false. However, if memory context contains user identification (like user name or previous successful personal queries), then personal queries ARE translatable even if they are the primary component of the query. - **Do not use the User ID in the generated SQL queries (especially not as an email).** + **Do not use the User ID (email based) in the sql_query** Again: OUTPUT ONLY VALID JSON. No explanations outside the JSON block. """ # pylint: disable=line-too-long return prompt diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 61b046d9..462c3ffa 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -99,7 +99,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> 'name': user_node_name, 'group_id': '\\_', 'created_at': datetime.now().isoformat(), - 'summary': f'User {user_id} is using QueryWeaver', + 'summary': f'The User is using QueryWeaver', 'name_embedding': user_name_embedding } @@ -199,8 +199,7 @@ async def update_user_information(self, conversation: Dict[str, Any], history: T if conversation.get('answer'): conv_text += f"Assistant: {conversation['answer']}\n" prompt = f""" - You are updating the personal memory of user "{self.user_id}". - + You are updating the personal memory of user. ### Inputs 1. Existing user summary (overall + personal info): {summary} @@ -218,10 +217,8 @@ async def update_user_information(self, conversation: Dict[str, Any], history: T - Write in **factual third-person style**, suitable for storage as a user node in a graph. - Try to explicitly divide overall summary, usage preferences and personal information. - ** Do not included the user-id in the content!** - ### Output - An updated user summary for "{self.user_id}". + An updated user summary. """ try: @@ -579,11 +576,11 @@ async def search_memories(self, query: str, user_limit: int = 5, database_limit: memory_context = "" if user_summary: - memory_context += f"{self.user_id} CONTEXT (Personal preferences and information):\n{user_summary}\n\n" + memory_context += f"(Personal preferences and information):\n{user_summary}\n\n" if database_facts: memory_context += f"{self.graph_id} INTERACTION HISTORY (Previous queries and learnings about this database):\n{database_facts}\n\n" - + # Add similar queries context if similar_queries: memory_context += "SIMILAR QUERIES HISTORY:\n" From 82f00f3291d77105fffe524dcedb86bf6ab0c4a7 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Sun, 7 Sep 2025 18:18:11 +0300 Subject: [PATCH 03/17] lint --- api/routes/graphs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/routes/graphs.py b/api/routes/graphs.py index d10ae977..e0a7fd06 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -640,7 +640,8 @@ async def generate(): # pylint: disable=too-many-locals,too-many-branches,too-m ) # Save conversation with memory tool (run in background) - save_task = asyncio.create_task(memory_tool.add_new_memory(full_response, [queries_history, result_history])) + save_task = asyncio.create_task(memory_tool.add_new_memory(full_response, [queries_history, + result_history])) # Add error handling callback to prevent silent failures save_task.add_done_callback( lambda t: logging.error("Memory save failed: %s", t.exception()) # nosemgrep From 40ef794cab8e81b923d8c6c0ff9c17884a8d3e39 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 13:44:59 +0300 Subject: [PATCH 04/17] updates --- api/agents/analysis_agent.py | 5 ++--- api/memory/graphiti_tool.py | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/api/agents/analysis_agent.py b/api/agents/analysis_agent.py index e3225057..4fc0c16a 100644 --- a/api/agents/analysis_agent.py +++ b/api/agents/analysis_agent.py @@ -240,8 +240,7 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a - If you CANNOT apply instructions in the SQL, explain why under "instructions_comments", "explanation" and reduce your confidence. - Penalize confidence appropriately if any part of the instructions is unmet. - - When there several tables that can be used to answer the question, - you can combine them in a single SQL query. + - When there several tables that can be used to answer the question, you can combine them in a single SQL query. - Use the memory context to inform your SQL generation, considering user preferences and previous database interactions. - For personal queries ("I", "my", "me", "I have"), FIRST check if user identification exists in memory context (user name, previous personal queries, etc.) before determining translatability. - NEVER assume general/company-wide interpretations for personal pronouns when NO user context is available. @@ -289,7 +288,7 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a 4. List missing information explicitly if applicable. 5. When critical information is missing make the is_sql_translatable false and add it to missing_information. 6. Confirm if necessary joins are possible. - 7. If similar query have been failed before, consider using a different approach or modifying the query. + 7. If similar query have been failed before, learn the error and try to avoid it. 8. Consider if complex calculations are feasible in SQL. 9. Identify multiple interpretations if they exist. 10. If the question is a follow-up, resolve references using the diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 462c3ffa..ca18bd5d 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -114,7 +114,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> """ await graph_driver.execute_query(user_cypher, node=user_node_data) - print(f"Created user entity node: {user_node_name} with UUID: {self.user_uuid}") + print(f"Created user entity node: {user_node_name} with UUID: {user_uuid}") else: print(f"User entity node already exists: {user_node_name}") @@ -151,7 +151,7 @@ async def _ensure_entity_nodes_direct(self, user_id: str, database_name: str) -> """ await graph_driver.execute_query(database_cypher, node=database_node_data) - print(f"Created database entity node: {database_node_name} with UUID: {self.database_uuid}") + print(f"Created database entity node: {database_node_name} with UUID: {database_uuid}") else: print(f"Database entity node already exists: {database_node_name}") @@ -579,7 +579,7 @@ async def search_memories(self, query: str, user_limit: int = 5, database_limit: memory_context += f"(Personal preferences and information):\n{user_summary}\n\n" if database_facts: - memory_context += f"{self.graph_id} INTERACTION HISTORY (Previous queries and learnings about this database):\n{database_facts}\n\n" + memory_context += f"{self.} INTERACTION HISTORY (Previous queries and learnings about this database):\n{database_facts}\n\n" # Add similar queries context if similar_queries: @@ -600,7 +600,7 @@ async def search_memories(self, query: str, user_limit: int = 5, database_limit: for i, query_data in enumerate(failed_queries, 1): memory_context += f"{i}. Query: \"{query_data.get('user_query', '')}\"\n" memory_context += f" Failed SQL: {query_data.get('sql_query', '')}\n" - if query_data.get('error'): + if query_data.gegraph_idt('error'): memory_context += f" Error: {query_data.get('error')}\n" memory_context += f" AVOID this approach.\n\n" From b1c3b508370c76405f9736cba760b71e734f0486 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 13:45:34 +0300 Subject: [PATCH 05/17] lint --- api/routes/graphs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/routes/graphs.py b/api/routes/graphs.py index e0a7fd06..37c74da9 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -640,8 +640,10 @@ async def generate(): # pylint: disable=too-many-locals,too-many-branches,too-m ) # Save conversation with memory tool (run in background) - save_task = asyncio.create_task(memory_tool.add_new_memory(full_response, [queries_history, - result_history])) + save_task = asyncio.create_task( + memory_tool.add_new_memory(full_response, + [queries_history, result_history]) + ) # Add error handling callback to prevent silent failures save_task.add_done_callback( lambda t: logging.error("Memory save failed: %s", t.exception()) # nosemgrep From 8af36dc364f9a7e1473e47931d2f52c1263b2586 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 13:49:31 +0300 Subject: [PATCH 06/17] fix-error --- api/memory/graphiti_tool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index ca18bd5d..45873f95 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -248,7 +248,7 @@ async def update_user_information(self, conversation: Dict[str, Any], history: T except Exception as e: return False - async def add_new_memory(self, conversation: Dict[str, Any], history: List[Dict[str, Any]]) -> bool: + async def add_new_memory(self, conversation: Dict[str, Any], history: Tuple[List[str], List[str]]) -> bool: # Use LLM to analyze and summarize the conversation with focus on graph-oriented database facts analysis = await self.summarize_conversation(conversation, history) user_id = self.user_id @@ -579,8 +579,8 @@ async def search_memories(self, query: str, user_limit: int = 5, database_limit: memory_context += f"(Personal preferences and information):\n{user_summary}\n\n" if database_facts: - memory_context += f"{self.} INTERACTION HISTORY (Previous queries and learnings about this database):\n{database_facts}\n\n" - + memory_context += f"{self.graph_id} INTERACTION HISTORY (Previous queries and learnings about this database):\n{database_facts}\n\n" + # Add similar queries context if similar_queries: memory_context += "SIMILAR QUERIES HISTORY:\n" From 914de223515c79479a8e4b7f5f67352159b7707f Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 14:01:43 +0300 Subject: [PATCH 07/17] Update api/memory/graphiti_tool.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- api/memory/graphiti_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 45873f95..21aecfd2 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -600,7 +600,7 @@ async def search_memories(self, query: str, user_limit: int = 5, database_limit: for i, query_data in enumerate(failed_queries, 1): memory_context += f"{i}. Query: \"{query_data.get('user_query', '')}\"\n" memory_context += f" Failed SQL: {query_data.get('sql_query', '')}\n" - if query_data.gegraph_idt('error'): + if query_data.get('error'): memory_context += f" Error: {query_data.get('error')}\n" memory_context += f" AVOID this approach.\n\n" From ea6f0c4d5fde7055b77c3ae46e267ee94d0c827f Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 14:02:02 +0300 Subject: [PATCH 08/17] Update api/memory/graphiti_tool.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- api/memory/graphiti_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 21aecfd2..0a41fc61 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -527,7 +527,7 @@ async def search_database_facts(self, query: str, limit: int = 5, episode_limit: fact_entry += f" ({', '.join(time_info)})" database_facts_text.append(fact_entry) - facts = "Session:\n".join(database_facts_text) if database_facts_text else "" + facts = "\n".join(database_facts_text) if database_facts_text else "" episodes = "\n".join(episodes_contents) if episodes_contents else "" database_context = "Previous sessions:\n" + episodes + "\n\nFacts:\n" + facts # Join all facts into a single string From aa86029c88ee629b109c1f0e85b35cdd0908d08d Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Mon, 8 Sep 2025 14:02:13 +0300 Subject: [PATCH 09/17] Update api/memory/graphiti_tool.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- api/memory/graphiti_tool.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/api/memory/graphiti_tool.py b/api/memory/graphiti_tool.py index 0a41fc61..4079ba25 100644 --- a/api/memory/graphiti_tool.py +++ b/api/memory/graphiti_tool.py @@ -461,6 +461,13 @@ async def search_user_summary(self, limit: int = 5) -> str: async def extract_episode_from_rel(self, rel_result): """ + Extracts the content of episodes associated with a given relationship result. + + Args: + rel_result: An object containing an 'episodes' attribute, which is a list of episode UUIDs. + + Returns: + List of episode content strings corresponding to the provided episode UUIDs. """ driver = self.graphiti_client.driver query = """ From e0dcea87ceb4c8d9e3ae878471a962170112c231 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 10 Sep 2025 09:26:30 +0300 Subject: [PATCH 10/17] imp-pipe --- api/agents/analysis_agent.py | 2 +- api/routes/graphs.py | 16 ++++++++-------- app/ts/modules/chat.ts | 2 -- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/api/agents/analysis_agent.py b/api/agents/analysis_agent.py index 4fc0c16a..0c677af7 100644 --- a/api/agents/analysis_agent.py +++ b/api/agents/analysis_agent.py @@ -237,6 +237,7 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a - Analyze the query's translatability into SQL according to the instructions. - Apply the instructions explicitly. + - You MUST NEVER use application-level identifiers that are email-based or encoded emails. - If you CANNOT apply instructions in the SQL, explain why under "instructions_comments", "explanation" and reduce your confidence. - Penalize confidence appropriately if any part of the instructions is unmet. @@ -298,6 +299,5 @@ def _build_prompt( # pylint: disable=too-many-arguments, too-many-positional-a 13. For personal queries, FIRST check memory context for user identification. If user identity is found in memory context (user name, previous personal queries, etc.), the query IS translatable. 14. CRITICAL PERSONALIZATION CHECK: If missing user identification/personalization is a significant or primary component of the query (e.g., "show my orders", "my account balance", "my recent purchases", "how many employees I have", "products I own") AND no user identification is available in memory context or schema, set "is_sql_translatable" to false. However, if memory context contains user identification (like user name or previous successful personal queries), then personal queries ARE translatable even if they are the primary component of the query. - **Do not use the User ID (email based) in the sql_query** Again: OUTPUT ONLY VALID JSON. No explanations outside the JSON block. """ # pylint: disable=line-too-long return prompt diff --git a/api/routes/graphs.py b/api/routes/graphs.py index 37c74da9..ffa3da57 100644 --- a/api/routes/graphs.py +++ b/api/routes/graphs.py @@ -493,14 +493,14 @@ async def generate(): # pylint: disable=too-many-locals,too-many-branches,too-m answer_an["sql_query"], db_url ) - - yield json.dumps( - { - "type": "query_result", - "data": query_results, - "final_response": False - } - ) + MESSAGE_DELIMITER + if len(query_results) != 0: + yield json.dumps( + { + "type": "query_result", + "data": query_results, + "final_response": False + } + ) + MESSAGE_DELIMITER # If schema was modified, refresh the graph using the appropriate loader if is_schema_modifying: diff --git a/app/ts/modules/chat.ts b/app/ts/modules/chat.ts index 347417e3..6a7f60ce 100644 --- a/app/ts/modules/chat.ts +++ b/app/ts/modules/chat.ts @@ -172,8 +172,6 @@ function handleFinalResult(step: any, isQuery = false) { const message = step.message || JSON.stringify(step.data, null, 2); if (step.is_valid) { addMessage(message, "final-result", isQuery); - } else { - addMessage("Sorry, we couldn't generate a valid SQL query. Please try rephrasing your question or add more details. For help, check the explanation window.", "followup"); } } From 0f56a37a642633dc7d4c37b95a7fff80cf0e0bb8 Mon Sep 17 00:00:00 2001 From: Gal Shubeli Date: Wed, 10 Sep 2025 10:03:48 +0300 Subject: [PATCH 11/17] dis-chat --- app/templates/components/chat_input.j2 | 2 +- app/ts/app.ts | 38 ++++++++++++++++++++------ app/ts/modules/graphs.ts | 6 ++++ 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/app/templates/components/chat_input.j2 b/app/templates/components/chat_input.j2 index 6c873157..5e909db1 100644 --- a/app/templates/components/chat_input.j2 +++ b/app/templates/components/chat_input.j2 @@ -1,7 +1,7 @@ {# Chat input area with text input and action buttons #}
- +