From ddbf390d56e6f91b31e0b1fb0e19bb9869311e2f Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 2 Feb 2026 00:27:48 +0800 Subject: [PATCH 01/32] chore: stash code --- src/langbot/pkg/api/http/service/pipeline.py | 57 +++++++- .../pkg/pipeline/process/handlers/chat.py | 96 ++++++++++++- src/langbot/pkg/plugin/connector.py | 46 +++++++ src/langbot/pkg/plugin/handler.py | 129 ++++++++++++++++++ .../dynamic-form/DynamicFormComponent.tsx | 42 +++++- .../dynamic-form/DynamicFormItemComponent.tsx | 10 ++ web/src/app/infra/entities/form/dynamic.ts | 5 + 7 files changed, 374 insertions(+), 11 deletions(-) diff --git a/src/langbot/pkg/api/http/service/pipeline.py b/src/langbot/pkg/api/http/service/pipeline.py index 9175aba55..23d7d0b42 100644 --- a/src/langbot/pkg/api/http/service/pipeline.py +++ b/src/langbot/pkg/api/http/service/pipeline.py @@ -31,10 +31,65 @@ def __init__(self, ap: app.Application) -> None: self.ap = ap async def get_pipeline_metadata(self) -> list[dict]: + """Get pipeline metadata with dynamically loaded plugin runners""" + import copy + + # Deep copy AI metadata to avoid modifying the original + ai_metadata = copy.deepcopy(self.ap.pipeline_config_meta_ai) + + # Find the runner stage + runner_stage = None + for stage in ai_metadata.get('stages', []): + if stage.get('name') == 'runner': + runner_stage = stage + break + + if runner_stage: + # Find the runner select config + for config_item in runner_stage.get('config', []): + if config_item.get('name') == 'runner': + # Get plugin agent runners + try: + plugin_runners = await self.ap.plugin_connector.list_agent_runners() + + # Add plugin runners to options + for runner in plugin_runners: + manifest = runner.get('manifest', {}) + metadata = manifest.get('metadata', {}) + + # Format: plugin:author/plugin_name/runner_name + runner_value = ( + f'plugin:{runner["plugin_author"]}/{runner["plugin_name"]}/{runner["runner_name"]}' + ) + + # Add to options + config_item['options'].append( + { + 'name': runner_value, + 'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}), + 'description': metadata.get('description'), + } + ) + + # Add corresponding stage configuration for this runner + spec_config = manifest.get('spec', {}).get('config', []) + if spec_config: + ai_metadata['stages'].append( + { + 'name': runner_value, + 'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}), + 'description': metadata.get('description'), + 'config': spec_config, + } + ) + + except Exception as e: + self.ap.logger.warning(f'Failed to load plugin agent runners: {e}') + return [ self.ap.pipeline_config_meta_trigger, self.ap.pipeline_config_meta_safety, - self.ap.pipeline_config_meta_ai, + ai_metadata, self.ap.pipeline_config_meta_output, ] diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 87f8d8ce4..76b55053b 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -17,11 +17,81 @@ import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.provider.message as provider_message +from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext importutil.import_modules_in_pkg(runners) +class PluginAgentRunnerWrapper(runner_module.RequestRunner): + """Wrapper to run AgentRunner from plugin""" + + def __init__(self, ap, plugin_author: str, plugin_name: str, runner_name: str, pipeline_config: dict): + super().__init__(ap, pipeline_config) + self.plugin_author = plugin_author + self.plugin_name = plugin_name + self.runner_name = runner_name + self.name = f'plugin:{plugin_author}/{plugin_name}/{runner_name}' + + async def run( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: + """Run the plugin agent runner""" + + # Build AgentRunContext + context = AgentRunContext( + query_id=query.query_id, + session=query.session, + messages=query.messages, + user_message=query.user_message.content[0] + if isinstance(query.user_message.content, list) + else provider_message.ContentElement.from_text(query.user_message.content), + use_funcs=query.use_funcs, + extra_config=self.pipeline_config.get('ai', {}).get(self.runner_name, {}), + ) + + # Call plugin connector to run agent + async for result_dict in self.ap.plugin_connector.run_agent( + plugin_author=self.plugin_author, + plugin_name=self.plugin_name, + runner_name=self.runner_name, + context=context.model_dump(), + ): + # Convert result to Message/MessageChunk + result_type = result_dict.get('type') + + if result_type == 'chunk': + # Stream chunk + chunk_data = result_dict.get('message_chunk') + if chunk_data: + yield provider_message.MessageChunk.model_validate(chunk_data) + + elif result_type == 'text': + # Text content + content = result_dict.get('content', '') + yield provider_message.MessageChunk( + role='assistant', + content=content, + ) + + elif result_type == 'tool_call': + # Tool call notification (may not need to yield anything here) + pass + + elif result_type == 'finish': + # Final message + message_data = result_dict.get('message') + if message_data: + yield provider_message.Message.model_validate(message_data) + else: + # Fallback: create message from content + content = result_dict.get('content', '') + yield provider_message.Message( + role='assistant', + content=content, + ) + + class ChatMessageHandler(handler.MessageHandler): async def handle( self, @@ -83,12 +153,32 @@ async def handle( is_stream = False try: + runner_name = query.pipeline_config['ai']['runner']['runner'] + + # Check if it's a built-in runner + runner = None for r in runner_module.preregistered_runners: - if r.name == query.pipeline_config['ai']['runner']['runner']: + if r.name == runner_name: runner = r(self.ap, query.pipeline_config) break - else: - raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}') + + # If not found in built-in runners, check plugin runners + if runner is None: + # Parse runner name: format is "plugin:author/plugin_name/runner_name" + if runner_name.startswith('plugin:'): + parts = runner_name[7:].split('/') # Remove "plugin:" prefix + if len(parts) == 3: + plugin_author, plugin_name, component_runner_name = parts + runner = PluginAgentRunnerWrapper( + self.ap, plugin_author, plugin_name, component_runner_name, query.pipeline_config + ) + else: + raise ValueError( + f'Invalid plugin runner name format: {runner_name}. Expected: plugin:author/name/runner' + ) + else: + raise ValueError(f'Request Runner not found: {runner_name}') + # Mark start time for telemetry start_ts = time.time() diff --git a/src/langbot/pkg/plugin/connector.py b/src/langbot/pkg/plugin/connector.py index 9e1b0ea8a..22cbb14d4 100644 --- a/src/langbot/pkg/plugin/connector.py +++ b/src/langbot/pkg/plugin/connector.py @@ -599,6 +599,52 @@ async def execute_command( yield cmd_ret + # AgentRunner methods + async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]: + """List all available AgentRunner components.""" + if not self.is_enable_plugin: + return [] + + runners_data = await self.handler.list_agent_runners(include_plugins=bound_plugins) + runners = [ComponentManifest.model_validate(runner) for runner in runners_data] + return runners + + async def run_agent( + self, + plugin_author: str, + plugin_name: str, + runner_name: str, + context: dict[str, Any], + ) -> typing.AsyncGenerator[dict[str, Any], None]: + """Run an AgentRunner from a plugin. + + Args: + plugin_author: Plugin author + plugin_name: Plugin name + runner_name: AgentRunner component name + context: AgentRunContext as dict + + Yields: + AgentRunReturn results as dicts + """ + if not self.is_enable_plugin: + yield {'type': 'finish', 'finish_reason': 'error', 'content': 'Plugin system is disabled'} + return + + gen = self.handler.run_agent(plugin_author, plugin_name, runner_name, context) + + async for ret in gen: + yield ret + + # KnowledgeRetriever methods + async def list_knowledge_retrievers(self, bound_plugins: list[str] | None = None) -> list[dict[str, Any]]: + """List all available KnowledgeRetriever components.""" + if not self.is_enable_plugin: + return [] + + retrievers_data = await self.handler.list_knowledge_retrievers(include_plugins=bound_plugins) + return retrievers_data + async def retrieve_knowledge( self, plugin_author: str, diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 60922003a..6952296d7 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -360,6 +360,135 @@ async def _placeholder_func(**kwargs): }, ) + @self.action(PluginToRuntimeAction.INVOKE_LLM_STREAM) + async def invoke_llm_stream(data: dict[str, Any]): + """Invoke llm with streaming response""" + llm_model_uuid = data['llm_model_uuid'] + messages = data['messages'] + funcs = data.get('funcs', []) + extra_args = data.get('extra_args', {}) + + llm_model = await self.ap.model_mgr.get_model_by_uuid(llm_model_uuid) + if llm_model is None: + yield handler.ActionResponse.error( + message=f'LLM model with llm_model_uuid {llm_model_uuid} not found', + ) + return + + messages_obj = [provider_message.Message.model_validate(message) for message in messages] + funcs_obj = [resource_tool.LLMTool.model_validate(func) for func in funcs] + + async for chunk in llm_model.provider.invoke_llm_stream( + query=None, + model=llm_model, + messages=messages_obj, + funcs=funcs_obj, + extra_args=extra_args, + ): + yield handler.ActionResponse.success( + data={ + 'chunk': chunk.model_dump(), + }, + ) + + @self.action(PluginToRuntimeAction.CALL_TOOL) + async def call_tool(data: dict[str, Any]) -> handler.ActionResponse: + """Call a tool""" + tool_name = data['tool_name'] + parameters = data['parameters'] + # session_data = data['session'] + # query_id = data['query_id'] + + # Convert session_data to Session object (simplified) + # In real implementation, you would reconstruct the full session + # For now, we'll call the tool manager's execute method + try: + result = await self.ap.tool_mgr.execute_func_call( + name=tool_name, + parameters=parameters, + query=None, # TODO: reconstruct query from session_data if needed + ) + return handler.ActionResponse.success( + data={ + 'result': result, + }, + ) + except Exception as e: + traceback.print_exc() + return handler.ActionResponse.error( + message=f'Failed to execute tool {tool_name}: {e}', + ) + + @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE) + async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse: + """Retrieve knowledge from a knowledge base""" + kb_uuid = data['kb_uuid'] + query = data['query'] + top_k = data.get('top_k', 5) + + try: + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) + if kb is None: + return handler.ActionResponse.error( + message=f'Knowledge base with uuid {kb_uuid} not found', + ) + + results = await kb.retrieve(query=query, top_k=top_k) + + # Convert results to dict format + results_data = [ + { + 'id': r.id, + 'content': [c.model_dump() for c in r.content], + 'metadata': r.metadata, + } + for r in results + ] + + return handler.ActionResponse.success( + data={ + 'results': results_data, + }, + ) + except Exception as e: + traceback.print_exc() + return handler.ActionResponse.error( + message=f'Failed to retrieve knowledge: {e}', + ) + + @self.action(PluginToRuntimeAction.INVOKE_EMBEDDING) + async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse: + """Invoke an embedding model""" + embedding_model_uuid = data['embedding_model_uuid'] + texts = data['texts'] + + try: + embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid) + if embedding_model is None: + return handler.ActionResponse.error( + message=f'Embedding model with uuid {embedding_model_uuid} not found', + ) + + # Call embedding model to generate embeddings + embeddings = [] + for text in texts: + embedding = await embedding_model.provider.invoke_embedding( + model=embedding_model, + text=text, + ) + embeddings.append(embedding) + + return handler.ActionResponse.success( + data={ + 'embeddings': embeddings, + }, + ) + except Exception as e: + traceback.print_exc() + return handler.ActionResponse.error( + message=f'Failed to invoke embedding model: {e}', + ) + @self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE) async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: """Set binary storage""" diff --git a/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx b/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx index ffea18d6e..9e2df54d5 100644 --- a/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx +++ b/web/src/app/home/components/dynamic-form/DynamicFormComponent.tsx @@ -1,4 +1,7 @@ -import { IDynamicFormItemSchema } from '@/app/infra/entities/form/dynamic'; +import { + IDynamicFormItemSchema, + DynamicFormItemType, +} from '@/app/infra/entities/form/dynamic'; import { useForm } from 'react-hook-form'; import { zodResolver } from '@hookform/resolvers/zod'; import { z } from 'zod'; @@ -190,6 +193,19 @@ function WebhookUrlField({ ); } +/** + * Normalize plugin manifest type names to frontend-compatible types + */ +function normalizeItemType(type: string): string { + const typeMap: Record = { + 'select-llm-model': DynamicFormItemType.LLM_MODEL_SELECTOR, + 'select-knowledge-bases': DynamicFormItemType.KNOWLEDGE_BASE_MULTI_SELECTOR, + number: DynamicFormItemType.FLOAT, + json: DynamicFormItemType.TEXT, + }; + return typeMap[type] || type; +} + export default function DynamicFormComponent({ itemConfigList, onSubmit, @@ -270,8 +286,11 @@ export default function DynamicFormComponent({ const formSchema = z.object( editableItems.reduce( (acc, item) => { + // Normalize type to handle plugin manifest type names + const normalizedType = normalizeItemType(item.type); + let fieldSchema; - switch (item.type) { + switch (normalizedType) { case 'integer': fieldSchema = z.number(); break; @@ -325,6 +344,9 @@ export default function DynamicFormComponent({ }), ); break; + case 'text': + fieldSchema = z.string(); + break; default: fieldSchema = z.string(); } @@ -478,6 +500,12 @@ export default function DynamicFormComponent({ /> {itemConfigList.map((config) => { + // Create a normalized config with type converted to frontend format + const normalizedConfig = { + ...config, + type: normalizeItemType(config.type), + }; + if (config.show_if) { const dependValue = resolveShowIfValue( config.show_if.field, @@ -511,7 +539,7 @@ export default function DynamicFormComponent({ const isFieldDisabled = !!isEditing; // Webhook URL fields are display-only; render outside of form binding - if (config.type === 'webhook-url') { + if (normalizedConfig.type === 'webhook-url') { const webhookUrl = (systemContext?.webhook_url as string) || ''; const extraWebhookUrl = (systemContext?.extra_webhook_url as string) || ''; @@ -533,7 +561,7 @@ export default function DynamicFormComponent({ ); } - if (config.type === 'embed-code') { + if (normalizedConfig.type === 'embed-code') { const botUuid = (systemContext?.bot_uuid as string) || ''; if (!botUuid) return null; @@ -624,7 +652,7 @@ export default function DynamicFormComponent({ } // Boolean fields use a special inline layout - if (config.type === 'boolean') { + if (normalizedConfig.type === 'boolean') { return ( @@ -681,7 +709,7 @@ export default function DynamicFormComponent({ } > diff --git a/web/src/app/home/components/dynamic-form/DynamicFormItemComponent.tsx b/web/src/app/home/components/dynamic-form/DynamicFormItemComponent.tsx index 51831bdeb..72499176b 100644 --- a/web/src/app/home/components/dynamic-form/DynamicFormItemComponent.tsx +++ b/web/src/app/home/components/dynamic-form/DynamicFormItemComponent.tsx @@ -248,6 +248,7 @@ export default function DynamicFormItemComponent({ switch (config.type) { case DynamicFormItemType.INT: case DynamicFormItemType.FLOAT: + case DynamicFormItemType.NUMBER: return ( ; + case DynamicFormItemType.JSON: + return ( +