diff --git a/contrib/workflow_with_ai_parse_document/.gitignore b/contrib/job_with_ai_parse_document/.gitignore similarity index 100% rename from contrib/workflow_with_ai_parse_document/.gitignore rename to contrib/job_with_ai_parse_document/.gitignore diff --git a/contrib/workflow_with_ai_parse_document/README.md b/contrib/job_with_ai_parse_document/README.md similarity index 99% rename from contrib/workflow_with_ai_parse_document/README.md rename to contrib/job_with_ai_parse_document/README.md index 2e0e921e..f899e1d0 100644 --- a/contrib/workflow_with_ai_parse_document/README.md +++ b/contrib/job_with_ai_parse_document/README.md @@ -96,7 +96,7 @@ Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images: Extracts clean concatenated text using `transform()`: - Reads from previous task's table via streaming -- Handles both parser v1.0 and v2.0 formats +- Extracts text from parsed document elements - Uses `transform()` for efficient text extraction - Includes error handling for failed parses diff --git a/contrib/workflow_with_ai_parse_document/assets/document_summary.png b/contrib/job_with_ai_parse_document/assets/document_summary.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/document_summary.png rename to contrib/job_with_ai_parse_document/assets/document_summary.png diff --git a/contrib/workflow_with_ai_parse_document/assets/figure_description.png b/contrib/job_with_ai_parse_document/assets/figure_description.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/figure_description.png rename to contrib/job_with_ai_parse_document/assets/figure_description.png diff --git a/contrib/workflow_with_ai_parse_document/assets/page1_bounding_boxes.png b/contrib/job_with_ai_parse_document/assets/page1_bounding_boxes.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/page1_bounding_boxes.png rename to contrib/job_with_ai_parse_document/assets/page1_bounding_boxes.png diff --git a/contrib/workflow_with_ai_parse_document/assets/page1_elements_list.png b/contrib/job_with_ai_parse_document/assets/page1_elements_list.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/page1_elements_list.png rename to contrib/job_with_ai_parse_document/assets/page1_elements_list.png diff --git a/contrib/workflow_with_ai_parse_document/assets/page2_contents_table.png b/contrib/job_with_ai_parse_document/assets/page2_contents_table.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/page2_contents_table.png rename to contrib/job_with_ai_parse_document/assets/page2_contents_table.png diff --git a/contrib/workflow_with_ai_parse_document/assets/table_extraction.png b/contrib/job_with_ai_parse_document/assets/table_extraction.png similarity index 100% rename from contrib/workflow_with_ai_parse_document/assets/table_extraction.png rename to contrib/job_with_ai_parse_document/assets/table_extraction.png diff --git a/contrib/workflow_with_ai_parse_document/databricks.yml b/contrib/job_with_ai_parse_document/databricks.yml similarity index 100% rename from contrib/workflow_with_ai_parse_document/databricks.yml rename to contrib/job_with_ai_parse_document/databricks.yml diff --git a/contrib/workflow_with_ai_parse_document/resources/ai_parse_document_job.job.yml b/contrib/job_with_ai_parse_document/resources/ai_parse_document_job.job.yml similarity index 100% rename from contrib/workflow_with_ai_parse_document/resources/ai_parse_document_job.job.yml rename to contrib/job_with_ai_parse_document/resources/ai_parse_document_job.job.yml diff --git a/contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py b/contrib/job_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py similarity index 81% rename from contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py rename to contrib/job_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py index 9f3f507d..2f39afab 100644 --- a/contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py +++ b/contrib/job_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py @@ -109,7 +109,7 @@ # DBTITLE 1,Run Document Parse Code (may take some time) # SQL statement with ai_parse_document() # Note: input_file can be a single file path or a directory path with wildcard -sql = f""" +sql = f''' with parsed_documents AS ( SELECT path, @@ -125,7 +125,7 @@ read_files('{source_files}', format => 'binaryFile') ) select * from parsed_documents -""" +''' parsed_results = [row.parsed for row in spark.sql(sql).collect()] @@ -139,62 +139,59 @@ from PIL import Image import io - class DocumentRenderer: def __init__(self): # Color mapping for different element types self.element_colors = { - "section_header": "#FF6B6B", - "text": "#4ECDC4", - "figure": "#45B7D1", - "caption": "#96CEB4", - "page_footer": "#FFEAA7", - "page_header": "#DDA0DD", - "table": "#98D8C8", - "list": "#F7DC6F", - "default": "#BDC3C7", + 'section_header': '#FF6B6B', + 'text': '#4ECDC4', + 'figure': '#45B7D1', + 'caption': '#96CEB4', + 'page_footer': '#FFEAA7', + 'page_header': '#DDA0DD', + 'table': '#98D8C8', + 'list': '#F7DC6F', + 'default': '#BDC3C7' } - - def _parse_page_selection( - self, page_selection: Union[str, None], total_pages: int - ) -> Set[int]: + + def _parse_page_selection(self, page_selection: Union[str, None], total_pages: int) -> Set[int]: """Parse page selection string and return set of page indices (0-based). - + Args: page_selection: Selection string or None total_pages: Total number of pages available - + Returns: Set of 0-based page indices to display """ # Handle None or "all" - return all pages if page_selection is None or page_selection.lower() == "all": return set(range(total_pages)) - + selected_pages = set() - + # Clean the input page_selection = page_selection.strip() - + # Split by commas for multiple selections - parts = page_selection.split(",") - + parts = page_selection.split(',') + for part in parts: part = part.strip() - + # Check if it's a range (contains hyphen) - if "-" in part: + if '-' in part: try: # Split range and convert to integers - range_parts = part.split("-") + range_parts = part.split('-') if len(range_parts) == 2: start = int(range_parts[0].strip()) end = int(range_parts[1].strip()) - + # Convert from 1-indexed to 0-indexed start_idx = start - 1 end_idx = end - 1 - + # Add all pages in range (inclusive) for i in range(start_idx, end_idx + 1): if 0 <= i < total_pages: @@ -210,27 +207,21 @@ def _parse_page_selection( if 0 <= page_idx < total_pages: selected_pages.add(page_idx) else: - print( - f"Warning: Page {page_num} is out of range (1-{total_pages})" - ) + print(f"Warning: Page {page_num} is out of range (1-{total_pages})") except ValueError: print(f"Warning: Invalid page number '{part}' in page selection") - + # If no valid pages were selected, default to all pages if not selected_pages: - print( - f"Warning: No valid pages in selection '{page_selection}'. Showing all pages." - ) + print(f"Warning: No valid pages in selection '{page_selection}'. Showing all pages.") return set(range(total_pages)) - + return selected_pages - + def _get_element_color(self, element_type: str) -> str: """Get color for element type.""" - return self.element_colors.get( - element_type.lower(), self.element_colors["default"] - ) - + return self.element_colors.get(element_type.lower(), self.element_colors['default']) + def _get_image_dimensions(self, image_path: str) -> Optional[Tuple[int, int]]: """Get dimensions of an image file.""" try: @@ -241,18 +232,18 @@ def _get_image_dimensions(self, image_path: str) -> Optional[Tuple[int, int]]: except Exception as e: print(f"Error getting image dimensions for {image_path}: {e}") return None - + def _load_image_as_base64(self, image_path: str) -> Optional[str]: """Load image from file path and convert to base64.""" try: if os.path.exists(image_path): - with open(image_path, "rb") as img_file: + with open(image_path, 'rb') as img_file: img_data = img_file.read() - img_base64 = base64.b64encode(img_data).decode("utf-8") + img_base64 = base64.b64encode(img_data).decode('utf-8') ext = os.path.splitext(image_path)[1].lower() - if ext in [".jpg", ".jpeg"]: + if ext in ['.jpg', '.jpeg']: return f"data:image/jpeg;base64,{img_base64}" - elif ext in [".png"]: + elif ext in ['.png']: return f"data:image/png;base64,{img_base64}" else: return f"data:image/jpeg;base64,{img_base64}" @@ -260,25 +251,25 @@ def _load_image_as_base64(self, image_path: str) -> Optional[str]: except Exception as e: print(f"Error loading image {image_path}: {e}") return None - + def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> str: """Render element content with appropriate formatting for both tooltip and element list display. - + Args: element: The element dictionary containing content/description for_tooltip: Whether this is for tooltip display (affects styling and truncation) """ - element_type = element.get("type", "unknown") - content = element.get("content", "") - description = element.get("description", "") - + element_type = element.get('type', 'unknown') + content = element.get('content', '') + description = element.get('description', '') + display_content = "" - + if content: - if element_type == "table": + if element_type == 'table': # Render the HTML table with styling table_html = content - + # Apply different styling based on context if for_tooltip: # Compact styling for tooltips with light theme @@ -293,17 +284,17 @@ def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> s th_style = 'style="border: 1px solid #ddd; padding: 8px; background: #f5f5f5; font-weight: bold; text-align: left;"' td_style = 'style="border: 1px solid #ddd; padding: 8px;"' thead_style = 'style="background: #f0f0f0;"' - + # Apply styling transformations - if "" in table_html: - table_html = table_html.replace("
", f"
") - if "" in table_html: - table_html = table_html.replace("", f"") - + if '
" in table_html: - table_html = table_html.replace("", f"") - if "" in table_html: - table_html = table_html.replace("", f"") - if "
' in table_html: + table_html = table_html.replace('
', f'
') + if '' in table_html: + table_html = table_html.replace('', f'') + if for_tooltip: display_content = table_html else: @@ -312,85 +303,71 @@ def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> s # Regular content handling if for_tooltip and len(content) > 500: # Truncate for tooltip display and escape HTML for safety - display_content = self._escape_for_html_attribute( - content[:500] + "..." - ) + display_content = self._escape_for_html_attribute(content[:500] + "...") else: - display_content = ( - self._escape_for_html_attribute(content) - if for_tooltip - else content - ) + display_content = self._escape_for_html_attribute(content) if for_tooltip else content elif description: desc_content = description if for_tooltip and len(desc_content) > 500: desc_content = desc_content[:500] + "..." - + if for_tooltip: - display_content = self._escape_for_html_attribute( - f"Description: {desc_content}" - ) + display_content = self._escape_for_html_attribute(f"Description: {desc_content}") else: display_content = f"Description: {desc_content}" else: - display_content = ( - "No content available" if for_tooltip else "No content" - ) - + display_content = "No content available" if for_tooltip else "No content" + return display_content - + def _escape_for_html_attribute(self, text: str) -> str: """Escape text for safe use in HTML attributes.""" - return ( - text.replace("&", "&") - .replace("<", "<") - .replace(">", ">") - .replace('"', """) - .replace("'", "'") - .replace("\n", "
") - ) - + return (text.replace('&', '&') + .replace('<', '<') + .replace('>', '>') + .replace('"', '"') + .replace("'", ''') + .replace('\n', '
')) + def _calculate_tooltip_width(self, element: Dict, image_width: int) -> int: """Calculate dynamic tooltip width based on table content.""" - element_type = element.get("type", "unknown") - content = element.get("content", "") - - if element_type == "table" and content: + element_type = element.get('type', 'unknown') + content = element.get('content', '') + + if element_type == 'table' and content: # Count columns by looking for ", content, re.DOTALL | re.IGNORECASE - ) + first_row_match = re.search(r']*>(.*?)', content, re.DOTALL | re.IGNORECASE) if first_row_match: first_row = first_row_match.group(1) # Count th or td tags - th_count = len(re.findall(r"]*>", first_row, re.IGNORECASE)) - td_count = len(re.findall(r"]*>", first_row, re.IGNORECASE)) + th_count = len(re.findall(r']*>', first_row, re.IGNORECASE)) + td_count = len(re.findall(r']*>', first_row, re.IGNORECASE)) column_count = max(th_count, td_count) - + if column_count > 0: # Base width + additional width per column base_width = 300 width_per_column = 80 calculated_width = base_width + (column_count * width_per_column) - + # Cap at 4/5th of image width max_width = int(image_width * 0.8) return min(calculated_width, max_width) - + # Default width for non-tables or when calculation fails return 400 - + def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: """Create annotated image with SCALING to fit within 1024px width.""" - image_uri = page.get("image_uri", "") - page_id = page.get("id", 0) - + image_uri = page.get('image_uri', '') + page_id = page.get('id', 0) + if not image_uri: return "

