-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpdf_parser.py
More file actions
451 lines (362 loc) · 15.7 KB
/
Copy pathpdf_parser.py
File metadata and controls
451 lines (362 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
#!/usr/bin/env python3
"""
PDF Parser and JSON Extractor
This module provides functionality to parse PDF files and extract their content
into a well-structured JSON format while preserving the hierarchical organization
of the document.
Author: AI Assistant
Date: 2025
"""
import json
import argparse
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import re
# PDF processing libraries
import fitz # PyMuPDF
import pdfplumber
import pandas as pd
try:
from tabula import read_pdf
except ImportError:
read_pdf = None
try:
import camelot
except ImportError:
camelot = None
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PDFParser:
"""
A comprehensive PDF parser that extracts content into structured JSON format.
Supports extraction of:
- Paragraphs and text content
- Tables
- Charts and images
- Section hierarchies
"""
def __init__(self, pdf_path: str):
"""
Initialize the PDF parser.
Args:
pdf_path (str): Path to the PDF file to parse
"""
self.pdf_path = Path(pdf_path)
if not self.pdf_path.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
self.doc = None
self.plumber_pdf = None
def __enter__(self):
"""Context manager entry."""
self.doc = fitz.open(self.pdf_path)
self.plumber_pdf = pdfplumber.open(self.pdf_path)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
if self.doc:
self.doc.close()
if self.plumber_pdf:
self.plumber_pdf.close()
def extract_text_blocks(self, page_num: int) -> List[Dict[str, Any]]:
"""
Extract text blocks from a page using PyMuPDF.
Args:
page_num (int): Page number (0-indexed)
Returns:
List[Dict]: List of text blocks with metadata
"""
page = self.doc[page_num]
blocks = page.get_text("dict")["blocks"]
text_blocks = []
for block in blocks:
if "lines" in block: # Text block
text_content = ""
for line in block["lines"]:
for span in line["spans"]:
text_content += span["text"] + " "
if text_content.strip():
text_blocks.append({
"type": "paragraph",
"text": text_content.strip(),
"bbox": block["bbox"],
"font_info": self._extract_font_info(block)
})
return text_blocks
def _extract_font_info(self, block: Dict) -> Dict[str, Any]:
"""Extract font information from a text block."""
font_info = {"sizes": [], "fonts": [], "flags": []}
for line in block.get("lines", []):
for span in line.get("spans", []):
font_info["sizes"].append(span.get("size", 0))
font_info["fonts"].append(span.get("font", ""))
font_info["flags"].append(span.get("flags", 0))
# Get most common font properties
if font_info["sizes"]:
font_info["primary_size"] = max(set(font_info["sizes"]), key=font_info["sizes"].count)
if font_info["fonts"]:
font_info["primary_font"] = max(set(font_info["fonts"]), key=font_info["fonts"].count)
return font_info
def extract_tables_pdfplumber(self, page_num: int) -> List[Dict[str, Any]]:
"""
Extract tables from a page using pdfplumber.
Args:
page_num (int): Page number (0-indexed)
Returns:
List[Dict]: List of extracted tables
"""
page = self.plumber_pdf.pages[page_num]
tables = page.extract_tables()
extracted_tables = []
for i, table in enumerate(tables):
if table and len(table) > 0:
# Clean and process table data
cleaned_table = []
for row in table:
cleaned_row = [cell.strip() if cell else "" for cell in row]
cleaned_table.append(cleaned_row)
extracted_tables.append({
"type": "table",
"table_id": f"table_{page_num + 1}_{i + 1}",
"table_data": cleaned_table,
"rows": len(cleaned_table),
"columns": len(cleaned_table[0]) if cleaned_table else 0
})
return extracted_tables
def extract_tables_camelot(self, page_num: int) -> List[Dict[str, Any]]:
"""
Extract tables from a page using Camelot (fallback method).
Args:
page_num (int): Page number (0-indexed)
Returns:
List[Dict]: List of extracted tables
"""
if camelot is None:
logger.debug("Camelot not available, skipping camelot table extraction")
return []
try:
# Camelot uses 1-indexed pages
tables = camelot.read_pdf(str(self.pdf_path), pages=str(page_num + 1))
extracted_tables = []
for i, table in enumerate(tables):
if not table.df.empty:
# Convert DataFrame to list of lists
table_data = [table.df.columns.tolist()] + table.df.values.tolist()
extracted_tables.append({
"type": "table",
"table_id": f"camelot_table_{page_num + 1}_{i + 1}",
"table_data": table_data,
"rows": len(table_data),
"columns": len(table_data[0]) if table_data else 0,
"accuracy": table.accuracy if hasattr(table, 'accuracy') else None
})
return extracted_tables
except Exception as e:
logger.warning(f"Camelot table extraction failed for page {page_num + 1}: {e}")
return []
def detect_charts_and_images(self, page_num: int) -> List[Dict[str, Any]]:
"""
Detect charts and images on a page.
Args:
page_num (int): Page number (0-indexed)
Returns:
List[Dict]: List of detected charts/images
"""
page = self.doc[page_num]
image_list = page.get_images(full=True) # Get full image list
charts_and_images = []
for i, img in enumerate(image_list):
try:
xref = img[0]
# Try to get bbox, but handle cases where it fails
try:
bbox = page.get_image_bbox(img)
except (ValueError, IndexError):
# Fallback to default bbox if get_image_bbox fails
bbox = [0, 0, 100, 100]
charts_and_images.append({
"type": "chart",
"chart_id": f"chart_{page_num + 1}_{i + 1}",
"bbox": bbox,
"xref": xref,
"description": f"Chart/Image found on page {page_num + 1}",
"table_data": None # Will be populated if chart data can be extracted
})
except Exception as e:
logger.warning(f"Failed to process image {i} on page {page_num + 1}: {e}")
continue
return charts_and_images
def identify_sections(self, text_blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Identify sections and sub-sections based on text formatting and content.
Args:
text_blocks (List[Dict]): List of text blocks
Returns:
List[Dict]: Text blocks with section information added
"""
current_section = None
current_sub_section = None
# Common section patterns
section_patterns = [
r'^[A-Z][A-Z\s&]+$', # ALL CAPS sections
r'^\d+\.\s*[A-Z]', # Numbered sections
r'^[A-Z][a-z]+\s+[A-Z]', # Title Case sections
]
for block in text_blocks:
text = block["text"]
font_info = block.get("font_info", {})
primary_size = font_info.get("primary_size", 0)
# Check if this looks like a section header
is_section = False
for pattern in section_patterns:
if re.match(pattern, text.strip()):
is_section = True
break
# Also check font size (larger fonts are likely headers)
if primary_size > 12 and len(text.split()) <= 10:
is_section = True
if is_section:
if primary_size > 14 or text.isupper():
current_section = text.strip()
current_sub_section = None
else:
current_sub_section = text.strip()
# Add section information to the block
block["section"] = current_section
block["sub_section"] = current_sub_section
return text_blocks
def merge_content_by_type(self, text_blocks: List[Dict], tables: List[Dict],
charts: List[Dict]) -> List[Dict[str, Any]]:
"""
Merge and sort content by position on the page.
Args:
text_blocks (List[Dict]): Text content blocks
tables (List[Dict]): Table content blocks
charts (List[Dict]): Chart/image content blocks
Returns:
List[Dict]: Merged and sorted content
"""
all_content = []
# Add text blocks
for block in text_blocks:
content_item = {
"type": block["type"],
"section": block.get("section"),
"sub_section": block.get("sub_section"),
"text": block["text"],
"position": block.get("bbox", [0, 0, 0, 0])[1] # Y-coordinate for sorting
}
all_content.append(content_item)
# Add tables
for table in tables:
content_item = {
"type": table["type"],
"section": None, # Will be inferred from nearby text
"sub_section": None,
"description": f"Table with {table['rows']} rows and {table['columns']} columns",
"table_data": table["table_data"],
"position": 0 # Tables will be positioned based on context
}
all_content.append(content_item)
# Add charts
for chart in charts:
content_item = {
"type": chart["type"],
"section": None, # Will be inferred from nearby text
"sub_section": None,
"description": chart["description"],
"table_data": chart.get("table_data"),
"position": chart.get("bbox", [0, 0, 0, 0])[1] if chart.get("bbox") else 0
}
all_content.append(content_item)
# Sort by position (top to bottom)
all_content.sort(key=lambda x: -x["position"]) # Negative for top-to-bottom
# Infer sections for tables and charts based on nearby text
self._infer_sections_for_non_text(all_content)
# Clean up position field
for item in all_content:
item.pop("position", None)
return all_content
def _infer_sections_for_non_text(self, content: List[Dict[str, Any]]):
"""Infer section information for tables and charts based on nearby text."""
current_section = None
current_sub_section = None
for item in content:
if item["type"] == "paragraph":
if item.get("section"):
current_section = item["section"]
if item.get("sub_section"):
current_sub_section = item["sub_section"]
else:
# Apply current section to non-text items
if not item.get("section"):
item["section"] = current_section
if not item.get("sub_section"):
item["sub_section"] = current_sub_section
def parse_pdf(self) -> Dict[str, Any]:
"""
Parse the entire PDF and extract structured content.
Returns:
Dict: Structured JSON representation of the PDF content
"""
logger.info(f"Starting to parse PDF: {self.pdf_path}")
result = {
"document_info": {
"filename": self.pdf_path.name,
"total_pages": len(self.doc)
},
"pages": []
}
for page_num in range(len(self.doc)):
logger.info(f"Processing page {page_num + 1}/{len(self.doc)}")
# Extract different types of content
text_blocks = self.extract_text_blocks(page_num)
tables_plumber = self.extract_tables_pdfplumber(page_num)
tables_camelot = self.extract_tables_camelot(page_num)
charts = self.detect_charts_and_images(page_num)
# Combine tables from different methods
all_tables = tables_plumber + tables_camelot
# Identify sections in text blocks
text_blocks = self.identify_sections(text_blocks)
# Merge all content
page_content = self.merge_content_by_type(text_blocks, all_tables, charts)
page_info = {
"page_number": page_num + 1,
"content": page_content
}
result["pages"].append(page_info)
logger.info("PDF parsing completed successfully")
return result
def save_json(self, data: Dict[str, Any], output_path: str):
"""
Save the extracted data to a JSON file.
Args:
data (Dict): The structured data to save
output_path (str): Path where to save the JSON file
"""
output_path = Path(output_path)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"JSON output saved to: {output_path}")
def main():
"""Main function to run the PDF parser from command line."""
parser = argparse.ArgumentParser(description="Parse PDF and extract content to JSON")
parser.add_argument("pdf_file", help="Path to the PDF file to parse")
parser.add_argument("-o", "--output", help="Output JSON file path",
default="extracted_content.json")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable verbose logging")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
with PDFParser(args.pdf_file) as pdf_parser:
extracted_data = pdf_parser.parse_pdf()
pdf_parser.save_json(extracted_data, args.output)
print(f"Successfully parsed PDF and saved to {args.output}")
except Exception as e:
logger.error(f"Error parsing PDF: {e}")
raise
if __name__ == "__main__":
main()