diff --git a/workflows/cli.py b/workflows/cli.py index 47ffd26..2cb5419 100644 --- a/workflows/cli.py +++ b/workflows/cli.py @@ -1233,6 +1233,7 @@ async def _run_workflow(): var_type = input_def.type.lower() # type is a direct attribute is_required = input_def.required + default_value = getattr(input_def, 'default', None) type_info_str = f'type: {var_type}' if is_required: @@ -1245,21 +1246,32 @@ async def _run_workflow(): if hasattr(input_def, 'format') and input_def.format: format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' - full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str})' + # Add default value information if available + default_info_str = '' + if default_value is not None: + default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' + + full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' input_val = None if var_type == 'bool': - input_val = typer.confirm(full_prompt_text) + input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) elif var_type == 'number': - input_val = typer.prompt(full_prompt_text, type=float) + input_val = typer.prompt( + full_prompt_text, type=float, default=default_value if default_value is not None else ... + ) elif var_type == 'string': # Default to string for other unknown types as well - input_val = typer.prompt(full_prompt_text, type=str) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) else: # Should ideally not happen if schema is validated, but good to have a fallback typer.secho( f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", fg=typer.colors.YELLOW, ) - input_val = typer.prompt(full_prompt_text, type=str) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) inputs[input_def.name] = input_val typer.echo() # Add space after each prompt @@ -1369,6 +1381,7 @@ async def _run_workflow_no_ai(): var_type = input_def.type.lower() # type is a direct attribute is_required = input_def.required + default_value = getattr(input_def, 'default', None) type_info_str = f'type: {var_type}' if is_required: @@ -1381,21 +1394,32 @@ async def _run_workflow_no_ai(): if hasattr(input_def, 'format') and input_def.format: format_info_str = f', format: {typer.style(input_def.format, fg=typer.colors.GREEN)}' - full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str})' + # Add default value information if available + default_info_str = '' + if default_value is not None: + default_info_str = f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' + + full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str}{format_info_str}{default_info_str})' input_val = None if var_type == 'bool': - input_val = typer.confirm(full_prompt_text) + input_val = typer.confirm(full_prompt_text, default=default_value if default_value is not None else None) elif var_type == 'number': - input_val = typer.prompt(full_prompt_text, type=float) + input_val = typer.prompt( + full_prompt_text, type=float, default=default_value if default_value is not None else ... + ) elif var_type == 'string': # Default to string for other unknown types as well - input_val = typer.prompt(full_prompt_text, type=str) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) else: # Should ideally not happen if schema is validated, but good to have a fallback typer.secho( f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.", fg=typer.colors.YELLOW, ) - input_val = typer.prompt(full_prompt_text, type=str) + input_val = typer.prompt( + full_prompt_text, type=str, default=default_value if default_value is not None else ... + ) inputs[input_def.name] = input_val typer.echo() # Add space after each prompt @@ -2445,7 +2469,11 @@ def run_stored_workflow( if inp.required else typer.style('optional', fg=typer.colors.YELLOW) ) - typer.echo(f' • {inp.name} ({inp.type}, {required})') + default_value = getattr(inp, 'default', None) + default_str = ( + f', default: {typer.style(str(default_value), fg=typer.colors.BLUE)}' if default_value is not None else '' + ) + typer.echo(f' • {inp.name} ({inp.type}, {required}{default_str})') typer.echo() typer.echo('Options:') typer.echo(f' 1. Run as tool: python cli.py run-stored-workflow {workflow_id} --prompt "Your task"') diff --git a/workflows/uv.lock b/workflows/uv.lock index 4db61b5..4828368 100644 --- a/workflows/uv.lock +++ b/workflows/uv.lock @@ -4863,7 +4863,7 @@ wheels = [ [[package]] name = "workflow-use" -version = "0.2.8" +version = "0.2.9" source = { editable = "." } dependencies = [ { name = "aiofiles" }, diff --git a/workflows/workflow_use/healing/xpath_optimizer.py b/workflows/workflow_use/healing/xpath_optimizer.py new file mode 100644 index 0000000..d630b86 --- /dev/null +++ b/workflows/workflow_use/healing/xpath_optimizer.py @@ -0,0 +1,365 @@ +""" +XPath optimizer for generating robust, maintainable XPath selectors. + +This module takes absolute XPaths (captured during recording) and optimizes them +to be more resilient to page structure changes while maintaining accuracy. +""" + +import logging +import re +from typing import Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def escape_xpath_string(value: str) -> str: + """ + Escape a string value for safe use in XPath expressions. + + XPath 1.0 doesn't have a native escape mechanism for quotes, so we use concat() + when the string contains single quotes. + + Args: + value: The string value to escape + + Returns: + Escaped string safe for XPath, or concat() expression if needed + + Examples: + >>> escape_xpath_string('hello') + "'hello'" + >>> escape_xpath_string("it's") + 'concat("it", "\'", "s")' + >>> escape_xpath_string('say "hello"') + '\'say "hello"\'' + """ + if not value: + return "''" + + # If no single quotes, use single quotes (simple case) + if "'" not in value: + return f"'{value}'" + + # If no double quotes, use double quotes + if '"' not in value: + return f'"{value}"' + + # Contains both single and double quotes - use concat() + # Split by single quote and build concat expression + parts = value.split("'") + concat_parts = [] + for i, part in enumerate(parts): + if part: + # Use double quotes for parts that don't contain quotes + concat_parts.append(f'"{part}"') + if i < len(parts) - 1: + # Add the single quote separator + concat_parts.append('"\'"') + + return f'concat({", ".join(concat_parts)})' + + +class XPathOptimizer: + """ + Optimize XPath selectors for robustness and maintainability. + + Converts brittle absolute XPaths into smart relative XPaths that: + 1. Use stable attributes (id, name, data-*, aria-*, role) + 2. Leverage semantic structure (tables, forms, lists) + 3. Minimize depth dependency + 4. Include fallback strategies + """ + + def optimize_xpath(self, absolute_xpath: str, element_info: Optional[Dict] = None) -> List[str]: + """ + Generate optimized XPath alternatives from an absolute XPath. + + Args: + absolute_xpath: Full XPath like /html/body/div[1]/div[2]/table/tbody/tr[3]/td[2]/a + element_info: Optional dict with element details (tag, text, attributes, etc.) + + Returns: + List of XPath alternatives, ordered from most to least robust + + Example: + >>> optimizer = XPathOptimizer() + >>> xpaths = optimizer.optimize_xpath( + ... '/html/body/form/div[3]/table/tbody/tr[2]/td[3]/a', + ... {'tag': 'a', 'text': '12345', 'attributes': {'class': 'license-link'}}, + ... ) + >>> # Returns: [ + ... '//table//tr[2]/td[3]/a', # Table-anchored + ... '//a[contains(@class, "license-link")]', # Class-based + ... '//a[contains(text(), "12345")]', # Text-based + ... original_xpath # Absolute fallback + ... ] + """ + alternatives = [] + + # Parse the absolute XPath + parts = self._parse_xpath(absolute_xpath) + + # Strategy 1: Use element attributes (highest priority) + if element_info: + attr_xpaths = self._generate_attribute_based_xpaths(element_info, parts) + alternatives.extend(attr_xpaths) + + # Strategy 2: Anchor to stable parent structures + anchored_xpaths = self._generate_anchored_xpaths(parts, element_info) + alternatives.extend(anchored_xpaths) + + # Strategy 3: Use position within stable containers + positional_xpaths = self._generate_positional_xpaths(parts, element_info) + alternatives.extend(positional_xpaths) + + # Strategy 4: Shortened absolute path (remove volatile parents) + shortened_xpath = self._shorten_absolute_xpath(parts) + if shortened_xpath and shortened_xpath != absolute_xpath: + alternatives.append(shortened_xpath) + + # Strategy 5: Original absolute path (last resort) + alternatives.append(absolute_xpath) + + # Remove duplicates while preserving order + seen = set() + unique_alternatives = [] + for xpath in alternatives: + if xpath not in seen: + seen.add(xpath) + unique_alternatives.append(xpath) + + return unique_alternatives + + def _parse_xpath(self, xpath: str) -> List[Dict]: + """ + Parse XPath into structured parts. + + Args: + xpath: XPath string like /html/body/div[3]/table/tbody/tr[2]/td[1]/a + + Returns: + List of dicts: [ + {'tag': 'html', 'index': None}, + {'tag': 'body', 'index': None}, + {'tag': 'div', 'index': 3}, + ... + ] + """ + parts = [] + + # Remove leading slash + xpath = xpath.lstrip('/') + + # Split by / + segments = xpath.split('/') + + for segment in segments: + # Extract tag and index + match = re.match(r'^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$', segment) + if match: + tag = match.group(1) + index = int(match.group(2)) if match.group(2) else None + parts.append({'tag': tag, 'index': index, 'original': segment}) + + return parts + + def _generate_attribute_based_xpaths(self, element_info: Dict, parts: List[Dict]) -> List[str]: + """ + Generate XPaths using element attributes. + + Priority order: + 1. id (most stable) + 2. name (stable for forms) + 3. data-* attributes (very stable) + 4. aria-* attributes (semantic, stable) + 5. unique class combinations + 6. text content + """ + xpaths = [] + tag = element_info.get('tag', '*').lower() + attrs = element_info.get('attributes', {}) + text = element_info.get('text', '').strip() + + # 1. ID selector (highest priority) + if attrs.get('id'): + escaped_id = escape_xpath_string(attrs['id']) + xpaths.append(f'//{tag}[@id={escaped_id}]') + + # 2. Name attribute (good for forms) + if attrs.get('name'): + escaped_name = escape_xpath_string(attrs['name']) + xpaths.append(f'//{tag}[@name={escaped_name}]') + + # 3. Data attributes (very stable, often unique) + for attr_name, attr_value in attrs.items(): + if attr_name.startswith('data-') and attr_value: + escaped_value = escape_xpath_string(attr_value) + xpaths.append(f'//{tag}[@{attr_name}={escaped_value}]') + + # 4. ARIA attributes (semantic, stable) + if attrs.get('aria-label'): + escaped_label = escape_xpath_string(attrs['aria-label']) + xpaths.append(f'//{tag}[@aria-label={escaped_label}]') + if attrs.get('aria-labelledby'): + escaped_labelledby = escape_xpath_string(attrs['aria-labelledby']) + xpaths.append(f'//{tag}[@aria-labelledby={escaped_labelledby}]') + + # 5. Role attribute + if attrs.get('role'): + escaped_role = escape_xpath_string(attrs['role']) + if text: + escaped_text = escape_xpath_string(text) + xpaths.append(f'//{tag}[@role={escaped_role} and contains(text(), {escaped_text})]') + else: + xpaths.append(f'//{tag}[@role={escaped_role}]') + + # 6. Unique class combinations + if attrs.get('class'): + classes = attrs['class'].split() + # Try single unique class first + for cls in classes: + if cls and not cls.startswith('css-'): # Skip dynamic classes + escaped_class = escape_xpath_string(cls) + xpaths.append(f'//{tag}[contains(@class, {escaped_class})]') + break # Only try first non-dynamic class + + # 7. Text content + if text: + escaped_text = escape_xpath_string(text) + # Exact text + xpaths.append(f'//{tag}[text()={escaped_text}]') + # Contains text (more flexible) + if len(text) > 3: + xpaths.append(f'//{tag}[contains(text(), {escaped_text})]') + + return xpaths + + def _generate_anchored_xpaths(self, parts: List[Dict], element_info: Optional[Dict]) -> List[str]: + """ + Generate XPaths anchored to stable parent structures. + + Stable structures include: + - Tables (//table//tr[2]/td[3]/a) + - Forms (//form[@name='search']//input) + - Nav (//nav//a[text()='Home']) + - Sections with IDs + """ + xpaths = [] + + # Find stable anchor points in the path + anchor_tags = {'table', 'form', 'nav', 'header', 'footer', 'section', 'article', 'aside', 'main'} + + for i, part in enumerate(parts): + if part['tag'] in anchor_tags: + # Build path from this anchor point + target_tag = parts[-1]['tag'] if parts else '*' + relative_path = self._build_relative_path(parts[i:]) + + # Simple anchor + xpaths.append(f'//{part["tag"]}{relative_path}') + + # If we have element info, add context + if element_info and element_info.get('text'): + text = element_info['text'] + escaped_text = escape_xpath_string(text) + xpaths.append(f'//{part["tag"]}//{target_tag}[contains(text(), {escaped_text})]') + + # Special case: Table cell targeting + if len(parts) >= 3: + # Check if path contains table -> tr -> td pattern + for i in range(len(parts) - 2): + if ( + parts[i]['tag'] == 'table' + and any(p['tag'] == 'tr' for p in parts[i:]) + and any(p['tag'] == 'td' for p in parts[i:]) + ): + # Find tr and td indices + tr_idx = next((p['index'] for p in parts[i:] if p['tag'] == 'tr' and p['index']), None) + td_idx = next((p['index'] for p in parts[i:] if p['tag'] == 'td' and p['index']), None) + + if tr_idx and td_idx: + target_tag = parts[-1]['tag'] + xpaths.append(f'//table//tr[{tr_idx}]/td[{td_idx}]//{target_tag}') + xpaths.append(f'//table//tr[{tr_idx}]/td[{td_idx}]/{target_tag}') + + return xpaths + + def _generate_positional_xpaths(self, parts: List[Dict], element_info: Optional[Dict]) -> List[str]: + """ + Generate XPaths using position within containers. + + Examples: + - (//table//a)[2] - Second link in any table + - //form//button[last()] - Last button in form + """ + xpaths = [] + + if not parts: + return xpaths + + target_tag = parts[-1]['tag'] + + # Find if target is in a table + has_table = any(p['tag'] == 'table' for p in parts) + if has_table and element_info: + # Position within table + xpaths.append(f'(//table//{target_tag})[1]') # First occurrence + + return xpaths + + def _build_relative_path(self, parts: List[Dict]) -> str: + """ + Build relative path from parsed parts. + + Args: + parts: List of path segments + + Returns: + Relative XPath string like //tr[2]/td[3]/a + """ + path_segments = [] + + for part in parts[1:]: # Skip first part (already used as anchor) + segment = f'//{part["tag"]}' if not path_segments else f'/{part["tag"]}' + if part['index']: + segment += f'[{part["index"]}]' + path_segments.append(segment) + + return ''.join(path_segments) + + def _shorten_absolute_xpath(self, parts: List[Dict]) -> Optional[str]: + """ + Shorten absolute XPath by removing volatile parent elements. + + Strategy: + 1. Keep stable anchors (table, form, main, section with clear purpose) + 2. Remove middle divs/spans (most volatile) + 3. Keep last 2-3 levels for specificity + + Args: + parts: Parsed XPath parts + + Returns: + Shortened XPath or None + """ + if len(parts) <= 4: + return None # Already short enough + + # Find last stable anchor + stable_tags = {'html', 'body', 'table', 'form', 'nav', 'header', 'footer', 'main'} + last_stable_idx = 0 + + for i, part in enumerate(parts): + if part['tag'] in stable_tags: + last_stable_idx = i + + # Keep last stable anchor + last 3 elements + if last_stable_idx < len(parts) - 3: + keep_from = max(last_stable_idx, len(parts) - 3) + shortened_parts = parts[keep_from:] + + # Build shortened path + path = '/' + '/'.join(p['original'] for p in shortened_parts) + return path + + return None diff --git a/workflows/workflow_use/schema/views.py b/workflows/workflow_use/schema/views.py index f7a5686..053a16f 100644 --- a/workflows/workflow_use/schema/views.py +++ b/workflows/workflow_use/schema/views.py @@ -203,6 +203,11 @@ class WorkflowInputSchemaDefinition(BaseModel): description='None if the property is optional, True if the property is required.', ) + default: Optional[str | int | float | bool] = Field( + None, + description='Default value for the input. If provided, this value will be used when no input is given.', + ) + # --- Top-Level Workflow Definition File --- # Uses the Union WorkflowStep type diff --git a/workflows/workflow_use/workflow/element_finder.py b/workflows/workflow_use/workflow/element_finder.py index 75877b1..2d88f32 100644 --- a/workflows/workflow_use/workflow/element_finder.py +++ b/workflows/workflow_use/workflow/element_finder.py @@ -27,7 +27,7 @@ class ElementFinder: """ async def find_element_with_strategies( - self, strategies: List[Dict[str, Any]], browser_session: Any + self, strategies: List[Dict[str, Any]], browser_session: Any, target_text: Optional[str] = None ) -> Tuple[Optional[tuple[int, Dict[str, Any]]], List[StrategyAttempt]]: """ Try strategies to find element index in browser-use's DOM state. @@ -38,6 +38,7 @@ async def find_element_with_strategies( Args: strategies: List of strategy dictionaries with 'type', 'value', 'priority', 'metadata' browser_session: Browser-use BrowserSession object + target_text: Optional target text to validate element existence Returns: Tuple of: @@ -53,16 +54,25 @@ async def find_element_with_strategies( if not strategies: return None, strategy_attempts - # Get current DOM state from browser-use + # Get current page from browser-use try: - state = await browser_session.get_state() - if not state or not state.selector_map: - logger.warning(' ⚠️ No DOM state available') + page = await browser_session.get_current_page() + if not page: + logger.warning(' ⚠️ No page available') return None, strategy_attempts except Exception as e: - logger.warning(f' ⚠️ Failed to get DOM state: {e}') + logger.warning(f' ⚠️ Failed to get current page: {e}') return None, strategy_attempts + # Get selector map for semantic strategies + selector_map = None + try: + selector_map = await browser_session.get_selector_map() + if selector_map: + logger.debug(f' 📋 Retrieved selector map with {len(selector_map)} elements') + except Exception as e: + logger.debug(f' ⚠️ Could not get selector map: {e}') + # Sort by priority (should already be sorted, but ensure it) sorted_strategies = sorted(strategies, key=lambda s: s.get('priority', 999)) @@ -76,12 +86,12 @@ async def find_element_with_strategies( try: logger.info(f' 🔍 Strategy {i}/{len(sorted_strategies)}: {strategy_type}') - # Handle XPath strategies separately (requires Playwright) + # Try XPath strategies via Playwright if strategy_type == 'xpath': - result = await self._find_with_xpath(strategy_value, state, browser_session) + result = await self._find_with_xpath(strategy_value, page, browser_session, target_text) if result: - index, _ = result - logger.info(f' ✅ Found with XPath at index {index}') + xpath_string, xpath_used = result + logger.info(' ✅ Found element with XPath') # Record successful attempt strategy_attempts.append( StrategyAttempt( @@ -92,29 +102,50 @@ async def find_element_with_strategies( metadata=metadata, ) ) - return (index, strategy), strategy_attempts + # Return XPath string for semantic_executor.py to use in JavaScript click + # Note: This differs from semantic strategies which return element_index for service.py + return (xpath_string, strategy), strategy_attempts else: error_msg = 'XPath query returned no results' logger.debug(f' ⏭️ {error_msg}') - else: - # Search through browser-use's selector_map using semantic matching - for index, node in state.selector_map.items(): - if await self._matches_strategy(node, strategy_type, strategy_value, metadata): - logger.info(f' ✅ Found with {strategy_type} at index {index}') - # Record successful attempt - strategy_attempts.append( - StrategyAttempt( - strategy_type=strategy_type, - strategy_value=strategy_value, - priority=priority, - success=True, - metadata=metadata, - ) + # Try semantic strategies using selector map + elif selector_map and strategy_type in [ + 'text_exact', + 'role_text', + 'aria_label', + 'placeholder', + 'title', + 'alt_text', + 'text_fuzzy', + ]: + result = await self._find_with_semantic_strategy( + strategy_type, strategy_value, metadata, selector_map, target_text + ) + if result: + element_index, matched_element = result + logger.info(f' ✅ Found element with {strategy_type}') + # Record successful attempt + strategy_attempts.append( + StrategyAttempt( + strategy_type=strategy_type, + strategy_value=strategy_value, + priority=priority, + success=True, + metadata=metadata, ) - return (index, strategy), strategy_attempts + ) + return (element_index, strategy), strategy_attempts + else: + error_msg = 'No matching element found in DOM' + logger.debug(f' ⏭️ {error_msg}') - error_msg = 'No matching element found in DOM' + else: + # Strategy type not supported or no selector map available + if not selector_map: + error_msg = 'Selector map not available for semantic strategy' + else: + error_msg = f'Strategy type "{strategy_type}" not supported' logger.debug(f' ⏭️ {error_msg}') except Exception as e: @@ -137,12 +168,133 @@ async def find_element_with_strategies( logger.warning(f' ❌ All {len(sorted_strategies)} strategies failed') return None, strategy_attempts + async def _find_with_semantic_strategy( + self, + strategy_type: str, + strategy_value: str, + metadata: Dict[str, Any], + selector_map: Dict[str, Any], + target_text: Optional[str] = None, + ) -> Optional[tuple[int, Dict[str, Any]]]: + """ + Find element using semantic strategy in browser-use's selector map. + + Args: + strategy_type: Type of semantic strategy (text_exact, role_text, etc.) + strategy_value: Value to match + metadata: Additional matching metadata + selector_map: Browser-use's selector map (dict of index -> element) + target_text: Optional target text for validation + + Returns: + Tuple of (element_index, element_data) if found, None otherwise + """ + try: + # Iterate through selector map to find matching element + for index, element in selector_map.items(): + # Handle both dict and object formats + if isinstance(element, dict): + node = element + else: + # Convert object to dict-like access + node = element + + # Check if element matches the strategy + if await self._matches_strategy(node, strategy_type, strategy_value, metadata): + # Validate element exists and is visible + if await self._validate_element_in_map(index, node, target_text): + return (int(index), node) + + return None + + except Exception as e: + logger.debug(f'Error finding element with semantic strategy: {e}') + return None + + async def _validate_element_in_map(self, index: int, node: Any, target_text: Optional[str] = None) -> bool: + """ + Validate that element in selector map is visible and optionally matches target_text. + + Args: + index: Element index in browser-use's selector map + node: Browser-use DOM element (dict or object) + target_text: Optional text to validate + + Returns: + True if element is valid and visible + """ + try: + # Helper to get attribute from dict or object + def get_attr(obj, attr, default=''): + if isinstance(obj, dict): + return obj.get(attr, default) + return getattr(obj, attr, default) + + # Check if node is visible - this is a hard requirement + is_visible = get_attr(node, 'is_visible', True) + if not is_visible: + logger.debug(f'Element at index {index} is not visible') + return False + + # If target_text is provided, validate it (advisory only) + if target_text: + target_lower = target_text.lower().strip() + + # Collect all text sources from the element + text_sources = [] + + # Get element's visible text + node_text = get_attr(node, 'text', '') or '' + if node_text: + text_sources.append(node_text.lower().strip()) + + # Get aria-label + aria_label = get_attr(node, 'aria_label', '') or '' + if aria_label: + text_sources.append(aria_label.lower().strip()) + + # Get placeholder + placeholder = get_attr(node, 'placeholder', '') or '' + if placeholder: + text_sources.append(placeholder.lower().strip()) + + # Get title + title = get_attr(node, 'title', '') or '' + if title: + text_sources.append(title.lower().strip()) + + # Get alt text + alt = get_attr(node, 'alt', '') or '' + if alt: + text_sources.append(alt.lower().strip()) + + # Get name attribute + attrs = get_attr(node, 'attributes', {}) or {} + if isinstance(attrs, dict) and 'name' in attrs: + text_sources.append(attrs['name'].lower().strip()) + + # Check if target_text matches any text source + found_match = any(target_lower in source or source in target_lower for source in text_sources if source) + + if not found_match: + logger.debug( + f'⚠️ Target text "{target_text}" not found in element at index {index}, but proceeding with selector.' + ) + else: + logger.debug(f'✓ Target text "{target_text}" validated in element at index {index}') + + return True + + except Exception as e: + logger.debug(f'Error validating element at index {index}: {e}') + return False + async def _matches_strategy(self, node: Any, strategy_type: str, value: str, metadata: Dict[str, Any]) -> bool: """ Check if a DOM node matches a semantic strategy. Args: - node: EnhancedDOMTreeNode from browser-use + node: EnhancedDOMTreeNode from browser-use (dict or object) strategy_type: Type of strategy (text_exact, role_text, etc.) value: Value to match metadata: Additional matching metadata @@ -151,44 +303,50 @@ async def _matches_strategy(self, node: Any, strategy_type: str, value: str, met True if node matches the strategy """ try: + # Helper to get attribute from dict or object + def get_attr(obj, attr, default=''): + if isinstance(obj, dict): + return obj.get(attr, default) + return getattr(obj, attr, default) + # Semantic Strategy 1: Exact text match if strategy_type == 'text_exact': - node_text = getattr(node, 'text', '') or '' + node_text = get_attr(node, 'text', '') or '' return node_text.strip() == value # Semantic Strategy 2: Role + text elif strategy_type == 'role_text': expected_role = metadata.get('role', '').lower() - node_role = getattr(node, 'role', '') or getattr(node, 'tag_name', '') - node_role = node_role.lower() - node_text = getattr(node, 'text', '') or '' + node_role = get_attr(node, 'role', '') or get_attr(node, 'tag_name', '') + node_role = node_role.lower() if node_role else '' + node_text = get_attr(node, 'text', '') or '' return node_role == expected_role and node_text.strip() == value # Semantic Strategy 3: ARIA label elif strategy_type == 'aria_label': - aria_label = getattr(node, 'aria_label', '') or '' + aria_label = get_attr(node, 'aria_label', '') or '' return aria_label.strip() == value # Semantic Strategy 4: Placeholder elif strategy_type == 'placeholder': - placeholder = getattr(node, 'placeholder', '') or '' + placeholder = get_attr(node, 'placeholder', '') or '' return placeholder.strip() == value # Semantic Strategy 5: Title attribute elif strategy_type == 'title': - title = getattr(node, 'title', '') or '' + title = get_attr(node, 'title', '') or '' return title.strip() == value # Semantic Strategy 6: Alt text (images) elif strategy_type == 'alt_text': - alt = getattr(node, 'alt', '') or '' + alt = get_attr(node, 'alt', '') or '' return alt.strip() == value # Semantic Strategy 7: Fuzzy text match elif strategy_type == 'text_fuzzy': threshold = metadata.get('threshold', 0.8) - node_text = getattr(node, 'text', '') or '' + node_text = get_attr(node, 'text', '') or '' return self._fuzzy_match(value, node_text.strip(), threshold) # Note: XPath and CSS strategies are handled separately in find_element_with_strategies @@ -200,70 +358,174 @@ async def _matches_strategy(self, node: Any, strategy_type: str, value: str, met return False - async def _find_with_xpath(self, xpath: str, state: Any, browser_session: Any) -> Optional[tuple[int, Any]]: + async def _validate_element_exists( + self, index: int, node: Any, browser_session: Any, target_text: Optional[str] = None + ) -> bool: """ - Find element using XPath and map it to browser-use's index. + Validate that element exists, is visible, and optionally matches target_text. + + Args: + index: Element index in browser-use's selector map + node: Browser-use DOM node + browser_session: Browser-use session object + target_text: Optional text to validate against element's text/label/aria-label/placeholder + If provided, we log a warning if text doesn't match but still allow the element + + Returns: + True if element is valid and visible (text matching is advisory only) + """ + try: + # Check if node is visible - this is a hard requirement + is_visible = getattr(node, 'is_visible', True) + if not is_visible: + logger.debug(f'Element at index {index} is not visible') + return False + + # If target_text is provided, validate it exists in the element's text sources + # BUT: This is advisory only - we log a warning but don't fail validation + # The target_text might be a descriptive label, not actual visible text + if target_text: + target_lower = target_text.lower().strip() + + # Collect all text sources from the element + text_sources = [] + + # Get element's visible text + node_text = getattr(node, 'text', '') or '' + if node_text: + text_sources.append(node_text.lower().strip()) + + # Get aria-label + aria_label = getattr(node, 'aria_label', '') or '' + if aria_label: + text_sources.append(aria_label.lower().strip()) + + # Get placeholder + placeholder = getattr(node, 'placeholder', '') or '' + if placeholder: + text_sources.append(placeholder.lower().strip()) + + # Get title + title = getattr(node, 'title', '') or '' + if title: + text_sources.append(title.lower().strip()) + + # Get alt text + alt = getattr(node, 'alt', '') or '' + if alt: + text_sources.append(alt.lower().strip()) + + # Get name attribute + attrs = getattr(node, 'attributes', {}) or {} + if 'name' in attrs: + text_sources.append(attrs['name'].lower().strip()) + + # Check if target_text matches any text source + found_match = any(target_lower in source or source in target_lower for source in text_sources if source) + + if not found_match: + # Don't fail - just log a warning + # The XPath/CSS selector is more authoritative than target_text hint + logger.debug( + f'⚠️ Target text "{target_text}" not found in element at index {index}, but proceeding with selector. ' + f'Available text sources: {text_sources}' + ) + else: + logger.debug(f'✓ Target text "{target_text}" validated in element at index {index}') + + return True + + except Exception as e: + logger.debug(f'Error validating element at index {index}: {e}') + return False + + async def _find_with_xpath( + self, xpath: str, page: Any, browser_session: Any, target_text: Optional[str] = None + ) -> Optional[tuple[Any, str]]: + """ + Find element using XPath via JavaScript evaluation. Args: xpath: XPath selector - state: Current browser-use DOM state + page: Browser-use Page object browser_session: Browser-use session object + target_text: Optional target text for validation Returns: - Tuple of (element_index, node) if found, None otherwise + Tuple of (xpath, xpath_used) if found, None otherwise + Note: Returns xpath string, not element object, since we'll click via JS """ try: - # Get the Playwright page from browser_session - page = await browser_session.get_current_page() - if not page: - logger.debug('No Playwright page available for XPath execution') + # Normalize XPath: ensure it starts with / for absolute paths + normalized_xpath = xpath + if xpath and not xpath.startswith('/') and not xpath.startswith('('): + normalized_xpath = '/' + xpath + logger.info(f' 🔧 Normalized XPath to: {normalized_xpath}') + + logger.info(f' 🔎 Executing XPath: {normalized_xpath}') + + # Execute XPath query via JavaScript to find element + # Escape the XPath for safe JavaScript string usage + escaped_xpath = normalized_xpath.replace("'", "\\'") + js_code = f"""() => {{ + try {{ + const result = document.evaluate( + '{escaped_xpath}', + document, + null, + XPathResult.FIRST_ORDERED_NODE_TYPE, + null + ); + + const element = result.singleNodeValue; + if (!element) {{ + return null; + }} + + // Check if element is visible + const rect = element.getBoundingClientRect(); + const style = window.getComputedStyle(element); + const isVisible = rect.width > 0 && rect.height > 0 && + style.visibility !== 'hidden' && + style.display !== 'none'; + + return {{ + found: true, + visible: isVisible, + tag: element.tagName, + text: element.textContent?.trim() || '', + xpath: '{escaped_xpath}' + }}; + }} catch (error) {{ + return {{ error: error.message }}; + }} +}}""" + + result = await page.evaluate(js_code) + + if not result: + logger.info(' ⚠️ XPath evaluation returned null') return None - # Execute XPath query to find element - element = await page.query_selector(f'xpath={xpath}') - if not element: - logger.debug(f'XPath query returned no results: {xpath}') + if isinstance(result, dict) and result.get('error'): + logger.warning(f' ❌ XPath evaluation error: {result["error"]}') return None - # Get element properties to match against browser-use's nodes - try: - # Get text content, tag name, and attributes - element_data = await page.evaluate( - """(el) => { - return { - text: el.textContent?.trim() || '', - tagName: el.tagName?.toLowerCase() || '', - id: el.id || '', - className: el.className || '', - ariaLabel: el.getAttribute('aria-label') || '', - placeholder: el.getAttribute('placeholder') || '', - name: el.getAttribute('name') || '', - boundingBox: el.getBoundingClientRect ? { - x: el.getBoundingClientRect().x, - y: el.getBoundingClientRect().y, - width: el.getBoundingClientRect().width, - height: el.getBoundingClientRect().height - } : null - }; - }""", - element, - ) - - # Try to find matching node in browser-use's selector_map - for index, node in state.selector_map.items(): - if self._xpath_node_matches(node, element_data): - logger.debug(f'Matched XPath element to index {index}') - return (index, node) + if not isinstance(result, dict) or not result.get('found'): + logger.info(' ⚠️ XPath returned no results') + return None - logger.debug('XPath found element but could not match to browser-use index') + if not result.get('visible'): + logger.info(' ⚠️ Element found but not visible') + return None - except Exception as e: - logger.debug(f'Error extracting element data: {e}') + logger.info(f' ✅ Found visible element: <{result["tag"]}>') + # Return the xpath itself since we'll execute click via JavaScript + return (normalized_xpath, normalized_xpath) except Exception as e: - logger.debug(f'Error executing XPath: {e}') - - return None + logger.warning(f' ❌ Error executing XPath: {e}') + return None def _xpath_node_matches(self, node: Any, element_data: Dict[str, Any]) -> bool: """ diff --git a/workflows/workflow_use/workflow/semantic_executor.py b/workflows/workflow_use/workflow/semantic_executor.py index 8e35faa..69ad9f5 100644 --- a/workflows/workflow_use/workflow/semantic_executor.py +++ b/workflows/workflow_use/workflow/semantic_executor.py @@ -744,7 +744,74 @@ async def execute_click_step(self, step: ClickStep) -> ActionResult: """Execute click step using semantic mapping with improved selector strategies.""" page = await self.browser.get_current_page() - # Try to find element using multiple strategies (prioritize target_text) + # DEBUG: Check what attributes the step has + logger.info(f'🔍 DEBUG: Step attributes: {[attr for attr in dir(step) if not attr.startswith("_")]}') + logger.info(f'🔍 DEBUG: hasattr selectorStrategies: {hasattr(step, "selectorStrategies")}') + if hasattr(step, 'selectorStrategies'): + logger.info(f'🔍 DEBUG: selectorStrategies value: {step.selectorStrategies}') + logger.info(f'🔍 DEBUG: selectorStrategies truthy: {bool(step.selectorStrategies)}') + + # PRIORITY 1: Check for explicit selectorStrategies first (most reliable) + # These are explicit selectors from the workflow definition and should take precedence + if hasattr(step, 'selectorStrategies') and step.selectorStrategies: + logger.info(f'🎯 Using explicit selectorStrategies from workflow ({len(step.selectorStrategies)} strategies)') + + # Import ElementFinder here to avoid circular imports + from workflow_use.workflow.element_finder import ElementFinder + + element_finder = ElementFinder() + + target_text = step.target_text if hasattr(step, 'target_text') else None + result, strategy_attempts = await element_finder.find_element_with_strategies( + step.selectorStrategies, self.browser, target_text + ) + + if result: + xpath_or_selector, strategy_used = result + logger.info(f'✅ Found element using strategy: {strategy_used.get("type")} = {strategy_used.get("value")}') + + # Click the element using JavaScript evaluation + try: + if strategy_used.get('type') == 'xpath': + # Click via JavaScript using XPath + escaped_xpath = xpath_or_selector.replace("'", "\\'") + click_js = f"""() => {{ + try {{ + const result = document.evaluate( + '{escaped_xpath}', + document, + null, + XPathResult.FIRST_ORDERED_NODE_TYPE, + null + ); + const element = result.singleNodeValue; + if (element) {{ + element.click(); + return {{ success: true, tag: element.tagName }}; + }} + return {{ success: false, error: 'Element not found' }}; + }} catch (error) {{ + return {{ success: false, error: error.message }}; + }} +}}""" + click_result = await page.evaluate(click_js) + + if click_result and click_result.get('success'): + msg = f'🖱️ Clicked element using XPath: {xpath_or_selector}' + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + else: + raise Exception(f'Failed to click element: {click_result.get("error", "Unknown error")}') + else: + raise Exception(f'Unsupported strategy type: {strategy_used.get("type")}') + + except Exception as e: + logger.error(f'Failed to click element: {e}') + raise Exception(f'Failed to click element: {e}') + else: + logger.warning('⚠️ selectorStrategies failed to find element, falling back to semantic mapping') + + # PRIORITY 2: Try semantic mapping (find by target_text in current_mapping) element_info = None target_identifier = None selector_to_use = None @@ -794,7 +861,7 @@ async def execute_click_step(self, step: ClickStep) -> ActionResult: selector_to_use = element_info['selectors'] logger.info(f"Using semantic mapping: '{target_identifier}' -> {selector_to_use}") - # Final fallback to original CSS selector or XPath + # PRIORITY 3: Final fallback to legacy CSS selector or XPath fields if not selector_to_use: if step.cssSelector: selector_to_use = step.cssSelector @@ -981,11 +1048,21 @@ async def _click_element_intelligently(self, selector: str, target_text: str, el page = await self.browser.get_current_page() # STRATEGY 0: Try direct text-based clicking first (most semantic) + # BUT: Only fail if we don't have a selector to fall back to + # target_text might be just a descriptive label, not actual visible text if target_text and target_text.strip(): element_tag = element_info.get('tag', '').lower() if element_info else None if await self._click_element_by_text_direct(target_text, element_tag): return True - logger.info('Direct text click failed, falling back to selector-based approach') + # If text-based click failed but we have a selector, try it + # Only refuse to click if we have NO selector strategies at all + if not selector or selector == 'None': + logger.error(f'Element with target text "{target_text}" not found on the page') + logger.error('Refusing to click without validating target text exists or having a selector') + return False + else: + logger.warning(f"⚠️ Could not find element by text: '{target_text}'") + logger.info(f'🔄 Falling back to selector strategy: {selector}') try: # Strategy -1: Check if element_info indicates this is a radio/checkbox, even if selector doesn't show it diff --git a/workflows/workflow_use/workflow/service.py b/workflows/workflow_use/workflow/service.py index 0ae9a58..1a6c325 100644 --- a/workflows/workflow_use/workflow/service.py +++ b/workflows/workflow_use/workflow/service.py @@ -168,37 +168,63 @@ async def _run_deterministic_step(self, step: DeterministicWorkflowStep, step_in if action_name in ['click', 'input'] and all_params.get('selectorStrategies'): try: strategies = all_params['selectorStrategies'] + target_text = all_params.get('target_text') # Get target_text for validation logger.info(f' 🎯 Attempting semantic multi-strategy finding ({len(strategies)} strategies)') - result, strategy_attempts = await self.element_finder.find_element_with_strategies(strategies, self.browser) + if target_text: + logger.info(f' 🎯 Validating target text: "{target_text}"') + + result, strategy_attempts = await self.element_finder.find_element_with_strategies( + strategies, self.browser, target_text + ) # Store strategy attempts for error reporting self._current_strategy_attempts = strategy_attempts if result: - element_index, strategy_used = result - logger.info(f' ✅ Element found at index {element_index} using strategy: {strategy_used.get("type")}') - - # Use the found index to execute the action through browser-use's controller - # This ensures we use browser-use's native semantic action system - if action_name == 'click': - # Override the index param with our found index - params['index'] = element_index - logger.info(f' ✅ Will click element at index {element_index} (semantic-only)') - - elif action_name == 'input': - # Override the index param with our found index - params['index'] = element_index - logger.info(f' ✅ Will input to element at index {element_index} (semantic-only)') - - # Continue to controller execution below with updated index - # This way we leverage browser-use's robust action handling + element_index_or_xpath, strategy_used = result + strategy_type = strategy_used.get('type') + logger.info(f' ✅ Element found using strategy: {strategy_type}') + + # XPath strategies return a string, not an index + # They should be handled by semantic_executor, not here + if strategy_type == 'xpath': + logger.info(' ⚠️ XPath strategy found - this should be handled by semantic_executor, not service.py') + logger.info(' ⚠️ Falling back to full controller') + else: + # Semantic strategies return element index + element_index = element_index_or_xpath + logger.info(f' ✅ Element found at index {element_index}') + + # Use the found index to execute the action through browser-use's controller + # This ensures we use browser-use's native semantic action system + if action_name == 'click': + # Override the index param with our found index + params['index'] = element_index + logger.info(f' ✅ Will click element at index {element_index} (semantic-only)') + + elif action_name == 'input': + # Override the index param with our found index + params['index'] = element_index + logger.info(f' ✅ Will input to element at index {element_index} (semantic-only)') + + # Continue to controller execution below with updated index + # This way we leverage browser-use's robust action handling else: logger.warning(' ⚠️ Multi-strategy finding failed, falling back to full controller') + # If target_text was provided and we couldn't find it, raise an error + if target_text: + raise RuntimeError( + f'Element with target text "{target_text}" not found on the page. ' + f'Tried {len(strategies)} strategies but none matched a visible element with this text.' + ) except Exception as e: logger.warning(f' ⚠️ Error in multi-strategy finding: {e}, falling back to full controller') + # Re-raise if it's our validation error + if 'target text' in str(e).lower(): + raise # Special handling for actions that don't accept any parameters # These actions use NoParamsAction, so we pass an empty instance instead of {} diff --git a/workflows/workflow_use/workflow/tests/test_variable_identifier.py b/workflows/workflow_use/workflow/tests/test_variable_identifier.py index 10a9253..e4e4998 100644 --- a/workflows/workflow_use/workflow/tests/test_variable_identifier.py +++ b/workflows/workflow_use/workflow/tests/test_variable_identifier.py @@ -5,12 +5,13 @@ """ import pytest + +from workflow_use.workflow.variable_config import VariableConfigPresets from workflow_use.workflow.variable_identifier import ( VariableIdentifier, VariableType, identify_variables_in_workflow, ) -from workflow_use.workflow.variable_config import VariableConfigPresets class TestVariableIdentifier: diff --git a/workflows/workflow_use/workflow/validation_utils.py b/workflows/workflow_use/workflow/validation_utils.py index fa0de3d..ee70214 100644 --- a/workflows/workflow_use/workflow/validation_utils.py +++ b/workflows/workflow_use/workflow/validation_utils.py @@ -7,7 +7,6 @@ from typing import List, Optional, Tuple - # Common CSS selectors for error messages across different frameworks and patterns VALIDATION_ERROR_SELECTORS = [ '.error', diff --git a/workflows/workflow_use/workflow/variable_config.py b/workflows/workflow_use/workflow/variable_config.py index 0cc9919..05a5f5b 100644 --- a/workflows/workflow_use/workflow/variable_config.py +++ b/workflows/workflow_use/workflow/variable_config.py @@ -5,8 +5,8 @@ are identified in workflow automation. """ -from typing import Dict, List, Set from dataclasses import dataclass, field +from typing import Dict, Set @dataclass diff --git a/workflows/workflow_use/workflow/variable_identifier.py b/workflows/workflow_use/workflow/variable_identifier.py index db7ffe3..9a84715 100644 --- a/workflows/workflow_use/workflow/variable_identifier.py +++ b/workflows/workflow_use/workflow/variable_identifier.py @@ -8,9 +8,9 @@ import logging import re -from typing import Any, Dict, List, Optional, Set, Tuple from dataclasses import dataclass from enum import Enum +from typing import Any, Dict, List, Optional, Set, Tuple logger = logging.getLogger(__name__)