Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions schemas/regulatory-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://ona-protocol.org/schemas/v1/regulatory-event.json",
"title": "ODS-E Regulatory Event",
"type": "object",
"additionalProperties": false,
"required": [
"jurisdiction",
"regulator",
"event_type",
"title",
"published_date",
"source_url",
"source_system",
"source_record_id",
"schema_version",
"transform_version"
],
"properties": {
"jurisdiction": {
"type": "string",
"enum": ["US", "ZA", "ZW"]
},
"regulator": {
"type": "string"
},
"event_type": {
"type": "string"
},
"title": {
"type": "string"
},
"summary": {
"type": ["string", "null"]
},
"effective_date": {
"type": ["string", "null"],
"format": "date"
},
"deadline_date": {
"type": ["string", "null"],
"format": "date"
},
"published_date": {
"type": "string",
"format": "date"
},
"source_url": {
"type": "string",
"format": "uri"
},
"source_system": {
"type": "string"
},
"source_record_id": {
"type": "string"
},
"schema_version": {
"type": "string",
"const": "regulatory-event.v1"
},
"transform_version": {
"type": "string"
}
}
}
51 changes: 51 additions & 0 deletions spec/regulatory-event-normalization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Regulatory Event Normalization

This document defines an additive normalization contract for regulatory notices, decisions, and procurement updates that sit outside the core ODS-E `energy-timeseries` and `asset-metadata` schemas.

## Scope

The contract is intended for application-layer ingestion of regulatory records across:

- United States (`US`)
- South Africa (`ZA`)
- Zimbabwe (`ZW`)

The normalized output schema is [`schemas/regulatory-event.json`](../schemas/regulatory-event.json). The reference transform mapping is [`transforms/regulatory-events-unified.yaml`](../transforms/regulatory-events-unified.yaml).

## Canonical Fields

Every normalized record must provide:

- `jurisdiction`
- `regulator`
- `event_type`
- `title`
- `published_date`
- `source_url`
- `source_system`
- `source_record_id`
- `schema_version`
- `transform_version`

Optional fields:

- `summary`
- `effective_date`
- `deadline_date`

## Source Adapters

The initial adapters are:

- `us_manual`: manually curated or already-structured US regulatory records
- `nersa`: NERSA homepage decision/news records
- `ippo`: South African IPP Office press-release feed
- `zera_seed`: Seeded ZERA publication catalog entries used when the official site blocks direct machine access

The `zera_seed` path is intentionally additive and explicit. It keeps Zimbabwe records in the shared contract while preserving the distinction between the canonical normalized schema and the source acquisition method.

## Conformance Notes

- This normalization is application-layer and does not modify the existing ODS-E `energy-timeseries` or `asset-metadata` contracts.
- `schema_version` is fixed at `regulatory-event.v1`.
- `transform_version` is source-specific and must change when source-field semantics change.
3 changes: 3 additions & 0 deletions src/python/odse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .transformer import transform, transform_stream
from .enrichment import enrich
from .io import to_csv, to_dataframe, to_json, to_parquet
from .regulatory import REGULATORY_EVENT_SCHEMA_VERSION, normalize_regulatory_events