No image URI found for this page

" - + # Load image img_data_uri = self._load_image_as_base64(image_uri) if not img_data_uri: @@ -400,7 +377,7 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: Make sure the file exists and is accessible. """ - + # Get original image dimensions original_dimensions = self._get_image_dimensions(image_uri) if not original_dimensions: @@ -408,35 +385,38 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: original_width, original_height = 1024, 768 # Default fallback else: original_width, original_height = original_dimensions - + # Calculate scaling factor to fit within 1024px width max_display_width = 1024 scale_factor = 1.0 display_width = original_width display_height = original_height - + if original_width > max_display_width: scale_factor = max_display_width / original_width display_width = max_display_width display_height = int(original_height * scale_factor) - + # Filter elements for this page and collect their bounding boxes page_elements = [] - + for elem in elements: elem_bboxes = [] - for bbox in elem.get("bbox", []): - if bbox.get("page_id", 0) == page_id: - coord = bbox.get("coord", []) + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) == page_id: + coord = bbox.get('coord', []) if len(coord) >= 4: elem_bboxes.append(bbox) - + if elem_bboxes: - page_elements.append({"element": elem, "bboxes": elem_bboxes}) - + page_elements.append({ + 'element': elem, + 'bboxes': elem_bboxes + }) + if not page_elements: return f"

