Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions src/python/odse/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def _get_transformer(source: str):
"sma": SMATransformer(),
"solis": SolisTransformer(),
"soliscloud": SolisTransformer(),
"sungrow": SungrowTransformer(),
"isolarcloud": SungrowTransformer(),
"higeco": HigecoTransformer(),
"csv": GenericCSVTransformer(),
"generic_csv": GenericCSVTransformer(),
Expand Down Expand Up @@ -967,6 +969,177 @@ def transform(self, data: Union[str, Path], **kwargs) -> List[Dict[str, Any]]:
return out


class SungrowTransformer(BaseTransformer):
"""Transform Sungrow iSolarCloud API JSON payloads to ODS-E."""

DEVICE_STATUS_MAPPING = {
0: "offline", # Disconnected
1: "standby", # Standby
2: "standby", # Starting
3: "normal", # Running
4: "normal", # Generating
5: "warning", # Derating
6: "fault", # Fault
7: "fault", # Alarm
8: "standby", # Shutdown
9: "warning", # Communication fault
10: "offline", # Not communicating
11: "standby", # Sleeping
12: "warning", # Maintenance
13: "critical", # Emergency stop
14: "warning", # Grid abnormal
15: "fault", # Inverter fault
}

PLANT_STATUS_MAPPING = {
0: "offline", # All devices offline
1: "normal", # All devices normal
2: "warning", # Some devices warning
3: "fault", # Some devices fault
4: "critical", # Critical fault
5: "standby", # All devices standby
}

def transform(self, data: Union[str, Path], **kwargs) -> List[Dict[str, Any]]:
payload = self._parse_json(data)
timezone = kwargs.get("timezone")
interval_hours = (kwargs.get("interval_minutes", 5) or 5) / 60.0
asset_id = kwargs.get("asset_id")

out: List[Dict[str, Any]] = []

# Handle plant_realtime endpoint
if isinstance(payload, dict) and "plant_id" in payload and "total_power" in payload:
ts = _to_iso8601(payload.get("timestamp"), timezone=timezone)
if ts:
power_w = _to_float(payload.get("total_power"))
energy_wh = _to_float(payload.get("daily_energy"))
status = _to_int(payload.get("status"))

rec = _base_record(
timestamp=ts,
kwh=max((energy_wh or 0.0) / 1000.0, 0.0),
error_type=self.PLANT_STATUS_MAPPING.get(status or -1, "unknown"),
error_code=status,
asset_id=asset_id,
)
if power_w is not None:
rec["kW"] = power_w / 1000.0
out.append(rec)
return out

# Handle device_telemetry endpoint
if isinstance(payload, dict) and "device_id" in payload and "data_points" in payload:
data_points = payload.get("data_points", [])
if isinstance(data_points, list):
for point in data_points:
if not isinstance(point, dict):
continue
ts = _to_iso8601(point.get("timestamp"), timezone=timezone)
if not ts:
continue

p_w = _to_float(point.get("active_power"))
q_var = _to_float(point.get("reactive_power"))
s_va = _to_float(point.get("apparent_power"))
pf = _to_float(point.get("power_factor"))
e_wh = _to_float(point.get("daily_energy"))
status_code = _to_int(point.get("status_code"))
fault_code = point.get("fault_code")

kwh = (e_wh / 1000.0) if e_wh is not None else max(((p_w or 0.0) / 1000.0) * interval_hours, 0.0)

rec = _base_record(
timestamp=ts,
kwh=kwh,
error_type=self.DEVICE_STATUS_MAPPING.get(status_code or -1, "unknown"),
error_code=fault_code,
asset_id=asset_id,
)

if p_w is not None:
rec["kW"] = p_w / 1000.0
if q_var is not None:
rec["kVAr"] = q_var / 1000.0
if s_va is not None:
rec["kVA"] = s_va / 1000.0
if pf is not None:
rec["PF"] = max(0.0, min(1.0, pf))
elif (p_w is not None) and (s_va is not None) and s_va > 0:
rec["PF"] = max(0.0, min(1.0, p_w / s_va))

# AC electrical parameters (3-phase averaging/summing)
v_a = _to_float(point.get("voltage_a"))
v_b = _to_float(point.get("voltage_b"))
v_c = _to_float(point.get("voltage_c"))
voltages = [v for v in [v_a, v_b, v_c] if v is not None and v > 0]
if voltages:
rec["voltage_ac"] = sum(voltages) / len(voltages)

i_a = _to_float(point.get("current_a"))
i_b = _to_float(point.get("current_b"))
i_c = _to_float(point.get("current_c"))
currents = [i for i in [i_a, i_b, i_c] if i is not None]
if currents:
rec["current_ac"] = sum(currents)

freq = _to_float(point.get("frequency"))
if freq is not None:
rec["frequency"] = freq

# DC electrical parameters
dc_v1 = _to_float(point.get("dc_voltage_1"))
dc_v2 = _to_float(point.get("dc_voltage_2"))
dc_voltages = [v for v in [dc_v1, dc_v2] if v is not None]
if dc_voltages:
rec["voltage_dc"] = max(dc_voltages)

dc_i1 = _to_float(point.get("dc_current_1"))
dc_i2 = _to_float(point.get("dc_current_2"))
dc_currents = [i for i in [dc_i1, dc_i2] if i is not None]
if dc_currents:
rec["current_dc"] = sum(dc_currents)

temp = _to_float(point.get("temperature"))
if temp is not None:
rec["temperature"] = temp

if status_code is not None:
rec["oem_error_code"] = str(status_code)

out.append(rec)
return out

# Handle historical_data endpoint
if isinstance(payload, dict) and "data_points" in payload:
data_points = payload.get("data_points", [])
if isinstance(data_points, list):
for point in data_points:
if not isinstance(point, dict):
continue
ts = _to_iso8601(point.get("timestamp"), timezone=timezone)
if not ts:
continue

power_w = _to_float(point.get("power"))
energy_wh = _to_float(point.get("energy"))
status = _to_int(point.get("status"))

rec = _base_record(
timestamp=ts,
kwh=max((energy_wh or 0.0) / 1000.0, 0.0),
error_type=self.PLANT_STATUS_MAPPING.get(status or -1, "unknown"),
error_code=status,
asset_id=asset_id,
)
if power_w is not None:
rec["kW"] = power_w / 1000.0
out.append(rec)
return out

return out


class GenericCSVTransformer(BaseTransformer):
"""Transform arbitrary CSV data to ODS-E using a column mapping."""

Expand Down
Loading
Loading