-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
152 lines (131 loc) · 5.58 KB
/
main.py
File metadata and controls
152 lines (131 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# In main.py
from fastapi import FastAPI, HTTPException, UploadFile, File, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
from processing import process_csv_to_parquet
from logging_system import get_logs, add_log, LogLevel
import io
import uuid
# --- Create the FastAPI app ---
app = FastAPI(
title="SAGAR Data Processing Engine",
description="API to process CSV files to Parquet format and store in processed-data bucket",
version="1.0.0"
)
# --- CORS Middleware ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Define the request data model ---
class ProcessResponse(BaseModel):
status: str
processed_file: Optional[str] = None
metadata: Optional[dict] = None
quality_report: Optional[dict] = None # SAGAR-QC quality report (for non-netCDF files)
compliance_report: Optional[dict] = None # Compliance report (for netCDF files)
file_type: Optional[str] = None # 'netcdf' or other
error: Optional[str] = None
# --- Create the API endpoint ---
@app.post("/process-csv", response_model=ProcessResponse)
async def process_csv_file(
file: UploadFile = File(...)
):
"""
Receives a file (CSV, TSV, TXT, Excel, JSON, etc.), cleans it, converts to Parquet format,
uploads to processed-data bucket, and stores metadata in metadata_sagar table.
RAG_API_URL
The backend handles:
1. CSV cleaning (removes lines above header, ensures proper format)
2. CSV to Parquet conversion
3. Storage upload
4. Metadata storage
"""
job_id = str(uuid.uuid4())[:8] # Generate short job ID
filename = file.filename
add_log(LogLevel.INFO, f"Received request to process file: {filename}", filename, job_id)
try:
# Read file content
file_bytes = await file.read()
if len(file_bytes) == 0:
add_log(LogLevel.ERROR, "File is empty", filename, job_id)
raise HTTPException(status_code=400, detail="File is empty")
add_log(LogLevel.INFO, f"File read successfully: {len(file_bytes)} bytes", filename, job_id)
# Process the file
result = process_csv_to_parquet(file_bytes, file.filename, job_id=job_id)
# If the pipeline failed, return an HTTP error
if result.get("status") == "failed":
error_msg = result.get('error', 'Unknown error')
add_log(LogLevel.ERROR, f"Processing failed: {error_msg}", filename, job_id)
raise HTTPException(
status_code=500,
detail=f"Processing failed. Error: {error_msg}"
)
add_log(LogLevel.SUCCESS, f"Processing completed successfully: {result.get('processed_file')}", filename, job_id)
# Return success response with job_id
response = ProcessResponse(
status="success",
processed_file=result.get("processed_file"),
metadata=result.get("metadata"),
quality_report=result.get("quality_report"),
compliance_report=result.get("compliance_report"),
file_type=result.get("file_type")
)
# Add job_id to response (even though it's not in the model, we'll include it)
response_dict = response.dict()
response_dict["job_id"] = job_id
return response_dict
except HTTPException:
raise
except Exception as e:
error_msg = f"Internal server error: {str(e)}"
add_log(LogLevel.ERROR, error_msg, filename, job_id)
raise HTTPException(
status_code=500,
detail=error_msg
)
# --- Health check endpoint ---
@app.get("/")
def read_root():
return {"status": "ok", "message": "SAGAR Data Processing Engine is running."}
# --- Health check endpoint ---
@app.get("/health")
def health_check():
return {"status": "healthy", "service": "Data Processing Engine"}
# --- Logs endpoint ---
@app.get("/logs")
def get_processing_logs(
filename: Optional[str] = Query(None, description="Filter logs by filename"),
job_id: Optional[str] = Query(None, description="Filter logs by job ID"),
level: Optional[str] = Query(None, description="Filter logs by level (info, success, warning, error, debug)"),
limit: int = Query(100, description="Maximum number of logs to return", ge=1, le=1000)
):
"""
Retrieve processing logs from the system
Query Parameters:
- filename: Filter logs by filename (optional)
- job_id: Filter logs by job ID (optional)
- level: Filter logs by level (optional)
- limit: Maximum number of logs to return (1-1000, default: 100)
Returns:
- Array of log entries with timestamp, level, message, filename, and job_id
"""
logs = get_logs(filename=filename, job_id=job_id, level=level, limit=limit)
return logs
# --- Logs endpoint (alternative format) ---
@app.get("/processing-logs")
def get_processing_logs_alt(
filename: Optional[str] = Query(None, description="Filter logs by filename"),
job_id: Optional[str] = Query(None, description="Filter logs by job ID"),
level: Optional[str] = Query(None, description="Filter logs by level"),
limit: int = Query(100, description="Maximum number of logs to return", ge=1, le=1000)
):
"""
Alternative endpoint for retrieving processing logs (returns logs in object format)
"""
logs = get_logs(filename=filename, job_id=job_id, level=level, limit=limit)
return {"logs": logs, "count": len(logs)}