diff --git a/workflows/cli.py b/workflows/cli.py index 2cb5419..4947085 100644 --- a/workflows/cli.py +++ b/workflows/cli.py @@ -1223,58 +1223,81 @@ async def _run_workflow(): input_definitions = workflow_obj.inputs_def # Access inputs_def from the Workflow instance if input_definitions: # Check if the list is not empty - typer.echo() # Add space - typer.echo(typer.style('Provide values for the following workflow inputs:', bold=True)) - typer.echo() # Add space - - for input_def in input_definitions: - var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) - prompt_question = typer.style(f'Enter value for {var_name_styled}', bold=True) - - var_type = input_def.type.lower() # type is a direct attribute - is_required = input_def.required - default_value = getattr(input_def, 'default', None) - - type_info_str = f'type: {var_type}' - if is_required: - status_str = typer.style('required', fg=typer.colors.RED) - else: - status_str = typer.style('optional', fg=typer.colors.YELLOW) - - # Add format information if available - format_info_str = '' - if hasattr(input_def, 'format') and input_def.format: - format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' - - # Add default value information if available - default_info_str = '' - if default_value is not None: - default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' - - full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' - - input_val = None - if var_type == 'bool': - input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) - elif var_type == 'number': - input_val = typer.prompt( - full_prompt_text, type=float, default=default_value if default_value is not None else ... - ) - elif var_type == 'string': # Default to string for other unknown types as well - input_val = typer.prompt( - full_prompt_text, type=str, default=default_value if default_value is not None else ... - ) - else: # Should ideally not happen if schema is validated, but good to have a fallback - typer.secho( - f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", - fg=typer.colors.YELLOW, - ) - input_val = typer.prompt( - full_prompt_text, type=str, default=default_value if default_value is not None else ... - ) + # Check if all REQUIRED inputs have defaults (can skip prompting) + # Note: Optional inputs can be skipped, so we only check required ones + required_inputs = [inp for inp in input_definitions if inp.required] + all_required_have_defaults = all(getattr(input_def, 'default', None) is not None for input_def in required_inputs) + + if all_required_have_defaults: + # All required inputs have defaults, use defaults automatically + typer.echo() # Add space + typer.echo(typer.style('Using default values for workflow inputs:', bold=True)) + typer.echo() # Add space + + for input_def in input_definitions: + default_value = getattr(input_def, 'default', None) + if default_value is not None: + inputs[input_def.name] = default_value + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + typer.echo(f' • {var_name_styled} = {typer.style(str(default_value), fg=typer.colors.BLUE)}') + elif not input_def.required: + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + typer.echo(f' • {var_name_styled} = {typer.style("(not provided, optional)", fg=typer.colors.YELLOW)}') + typer.echo() + else: + # Some inputs need user input + typer.echo() # Add space + typer.echo(typer.style('Provide values for the following workflow inputs:', bold=True)) + typer.echo() # Add space + + for input_def in input_definitions: + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + prompt_question = typer.style(f'Enter value for {var_name_styled}', bold=True) + + var_type = input_def.type.lower() # type is a direct attribute + is_required = input_def.required + default_value = getattr(input_def, 'default', None) + + type_info_str = f'type: {var_type}' + if is_required: + status_str = typer.style('required', fg=typer.colors.RED) + else: + status_str = typer.style('optional', fg=typer.colors.YELLOW) + + # Add format information if available + format_info_str = '' + if hasattr(input_def, 'format') and input_def.format: + format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' + + # Add default value information if available + default_info_str = '' + if default_value is not None: + default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' + + full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' + + input_val = None + if var_type == 'bool': + input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) + elif var_type == 'number': + input_val = typer.prompt( + full_prompt_text, type=float, default=default_value if default_value is not None else ... + ) + elif var_type == 'string': # Default to string for other unknown types as well + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) + else: # Should ideally not happen if schema is validated, but good to have a fallback + typer.secho( + f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", + fg=typer.colors.YELLOW, + ) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) - inputs[input_def.name] = input_val - typer.echo() # Add space after each prompt + inputs[input_def.name] = input_val + typer.echo() # Add space after each prompt else: typer.echo('No input schema found in the workflow, or no properties defined. Proceeding without inputs.') @@ -1371,58 +1394,81 @@ async def _run_workflow_no_ai(): input_definitions = workflow_obj.inputs_def # Access inputs_def from the Workflow instance if input_definitions: # Check if the list is not empty - typer.echo() # Add space - typer.echo(typer.style('Provide values for the following workflow inputs:', bold=True)) - typer.echo() # Add space - - for input_def in input_definitions: - var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) - prompt_question = typer.style(f'Enter value for {var_name_styled}', bold=True) - - var_type = input_def.type.lower() # type is a direct attribute - is_required = input_def.required - default_value = getattr(input_def, 'default', None) - - type_info_str = f'type: {var_type}' - if is_required: - status_str = typer.style('required', fg=typer.colors.RED) - else: - status_str = typer.style('optional', fg=typer.colors.YELLOW) - - # Add format information if available - format_info_str = '' - if hasattr(input_def, 'format') and input_def.format: - format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' - - # Add default value information if available - default_info_str = '' - if default_value is not None: - default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' - - full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' - - input_val = None - if var_type == 'bool': - input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) - elif var_type == 'number': - input_val = typer.prompt( - full_prompt_text, type=float, default=default_value if default_value is not None else ... - ) - elif var_type == 'string': # Default to string for other unknown types as well - input_val = typer.prompt( - full_prompt_text, type=str, default=default_value if default_value is not None else ... - ) - else: # Should ideally not happen if schema is validated, but good to have a fallback - typer.secho( - f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", - fg=typer.colors.YELLOW, - ) - input_val = typer.prompt( - full_prompt_text, type=str, default=default_value if default_value is not None else ... - ) + # Check if all REQUIRED inputs have defaults (can skip prompting) + # Note: Optional inputs can be skipped, so we only check required ones + required_inputs = [inp for inp in input_definitions if inp.required] + all_required_have_defaults = all(getattr(input_def, 'default', None) is not None for input_def in required_inputs) + + if all_required_have_defaults: + # All required inputs have defaults, use defaults automatically + typer.echo() # Add space + typer.echo(typer.style('Using default values for workflow inputs:', bold=True)) + typer.echo() # Add space + + for input_def in input_definitions: + default_value = getattr(input_def, 'default', None) + if default_value is not None: + inputs[input_def.name] = default_value + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + typer.echo(f' • {var_name_styled} = {typer.style(str(default_value), fg=typer.colors.BLUE)}') + elif not input_def.required: + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + typer.echo(f' • {var_name_styled} = {typer.style("(not provided, optional)", fg=typer.colors.YELLOW)}') + typer.echo() + else: + # Some inputs need user input + typer.echo() # Add space + typer.echo(typer.style('Provide values for the following workflow inputs:', bold=True)) + typer.echo() # Add space + + for input_def in input_definitions: + var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True) + prompt_question = typer.style(f'Enter value for {var_name_styled}', bold=True) + + var_type = input_def.type.lower() # type is a direct attribute + is_required = input_def.required + default_value = getattr(input_def, 'default', None) + + type_info_str = f'type: {var_type}' + if is_required: + status_str = typer.style('required', fg=typer.colors.RED) + else: + status_str = typer.style('optional', fg=typer.colors.YELLOW) + + # Add format information if available + format_info_str = '' + if hasattr(input_def, 'format') and input_def.format: + format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' + + # Add default value information if available + default_info_str = '' + if default_value is not None: + default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' + + full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' + + input_val = None + if var_type == 'bool': + input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) + elif var_type == 'number': + input_val = typer.prompt( + full_prompt_text, type=float, default=default_value if default_value is not None else ... + ) + elif var_type == 'string': # Default to string for other unknown types as well + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) + else: # Should ideally not happen if schema is validated, but good to have a fallback + typer.secho( + f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", + fg=typer.colors.YELLOW, + ) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) - inputs[input_def.name] = input_val - typer.echo() # Add space after each prompt + inputs[input_def.name] = input_val + typer.echo() # Add space after each prompt else: typer.echo('No input schema found in the workflow, or no properties defined. Proceeding without inputs.') @@ -2460,24 +2506,67 @@ def run_stored_workflow( else: typer.echo(str(result)) else: - # Has inputs but no prompt - need to collect them - typer.secho('This workflow requires input parameters:', fg=typer.colors.YELLOW) - typer.echo() - for inp in workflow_definition.input_schema: - required = ( - typer.style('required', fg=typer.colors.RED) - if inp.required - else typer.style('optional', fg=typer.colors.YELLOW) - ) - default_value = getattr(inp, 'default', None) - default_str = ( - f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' if default_value is not None else '' + # Check if all REQUIRED inputs have defaults (can run without user input) + # Note: Optional inputs can be skipped, so we only check required ones + required_inputs = [inp for inp in workflow_definition.input_schema if inp.required] + all_required_have_defaults = all(getattr(inp, 'default', None) is not None for inp in required_inputs) + + if all_required_have_defaults: + # All required inputs have defaults, run with defaults + typer.secho('▶️ Running workflow with default values...', fg=typer.colors.CYAN) + typer.echo() + typer.echo('Using default values:') + for inp in workflow_definition.input_schema: + default_value = getattr(inp, 'default', None) + if default_value is not None: + typer.echo(f' • {inp.name} = {typer.style(str(default_value), fg=typer.colors.BLUE)}') + elif not inp.required: + typer.echo(f' • {inp.name} = {typer.style("(not provided, optional)", fg=typer.colors.YELLOW)}') + typer.echo() + + # Build inputs dict with defaults (only include values that have defaults) + inputs = {} + for inp in workflow_definition.input_schema: + default_value = getattr(inp, 'default', None) + if default_value is not None: + inputs[inp.name] = default_value + # Optional parameters without defaults are simply not included + + workflow = Workflow.load_from_file( + temp_file, llm_instance, page_extraction_llm=page_extraction_llm, use_cloud=use_cloud ) - typer.echo(f' • {inp.name} ({inp.type}, {required}{default_str})') - typer.echo() - typer.echo('Options:') - typer.echo(f' 1. Run as tool: python cli.py run-stored-workflow {workflow_id} --prompt "Your task"') - typer.echo(f' 2. Run with inputs: python cli.py run-workflow {metadata.file_path if metadata else temp_file}') + result = asyncio.run(workflow.run(inputs=inputs)) + + typer.secho('✅ Workflow completed!', fg=typer.colors.GREEN, bold=True) + typer.echo() + if result: + typer.echo('Result:') + # Extract the actual data from step results + if hasattr(result, 'step_results'): + for i, step_result in enumerate(result.step_results, 1): + if hasattr(step_result, 'extracted_content') and step_result.extracted_content: + typer.echo(f'Step {i}: {step_result.extracted_content}') + else: + typer.echo(str(result)) + else: + # Has required inputs without defaults - need to collect them + typer.secho('This workflow requires input parameters:', fg=typer.colors.YELLOW) + typer.echo() + for inp in workflow_definition.input_schema: + required = ( + typer.style('required', fg=typer.colors.RED) + if inp.required + else typer.style('optional', fg=typer.colors.YELLOW) + ) + default_value = getattr(inp, 'default', None) + default_str = ( + f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' if default_value is not None else '' + ) + typer.echo(f' • {inp.name} ({inp.type}, {required}{default_str})') + typer.echo() + typer.echo('Options:') + typer.echo(f' 1. Run as tool: python cli.py run-stored-workflow {workflow_id} --prompt "Your task"') + typer.echo(f' 2. Run with inputs: python cli.py run-workflow {metadata.file_path if metadata else temp_file}') finally: # Cleanup temp file diff --git a/workflows/test_max_alternatives_bug_fix.py b/workflows/test_max_alternatives_bug_fix.py new file mode 100644 index 0000000..a6436dc --- /dev/null +++ b/workflows/test_max_alternatives_bug_fix.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Test script to verify the max_alternatives bug fix. + +Bug: When max_alternatives=1, the function was returning 2 XPaths instead of 1. +Fix: Now correctly returns only the absolute xpath when max_alternatives <= 1. +""" + +from workflow_use.healing.xpath_optimizer import XPathOptimizer + +optimizer = XPathOptimizer() + +absolute_xpath = '/html/body/form/div[3]/table/tbody/tr[2]/td[3]/a' +element_info = { + 'tag': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link', 'href': '/license/12345'}, +} + +print('=' * 80) +print('Testing max_alternatives Bug Fix') +print('=' * 80) + +# Test max_alternatives = 1 +print('\n📋 Test 1: max_alternatives=1') +print(f' Input: {absolute_xpath}') +result_1 = optimizer.optimize_xpath(absolute_xpath, element_info, max_alternatives=1) +print(' Expected: 1 XPath (only absolute)') +print(f' Got: {len(result_1)} XPath(s)') +print(' XPaths:') +for i, xpath in enumerate(result_1, 1): + print(f' {i}. {xpath}') + +# Validate: Should be exactly 1 XPath and it should be the absolute one +test_1_pass = len(result_1) == 1 and result_1[0] == absolute_xpath +if test_1_pass: + print(' ✅ PASS: Exactly 1 XPath returned (absolute)') +else: + print(f' ❌ FAIL: Expected 1 absolute XPath, got {len(result_1)}') + if len(result_1) > 0 and result_1[0] != absolute_xpath: + print(' ❌ FAIL: First XPath is not the absolute one!') + +# Test max_alternatives = 2 +print('\n📋 Test 2: max_alternatives=2') +print(f' Input: {absolute_xpath}') +result_2 = optimizer.optimize_xpath(absolute_xpath, element_info, max_alternatives=2) +print(' Expected: 2 XPaths (1 optimized + 1 absolute)') +print(f' Got: {len(result_2)} XPath(s)') +print(' XPaths:') +for i, xpath in enumerate(result_2, 1): + is_absolute = xpath == absolute_xpath + print(f' {i}. {xpath[:60]}{"..." if len(xpath) > 60 else ""} {"(absolute)" if is_absolute else "(optimized)"}') + +# Validate: Should be 2 XPaths, last one should be absolute, first should be optimized +test_2_pass = ( + len(result_2) == 2 + and result_2[-1] == absolute_xpath # Last is absolute + and result_2[0] != absolute_xpath # First is optimized (different from absolute) +) +if test_2_pass: + print(' ✅ PASS: Exactly 2 XPaths (1 optimized + 1 absolute)') +else: + print(f' ❌ FAIL: Expected 2 XPaths (1 optimized + 1 absolute), got {len(result_2)}') + if len(result_2) >= 2 and result_2[-1] != absolute_xpath: + print(' ❌ FAIL: Last XPath is not the absolute fallback!') + if len(result_2) >= 1 and result_2[0] == absolute_xpath: + print(' ❌ FAIL: First XPath should be optimized, not absolute!') + +# Test max_alternatives = 3 +print('\n📋 Test 3: max_alternatives=3') +print(f' Input: {absolute_xpath}') +result_3 = optimizer.optimize_xpath(absolute_xpath, element_info, max_alternatives=3) +print(' Expected: 3 XPaths (2 optimized + 1 absolute)') +print(f' Got: {len(result_3)} XPath(s)') +print(' XPaths:') +for i, xpath in enumerate(result_3, 1): + is_absolute = xpath == absolute_xpath + print(f' {i}. {xpath[:60]}{"..." if len(xpath) > 60 else ""} {"(absolute)" if is_absolute else "(optimized)"}') + +# Validate: Should be 3 XPaths, last one absolute, first two optimized +optimized_count = sum(1 for xpath in result_3[:-1] if xpath != absolute_xpath) +test_3_pass = ( + len(result_3) == 3 + and result_3[-1] == absolute_xpath # Last is absolute + and optimized_count >= 1 # At least 1 optimized (ideally 2) + and all(xpath != absolute_xpath for xpath in result_3[:-1]) # All except last are different from absolute +) +if test_3_pass: + print(f' ✅ PASS: Exactly 3 XPaths ({optimized_count} optimized + 1 absolute)') +else: + print(f' ❌ FAIL: Expected 3 XPaths (2 optimized + 1 absolute), got {len(result_3)}') + if len(result_3) >= 3 and result_3[-1] != absolute_xpath: + print(' ❌ FAIL: Last XPath is not the absolute fallback!') + if len(result_3) >= 2: + non_optimized = sum(1 for xpath in result_3[:-1] if xpath == absolute_xpath) + if non_optimized > 0: + print(f' ❌ FAIL: Found {non_optimized} non-optimized XPath(s) before the final absolute!') + +# Summary +print('\n' + '=' * 80) +all_pass = test_1_pass and test_2_pass and test_3_pass +if all_pass: + print('🎉 All tests passed! Bug is fixed.') + print(' • max_alternatives=1: Returns only absolute XPath ✅') + print(' • max_alternatives=2: Returns 1 optimized + 1 absolute ✅') + print(' • max_alternatives=3: Returns 2 optimized + 1 absolute ✅') +else: + print('❌ Some tests failed. Bug still exists.') + if not test_1_pass: + print(' • Test 1 (max_alternatives=1) FAILED') + if not test_2_pass: + print(' • Test 2 (max_alternatives=2) FAILED') + if not test_3_pass: + print(' • Test 3 (max_alternatives=3) FAILED') +print('=' * 80) diff --git a/workflows/tests/test_selector_generator.py b/workflows/tests/test_selector_generator.py index 5ab88e1..95e858c 100644 --- a/workflows/tests/test_selector_generator.py +++ b/workflows/tests/test_selector_generator.py @@ -41,9 +41,9 @@ def test_button_with_text_generates_semantic_strategies(self): assert role_text.metadata.get('role') == 'button', f"Expected role 'button', got {role_text.metadata.get('role')}" assert role_text.priority == 2, 'role_text should have priority 2' - # Test 2: No CSS/xpath/id strategies - def test_no_css_xpath_id_strategies(self): - """Test that NO CSS, xpath, or id strategies are generated (semantic-only)""" + # Test 2: No CSS/id strategies (but xpath is allowed as fallback) + def test_no_css_id_strategies(self): + """Test that NO CSS or id strategies are generated (semantic-first with xpath fallback)""" element_data = { 'tag_name': 'button', 'text': 'Click Me', @@ -56,13 +56,15 @@ def test_no_css_xpath_id_strategies(self): strategies = self.generator.generate_strategies(element_data) - # Verify NO CSS/xpath/id strategies + # Verify NO CSS/id strategies (semantic-first approach) strategy_types = [s.type for s in strategies] - assert 'id' not in strategy_types, 'Should NOT generate id strategy (semantic-only)' - assert 'css' not in strategy_types, 'Should NOT generate CSS strategy (semantic-only)' - assert 'xpath' not in strategy_types, 'Should NOT generate xpath strategy (semantic-only)' - assert 'css_selector' not in strategy_types, 'Should NOT generate css_selector strategy (semantic-only)' - assert 'css_attr' not in strategy_types, 'Should NOT generate css_attr strategy (semantic-only)' + assert 'id' not in strategy_types, 'Should NOT generate id strategy (semantic-first)' + assert 'css' not in strategy_types, 'Should NOT generate CSS strategy (semantic-first)' + assert 'css_selector' not in strategy_types, 'Should NOT generate css_selector strategy (semantic-first)' + assert 'css_attr' not in strategy_types, 'Should NOT generate css_attr strategy (semantic-first)' + + # XPath is allowed as a fallback strategy (this is the improvement!) + # It should be low priority (tested in test_xpath_optimization.py) # Test 3: ARIA label strategy def test_aria_label_strategy(self): diff --git a/workflows/tests/test_xpath_optimization.py b/workflows/tests/test_xpath_optimization.py new file mode 100644 index 0000000..5aa693b --- /dev/null +++ b/workflows/tests/test_xpath_optimization.py @@ -0,0 +1,449 @@ +""" +Unit tests for XPath optimization in SelectorGenerator. + +Tests that SelectorGenerator with xpath optimization enabled generates +multiple robust XPath alternatives instead of a single brittle absolute XPath. +""" + +from workflow_use.healing.selector_generator import SelectorGenerator +from workflow_use.healing.xpath_optimizer import XPathOptimizer + + +class TestXPathOptimization: + """Test XPath optimization in SelectorGenerator""" + + def setup_method(self): + """Setup test fixture""" + # NOTE: max_total_strategies=None means no global limit (for testing XPath optimization specifically) + self.generator_with_optimization = SelectorGenerator(enable_xpath_optimization=True, max_total_strategies=None) + self.generator_without_optimization = SelectorGenerator(enable_xpath_optimization=False, max_total_strategies=None) + self.optimizer = XPathOptimizer() + + # ============================================================================ + # Test 1: XPath Optimization Enabled - Limited XPath Strategies + # ============================================================================ + def test_xpath_optimization_generates_limited_strategies(self): + """Test that XPath optimization generates limited XPath strategies (max 2)""" + # Absolute XPath for a table link (brittle) + absolute_xpath = '/html/body/form/div[3]/div/div/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link', 'href': '/license/12345'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Filter to XPath strategies only + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + + # Should generate exactly 2 XPath alternatives (1 optimized + 1 absolute) + assert len(xpath_strategies) == 2, f'Expected exactly 2 XPath strategies, got {len(xpath_strategies)}' + + # Should include at least one optimized strategy + optimized_strategies = [s for s in xpath_strategies if s.metadata.get('optimized')] + assert len(optimized_strategies) > 0, 'Should have at least one optimized XPath strategy' + + # Last one should be the absolute xpath + assert xpath_strategies[-1].value == absolute_xpath, 'Last XPath should be absolute fallback' + + # ============================================================================ + # Test 2: XPath Optimization Disabled - Single XPath Strategy + # ============================================================================ + def test_xpath_optimization_disabled_generates_single_strategy(self): + """Test that without optimization, only one XPath strategy is generated""" + absolute_xpath = '/html/body/form/div[3]/div/div/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_without_optimization.generate_strategies(element_data) + + # Filter to XPath strategies only + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + + # Should generate only one XPath strategy (fallback only) + assert len(xpath_strategies) == 1, f'Expected 1 XPath strategy, got {len(xpath_strategies)}' + + # ============================================================================ + # Test 3: Priority Calculation - ID-based XPath + # ============================================================================ + def test_priority_calculation_id_based(self): + """Test that ID-based XPath gets high priority (low number)""" + xpath = "//button[@id='submit-btn']" + priority = self.generator_with_optimization._calculate_xpath_priority(xpath) + + assert priority <= 2, f'ID-based XPath should have priority <= 2, got {priority}' + + # ============================================================================ + # Test 4: Priority Calculation - Table-anchored XPath + # ============================================================================ + def test_priority_calculation_table_anchored(self): + """Test that table-anchored XPath gets medium-high priority""" + xpath = '//table//tr[2]/td[3]/a' + priority = self.generator_with_optimization._calculate_xpath_priority(xpath) + + assert priority == 4, f'Table-anchored XPath should have priority 4, got {priority}' + + # ============================================================================ + # Test 5: Priority Calculation - Absolute XPath + # ============================================================================ + def test_priority_calculation_absolute_xpath(self): + """Test that absolute XPath gets lowest priority (highest number)""" + xpath = '/html/body/form/div[3]/div/div/table/tbody/tr[2]/td[3]/a' + priority = self.generator_with_optimization._calculate_xpath_priority(xpath, is_absolute=True) + + assert priority >= 8, f'Absolute XPath should have priority >= 8, got {priority}' + + # ============================================================================ + # Test 6: Strategy Determination - Table-anchored + # ============================================================================ + def test_strategy_determination_table_anchored(self): + """Test that table-anchored XPath is correctly identified""" + xpath = '//table//tr[2]/td[3]/a' + strategy = self.generator_with_optimization._determine_xpath_strategy(xpath) + + assert strategy == 'table-anchored', f"Expected 'table-anchored', got '{strategy}'" + + # ============================================================================ + # Test 7: Strategy Determination - ID-based + # ============================================================================ + def test_strategy_determination_id_based(self): + """Test that ID-based XPath is correctly identified""" + xpath = "//button[@id='submit']" + strategy = self.generator_with_optimization._determine_xpath_strategy(xpath) + + assert strategy == 'id-based', f"Expected 'id-based', got '{strategy}'" + + # ============================================================================ + # Test 8: Strategy Determination - Absolute Fallback + # ============================================================================ + def test_strategy_determination_absolute_fallback(self): + """Test that absolute XPath is correctly identified""" + xpath = '/html/body/div/form/button' + strategy = self.generator_with_optimization._determine_xpath_strategy(xpath) + + assert strategy == 'absolute-fallback', f"Expected 'absolute-fallback', got '{strategy}'" + + # ============================================================================ + # Test 9: Strategy Determination - Text-based + # ============================================================================ + def test_strategy_determination_text_contains(self): + """Test that text-based XPath is correctly identified""" + xpath = "//a[contains(text(), 'Click here')]" + strategy = self.generator_with_optimization._determine_xpath_strategy(xpath) + + assert strategy == 'text-contains', f"Expected 'text-contains', got '{strategy}'" + + # ============================================================================ + # Test 10: Integration - Full Element with Table Context + # ============================================================================ + def test_integration_table_element_optimization(self): + """Test full integration: table element with optimization (limited to 2 XPaths)""" + absolute_xpath = '/html/body/div[1]/div[2]/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link', 'href': '/license/12345'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Should have strategies including: + # - Semantic strategies (text_exact, role_text, etc.) + # - Limited XPath strategies (2 max) + + strategy_types = [s.type for s in strategies] + + # Should have semantic strategies + assert 'text_exact' in strategy_types, 'Should have text_exact strategy' + + # Should have exactly 2 XPath strategies + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + assert len(xpath_strategies) == 2, f'Should have exactly 2 XPath strategies, got {len(xpath_strategies)}' + + # First should be optimized (table-anchored or similar) + first_xpath = xpath_strategies[0] + assert first_xpath.metadata.get('optimized'), 'First XPath should be optimized' + + # Last should be absolute fallback + last_xpath = xpath_strategies[-1] + assert last_xpath.value == absolute_xpath, 'Last XPath should be absolute fallback' + assert last_xpath.priority >= first_xpath.priority, 'Absolute should have lower priority (higher number)' + + # ============================================================================ + # Test 11: Integration - Form Input Element + # ============================================================================ + def test_integration_form_input_optimization(self): + """Test full integration: form input element with optimization (limited to 2 XPaths)""" + absolute_xpath = '/html/body/div[5]/form/div[3]/div[1]/input' + + element_data = { + 'tag_name': 'input', + 'text': '', + 'attributes': {'name': 'email', 'type': 'email', 'placeholder': 'Enter your email'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Should have placeholder and XPath strategies + strategy_types = [s.type for s in strategies] + assert 'placeholder' in strategy_types, 'Should have placeholder strategy' + + # Should have exactly 2 XPath strategies + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + assert len(xpath_strategies) == 2, f'Should have exactly 2 XPath strategies, got {len(xpath_strategies)}' + + # At least one should be optimized + optimized = [s for s in xpath_strategies if s.metadata.get('optimized')] + assert len(optimized) > 0, 'Should have optimized XPath strategies' + + # ============================================================================ + # Test 12: XPath Optimization with Data Attributes + # ============================================================================ + def test_xpath_optimization_data_attributes(self): + """Test that data attributes are prioritized in optimized XPaths (limited to 2)""" + absolute_xpath = '/html/body/div[1]/div[2]/div[3]/button' + + element_data = { + 'tag_name': 'button', + 'text': 'Submit', + 'attributes': {'data-testid': 'submit-button', 'class': 'btn btn-primary'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Should have exactly 2 XPath strategies + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + assert len(xpath_strategies) == 2, f'Should have exactly 2 XPath strategies, got {len(xpath_strategies)}' + + # First should be optimized (likely data-attribute based) + first_xpath = xpath_strategies[0] + assert first_xpath.metadata.get('optimized'), 'First XPath should be optimized' + + # Data attribute XPath should have high priority + assert first_xpath.priority <= 3, f'First XPath should have high priority, got {first_xpath.priority}' + + # ============================================================================ + # Test 13: XPath Optimization without Absolute XPath + # ============================================================================ + def test_xpath_optimization_without_absolute_xpath(self): + """Test that optimization falls back gracefully when no absolute XPath is provided""" + element_data = { + 'tag_name': 'button', + 'text': 'Submit', + 'attributes': {'class': 'btn'}, + # No xpath provided + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Should still generate strategies (semantic + fallback XPath) + assert len(strategies) > 0, 'Should generate strategies even without absolute XPath' + + # Should have at least text-based strategies + strategy_types = [s.type for s in strategies] + assert 'text_exact' in strategy_types, 'Should have text_exact strategy' + + # ============================================================================ + # Test 14: XPath Strategy Metadata + # ============================================================================ + def test_xpath_strategy_metadata(self): + """Test that XPath strategies include proper metadata""" + absolute_xpath = '/html/body/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'Link', + 'attributes': {'href': '/page'}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + + for xpath_strategy in xpath_strategies: + # Should have metadata + assert xpath_strategy.metadata is not None, 'XPath strategy should have metadata' + + # Should have tag + assert 'tag' in xpath_strategy.metadata, 'XPath metadata should include tag' + + # Optimized strategies should have strategy name + if xpath_strategy.metadata.get('optimized'): + assert 'strategy' in xpath_strategy.metadata, 'Optimized XPath should have strategy name' + + # ============================================================================ + # Test 15: Priority Calculation - ARIA Attributes + # ============================================================================ + def test_priority_calculation_aria_attributes(self): + """Test that ARIA attribute-based XPath gets high priority""" + xpath = "//button[@aria-label='Submit form']" + priority = self.generator_with_optimization._calculate_xpath_priority(xpath) + + assert priority <= 3, f'ARIA-based XPath should have priority <= 3, got {priority}' + + # ============================================================================ + # Test 16: XPath Optimizer Integration + # ============================================================================ + def test_xpath_optimizer_called_correctly(self): + """Test that XPathOptimizer is called with correct parameters""" + absolute_xpath = '/html/body/div[1]/form/button' + + element_data = { + 'tag_name': 'button', + 'text': 'Submit', + 'attributes': {'id': 'submit-btn'}, + 'xpath': absolute_xpath, + } + + # Generate strategies (this should call XPathOptimizer internally) + strategies = self.generator_with_optimization.generate_strategies(element_data) + + # Verify that we got optimized XPath strategies + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + optimized_strategies = [s for s in xpath_strategies if s.metadata.get('optimized')] + + assert len(optimized_strategies) > 0, 'Should have optimized XPath strategies from optimizer' + + # ============================================================================ + # Test 17: Comparison - With vs Without Optimization + # ============================================================================ + def test_comparison_with_without_optimization(self): + """Compare strategy generation with and without optimization (limited to 2)""" + absolute_xpath = '/html/body/div[1]/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'Link', + 'attributes': {'class': 'link'}, + 'xpath': absolute_xpath, + } + + strategies_with = self.generator_with_optimization.generate_strategies(element_data) + strategies_without = self.generator_without_optimization.generate_strategies(element_data) + + xpath_with = [s for s in strategies_with if s.type == 'xpath'] + xpath_without = [s for s in strategies_without if s.type == 'xpath'] + + # With optimization should have 2 XPath strategies + assert len(xpath_with) == 2, f'With optimization should have 2 XPaths, got {len(xpath_with)}' + + # Without optimization should have only 1 XPath (the fallback) + assert len(xpath_without) == 1, f'Without optimization should have 1 XPath, got {len(xpath_without)}' + + # ============================================================================ + # Test 18: No Duplicate XPath Strategies + # ============================================================================ + def test_no_duplicate_xpath_strategies(self): + """Test that no duplicate XPath strategies are generated""" + absolute_xpath = '/html/body/div/a' + + element_data = { + 'tag_name': 'a', + 'text': 'Link', + 'attributes': {}, + 'xpath': absolute_xpath, + } + + strategies = self.generator_with_optimization.generate_strategies(element_data) + + xpath_strategies = [s for s in strategies if s.type == 'xpath'] + xpath_values = [s.value for s in xpath_strategies] + + # Check for duplicates + unique_values = set(xpath_values) + assert len(xpath_values) == len(unique_values), f'Found duplicate XPath strategies: {xpath_values}' + + # ============================================================================ + # Test 19: Max Alternatives = 1 (Only Absolute XPath) + # ============================================================================ + def test_max_alternatives_one(self): + """Test that max_alternatives=1 returns only the absolute xpath (no optimized alternatives)""" + absolute_xpath = '/html/body/form/div[3]/table/tbody/tr[2]/td[3]/a' + + element_info = { + 'tag': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link', 'href': '/license/12345'}, + } + + # Request only 1 alternative + result = self.optimizer.optimize_xpath(absolute_xpath, element_info, max_alternatives=1) + + # Should return exactly 1 XPath (the absolute one only) + assert len(result) == 1, f'Expected exactly 1 XPath with max_alternatives=1, got {len(result)}' + + # Should be the absolute xpath + assert result[0] == absolute_xpath, f'Expected absolute xpath, got {result[0]}' + + # ============================================================================ + # Test 20: Total Strategy Limit + # ============================================================================ + def test_total_strategy_limit(self): + """Test that total strategies are limited to max_total_strategies""" + generator_limited = SelectorGenerator(enable_xpath_optimization=True, max_xpath_alternatives=2, max_total_strategies=2) + + absolute_xpath = '/html/body/form/div[3]/div/div/table/tbody/tr[2]/td[3]/a' + + element_data = { + 'tag_name': 'a', + 'text': 'License 12345', + 'attributes': {'class': 'license-link', 'href': '/license/12345'}, + 'xpath': absolute_xpath, + } + + strategies = generator_limited.generate_strategies(element_data) + + # Should have exactly 2 total strategies (not just 2 XPath) + assert len(strategies) == 2, f'Expected exactly 2 total strategies, got {len(strategies)}' + + # Should prioritize highest priority strategies + priorities = [s.priority for s in strategies] + assert priorities[0] <= priorities[1], 'Strategies should be sorted by priority' + + +if __name__ == '__main__': + # Run all tests + test = TestXPathOptimization() + test_methods = [m for m in dir(test) if m.startswith('test_')] + + print(f'\n{"=" * 80}') + print(f'Running {len(test_methods)} tests for XPath Optimization') + print(f'{"=" * 80}\n') + + passed = 0 + failed = 0 + + for method_name in test_methods: + test.setup_method() # Setup for each test + method = getattr(test, method_name) + try: + method() + print(f'✅ PASS: {method_name}') + passed += 1 + except AssertionError as e: + print(f'❌ FAIL: {method_name}') + print(f' {str(e)}') + failed += 1 + except Exception as e: + print(f'❌ ERROR: {method_name}') + print(f' {type(e).__name__}: {str(e)}') + failed += 1 + + print(f'\n{"=" * 80}') + print(f'Test Results: {passed} passed, {failed} failed') + print(f'{"=" * 80}\n') + + exit(0 if failed == 0 else 1) diff --git a/workflows/workflow_use/healing/selector_generator.py b/workflows/workflow_use/healing/selector_generator.py index aa2af6b..a12e787 100644 --- a/workflows/workflow_use/healing/selector_generator.py +++ b/workflows/workflow_use/healing/selector_generator.py @@ -9,6 +9,8 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Optional +from workflow_use.healing.xpath_optimizer import XPathOptimizer + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -62,6 +64,23 @@ class SelectorGenerator: 10. Direct CSS/xpath (fallback) """ + def __init__(self, enable_xpath_optimization: bool = True, max_xpath_alternatives: int = 2, max_total_strategies: int = 2): + """ + Initialize the SelectorGenerator. + + Args: + enable_xpath_optimization: If True, use XPathOptimizer to generate multiple + robust XPath alternatives instead of a single XPath fallback + max_xpath_alternatives: Maximum number of XPath alternatives to generate (default: 2) + Includes the absolute xpath fallback, so 2 means 1 optimized + 1 absolute + max_total_strategies: Maximum total number of strategies to return (default: 2) + Limits the total number of strategies across all types (semantic + xpath) + """ + self.enable_xpath_optimization = enable_xpath_optimization + self.max_xpath_alternatives = max_xpath_alternatives + self.max_total_strategies = max_total_strategies + self.xpath_optimizer = XPathOptimizer() if enable_xpath_optimization else None + def generate_strategies(self, element_data: Dict[str, Any], include_xpath_fallback: bool = True) -> List[SelectorStrategy]: """ Generate selector strategies from captured element data. @@ -176,22 +195,98 @@ def generate_strategies(self, element_data: Dict[str, Any], include_xpath_fallba # Strategy 8: XPath fallback (lowest priority but most powerful) if include_xpath_fallback: - # Try to use pre-computed XPath first, then generate - xpath = element_data.get('xpath') or self._generate_xpath(tag, text, attrs) - - if xpath: - strategies.append( - SelectorStrategy( - type='xpath', - value=xpath, - priority=8, - metadata={'tag': tag, 'fallback': True}, + if self.enable_xpath_optimization and self.xpath_optimizer: + # NEW: Use XPathOptimizer to generate multiple robust XPath alternatives + absolute_xpath = element_data.get('xpath', '') + + if absolute_xpath: + # Prepare element info for optimizer + element_info = { + 'tag': tag, + 'text': text, + 'attributes': attrs, + } + + # Generate optimized XPath alternatives (limited to max_xpath_alternatives) + try: + optimized_xpaths = self.xpath_optimizer.optimize_xpath( + absolute_xpath, element_info, max_alternatives=self.max_xpath_alternatives + ) + + # Add each optimized XPath with appropriate priority + # Priority starts at 3 for most robust, increases for less robust + base_priority = 3 + for i, opt_xpath in enumerate(optimized_xpaths): + # Skip if this is the absolute xpath (we'll add it last) + if opt_xpath == absolute_xpath and i < len(optimized_xpaths) - 1: + continue + + # Determine priority based on XPath characteristics + priority = self._calculate_xpath_priority(opt_xpath, is_absolute=(opt_xpath == absolute_xpath)) + + # Determine strategy type + strategy_name = self._determine_xpath_strategy(opt_xpath) + + strategies.append( + SelectorStrategy( + type='xpath', + value=opt_xpath, + priority=priority, + metadata={ + 'tag': tag, + 'strategy': strategy_name, + 'optimized': True, + 'index': i, + }, + ) + ) + except Exception as e: + logger.debug(f'XPath optimization failed, using fallback: {e}') + # Fall back to simple XPath generation + xpath = self._generate_xpath(tag, text, attrs) + if xpath: + strategies.append( + SelectorStrategy( + type='xpath', + value=xpath, + priority=8, + metadata={'tag': tag, 'fallback': True}, + ) + ) + else: + # No absolute xpath available, generate one + xpath = self._generate_xpath(tag, text, attrs) + if xpath: + strategies.append( + SelectorStrategy( + type='xpath', + value=xpath, + priority=8, + metadata={'tag': tag, 'fallback': True}, + ) + ) + else: + # XPath optimization disabled, use original behavior + xpath = element_data.get('xpath') or self._generate_xpath(tag, text, attrs) + + if xpath: + strategies.append( + SelectorStrategy( + type='xpath', + value=xpath, + priority=8, + metadata={'tag': tag, 'fallback': True}, + ) ) - ) # Sort by priority (lower number = higher priority) strategies.sort(key=lambda s: s.priority) + # Limit total number of strategies if configured + if self.max_total_strategies and len(strategies) > self.max_total_strategies: + logger.debug(f'Limiting strategies from {len(strategies)} to {self.max_total_strategies} (keeping highest priority)') + strategies = strategies[: self.max_total_strategies] + return strategies def _infer_role(self, tag: str, attrs: Dict[str, Any]) -> Optional[str]: @@ -383,6 +478,117 @@ def _escape_quotes(self, value: str) -> str: """Escape quotes in CSS selector values.""" return value.replace("'", "\\'").replace('"', '\\"') + def _calculate_xpath_priority(self, xpath: str, is_absolute: bool = False) -> int: + """ + Calculate priority score based on XPath characteristics. + + Lower priority number = higher priority (tried first) + + Args: + xpath: The XPath string to evaluate + is_absolute: Whether this is an absolute XPath + + Returns: + Priority score (1-10, lower is better) + """ + # Absolute XPath = lowest priority (fallback only) + if is_absolute or xpath.startswith('/html/body'): + return 10 + + # ID-based XPath = highest priority + if '@id=' in xpath or '@id =' in xpath: + return 2 + + # Name attribute-based = very high priority + if '@name=' in xpath or '@name =' in xpath: + return 2 + + # Data attributes = very high priority + if '@data-' in xpath: + return 2 + + # ARIA attributes = high priority + if '@aria-label=' in xpath or '@aria-labelledby=' in xpath or '@role=' in xpath: + return 3 + + # Table-anchored XPath = medium-high priority + if '//table//' in xpath and ('tr[' in xpath or 'td[' in xpath): + return 4 + + # Form-anchored XPath = medium-high priority + if '//form//' in xpath: + return 4 + + # Text-based in stable container = medium priority + if ('//table//' in xpath or '//form//' in xpath or '//nav//' in xpath) and 'text()' in xpath: + return 5 + + # Class-based = medium-low priority (classes can be dynamic) + if '@class=' in xpath or 'contains(@class' in xpath: + return 6 + + # Text-based without stable container = low priority + if 'text()' in xpath or 'contains(text()' in xpath: + return 7 + + # Positional selectors = very low priority + if '[' in xpath and ']' in xpath and xpath.count('[') > 1: + return 8 + + # Default: medium priority + return 5 + + def _determine_xpath_strategy(self, xpath: str) -> str: + """ + Determine the strategy type/name based on XPath characteristics. + + Args: + xpath: The XPath string to evaluate + + Returns: + Strategy name (e.g., 'id-based', 'table-anchored', 'text-based') + """ + if xpath.startswith('/html/body'): + return 'absolute-fallback' + + if '@id=' in xpath or '@id =' in xpath: + return 'id-based' + + if '@name=' in xpath or '@name =' in xpath: + return 'name-based' + + if '@data-' in xpath: + return 'data-attribute' + + if '@aria-label=' in xpath or '@aria-labelledby=' in xpath: + return 'aria-based' + + if '@role=' in xpath: + return 'role-based' + + if '//table//' in xpath and ('tr[' in xpath or 'td[' in xpath): + return 'table-anchored' + + if '//form//' in xpath: + return 'form-anchored' + + if '//nav//' in xpath: + return 'nav-anchored' + + if 'text()=' in xpath: + return 'text-exact' + + if 'contains(text()' in xpath: + return 'text-contains' + + if '@class=' in xpath or 'contains(@class' in xpath: + return 'class-based' + + if '[' in xpath and ']' in xpath: + return 'positional' + + return 'relative-xpath' + def generate_strategies_dict(self, element_data: Dict[str, Any]) -> List[Dict[str, Any]]: """ Generate strategies and return as list of dictionaries for JSON serialization. diff --git a/workflows/workflow_use/healing/service.py b/workflows/workflow_use/healing/service.py index 29120ab..6f876c9 100644 --- a/workflows/workflow_use/healing/service.py +++ b/workflows/workflow_use/healing/service.py @@ -36,6 +36,8 @@ def __init__( remove_descriptions: bool = True, remove_verification_checks: bool = True, remove_expected_outcomes: bool = True, + # NEW: XPath optimization options + enable_xpath_optimization: bool = True, ): self.llm = llm self.enable_variable_extraction = enable_variable_extraction @@ -52,9 +54,16 @@ def __init__( self.remove_verification_checks = remove_verification_checks self.remove_expected_outcomes = remove_expected_outcomes + # XPath optimization settings + self.enable_xpath_optimization = enable_xpath_optimization + self.variable_extractor = VariableExtractor(llm=llm) if enable_variable_extraction else None self.deterministic_converter = DeterministicWorkflowConverter(llm=llm) if use_deterministic_conversion else None - self.selector_generator = SelectorGenerator() # Initialize multi-strategy selector generator + self.selector_generator = SelectorGenerator( + enable_xpath_optimization=enable_xpath_optimization, + max_xpath_alternatives=2, # Limit to 2 XPath alternatives (1 optimized + 1 absolute fallback) + max_total_strategies=2, # Limit to 2 total strategies (semantic + xpath combined) + ) # Initialize multi-strategy selector generator # Note: validator will be initialized with extraction_llm in generate_workflow_from_prompt self.validator = None @@ -79,7 +88,6 @@ def _post_process_workflow(self, workflow_definition: WorkflowDefinitionSchema) print(f' Confidence threshold: {self.pattern_variable_confidence}') # Import the identifier directly to avoid package issues - import sys import importlib.util from pathlib import Path diff --git a/workflows/workflow_use/healing/xpath_optimizer.py b/workflows/workflow_use/healing/xpath_optimizer.py index d630b86..1b10282 100644 --- a/workflows/workflow_use/healing/xpath_optimizer.py +++ b/workflows/workflow_use/healing/xpath_optimizer.py @@ -70,29 +70,41 @@ class XPathOptimizer: 4. Include fallback strategies """ - def optimize_xpath(self, absolute_xpath: str, element_info: Optional[Dict] = None) -> List[str]: + def optimize_xpath(self, absolute_xpath: str, element_info: Optional[Dict] = None, max_alternatives: int = 2) -> List[str]: """ Generate optimized XPath alternatives from an absolute XPath. Args: absolute_xpath: Full XPath like /html/body/div[1]/div[2]/table/tbody/tr[3]/td[2]/a element_info: Optional dict with element details (tag, text, attributes, etc.) + max_alternatives: Maximum number of alternatives to return (default: 2) + - If max_alternatives <= 1: Returns ONLY the absolute xpath (no optimized alternatives) + - If max_alternatives >= 2: Returns best N-1 optimized alternatives + absolute xpath fallback Returns: - List of XPath alternatives, ordered from most to least robust + List of XPath alternatives (exactly max_alternatives), ordered from most to least robust - Example: + Examples: >>> optimizer = XPathOptimizer() + + >>> # max_alternatives=1 returns only absolute xpath + >>> xpaths = optimizer.optimize_xpath(xpath, element_info, max_alternatives=1) + >>> # Returns: [original_xpath] + + >>> # max_alternatives=2 returns 1 optimized + 1 absolute >>> xpaths = optimizer.optimize_xpath( ... '/html/body/form/div[3]/table/tbody/tr[2]/td[3]/a', ... {'tag': 'a', 'text': '12345', 'attributes': {'class': 'license-link'}}, + ... max_alternatives=2, ... ) >>> # Returns: [ - ... '//table//tr[2]/td[3]/a', # Table-anchored - ... '//a[contains(@class, "license-link")]', # Class-based - ... '//a[contains(text(), "12345")]', # Text-based - ... original_xpath # Absolute fallback + ... '//table//tr[2]/td[3]/a', # Best optimized + ... original_xpath # Absolute fallback ... ] + + >>> # max_alternatives=3 returns 2 optimized + 1 absolute + >>> xpaths = optimizer.optimize_xpath(xpath, element_info, max_alternatives=3) + >>> # Returns: [optimized_1, optimized_2, original_xpath] """ alternatives = [] @@ -117,17 +129,29 @@ def optimize_xpath(self, absolute_xpath: str, element_info: Optional[Dict] = Non if shortened_xpath and shortened_xpath != absolute_xpath: alternatives.append(shortened_xpath) - # Strategy 5: Original absolute path (last resort) - alternatives.append(absolute_xpath) - # Remove duplicates while preserving order seen = set() unique_alternatives = [] for xpath in alternatives: - if xpath not in seen: + if xpath not in seen and xpath != absolute_xpath: seen.add(xpath) unique_alternatives.append(xpath) + logger.debug( + f'XPath optimizer: Generated {len(unique_alternatives)} unique alternatives before limiting (max_alternatives={max_alternatives})' + ) + + # Limit alternatives based on max_alternatives setting + if max_alternatives <= 1: + # If max is 1 or less, return ONLY the absolute xpath (no optimized alternatives) + unique_alternatives = [absolute_xpath] + else: + # Otherwise, keep best N-1 optimized alternatives and add absolute xpath as fallback + unique_alternatives = unique_alternatives[: max_alternatives - 1] + unique_alternatives.append(absolute_xpath) + + logger.debug(f'XPath optimizer: Returning {len(unique_alternatives)} alternatives (limit={max_alternatives})') + return unique_alternatives def _parse_xpath(self, xpath: str) -> List[Dict]: