-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv_load.py
More file actions
73 lines (65 loc) · 2.86 KB
/
csv_load.py
File metadata and controls
73 lines (65 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Load categorized transaction CSV files."""
from __future__ import annotations
import csv
from pathlib import Path
from csv_columns import detect_columns
from parsing import parse_amount, parse_date
from records import normalize_analysis_record
from schemas import CategorizedRecord
def load_categorized_file(
file_path: str | Path,
) -> tuple[list[CategorizedRecord], list[str], list[CategorizedRecord]]:
"""Read categorized transactions and flag duplicates along the way."""
records: list[CategorizedRecord] = []
warnings: list[str] = []
duplicates: list[CategorizedRecord] = []
seen: set[tuple[str, str, float]] = set()
input_path = Path(file_path)
if not input_path.exists():
raise FileNotFoundError(f"Could not find file: {input_path}")
with input_path.open("r", newline="", encoding="utf-8-sig") as handle:
reader = csv.reader(handle)
try:
headers = next(reader)
except StopIteration:
return [], ["The file was empty."], []
mapping = detect_columns(headers)
for line_number, row in enumerate(reader, start=2):
if not row or not any(cell.strip() for cell in row):
continue
try:
di = mapping["date"]
mi = mapping["merchant"]
ai = mapping["amount"]
ci = mapping["category"]
si = mapping["subcategory"]
if di is None or mi is None or ai is None or ci is None:
raise ValueError("Row did not contain all expected columns")
if max(di, mi, ai, ci) >= len(row):
raise ValueError("Row did not contain all expected columns")
parsed_date = parse_date(row[di])
merchant = row[mi].strip()
amount = parse_amount(row[ai])
category = row[ci].strip() or "Unknown"
if si is not None and si < len(row):
subcategory = row[si].strip() or category
else:
subcategory = category
entry = normalize_analysis_record(
{
"date": parsed_date,
"merchant": merchant,
"amount": amount,
"category": category,
"subcategory": subcategory,
}
)
duplicate_key = (parsed_date.isoformat(), merchant.lower(), round(amount, 2))
if duplicate_key in seen:
duplicates.append(entry)
else:
seen.add(duplicate_key)
records.append(entry)
except (ValueError, TypeError, KeyError, IndexError) as error:
warnings.append(f"Skipped line {line_number}: {error}")
return records, warnings, duplicates