No elements found for page {page_id}

" - + header_info = f"""
Page {page_id + 1}: {len(page_elements)} elements
@@ -445,54 +425,54 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: Scale factor: {scale_factor:.3f}
""" - + # Generate unique container ID for this page container_id = f"page_container_{page_id}_{id(self)}" - + # Create bounding box overlays using SCALED coordinates with hover functionality overlays = [] - + for idx, item in enumerate(page_elements): - element = item["element"] - element_id = element.get("id", "N/A") - element_type = element.get("type", "unknown") + element = item['element'] + element_id = element.get('id', 'N/A') + element_type = element.get('type', 'unknown') color = self._get_element_color(element_type) - + # Use the shared content renderer for tooltip tooltip_content = self._render_element_content(element, for_tooltip=True) - + # Calculate dynamic tooltip width tooltip_width = self._calculate_tooltip_width(element, display_width) - + # Tables should render as HTML, other content should be escaped - - for bbox_idx, bbox in enumerate(item["bboxes"]): - coord = bbox.get("coord", []) + + for bbox_idx, bbox in enumerate(item['bboxes']): + coord = bbox.get('coord', []) if len(coord) >= 4: x1, y1, x2, y2 = coord - + # Apply scaling to coordinates scaled_x1 = x1 * scale_factor scaled_y1 = y1 * scale_factor scaled_x2 = x2 * scale_factor scaled_y2 = y2 * scale_factor - + width = scaled_x2 - scaled_x1 height = scaled_y2 - scaled_y1 - + # Skip invalid boxes if width <= 0 or height <= 0: continue - + # Position label above box when possible label_top = -18 if scaled_y1 >= 18 else 2 - + # Unique ID for this bounding box box_id = f"bbox_{page_id}_{idx}_{bbox_idx}" - + # Calculate tooltip position (prefer right side, but switch to left if needed) tooltip_left = 10 - + overlay = f"""
str: white-space: nowrap; border-radius: 2px; box-shadow: 0 1px 2px rgba(0,0,0,0.3); pointer-events: none; - max-width: {max(50, width - 4):.0f}px; + max-width: {max(50, width-4):.0f}px; overflow: hidden; z-index: 1000;"> {element_type.upper()[:6]}#{element_id} @@ -546,7 +526,7 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
""" overlays.append(overlay) - + # Pure CSS hover functionality (works in Databricks) styles = f""" """ - + return f""" {header_info} {styles} @@ -580,58 +560,56 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: Page {page_id + 1} - {"".join(overlays)} + {''.join(overlays)} """ - + def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str: """Create a detailed list of elements for a specific page.""" # Filter elements for this page page_elements = [] - + for elem in elements: elem_bboxes = [] - for bbox in elem.get("bbox", []): - if bbox.get("page_id", 0) == page_id: + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) == page_id: elem_bboxes.append(bbox) - + if elem_bboxes: page_elements.append(elem) - + if not page_elements: return f"

No elements found for page {page_id + 1}

" - + html_parts = [] - + for element in page_elements: - element_id = element.get("id", "N/A") - element_type = element.get("type", "unknown") + element_id = element.get('id', 'N/A') + element_type = element.get('type', 'unknown') color = self._get_element_color(element_type) - + # Get bounding box info for this page only bbox_info = "No bbox" - bbox_list = element.get("bbox", []) + bbox_list = element.get('bbox', []) if bbox_list: bbox_details = [] for bbox in bbox_list: - if bbox.get("page_id", 0) == page_id: - coord = bbox.get("coord", []) + if bbox.get('page_id', 0) == page_id: + coord = bbox.get('coord', []) if len(coord) >= 4: - bbox_details.append( - f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]" - ) + bbox_details.append(f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]") bbox_info = "; ".join(bbox_details) if bbox_details else "Invalid bbox" - + # Use the shared content renderer for element list display display_content = self._render_element_content(element, for_tooltip=False) - + element_html = f"""

