diff --git a/workflows/README.md b/workflows/README.md index 0a5deb2..edff388 100644 --- a/workflows/README.md +++ b/workflows/README.md @@ -57,12 +57,32 @@ python cli.py run-workflow-no-ai my_workflow.json - **Semantic Targeting**: Use `{variable}` in `target_text` - **Auto-Extraction**: LLM suggests variables automatically +### šŸ“Š Real-time Progress Tracking (NEW!) +- **Step-by-Step Visibility**: See each browser action as it's recorded +- **Status Updates**: Track workflow processing phases in real-time +- **Cloud Integration Ready**: Store progress in database for live UI updates +- **Debug Friendly**: Know exactly where workflow generation fails +- **Zero Overhead**: Optional callbacks, fully backward compatible + +```python +# Track workflow generation progress in real-time +workflow = await service.generate_workflow_from_prompt( + prompt="Search for Python docs", + agent_llm=llm, + extraction_llm=llm, + on_step_recorded=lambda s: print(f"Step {s['step_number']}: {s['description']}"), + on_status_update=lambda msg: print(f"Status: {msg}"), +) +``` + --- ## Documentation - **[docs/DETERMINISTIC.md](docs/DETERMINISTIC.md)** - Deterministic workflow generation - **[docs/VARIABLES.md](docs/VARIABLES.md)** - Variables guide +- **[docs/PROGRESS_TRACKING.md](docs/PROGRESS_TRACKING.md)** - Real-time progress tracking ⭐ NEW +- **[QUICK_START_PROGRESS_TRACKING.md](QUICK_START_PROGRESS_TRACKING.md)** - 5-minute integration guide - **[examples/README.md](examples/README.md)** - Example scripts --- @@ -98,6 +118,7 @@ workflows/ │ │ ā”œā”€ā”€ variables/ # Variable feature examples │ │ ā”œā”€ā”€ demos/ # Advanced demos │ │ └── runner.py # Generic workflow runner +│ ā”œā”€ā”€ progress_tracking_example.py # ⭐ NEW: Real-time progress tracking │ └── workflows/ # Example workflow JSON files │ ā”œā”€ā”€ basic/ # Basic workflow examples │ ā”œā”€ā”€ form_filling/ # Form filling examples diff --git a/workflows/examples/progress_tracking_example.py b/workflows/examples/progress_tracking_example.py new file mode 100644 index 0000000..7339ded --- /dev/null +++ b/workflows/examples/progress_tracking_example.py @@ -0,0 +1,256 @@ +""" +Example: Real-time Progress Tracking for Workflow Generation + +This example demonstrates how to use the new on_step_recorded and on_status_update +callbacks to track workflow generation progress in real-time. + +Usage: + python examples/progress_tracking_example.py +""" + +import asyncio +from datetime import datetime + +from browser_use.llm import ChatBrowserUse + +from workflow_use.healing.service import HealingService + + +# Example 1: Simple console logging +async def simple_console_example(): + """Basic example: Print steps to console as they're recorded.""" + print('=' * 80) + print('EXAMPLE 1: Simple Console Logging') + print('=' * 80) + + def step_callback(step_data: dict): + """Called each time a step is recorded.""" + print(f'\nšŸ“ Step {step_data["step_number"]}: {step_data["description"]}') + print(f' Type: {step_data["action_type"]}') + print(f' URL: {step_data["url"]}') + if step_data.get('target_text'): + print(f' Target: {step_data["target_text"]}') + if step_data.get('extracted_data'): + print(f' Extracted: {step_data["extracted_data"]}') + + def status_callback(status: str): + """Called for general status updates.""" + print(f'\nšŸ”„ {status}') + + # Initialize service + llm = ChatBrowserUse(model='bu-latest') + healing_service = HealingService( + llm=llm, + use_deterministic_conversion=True, + enable_variable_extraction=True, + ) + + # Generate workflow with callbacks + workflow = await healing_service.generate_workflow_from_prompt( + prompt='Go to example.com and extract the page title', + agent_llm=llm, + extraction_llm=llm, + use_cloud=False, + on_step_recorded=step_callback, + on_status_update=status_callback, + ) + + print(f'\nāœ… Generated workflow with {len(workflow.steps)} steps!') + + +# Example 2: Store steps in a list (for database storage) +async def database_storage_example(): + """Example: Store steps in memory (simulates database storage).""" + print('\n' + '=' * 80) + print('EXAMPLE 2: Database Storage Pattern') + print('=' * 80) + + # Simulated database storage + stored_steps = [] + status_history = [] + + async def step_callback(step_data: dict): + """Store step in database (simulated with list).""" + stored_steps.append(step_data) + print(f'āœ“ Stored step {step_data["step_number"]} in database') + + async def status_callback(status: str): + """Store status update in database.""" + status_history.append({'timestamp': datetime.now().isoformat(), 'status': status}) + print(f'ā„¹ļø {status}') + + # Initialize service + llm = ChatBrowserUse(model='bu-latest') + healing_service = HealingService( + llm=llm, + use_deterministic_conversion=True, + enable_variable_extraction=True, + ) + + # Generate workflow with async callbacks + workflow = await healing_service.generate_workflow_from_prompt( + prompt='Go to example.com and extract the page title', + agent_llm=llm, + extraction_llm=llm, + use_cloud=False, + on_step_recorded=lambda data: asyncio.create_task(step_callback(data)), + on_status_update=lambda status: asyncio.create_task(status_callback(status)), + ) + + # Display stored data + print(f'\nšŸ“Š Stored {len(stored_steps)} steps and {len(status_history)} status updates') + print('\nStored Steps:') + for step in stored_steps: + print(f' {step["step_number"]}. {step["description"]}') + + print('\nStatus History:') + for status in status_history: + print(f' [{status["timestamp"]}] {status["status"]}') + + +# Example 3: Real-time progress bar +async def progress_bar_example(): + """Example: Show progress with a simple progress indicator.""" + print('\n' + '=' * 80) + print('EXAMPLE 3: Progress Bar') + print('=' * 80) + + step_count = {'count': 0} + + def step_callback(step_data: dict): + """Update progress bar as steps are recorded.""" + step_count['count'] = step_data['step_number'] + # Simple progress indicator + bar = 'ā–ˆ' * step_data['step_number'] + 'ā–‘' * (10 - step_data['step_number']) + print(f'\rProgress: [{bar}] Step {step_data["step_number"]}: {step_data["description"][:40]}...', end='') + + def status_callback(status: str): + """Display status updates.""" + print(f'\n\nšŸ”„ {status}') + + # Initialize service + llm = ChatBrowserUse(model='bu-latest') + healing_service = HealingService( + llm=llm, + use_deterministic_conversion=True, + enable_variable_extraction=True, + ) + + # Generate workflow with callbacks + workflow = await healing_service.generate_workflow_from_prompt( + prompt='Go to example.com and extract the page title', + agent_llm=llm, + extraction_llm=llm, + use_cloud=False, + on_step_recorded=step_callback, + on_status_update=status_callback, + ) + + print(f'\n\nāœ… Completed! Generated workflow with {step_count["count"]} steps') + + +# Example 4: Real-world pattern for Browser-Use Cloud backend +async def cloud_backend_pattern(): + """ + Example: Pattern for Browser-Use Cloud backend integration. + + This shows how to integrate with your database to store steps + in real-time for frontend polling. + """ + print('\n' + '=' * 80) + print('EXAMPLE 4: Browser-Use Cloud Backend Pattern') + print('=' * 80) + + # Simulated workflow_id (would come from your database) + workflow_id = 'wf_123abc' + generation_metadata = {'steps': [], 'status_history': []} + + async def step_callback(step_data: dict): + """ + Store step immediately in database for real-time display. + + In your actual implementation, this would be: + async with await database.get_session() as session: + workflow = await get_workflow(session, workflow_id) + if workflow and workflow.generation_metadata: + steps = workflow.generation_metadata.get('steps', []) + steps.append(step_data) + workflow.generation_metadata['steps'] = steps + await session.commit() + """ + # Simulated database storage + generation_metadata['steps'].append(step_data) + + print(f'šŸ’¾ Stored step {step_data["step_number"]} to workflow {workflow_id}') + print(f' Description: {step_data["description"]}') + print(f' Type: {step_data["action_type"]}') + print(f' Timestamp: {step_data["timestamp"]}') + + async def status_callback(status: str): + """Store status updates for display in the frontend.""" + status_entry = {'timestamp': datetime.now().isoformat(), 'message': status} + generation_metadata['status_history'].append(status_entry) + + print(f'ā„¹ļø Status update: {status}') + + # Initialize service + llm = ChatBrowserUse(model='bu-latest') + healing_service = HealingService( + llm=llm, + use_deterministic_conversion=True, + enable_variable_extraction=True, + ) + + # Generate workflow with progress tracking + print(f'\nšŸš€ Starting workflow generation for {workflow_id}...') + + workflow = await healing_service.generate_workflow_from_prompt( + prompt='Go to example.com and extract the page title', + agent_llm=llm, + extraction_llm=llm, + use_cloud=False, + on_step_recorded=lambda data: asyncio.create_task(step_callback(data)), + on_status_update=lambda status: asyncio.create_task(status_callback(status)), + ) + + # Display final metadata (what would be in your database) + print('\n' + '=' * 80) + print('FINAL DATABASE STATE') + print('=' * 80) + print(f'\nWorkflow ID: {workflow_id}') + print(f'Total Steps Recorded: {len(generation_metadata["steps"])}') + print(f'Total Status Updates: {len(generation_metadata["status_history"])}') + + print('\nšŸ“‹ Steps Timeline:') + for step in generation_metadata['steps']: + print(f' [{step["timestamp"]}] Step {step["step_number"]}: {step["description"]}') + + print('\nšŸ“Š Status Timeline:') + for status in generation_metadata['status_history']: + print(f' [{status["timestamp"]}] {status["message"]}') + + print(f'\nāœ… Workflow generation complete! Final workflow has {len(workflow.steps)} steps') + + +# Run all examples +async def main(): + """Run all examples (commented out to avoid actual API calls).""" + print('Progress Tracking Examples') + print('=' * 80) + print('\nThese examples demonstrate different patterns for tracking') + print('workflow generation progress in real-time.') + print('\nNote: Examples are commented out to avoid actual API calls.') + print('Uncomment the examples you want to run.\n') + + # Uncomment the examples you want to run: + + # await simple_console_example() + # await database_storage_example() + # await progress_bar_example() + # await cloud_backend_pattern() + + print('\nāœ… Examples completed!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/workflows/pyproject.toml b/workflows/pyproject.toml index 723a168..32c6463 100644 --- a/workflows/pyproject.toml +++ b/workflows/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "workflow-use" -version = "0.2.10" +version = "0.2.11" authors = [{ name = "Gregor Zunic" }] description = "Create, edit, run deterministic workflows" readme = "README.md" diff --git a/workflows/tests/test_progress_tracking.py b/workflows/tests/test_progress_tracking.py new file mode 100644 index 0000000..7661538 --- /dev/null +++ b/workflows/tests/test_progress_tracking.py @@ -0,0 +1,341 @@ +""" +Unit tests for progress tracking callbacks in HealingService. + +Usage: + python tests/test_progress_tracking.py +""" + +from typing import Any, Dict +from unittest.mock import AsyncMock, Mock + +try: + import pytest + + HAS_PYTEST = True +except ImportError: + HAS_PYTEST = False + + # Mock pytest.mark.skip for standalone execution + class _MockPytest: + class mark: + @staticmethod + def skip(*args, **kwargs): + def decorator(func): + return func + + return decorator + + pytest = _MockPytest() + +from workflow_use.healing.service import HealingService + + +class TestProgressTracking: + """Test suite for progress tracking callbacks.""" + + def test_callback_signature(self): + """Test that callbacks have correct type annotations.""" + from workflow_use.healing.service import StatusUpdateCallback, StepRecordedCallback + + # These should be callable types + assert callable(StepRecordedCallback.__args__[0]) + assert callable(StatusUpdateCallback.__args__[0]) + + def test_step_callback_data_structure(self): + """Test that step callback receives correct data structure.""" + # Mock callback to capture data + step_callback = Mock() + + # Simulate what the callback would receive + expected_data = { + 'step_number': 1, + 'action_type': 'navigation', + 'description': 'Navigate to https://example.com', + 'url': 'https://example.com', + 'selector': None, + 'extracted_data': None, + 'timestamp': '2025-01-19T10:30:45.123456+00:00', + 'target_text': None, + } + + step_callback(expected_data) + + # Verify callback was called with correct structure + assert step_callback.called + call_args = step_callback.call_args[0][0] + assert 'step_number' in call_args + assert 'action_type' in call_args + assert 'description' in call_args + assert 'url' in call_args + assert 'timestamp' in call_args + + def test_status_callback_messages(self): + """Test that status callback receives expected messages.""" + status_callback = Mock() + + # Expected status messages + expected_statuses = [ + 'Initializing browser...', + 'Creating browser agent...', + 'Recording workflow steps...', + 'Completed recording 3 steps', + 'Converting steps to workflow (deterministic)...', + 'Post-processing workflow (variable identification & cleanup)...', + 'Workflow generation complete!', + ] + + # Simulate status updates + for status in expected_statuses: + status_callback(status) + + # Verify all statuses were called + assert status_callback.call_count == len(expected_statuses) + + def test_callbacks_are_optional(self): + """Test that callbacks are truly optional (backward compatibility).""" + + # Test signature only (don't instantiate LLM) + import inspect + + sig = inspect.signature(HealingService.generate_workflow_from_prompt) + + # Check that callbacks are optional + assert sig.parameters['on_step_recorded'].default is None + assert sig.parameters['on_status_update'].default is None + print(' Verified: on_step_recorded defaults to None') + print(' Verified: on_status_update defaults to None') + + def test_callback_exception_handling(self): + """Test that callback exceptions don't break workflow generation.""" + + def failing_callback(data: Dict[str, Any]): + raise Exception('Callback error!') + + # In real implementation, this should be caught and logged + # without breaking the workflow generation + try: + failing_callback({'step_number': 1}) + except Exception as e: + # In actual implementation, this exception is caught + # Here we just verify it would be raised + assert str(e) == 'Callback error!' + + def test_async_callback_pattern(self): + """Test that async callbacks can be wrapped with create_task.""" + import asyncio + + async_callback = AsyncMock() + + # Pattern used in examples + wrapper = lambda data: asyncio.create_task(async_callback(data)) + + # Verify wrapper is callable + assert callable(wrapper) + + # Test in async context + async def test_async(): + task = wrapper({'step_number': 1}) + assert isinstance(task, asyncio.Task) + await task + assert async_callback.called + + # Run test + asyncio.run(test_async()) + + def test_step_action_types(self): + """Test that all expected action types are covered.""" + expected_action_types = [ + 'navigation', + 'click', + 'input_text', + 'extract', + 'keypress', + 'scroll', + ] + + # These should all be valid action types returned by callbacks + for action_type in expected_action_types: + assert action_type in [ + 'navigation', + 'click', + 'input_text', + 'extract', + 'keypress', + 'scroll', + ] + + def test_timestamp_format(self): + """Test that timestamp is ISO 8601 format.""" + from datetime import datetime, timezone + + # Generate timestamp like the implementation does + timestamp = datetime.now(timezone.utc).isoformat() + + # Verify it's valid ISO 8601 + parsed = datetime.fromisoformat(timestamp) + assert isinstance(parsed, datetime) + + def test_description_generation(self): + """Test human-readable description generation logic.""" + # Test cases for description generation + test_cases = [ + { + 'action_type': 'navigation', + 'target_text': None, + 'input_value': None, + 'url': 'https://example.com', + 'expected': 'Navigate to https://example.com', + }, + { + 'action_type': 'click', + 'target_text': 'Search', + 'input_value': None, + 'url': 'https://example.com', + 'expected': 'Click on "Search"', + }, + { + 'action_type': 'input_text', + 'target_text': 'Username', + 'input_value': 'john_doe', + 'url': 'https://example.com', + 'expected': 'Enter "john_doe" into Username', + }, + { + 'action_type': 'extract', + 'target_text': None, + 'input_value': None, + 'url': 'https://example.com', + 'expected': 'Extract page content', + }, + ] + + # Simple implementation of _generate_action_description logic + def generate_description(action_type, target_text, input_value, url): + if action_type == 'navigation': + return f'Navigate to {url}' + elif action_type == 'click': + if target_text: + return f'Click on "{target_text}"' + return 'Click element' + elif action_type == 'input_text': + if target_text and input_value: + return f'Enter "{input_value}" into {target_text}' + elif input_value: + return f'Enter text: {input_value}' + return 'Input text' + elif action_type == 'extract': + return 'Extract page content' + else: + return f'Execute action: {action_type or "unknown"}' + + # Verify descriptions + for test_case in test_cases: + result = generate_description( + test_case['action_type'], + test_case['target_text'], + test_case['input_value'], + test_case['url'], + ) + assert result == test_case['expected'], f'Failed for {test_case["action_type"]}' + + +class TestProgressTrackingIntegration: + """Integration tests (require actual workflow generation - run manually).""" + + @pytest.mark.skip(reason='Requires browser automation - run manually') + async def test_full_workflow_with_callbacks(self): + """Test complete workflow generation with callbacks (manual test).""" + from browser_use.llm import ChatBrowserUse + + steps_recorded = [] + statuses_recorded = [] + + def step_callback(step_data): + steps_recorded.append(step_data) + print(f'Step {step_data["step_number"]}: {step_data["description"]}') + + def status_callback(status): + statuses_recorded.append(status) + print(f'Status: {status}') + + llm = ChatBrowserUse(model='bu-latest') + service = HealingService(llm=llm, use_deterministic_conversion=True) + + workflow = await service.generate_workflow_from_prompt( + prompt='Go to example.com and extract the page title', + agent_llm=llm, + extraction_llm=llm, + use_cloud=False, + on_step_recorded=step_callback, + on_status_update=status_callback, + ) + + # Verify callbacks were called + assert len(steps_recorded) > 0, 'No steps were recorded' + assert len(statuses_recorded) > 0, 'No status updates were recorded' + + # Verify step data structure + for step in steps_recorded: + assert 'step_number' in step + assert 'action_type' in step + assert 'description' in step + assert 'url' in step + assert 'timestamp' in step + + # Verify workflow was generated + assert workflow is not None + assert len(workflow.steps) > 0 + + print('\nāœ… Test complete!') + print(f' Steps recorded: {len(steps_recorded)}') + print(f' Status updates: {len(statuses_recorded)}') + print(f' Workflow steps: {len(workflow.steps)}') + + +if __name__ == '__main__': + # Run basic tests + print('Running progress tracking tests...') + + test = TestProgressTracking() + + print('\n1. Testing callback signature...') + test.test_callback_signature() + print(' āœ“ Passed') + + print('\n2. Testing step callback data structure...') + test.test_step_callback_data_structure() + print(' āœ“ Passed') + + print('\n3. Testing status callback messages...') + test.test_status_callback_messages() + print(' āœ“ Passed') + + print('\n4. Testing callbacks are optional...') + test.test_callbacks_are_optional() + print(' āœ“ Passed') + + print('\n5. Testing callback exception handling...') + test.test_callback_exception_handling() + print(' āœ“ Passed') + + print('\n6. Testing async callback pattern...') + test.test_async_callback_pattern() + print(' āœ“ Passed') + + print('\n7. Testing step action types...') + test.test_step_action_types() + print(' āœ“ Passed') + + print('\n8. Testing timestamp format...') + test.test_timestamp_format() + print(' āœ“ Passed') + + print('\n9. Testing description generation...') + test.test_description_generation() + print(' āœ“ Passed') + + print('\n' + '=' * 80) + print('āœ… All unit tests passed!') + print('=' * 80) + print('\nNote: Integration tests are skipped (require API keys and browser).') + print('To run integration tests manually, uncomment the test and run with pytest.') diff --git a/workflows/tests/test_step_counter_without_callback.py b/workflows/tests/test_step_counter_without_callback.py new file mode 100644 index 0000000..deb198b --- /dev/null +++ b/workflows/tests/test_step_counter_without_callback.py @@ -0,0 +1,108 @@ +""" +Test to verify step counter works correctly without callbacks. + +This test verifies the fix for the issue where step counter would remain at 0 +when no callback was provided. +""" + + +def test_step_counter_increments_without_callback(): + """ + Verify that step counter increments even when on_step_recorded is None. + + This tests the fix where the counter increment was moved outside the + callback conditional block. + """ + # Read the service.py file and verify the counter increment is outside the callback check + with open('workflow_use/healing/service.py', 'r') as f: + content = f.read() + + # Look for the pattern where counter increments before callback check + # The fixed code should have: + # 1. step_counter['count'] += 1 (outside if block) + # 2. if self.on_step_recorded: (after the increment) + + lines = content.split('\n') + + # Find the act method in CapturingController + in_act_method = False + found_increment_outside = False + increment_line = -1 + callback_check_line = -1 + + for i, line in enumerate(lines): + if 'async def act(self, action, browser_session' in line: + in_act_method = True + continue + + if in_act_method: + # Look for the counter increment + if "step_counter['count'] += 1" in line: + increment_line = i + # Check if this line is NOT inside an "if self.on_step_recorded:" block + # by looking backwards for the nearest if statement + for j in range(i - 1, max(i - 10, 0), -1): + if 'if self.on_step_recorded:' in lines[j]: + # Found callback check before increment - this is the OLD (buggy) pattern + break + if "step_counter['count'] += 1" in lines[j]: + # Found the increment first - this is the NEW (fixed) pattern + found_increment_outside = True + break + else: + # No callback check found in the 10 lines before - increment is outside + found_increment_outside = True + + # Look for the callback check + if 'if self.on_step_recorded:' in line and increment_line > 0: + callback_check_line = i + break + + assert found_increment_outside, ( + 'Step counter increment should be outside the callback check block. ' + 'The counter should increment on every action regardless of callback availability.' + ) + + assert increment_line > 0, 'Could not find step_counter increment' + assert callback_check_line > 0, 'Could not find callback check' + assert increment_line < callback_check_line, ( + f'Step counter increment (line {increment_line}) should come BEFORE callback check (line {callback_check_line})' + ) + + print(f'āœ“ Step counter increments at line {increment_line}') + print(f'āœ“ Callback check at line {callback_check_line}') + print('āœ“ Counter increments BEFORE callback check (correct order)') + + +def test_status_update_uses_counter(): + """ + Verify that the status update message uses the step counter correctly. + """ + with open('workflow_use/healing/service.py', 'r') as f: + content = f.read() + + # Look for the status update that reports step count + assert 'Completed recording {step_counter["count"]} steps' in content, ( + "Status update should use step_counter['count'] to report accurate step count" + ) + + print("āœ“ Status update correctly uses step_counter['count']") + + +if __name__ == '__main__': + print('=' * 80) + print('Testing Step Counter Fix') + print('=' * 80) + print() + + print('Test 1: Verify counter increments without callback...') + test_step_counter_increments_without_callback() + print() + + print('Test 2: Verify status update uses counter...') + test_status_update_uses_counter() + print() + + print('=' * 80) + print('āœ… All step counter tests passed!') + print('=' * 80) diff --git a/workflows/uv.lock b/workflows/uv.lock index 7939b53..c9cb355 100644 --- a/workflows/uv.lock +++ b/workflows/uv.lock @@ -4863,7 +4863,7 @@ wheels = [ [[package]] name = "workflow-use" -version = "0.2.10" +version = "0.2.11" source = { editable = "." } dependencies = [ { name = "aiofiles" }, diff --git a/workflows/workflow_use/healing/service.py b/workflows/workflow_use/healing/service.py index 6f876c9..1f78dc5 100644 --- a/workflows/workflow_use/healing/service.py +++ b/workflows/workflow_use/healing/service.py @@ -1,7 +1,8 @@ import hashlib import json +from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, List, Sequence, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Union import aiofiles from browser_use import Agent, AgentHistoryList, Browser @@ -17,6 +18,10 @@ from workflow_use.healing.views import ParsedAgentStep, SimpleDomElement, SimpleResult from workflow_use.schema.views import WorkflowDefinitionSchema +# Type definitions for progress tracking callbacks +StepRecordedCallback = Callable[[Dict[str, Any]], None] +StatusUpdateCallback = Callable[[str], None] + # Get the absolute path to the prompts directory _PROMPTS_DIR = Path(__file__).parent / 'prompts' @@ -413,28 +418,62 @@ def __init__(self, data): # Generate workflow from prompt async def generate_workflow_from_prompt( - self, prompt: str, agent_llm: BaseChatModel, extraction_llm: BaseChatModel, use_cloud: bool = False + self, + prompt: str, + agent_llm: BaseChatModel, + extraction_llm: BaseChatModel, + use_cloud: bool = False, + on_step_recorded: Optional[StepRecordedCallback] = None, + on_status_update: Optional[StatusUpdateCallback] = None, ) -> WorkflowDefinitionSchema: """ Generate a workflow definition from a prompt by: 1. Running a browser agent to explore and complete the task 2. Converting the agent history into a workflow definition + + Args: + prompt: Natural language task description + agent_llm: LLM for agent decision-making + extraction_llm: LLM for page extraction + use_cloud: Whether to use cloud browser + on_step_recorded: Optional callback fired when a step is recorded. Receives: + - step_number: int (1-indexed) + - action_type: str (navigation, click, input_text, extract, etc.) + - description: str (human-readable description) + - url: str (current page URL) + - selector: Optional[str] (CSS/XPath selector if applicable) + - extracted_data: Optional[dict] (for extract steps) + - timestamp: str (ISO 8601 timestamp) + - target_text: Optional[str] (element text being interacted with) + on_status_update: Optional callback for non-step status updates """ browser = Browser(use_cloud=use_cloud) + # Status callback for initialization + if on_status_update: + on_status_update('Initializing browser...') + # Create a shared map to capture element text during agent execution element_text_map = {} # Maps index -> {'text': str, 'tag': str, 'xpath': str, etc.} + # Track step count for callbacks + step_counter = {'count': 0} + # Create a custom controller that captures element mappings from browser_use import Controller class CapturingController(Controller): """Controller that captures element text mapping during execution""" - def __init__(self, selector_generator: SelectorGenerator): + def __init__( + self, + selector_generator: SelectorGenerator, + on_step_recorded: Optional[StepRecordedCallback] = None, + ): super().__init__() self.selector_generator = selector_generator + self.on_step_recorded = on_step_recorded async def act(self, action, browser_session, *args, **kwargs): # Get the selector map before action @@ -630,8 +669,118 @@ async def act(self, action, browser_session, *args, **kwargs): # Execute the actual action result = await super().act(action, browser_session, *args, **kwargs) + + # Increment step counter (always, regardless of callback) + step_counter['count'] += 1 + step_number = step_counter['count'] + + # Fire callback after successful action execution + if self.on_step_recorded: + try: + # Extract action details + action_dict = action.model_dump() if hasattr(action, 'model_dump') else {} + action_type = None + target_index = None + input_value = None + extracted_data = None + + # Determine action type and extract relevant data + for key, value in action_dict.items(): + if key in ['go_to_url', 'navigate']: + action_type = 'navigation' + break + elif key == 'click_element': + action_type = 'click' + if isinstance(value, dict): + target_index = value.get('index') + break + elif key == 'input_text': + action_type = 'input_text' + if isinstance(value, dict): + target_index = value.get('index') + input_value = value.get('text', '') + break + elif key == 'extract_page_content': + action_type = 'extract' + # Extract data from result if available + if result and hasattr(result, 'extracted_content'): + extracted_data = result.extracted_content + break + elif key == 'send_keys': + action_type = 'keypress' + if isinstance(value, dict): + input_value = value.get('keys', '') + break + elif key == 'scroll': + action_type = 'scroll' + break + + # Get current URL + current_url = '' + try: + current_url = await browser_session.get_current_url() + except Exception: + pass + + # Get target text if available from captured elements + target_text = None + selector = None + if target_index is not None and target_index in element_text_map: + element_data = element_text_map[target_index] + target_text = element_data.get('text', '') + # Use xpath or css_selector as the selector + selector = element_data.get('xpath') or element_data.get('css_selector') + + # Generate human-readable description + description = self._generate_action_description(action_type, target_text, input_value, current_url) + + # Create callback data + callback_data = { + 'step_number': step_number, + 'action_type': action_type or 'unknown', + 'description': description, + 'url': current_url, + 'selector': selector, + 'extracted_data': extracted_data, + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'target_text': target_text, + } + + # Fire the callback + self.on_step_recorded(callback_data) + + except Exception as e: + print(f'āš ļø Warning: Failed to fire step recorded callback: {e}') + return result + def _generate_action_description( + self, action_type: Optional[str], target_text: Optional[str], input_value: Optional[str], url: str + ) -> str: + """Generate a human-readable description of the action.""" + if action_type == 'navigation': + return f'Navigate to {url}' + elif action_type == 'click': + if target_text: + return f'Click on "{target_text}"' + return 'Click element' + elif action_type == 'input_text': + if target_text and input_value: + return f'Enter "{input_value}" into {target_text}' + elif input_value: + return f'Enter text: {input_value}' + return 'Input text' + elif action_type == 'extract': + return 'Extract page content' + elif action_type == 'keypress': + if input_value: + return f'Press keys: {input_value}' + return 'Press keys' + elif action_type == 'scroll': + return 'Scroll page' + else: + return f'Execute action: {action_type or "unknown"}' + # Enhance the prompt to ensure agent mentions visible text of elements in a structured format enhanced_prompt = f"""{prompt} @@ -650,12 +799,18 @@ async def act(self, action, browser_session, *args, **kwargs): The [ELEMENT: "..."] tag must contain the EXACT visible text of the button, label, link, or field you're interacting with. This structured format is critical for generating a reusable workflow.""" + # Status callback for agent creation + if on_status_update: + on_status_update('Creating browser agent...') + agent = Agent( task=enhanced_prompt, browser_session=browser, llm=agent_llm, page_extraction_llm=extraction_llm, - controller=CapturingController(self.selector_generator), # Pass selector_generator to controller + controller=CapturingController( + self.selector_generator, on_step_recorded=on_step_recorded + ), # Pass callbacks to controller enable_memory=False, use_vision=True, max_failures=10, @@ -665,15 +820,27 @@ async def act(self, action, browser_session, *args, **kwargs): self.captured_element_text_map = element_text_map # Run the agent to get history + if on_status_update: + on_status_update('Recording workflow steps...') + print('šŸŽ¬ Starting agent with element capture enabled...') history = await agent.run() print(f'āœ… Agent completed. Captured {len(element_text_map)} element mappings total.') + if on_status_update: + on_status_update(f'Completed recording {step_counter["count"]} steps') + # Store the history so it can be accessed externally (for result caching) self._agent_history = history # Create workflow definition from the history # Route to deterministic or LLM-based conversion based on flag + if on_status_update: + if self.use_deterministic_conversion: + on_status_update('Converting steps to workflow (deterministic)...') + else: + on_status_update('Analyzing workflow with AI...') + if self.use_deterministic_conversion: # Pass the captured element map to the deterministic converter self.deterministic_converter.captured_element_text_map = element_text_map @@ -692,6 +859,9 @@ async def act(self, action, browser_session, *args, **kwargs): if not self.validator: self.validator = WorkflowValidator(llm=extraction_llm) + if on_status_update: + on_status_update('Validating workflow with AI...') + print('\nšŸ” Running AI validation on generated workflow...') try: validation_result = await self.validator.validate_workflow(workflow=workflow_definition, original_task=prompt) @@ -713,6 +883,12 @@ async def act(self, action, browser_session, *args, **kwargs): print('Continuing with original workflow...') # Post-process: Apply variable identification and YAML cleanup + if on_status_update: + on_status_update('Post-processing workflow (variable identification & cleanup)...') + workflow_definition = self._post_process_workflow(workflow_definition) + if on_status_update: + on_status_update('Workflow generation complete!') + return workflow_definition