__all__ = [
"validate",
Expand All @@ -27,6 +28,8 @@
"transform",
"transform_stream",
"enrich",
"REGULATORY_EVENT_SCHEMA_VERSION",
"normalize_regulatory_events",
"to_json",
"to_csv",
"to_parquet",
Expand Down
188 changes: 188 additions & 0 deletions src/python/odse/regulatory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Regulatory event normalization helpers."""

from __future__ import annotations

import hashlib
from datetime import datetime
from typing import Any, Iterable, List
from urllib.parse import urljoin


REGULATORY_EVENT_SCHEMA_VERSION = "regulatory-event.v1"


def _stable_id(*parts: Any) -> str:
raw = "|".join("" if part is None else str(part) for part in parts)
return hashlib.md5(raw.encode("utf-8")).hexdigest()


def _parse_date(value: Any) -> str | None:
if value in (None, ""):
return None
text = str(value).strip()
for fmt in (
"%Y-%m-%d",
"%d %B %Y",
"%d %b %Y",
"%m/%d/%Y %I:%M:%S %p",
"%m/%d/%Y",
):
try:
return datetime.strptime(text, fmt).date().isoformat()
except ValueError:
continue
return text[:10] if len(text) >= 10 else text


def _slugify(value: str) -> str:
cleaned = "".join(char.lower() if char.isalnum() else "-" for char in value.strip())
return "-".join(part for part in cleaned.split("-") if part)


def _classify_event_type(title: str, default: str) -> str:
lowered = title.lower()
if "decision" in lowered or "approves" in lowered or "approval" in lowered:
return "decision"
if "comment" in lowered or "hearing" in lowered or "invitation" in lowered:
return "consultation"
if "bidder" in lowered or "procurement" in lowered or "rfq" in lowered:
return "procurement_update"
if "tariff" in lowered or "price" in lowered or "fuel notice" in lowered:
return "public_notice"
return default


def _normalize_nersa(records: Iterable[dict[str, Any]]) -> List[dict[str, Any]]:
events = []
for record in records:
title = str(record.get("title") or "").strip()
if not title:
continue
href = record.get("href") or ""
source_record_id = str(href).strip("/").split("/")[-1] if href else _slugify(title)
events.append(
{
"jurisdiction": "ZA",
"regulator": "NERSA",
"event_type": _classify_event_type(title, "announcement"),
"title": title,
"summary": record.get("section") or "NERSA event",
"effective_date": None,
"deadline_date": None,
"published_date": _parse_date(record.get("published_date")),
"source_url": urljoin("https://www.nersa.org.za/", href),
"source_system": "nersa",
"source_record_id": source_record_id,
"schema_version": REGULATORY_EVENT_SCHEMA_VERSION,
"transform_version": "nersa.v1",
}
)
return events


def _normalize_ippo(records: Iterable[dict[str, Any]]) -> List[dict[str, Any]]:
events = []
for record in records:
title = str(record.get("headline") or "").strip()
if not title:
continue
noteid = record.get("noteid")
events.append(
{
"jurisdiction": "ZA",
"regulator": "IPP Office",
"event_type": _classify_event_type(title, "announcement"),
"title": title,
"summary": record.get("detail") or "IPP Office press release",
"effective_date": None,
"deadline_date": None,
"published_date": _parse_date(record.get("date")),
"source_url": urljoin("https://www.ipp-projects.co.za", f"/_entity/annotation/{noteid}") if noteid else "https://www.ipp-projects.co.za/latestnews/",
"source_system": "ippo",
"source_record_id": str(record.get("id") or _slugify(title)),
"schema_version": REGULATORY_EVENT_SCHEMA_VERSION,
"transform_version": "ippo.v1",
}
)
return events


def _normalize_zera_seed(records: Iterable[dict[str, Any]]) -> List[dict[str, Any]]:
events = []
for record in records:
title = str(record.get("title") or "").strip()
if not title:
continue
events.append(
{
"jurisdiction": "ZW",
"regulator": "ZERA",
"event_type": _classify_event_type(title, "public_notice"),
"title": title,
"summary": record.get("summary"),
"effective_date": None,
"deadline_date": None,
"published_date": _parse_date(record.get("published_date")),
"source_url": record.get("source_url") or "https://www.zera.co.zw/press-releases-public-notices/",
"source_system": "zera_seed",
"source_record_id": str(record.get("source_record_id") or _slugify(title)),
"schema_version": REGULATORY_EVENT_SCHEMA_VERSION,
"transform_version": "zera_seed.v1",
}
)
return events


def _normalize_us_manual(records: Iterable[dict[str, Any]]) -> List[dict[str, Any]]:
events = []
for record in records:
title = str(record.get("title") or "").strip()
if not title:
continue
events.append(
{
"jurisdiction": "US",
"regulator": str(record.get("regulator") or "US Regulator"),
"event_type": _classify_event_type(title, str(record.get("event_type") or "rulemaking")),
"title": title,
"summary": record.get("summary"),
"effective_date": _parse_date(record.get("effective_date")),
"deadline_date": _parse_date(record.get("deadline_date")),
"published_date": _parse_date(record.get("published_date")),
"source_url": record.get("source_url"),
"source_system": "us_manual",
"source_record_id": str(record.get("source_record_id") or _slugify(title)),
"schema_version": REGULATORY_EVENT_SCHEMA_VERSION,
"transform_version": "us_manual.v1",
}
)
return events


def normalize_regulatory_events(
records: Iterable[dict[str, Any]],
*,
source: str,
) -> List[dict[str, Any]]:
"""Normalize source-specific regulatory records into the shared envelope."""

source_lower = source.lower()
if source_lower == "nersa":
normalized = _normalize_nersa(records)
elif source_lower == "ippo":
normalized = _normalize_ippo(records)
elif source_lower in {"us_manual", "us"}:
normalized = _normalize_us_manual(records)
elif source_lower in {"zera_seed", "zera"}:
normalized = _normalize_zera_seed(records)
else:
raise ValueError(f"Unknown regulatory source '{source}'")

for event in normalized:
if not event.get("source_record_id"):
event["source_record_id"] = _stable_id(
event.get("source_system"),
event.get("title"),
event.get("published_date"),
)
return normalized
Loading
Loading