- {element_type.upper().replace("_", " ")} (ID: {element_id}) + {element_type.upper().replace('_', ' ')} (ID: {element_id})

{bbox_info} @@ -643,36 +621,34 @@ def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str:
""" html_parts.append(element_html) - + return f"""

📋 Page {page_id + 1} Elements ({len(page_elements)} items)

- {"".join(html_parts)} + {''.join(html_parts)}
""" - - def _create_summary( - self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int - ) -> str: + + def _create_summary(self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int) -> str: """Create a summary with page selection info.""" - elements = document.get("elements", []) - + elements = document.get('elements', []) + # Count elements only on selected pages selected_elements = [] for elem in elements: - for bbox in elem.get("bbox", []): - if bbox.get("page_id", 0) in selected_pages: + for bbox in elem.get('bbox', []): + if bbox.get('page_id', 0) in selected_pages: selected_elements.append(elem) break - + # Count by type (for selected pages) type_counts = {} for elem in selected_elements: - elem_type = elem.get("type", "unknown") + elem_type = elem.get('type', 'unknown') type_counts[elem_type] = type_counts.get(elem_type, 0) + 1 - - type_list = ", ".join([f"{t}: {c}" for t, c in type_counts.items()]) - + + type_list = ', '.join([f"{t}: {c}" for t, c in type_counts.items()]) + # Create page selection info if len(selected_pages) == total_pages: page_info = f"All {total_pages} pages" @@ -683,22 +659,20 @@ def _create_summary( page_info = f"Pages {', '.join(map(str, page_nums))} ({len(selected_pages)} of {total_pages})" else: page_info = f"{len(selected_pages)} of {total_pages} pages selected" - + return f"""

📄 Document Summary

Displaying: {page_info}

Elements on selected pages: {len(selected_elements)}

-

Element Types: {type_list if type_list else "None"}

-

Document ID: {str(metadata.get("id", "N/A"))[:12]}...

+

Element Types: {type_list if type_list else 'None'}

+

Document ID: {str(metadata.get('id', 'N/A'))[:12]}...

""" - - def render_document( - self, parsed_result: Any, page_selection: Union[str, None] = None - ) -> None: + + def render_document(self, parsed_result: Any, page_selection: Union[str, None] = None) -> None: """Main render function with page selection support. - + Args: parsed_result: The parsed document result page_selection: Page selection string. Supported formats: @@ -710,100 +684,85 @@ def render_document( """ try: # Convert to dict - if hasattr(parsed_result, "toPython"): + if hasattr(parsed_result, 'toPython'): parsed_dict = parsed_result.toPython() - elif hasattr(parsed_result, "toJson"): + elif hasattr(parsed_result, 'toJson'): parsed_dict = json.loads(parsed_result.toJson()) elif isinstance(parsed_result, dict): parsed_dict = parsed_result else: - display( - HTML( - f"

❌ Could not convert result. Type: {type(parsed_result)}

" - ) - ) + display(HTML(f"

❌ Could not convert result. Type: {type(parsed_result)}

")) return - + # Extract components - document = parsed_dict.get("document", {}) - pages = document.get("pages", []) - elements = document.get("elements", []) - metadata = parsed_dict.get("metadata", {}) - + document = parsed_dict.get('document', {}) + pages = document.get('pages', []) + elements = document.get('elements', []) + metadata = parsed_dict.get('metadata', {}) + if not elements: - display( - HTML("

❌ No elements found in document

") - ) + display(HTML("

❌ No elements found in document

")) return - + # Parse page selection selected_pages = self._parse_page_selection(page_selection, len(pages)) - + # Display title display(HTML("

🔍 AI Parse Document Results

")) - + # Display summary with page selection info - summary_html = self._create_summary( - document, metadata, selected_pages, len(pages) - ) + summary_html = self._create_summary(document, metadata, selected_pages, len(pages)) display(HTML(summary_html)) - + # Display color legend legend_items = [] for elem_type, color in self.element_colors.items(): - if elem_type != "default": + if elem_type != 'default': legend_items.append(f""" - {elem_type.replace("_", " ").title()} + {elem_type.replace('_', ' ').title()} """) - - display( - HTML(f""" + + display(HTML(f"""
🎨 Element Colors:
- {"".join(legend_items)} + {''.join(legend_items)}
- """) - ) - + """)) + # Display annotated images with their corresponding elements (filtered by selection) if pages: display(HTML("

🖼️ Annotated Images & Elements

")) - + # Sort selected pages for display sorted_selected = sorted(selected_pages) - + for page_idx in sorted_selected: if page_idx < len(pages): page = pages[page_idx] - + # Display the annotated image annotated_html = self._create_annotated_image(page, elements) - display( - HTML(f"
{annotated_html}
") - ) - + display(HTML(f"
{annotated_html}
")) + # Display elements for this page immediately after the image - page_id = page.get("id", page_idx) - page_elements_html = self._create_page_elements_list( - page_id, elements - ) + page_id = page.get('id', page_idx) + page_elements_html = self._create_page_elements_list(page_id, elements) display(HTML(page_elements_html)) - + except Exception as e: display(HTML(f"

❌ Error: {str(e)}

")) import traceback - display(HTML(f"
{traceback.format_exc()}
")) # Simple usage functions def render_ai_parse_output(parsed_result, page_selection=None): """Simple function to render ai_parse_document output with page selection. - + Args: parsed_result: The parsed document result page_selection: Optional page selection string. Examples: @@ -816,9 +775,8 @@ def render_ai_parse_output(parsed_result, page_selection=None): renderer = DocumentRenderer() renderer.render_document(parsed_result, page_selection) - # COMMAND ---------- # DBTITLE 1,Debug Visualization Results for parsed_result in parsed_results: - render_ai_parse_output(parsed_result, page_selection) + render_ai_parse_output(parsed_result, page_selection) \ No newline at end of file diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py b/contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py similarity index 67% rename from contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py rename to contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py index 297aad29..4c3e2bb6 100644 --- a/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py +++ b/contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py @@ -9,17 +9,9 @@ # Get parameters dbutils.widgets.text("catalog", "main", "Catalog name") dbutils.widgets.text("schema", "default", "Schema name") -dbutils.widgets.text( - "source_volume_path", "/Volumes/main/default/source_documents", "Source volume path" -) -dbutils.widgets.text( - "output_volume_path", "/Volumes/main/default/parsed_output", "Output volume path" -) -dbutils.widgets.text( - "checkpoint_location", - "/Volumes/main/default/checkpoints/parse_documents", - "Checkpoint location", -) +dbutils.widgets.text("source_volume_path", "/Volumes/main/default/source_documents", "Source volume path") +dbutils.widgets.text("output_volume_path", "/Volumes/main/default/parsed_output", "Output volume path") +dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/parse_documents", "Checkpoint location") dbutils.widgets.text("table_name", "parsed_documents_raw", "Output table name") catalog = dbutils.widgets.get("catalog") @@ -38,28 +30,19 @@ # COMMAND ---------- from pyspark.sql.functions import col, current_timestamp, expr -from pyspark.sql.types import ( - StructType, - StructField, - StringType, - BinaryType, - TimestampType, - LongType, -) +from pyspark.sql.types import StructType, StructField, StringType, BinaryType, TimestampType, LongType # Define schema for binary files (must match exact schema expected by binaryFile format) -binary_file_schema = StructType( - [ - StructField("path", StringType(), False), - StructField("modificationTime", TimestampType(), False), - StructField("length", LongType(), False), - StructField("content", BinaryType(), True), - ] -) +binary_file_schema = StructType([ + StructField("path", StringType(), False), + StructField("modificationTime", TimestampType(), False), + StructField("length", LongType(), False), + StructField("content", BinaryType(), True) +]) # Read files using Structured Streaming -files_df = ( - spark.readStream.format("binaryFile") +files_df = (spark.readStream + .format("binaryFile") .schema(binary_file_schema) .option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}") .option("recursiveFileLookup", "true") @@ -67,10 +50,9 @@ ) # Parse documents with ai_parse_document -parsed_df = ( - files_df.repartition(8, expr("crc32(path) % 8")) - .withColumn( - "parsed", +parsed_df = (files_df + .repartition(8, expr("crc32(path) % 8")) + .withColumn("parsed", expr(f""" ai_parse_document( content, @@ -80,15 +62,15 @@ 'descriptionElementTypes', '*' ) ) - """), + """) ) .withColumn("parsed_at", current_timestamp()) .select("path", "parsed", "parsed_at") ) # Write to Delta table with streaming -( - parsed_df.writeStream.format("delta") +(parsed_df.writeStream + .format("delta") .outputMode("append") .option("checkpointLocation", checkpoint_location) .option("delta.feature.variantType-preview", "supported") diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py b/contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py similarity index 58% rename from contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py rename to contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py index 29fa9097..e352e784 100644 --- a/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py +++ b/contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py @@ -9,11 +9,7 @@ # Get parameters dbutils.widgets.text("catalog", "main", "Catalog name") dbutils.widgets.text("schema", "default", "Schema name") -dbutils.widgets.text( - "checkpoint_location", - "/Volumes/main/default/checkpoints/extract_text", - "Checkpoint location", -) +dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/extract_text", "Checkpoint location") dbutils.widgets.text("source_table_name", "parsed_documents_raw", "Source table name") dbutils.widgets.text("table_name", "parsed_documents_text", "Output table name") @@ -34,37 +30,36 @@ from pyspark.sql.functions import col, concat_ws, expr, lit, when # Read from source table using Structured Streaming -parsed_stream = spark.readStream.format("delta").table(source_table_name) +parsed_stream = (spark.readStream + .format("delta") + .table(source_table_name) +) # Extract text from parsed documents -text_df = ( - parsed_stream.withColumn( - "text", - when( - expr("try_cast(parsed:error_status AS STRING)").isNotNull(), lit(None) - ).otherwise( - concat_ws( - "\n\n", - expr(""" +text_df = parsed_stream.withColumn( + "text", + when( + expr("try_cast(parsed:error_status AS STRING)").isNotNull(), + lit(None) + ).otherwise( + concat_ws( + "\n\n", + expr(""" transform( - CASE - WHEN try_cast(parsed:metadata:version AS STRING) = '1.0' - THEN try_cast(parsed:document:pages AS ARRAY) - ELSE try_cast(parsed:document:elements AS ARRAY) - END, + try_cast(parsed:document:elements AS ARRAY), element -> try_cast(element:content AS STRING) ) - """), - ) - ), + """) + ) ) - .withColumn("error_status", expr("try_cast(parsed:error_status AS STRING)")) - .select("path", "text", "error_status", "parsed_at") -) +).withColumn( + "error_status", + expr("try_cast(parsed:error_status AS STRING)") +).select("path", "text", "error_status", "parsed_at") # Write to Delta table with streaming -( - text_df.writeStream.format("delta") +(text_df.writeStream + .format("delta") .outputMode("append") .option("checkpointLocation", checkpoint_location) .option("mergeSchema", "true") diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py b/contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py similarity index 76% rename from contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py rename to contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py index 7320711c..a78a2ad0 100644 --- a/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py +++ b/contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py @@ -9,11 +9,7 @@ # Get parameters dbutils.widgets.text("catalog", "main", "Catalog name") dbutils.widgets.text("schema", "default", "Schema name") -dbutils.widgets.text( - "checkpoint_location", - "/Volumes/main/default/checkpoints/extract_structured", - "Checkpoint location", -) +dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/extract_structured", "Checkpoint location") dbutils.widgets.text("source_table_name", "parsed_documents_text", "Source table name") dbutils.widgets.text("table_name", "parsed_documents_structured", "Output table name") @@ -34,21 +30,20 @@ from pyspark.sql.functions import col, concat, current_timestamp, expr, length, lit # Read from source table using Structured Streaming -text_stream = ( - spark.readStream.format("delta") +text_stream = (spark.readStream + .format("delta") .table(source_table_name) .filter( - (col("text").isNotNull()) - & (col("error_status").isNull()) - & (length(col("text")) > 100) + (col("text").isNotNull()) & + (col("error_status").isNull()) & + (length(col("text")) > 100) ) ) # Extract structured data using ai_query -structured_df = ( - text_stream.withColumn( - "extracted_json", - expr(""" +structured_df = text_stream.withColumn( + "extracted_json", + expr(""" ai_query( 'databricks-claude-sonnet-4', concat( @@ -64,15 +59,14 @@ 'temperature', 0.1 ) ) - """), - ) - .withColumn("extraction_timestamp", current_timestamp()) - .select("path", "extracted_json", "parsed_at", "extraction_timestamp") -) + """) +).withColumn( + "extraction_timestamp", current_timestamp() +).select("path", "extracted_json", "parsed_at", "extraction_timestamp") # Write to Delta table with streaming -( - structured_df.writeStream.format("delta") +(structured_df.writeStream + .format("delta") .outputMode("append") .option("checkpointLocation", checkpoint_location) .option("mergeSchema", "true")
' in table_html: + table_html = table_html.replace('', f'') + if '' in table_html: + table_html = table_html.replace('', f'') + if '
or tags in first row import re - + # Find first row (either in thead or tbody) - first_row_match = re.search( - r"]*>(.*?)