")
- if "| " in table_html:
- table_html = table_html.replace(" | ", f" | ")
- if " | " in table_html:
- table_html = table_html.replace(" | ", f" | ")
- if "" in table_html:
- table_html = table_html.replace("", f"")
-
+ if '' in table_html:
+ table_html = table_html.replace('', f'')
+ if '| ' in table_html:
+ table_html = table_html.replace(' | ', f' | ')
+ if ' | ' in table_html:
+ table_html = table_html.replace(' | ', f' | ')
+ if '' in table_html:
+ table_html = table_html.replace('', f'')
+
if for_tooltip:
display_content = table_html
else:
@@ -312,85 +303,71 @@ def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> s
# Regular content handling
if for_tooltip and len(content) > 500:
# Truncate for tooltip display and escape HTML for safety
- display_content = self._escape_for_html_attribute(
- content[:500] + "..."
- )
+ display_content = self._escape_for_html_attribute(content[:500] + "...")
else:
- display_content = (
- self._escape_for_html_attribute(content)
- if for_tooltip
- else content
- )
+ display_content = self._escape_for_html_attribute(content) if for_tooltip else content
elif description:
desc_content = description
if for_tooltip and len(desc_content) > 500:
desc_content = desc_content[:500] + "..."
-
+
if for_tooltip:
- display_content = self._escape_for_html_attribute(
- f"Description: {desc_content}"
- )
+ display_content = self._escape_for_html_attribute(f"Description: {desc_content}")
else:
display_content = f"Description: {desc_content}"
else:
- display_content = (
- "No content available" if for_tooltip else "No content"
- )
-
+ display_content = "No content available" if for_tooltip else "No content"
+
return display_content
-
+
def _escape_for_html_attribute(self, text: str) -> str:
"""Escape text for safe use in HTML attributes."""
- return (
- text.replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
- .replace("'", "'")
- .replace("\n", " ")
- )
-
+ return (text.replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>')
+ .replace('"', '"')
+ .replace("'", ''')
+ .replace('\n', ' '))
+
def _calculate_tooltip_width(self, element: Dict, image_width: int) -> int:
"""Calculate dynamic tooltip width based on table content."""
- element_type = element.get("type", "unknown")
- content = element.get("content", "")
-
- if element_type == "table" and content:
+ element_type = element.get('type', 'unknown')
+ content = element.get('content', '')
+
+ if element_type == 'table' and content:
# Count columns by looking for or | tags in first row
import re
-
+
# Find first row (either in thead or tbody)
- first_row_match = re.search(
- r" | ]*>(.*?) ", content, re.DOTALL | re.IGNORECASE
- )
+ first_row_match = re.search(r']*>(.*?) ', content, re.DOTALL | re.IGNORECASE)
if first_row_match:
first_row = first_row_match.group(1)
# Count th or td tags
- th_count = len(re.findall(r"]*>", first_row, re.IGNORECASE))
- td_count = len(re.findall(r" | ]*>", first_row, re.IGNORECASE))
+ th_count = len(re.findall(r' | ]*>', first_row, re.IGNORECASE))
+ td_count = len(re.findall(r' | ]*>', first_row, re.IGNORECASE))
column_count = max(th_count, td_count)
-
+
if column_count > 0:
# Base width + additional width per column
base_width = 300
width_per_column = 80
calculated_width = base_width + (column_count * width_per_column)
-
+
# Cap at 4/5th of image width
max_width = int(image_width * 0.8)
return min(calculated_width, max_width)
-
+
# Default width for non-tables or when calculation fails
return 400
-
+
def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
"""Create annotated image with SCALING to fit within 1024px width."""
- image_uri = page.get("image_uri", "")
- page_id = page.get("id", 0)
-
+ image_uri = page.get('image_uri', '')
+ page_id = page.get('id', 0)
+
if not image_uri:
return " No image URI found for this page "
-
+
# Load image
img_data_uri = self._load_image_as_base64(image_uri)
if not img_data_uri:
@@ -400,7 +377,7 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
Make sure the file exists and is accessible.
"""
-
+
# Get original image dimensions
original_dimensions = self._get_image_dimensions(image_uri)
if not original_dimensions:
@@ -408,35 +385,38 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
original_width, original_height = 1024, 768 # Default fallback
else:
original_width, original_height = original_dimensions
-
+
# Calculate scaling factor to fit within 1024px width
max_display_width = 1024
scale_factor = 1.0
display_width = original_width
display_height = original_height
-
+
if original_width > max_display_width:
scale_factor = max_display_width / original_width
display_width = max_display_width
display_height = int(original_height * scale_factor)
-
+
# Filter elements for this page and collect their bounding boxes
page_elements = []
-
+
for elem in elements:
elem_bboxes = []
- for bbox in elem.get("bbox", []):
- if bbox.get("page_id", 0) == page_id:
- coord = bbox.get("coord", [])
+ for bbox in elem.get('bbox', []):
+ if bbox.get('page_id', 0) == page_id:
+ coord = bbox.get('coord', [])
if len(coord) >= 4:
elem_bboxes.append(bbox)
-
+
if elem_bboxes:
- page_elements.append({"element": elem, "bboxes": elem_bboxes})
-
+ page_elements.append({
+ 'element': elem,
+ 'bboxes': elem_bboxes
+ })
+
if not page_elements:
return f"No elements found for page {page_id} "
-
+
header_info = f"""
Page {page_id + 1}: {len(page_elements)} elements
@@ -445,54 +425,54 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
Scale factor: {scale_factor:.3f}
"""
-
+
# Generate unique container ID for this page
container_id = f"page_container_{page_id}_{id(self)}"
-
+
# Create bounding box overlays using SCALED coordinates with hover functionality
overlays = []
-
+
for idx, item in enumerate(page_elements):
- element = item["element"]
- element_id = element.get("id", "N/A")
- element_type = element.get("type", "unknown")
+ element = item['element']
+ element_id = element.get('id', 'N/A')
+ element_type = element.get('type', 'unknown')
color = self._get_element_color(element_type)
-
+
# Use the shared content renderer for tooltip
tooltip_content = self._render_element_content(element, for_tooltip=True)
-
+
# Calculate dynamic tooltip width
tooltip_width = self._calculate_tooltip_width(element, display_width)
-
+
# Tables should render as HTML, other content should be escaped
-
- for bbox_idx, bbox in enumerate(item["bboxes"]):
- coord = bbox.get("coord", [])
+
+ for bbox_idx, bbox in enumerate(item['bboxes']):
+ coord = bbox.get('coord', [])
if len(coord) >= 4:
x1, y1, x2, y2 = coord
-
+
# Apply scaling to coordinates
scaled_x1 = x1 * scale_factor
scaled_y1 = y1 * scale_factor
scaled_x2 = x2 * scale_factor
scaled_y2 = y2 * scale_factor
-
+
width = scaled_x2 - scaled_x1
height = scaled_y2 - scaled_y1
-
+
# Skip invalid boxes
if width <= 0 or height <= 0:
continue
-
+
# Position label above box when possible
label_top = -18 if scaled_y1 >= 18 else 2
-
+
# Unique ID for this bounding box
box_id = f"bbox_{page_id}_{idx}_{bbox_idx}"
-
+
# Calculate tooltip position (prefer right side, but switch to left if needed)
tooltip_left = 10
-
+
overlay = f"""
str:
white-space: nowrap; border-radius: 2px;
box-shadow: 0 1px 2px rgba(0,0,0,0.3);
pointer-events: none;
- max-width: {max(50, width - 4):.0f}px;
+ max-width: {max(50, width-4):.0f}px;
overflow: hidden;
z-index: 1000;">
{element_type.upper()[:6]}#{element_id}
@@ -546,7 +526,7 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
"""
overlays.append(overlay)
-
+
# Pure CSS hover functionality (works in Databricks)
styles = f"""
"""
-
+
return f"""
{header_info}
{styles}
@@ -580,58 +560,56 @@ def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str:
- {"".join(overlays)}
+ {''.join(overlays)}
"""
-
+
def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str:
"""Create a detailed list of elements for a specific page."""
# Filter elements for this page
page_elements = []
-
+
for elem in elements:
elem_bboxes = []
- for bbox in elem.get("bbox", []):
- if bbox.get("page_id", 0) == page_id:
+ for bbox in elem.get('bbox', []):
+ if bbox.get('page_id', 0) == page_id:
elem_bboxes.append(bbox)
-
+
if elem_bboxes:
page_elements.append(elem)
-
+
if not page_elements:
return f"No elements found for page {page_id + 1} "
-
+
html_parts = []
-
+
for element in page_elements:
- element_id = element.get("id", "N/A")
- element_type = element.get("type", "unknown")
+ element_id = element.get('id', 'N/A')
+ element_type = element.get('type', 'unknown')
color = self._get_element_color(element_type)
-
+
# Get bounding box info for this page only
bbox_info = "No bbox"
- bbox_list = element.get("bbox", [])
+ bbox_list = element.get('bbox', [])
if bbox_list:
bbox_details = []
for bbox in bbox_list:
- if bbox.get("page_id", 0) == page_id:
- coord = bbox.get("coord", [])
+ if bbox.get('page_id', 0) == page_id:
+ coord = bbox.get('coord', [])
if len(coord) >= 4:
- bbox_details.append(
- f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]"
- )
+ bbox_details.append(f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]")
bbox_info = "; ".join(bbox_details) if bbox_details else "Invalid bbox"
-
+
# Use the shared content renderer for element list display
display_content = self._render_element_content(element, for_tooltip=False)
-
+
element_html = f"""
- {element_type.upper().replace("_", " ")} (ID: {element_id})
+ {element_type.upper().replace('_', ' ')} (ID: {element_id})
{bbox_info}
@@ -643,36 +621,34 @@ def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str:
"""
html_parts.append(element_html)
-
+
return f"""
📋 Page {page_id + 1} Elements ({len(page_elements)} items)
- {"".join(html_parts)}
+ {''.join(html_parts)}
"""
-
- def _create_summary(
- self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int
- ) -> str:
+
+ def _create_summary(self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int) -> str:
"""Create a summary with page selection info."""
- elements = document.get("elements", [])
-
+ elements = document.get('elements', [])
+
# Count elements only on selected pages
selected_elements = []
for elem in elements:
- for bbox in elem.get("bbox", []):
- if bbox.get("page_id", 0) in selected_pages:
+ for bbox in elem.get('bbox', []):
+ if bbox.get('page_id', 0) in selected_pages:
selected_elements.append(elem)
break
-
+
# Count by type (for selected pages)
type_counts = {}
for elem in selected_elements:
- elem_type = elem.get("type", "unknown")
+ elem_type = elem.get('type', 'unknown')
type_counts[elem_type] = type_counts.get(elem_type, 0) + 1
-
- type_list = ", ".join([f"{t}: {c}" for t, c in type_counts.items()])
-
+
+ type_list = ', '.join([f"{t}: {c}" for t, c in type_counts.items()])
+
# Create page selection info
if len(selected_pages) == total_pages:
page_info = f"All {total_pages} pages"
@@ -683,22 +659,20 @@ def _create_summary(
page_info = f"Pages {', '.join(map(str, page_nums))} ({len(selected_pages)} of {total_pages})"
else:
page_info = f"{len(selected_pages)} of {total_pages} pages selected"
-
+
return f"""
📄 Document Summary
Displaying: {page_info}
Elements on selected pages: {len(selected_elements)}
- Element Types: {type_list if type_list else "None"}
- Document ID: {str(metadata.get("id", "N/A"))[:12]}...
+ Element Types: {type_list if type_list else 'None'}
+ Document ID: {str(metadata.get('id', 'N/A'))[:12]}...
"""
-
- def render_document(
- self, parsed_result: Any, page_selection: Union[str, None] = None
- ) -> None:
+
+ def render_document(self, parsed_result: Any, page_selection: Union[str, None] = None) -> None:
"""Main render function with page selection support.
-
+
Args:
parsed_result: The parsed document result
page_selection: Page selection string. Supported formats:
@@ -710,100 +684,85 @@ def render_document(
"""
try:
# Convert to dict
- if hasattr(parsed_result, "toPython"):
+ if hasattr(parsed_result, 'toPython'):
parsed_dict = parsed_result.toPython()
- elif hasattr(parsed_result, "toJson"):
+ elif hasattr(parsed_result, 'toJson'):
parsed_dict = json.loads(parsed_result.toJson())
elif isinstance(parsed_result, dict):
parsed_dict = parsed_result
else:
- display(
- HTML(
- f" ❌ Could not convert result. Type: {type(parsed_result)} "
- )
- )
+ display(HTML(f" ❌ Could not convert result. Type: {type(parsed_result)} "))
return
-
+
# Extract components
- document = parsed_dict.get("document", {})
- pages = document.get("pages", [])
- elements = document.get("elements", [])
- metadata = parsed_dict.get("metadata", {})
-
+ document = parsed_dict.get('document', {})
+ pages = document.get('pages', [])
+ elements = document.get('elements', [])
+ metadata = parsed_dict.get('metadata', {})
+
if not elements:
- display(
- HTML(" ❌ No elements found in document ")
- )
+ display(HTML(" ❌ No elements found in document "))
return
-
+
# Parse page selection
selected_pages = self._parse_page_selection(page_selection, len(pages))
-
+
# Display title
display(HTML(" 🔍 AI Parse Document Results"))
-
+
# Display summary with page selection info
- summary_html = self._create_summary(
- document, metadata, selected_pages, len(pages)
- )
+ summary_html = self._create_summary(document, metadata, selected_pages, len(pages))
display(HTML(summary_html))
-
+
# Display color legend
legend_items = []
for elem_type, color in self.element_colors.items():
- if elem_type != "default":
+ if elem_type != 'default':
legend_items.append(f"""
- {elem_type.replace("_", " ").title()}
+ {elem_type.replace('_', ' ').title()}
""")
-
- display(
- HTML(f"""
+
+ display(HTML(f"""
🎨 Element Colors:
- {"".join(legend_items)}
+ {''.join(legend_items)}
- """)
- )
-
+ """))
+
# Display annotated images with their corresponding elements (filtered by selection)
if pages:
display(HTML(" 🖼️ Annotated Images & Elements"))
-
+
# Sort selected pages for display
sorted_selected = sorted(selected_pages)
-
+
for page_idx in sorted_selected:
if page_idx < len(pages):
page = pages[page_idx]
-
+
# Display the annotated image
annotated_html = self._create_annotated_image(page, elements)
- display(
- HTML(f" {annotated_html} ")
- )
-
+ display(HTML(f" {annotated_html} "))
+
# Display elements for this page immediately after the image
- page_id = page.get("id", page_idx)
- page_elements_html = self._create_page_elements_list(
- page_id, elements
- )
+ page_id = page.get('id', page_idx)
+ page_elements_html = self._create_page_elements_list(page_id, elements)
display(HTML(page_elements_html))
-
+
except Exception as e:
display(HTML(f" ❌ Error: {str(e)} "))
import traceback
-
display(HTML(f" {traceback.format_exc()}"))
# Simple usage functions
def render_ai_parse_output(parsed_result, page_selection=None):
"""Simple function to render ai_parse_document output with page selection.
-
+
Args:
parsed_result: The parsed document result
page_selection: Optional page selection string. Examples:
@@ -816,9 +775,8 @@ def render_ai_parse_output(parsed_result, page_selection=None):
renderer = DocumentRenderer()
renderer.render_document(parsed_result, page_selection)
-
# COMMAND ----------
# DBTITLE 1,Debug Visualization Results
for parsed_result in parsed_results:
- render_ai_parse_output(parsed_result, page_selection)
+ render_ai_parse_output(parsed_result, page_selection)
\ No newline at end of file
diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py b/contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py
similarity index 67%
rename from contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py
rename to contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py
index 297aad29..4c3e2bb6 100644
--- a/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py
+++ b/contrib/job_with_ai_parse_document/src/transformations/01_parse_documents.py
@@ -9,17 +9,9 @@
# Get parameters
dbutils.widgets.text("catalog", "main", "Catalog name")
dbutils.widgets.text("schema", "default", "Schema name")
-dbutils.widgets.text(
- "source_volume_path", "/Volumes/main/default/source_documents", "Source volume path"
-)
-dbutils.widgets.text(
- "output_volume_path", "/Volumes/main/default/parsed_output", "Output volume path"
-)
-dbutils.widgets.text(
- "checkpoint_location",
- "/Volumes/main/default/checkpoints/parse_documents",
- "Checkpoint location",
-)
+dbutils.widgets.text("source_volume_path", "/Volumes/main/default/source_documents", "Source volume path")
+dbutils.widgets.text("output_volume_path", "/Volumes/main/default/parsed_output", "Output volume path")
+dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/parse_documents", "Checkpoint location")
dbutils.widgets.text("table_name", "parsed_documents_raw", "Output table name")
catalog = dbutils.widgets.get("catalog")
@@ -38,28 +30,19 @@
# COMMAND ----------
from pyspark.sql.functions import col, current_timestamp, expr
-from pyspark.sql.types import (
- StructType,
- StructField,
- StringType,
- BinaryType,
- TimestampType,
- LongType,
-)
+from pyspark.sql.types import StructType, StructField, StringType, BinaryType, TimestampType, LongType
# Define schema for binary files (must match exact schema expected by binaryFile format)
-binary_file_schema = StructType(
- [
- StructField("path", StringType(), False),
- StructField("modificationTime", TimestampType(), False),
- StructField("length", LongType(), False),
- StructField("content", BinaryType(), True),
- ]
-)
+binary_file_schema = StructType([
+ StructField("path", StringType(), False),
+ StructField("modificationTime", TimestampType(), False),
+ StructField("length", LongType(), False),
+ StructField("content", BinaryType(), True)
+])
# Read files using Structured Streaming
-files_df = (
- spark.readStream.format("binaryFile")
+files_df = (spark.readStream
+ .format("binaryFile")
.schema(binary_file_schema)
.option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}")
.option("recursiveFileLookup", "true")
@@ -67,10 +50,9 @@
)
# Parse documents with ai_parse_document
-parsed_df = (
- files_df.repartition(8, expr("crc32(path) % 8"))
- .withColumn(
- "parsed",
+parsed_df = (files_df
+ .repartition(8, expr("crc32(path) % 8"))
+ .withColumn("parsed",
expr(f"""
ai_parse_document(
content,
@@ -80,15 +62,15 @@
'descriptionElementTypes', '*'
)
)
- """),
+ """)
)
.withColumn("parsed_at", current_timestamp())
.select("path", "parsed", "parsed_at")
)
# Write to Delta table with streaming
-(
- parsed_df.writeStream.format("delta")
+(parsed_df.writeStream
+ .format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.option("delta.feature.variantType-preview", "supported")
diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py b/contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py
similarity index 58%
rename from contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py
rename to contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py
index 29fa9097..e352e784 100644
--- a/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py
+++ b/contrib/job_with_ai_parse_document/src/transformations/02_extract_text.py
@@ -9,11 +9,7 @@
# Get parameters
dbutils.widgets.text("catalog", "main", "Catalog name")
dbutils.widgets.text("schema", "default", "Schema name")
-dbutils.widgets.text(
- "checkpoint_location",
- "/Volumes/main/default/checkpoints/extract_text",
- "Checkpoint location",
-)
+dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/extract_text", "Checkpoint location")
dbutils.widgets.text("source_table_name", "parsed_documents_raw", "Source table name")
dbutils.widgets.text("table_name", "parsed_documents_text", "Output table name")
@@ -34,37 +30,36 @@
from pyspark.sql.functions import col, concat_ws, expr, lit, when
# Read from source table using Structured Streaming
-parsed_stream = spark.readStream.format("delta").table(source_table_name)
+parsed_stream = (spark.readStream
+ .format("delta")
+ .table(source_table_name)
+)
# Extract text from parsed documents
-text_df = (
- parsed_stream.withColumn(
- "text",
- when(
- expr("try_cast(parsed:error_status AS STRING)").isNotNull(), lit(None)
- ).otherwise(
- concat_ws(
- "\n\n",
- expr("""
+text_df = parsed_stream.withColumn(
+ "text",
+ when(
+ expr("try_cast(parsed:error_status AS STRING)").isNotNull(),
+ lit(None)
+ ).otherwise(
+ concat_ws(
+ "\n\n",
+ expr("""
transform(
- CASE
- WHEN try_cast(parsed:metadata:version AS STRING) = '1.0'
- THEN try_cast(parsed:document:pages AS ARRAY )
- ELSE try_cast(parsed:document:elements AS ARRAY)
- END,
+ try_cast(parsed:document:elements AS ARRAY),
element -> try_cast(element:content AS STRING)
)
- """),
- )
- ),
+ """)
+ )
)
- .withColumn("error_status", expr("try_cast(parsed:error_status AS STRING)"))
- .select("path", "text", "error_status", "parsed_at")
-)
+).withColumn(
+ "error_status",
+ expr("try_cast(parsed:error_status AS STRING)")
+).select("path", "text", "error_status", "parsed_at")
# Write to Delta table with streaming
-(
- text_df.writeStream.format("delta")
+(text_df.writeStream
+ .format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true")
diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py b/contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py
similarity index 76%
rename from contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py
rename to contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py
index 7320711c..a78a2ad0 100644
--- a/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py
+++ b/contrib/job_with_ai_parse_document/src/transformations/03_extract_structured_data.py
@@ -9,11 +9,7 @@
# Get parameters
dbutils.widgets.text("catalog", "main", "Catalog name")
dbutils.widgets.text("schema", "default", "Schema name")
-dbutils.widgets.text(
- "checkpoint_location",
- "/Volumes/main/default/checkpoints/extract_structured",
- "Checkpoint location",
-)
+dbutils.widgets.text("checkpoint_location", "/Volumes/main/default/checkpoints/extract_structured", "Checkpoint location")
dbutils.widgets.text("source_table_name", "parsed_documents_text", "Source table name")
dbutils.widgets.text("table_name", "parsed_documents_structured", "Output table name")
@@ -34,21 +30,20 @@
from pyspark.sql.functions import col, concat, current_timestamp, expr, length, lit
# Read from source table using Structured Streaming
-text_stream = (
- spark.readStream.format("delta")
+text_stream = (spark.readStream
+ .format("delta")
.table(source_table_name)
.filter(
- (col("text").isNotNull())
- & (col("error_status").isNull())
- & (length(col("text")) > 100)
+ (col("text").isNotNull()) &
+ (col("error_status").isNull()) &
+ (length(col("text")) > 100)
)
)
# Extract structured data using ai_query
-structured_df = (
- text_stream.withColumn(
- "extracted_json",
- expr("""
+structured_df = text_stream.withColumn(
+ "extracted_json",
+ expr("""
ai_query(
'databricks-claude-sonnet-4',
concat(
@@ -64,15 +59,14 @@
'temperature', 0.1
)
)
- """),
- )
- .withColumn("extraction_timestamp", current_timestamp())
- .select("path", "extracted_json", "parsed_at", "extraction_timestamp")
-)
+ """)
+).withColumn(
+ "extraction_timestamp", current_timestamp()
+).select("path", "extracted_json", "parsed_at", "extraction_timestamp")
# Write to Delta table with streaming
-(
- structured_df.writeStream.format("delta")
+(structured_df.writeStream
+ .format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true")
| | |