diff --git a/.codacy.yml b/.codacy.yml new file mode 100644 index 0000000..3315768 --- /dev/null +++ b/.codacy.yml @@ -0,0 +1,15 @@ +--- +engines: + bandit: + exclude_paths: + - "tests/**" + - "test/**" + - "**/*.test.js" + - "**/test_*.py" + - "**/tests/**/*.py" +exclude_paths: + - "tests/**" + - "test/**" + - "**/*.test.js" + - "**/test_*.py" + - "**/tests/**/*.py" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fc2dcdd..84b90fa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,14 +19,6 @@ repos: # Run the formatter. - id: ruff-format -# Codespell: Find and fix common spelling errors in code/comments - - repo: https://github.com/codespell-project/codespell - rev: v2.2.6 # You can update this to the latest release if needed - hooks: - - id: codespell - args: ["--write-changes"] - additional_dependencies: [] # Optional if you want to pin spell files - # BasedPyright: Fast type checker (Pyright fork) - repo: local hooks: diff --git a/README.md b/README.md index fa0c91c..7d02792 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ [![Codacy Badge](https://app.codacy.com/project/badge/Grade/f3b16baf087c40208c084bdad90ea6ce)](https://app.codacy.com/gh/StevenHosper/brostar-api-requests/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade) +[![Codacy Badge](https://app.codacy.com/project/badge/Coverage/f3b16baf087c40208c084bdad90ea6ce)](https://app.codacy.com/gh/StevenHosper/brostar-api-requests/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_coverage) # brostar-api-requests diff --git a/main.py b/main.py new file mode 100644 index 0000000..342f26c --- /dev/null +++ b/main.py @@ -0,0 +1,46 @@ +import logging + +from src.brostar_api_requests.brostar_api_requests import ( + send_gldaddition_for_vitens_location, +) + + +def main(): + send_gldaddition_for_vitens_location("46B-0735001", "62254944", "1103") + # file_path = r"C:\Users\steven.hosper\Downloads\duplicates_ids.xlsx" + # correct_bulk_gld(file_path) + # retry_upload_task() + # print(total_events_delivered()) + + # Create GLD's Vitens + # file_path = r"C:\Users\steven.hosper\Downloads\overview_gmn_vitens_waterschap.xlsx" + # create_bulk_gld(excel_file=file_path) + + # Ingest GLD Vitens + # file_path = r"C:\Users\steven.hosper\Downloads\overview_gmn_vitens_v2.xlsx" + # df = pl.read_excel(file_path) + # df = df.filter(pl.col("bro_id").is_not_null()) + # for row in df.iter_rows(named=True): + # gld_to_lizard(row['objectIdAccountableParty'], row['bro_id']) + + # delete_invalid_upload_tasks() + + # Tholen Correction!!! + # bulk_gmw_correction_request(20166109) + + # Scheldestromen deliveries + # file_path = r"C:\Users\steven.hosper\Downloads\20250709_ScheldestromenImportExcel_test.xlsx" + # bulk_gmw_construction_request(file_path, "51640813") + + # BrabantWater corrections + # file_path = r"C:\Users\steven.hosper\Downloads\BROLab_ImportExcel.xlsx" + # bulk_gmw_construction_request(file_path, "17278718") + + # Gelderland Corrections + # file_path = r"C:\Users\steven.hosper\Downloads\freatische_filter_gmw_ids.xlsx" + # bulk_gmw_tubenumber_correction_request(file_path, "51468751") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() diff --git a/pyproject.toml b/pyproject.toml index eb61f84..dde8c3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "pytest>=8.3.5", "coverage>=7.8.0", "requests-mock>=1.12.1", + "pytz" ] diff --git a/src/brostar_api_requests/brostar_api_requests.py b/src/brostar_api_requests/brostar_api_requests.py index 8b25f7f..1dca8e4 100644 --- a/src/brostar_api_requests/brostar_api_requests.py +++ b/src/brostar_api_requests/brostar_api_requests.py @@ -1,18 +1,41 @@ +import ast +import csv +import datetime import logging import os +import time +from pathlib import Path from typing import Literal import polars as pl +import pytz import requests from dotenv import load_dotenv +from requests.adapters import HTTPAdapter, Retry from .connection import BROSTARConnection from .formatter import PayloadFormatter -from .upload_models import GMWConstruction, UploadTask, UploadTaskMetadata +from .upload_models import ( + GLDAddition, + GMWConstruction, + MonitoringTube, + UploadTask, + UploadTaskMetadata, +) logger = logging.getLogger(__name__) RequestTypeOptions = Literal["registration", "replace", "insert", "move", "delete"] RegistrationTypeOptions = Literal["GMW_Construction"] +AMS_TZ = pytz.timezone("Europe/Amsterdam") +CHUNK_SIZE = 7000 +VALIDATION_MAPPING = { + "goedgekeurd": 2, + "onbeslist": 5, + "afgekeurd": 8, + "nogNietBeoordeeld": 100, + "onbekend": 200, + # Any above 100 are corrected values +} load_dotenv() @@ -37,6 +60,37 @@ def _move_gmw( brostar.await_completed(uuid=uuid) +def _correct_gmw(brostar: BROSTARConnection, upload_task: UploadTask) -> None: + """Send a move request that corrects the dates.""" + payload = upload_task.model_dump(mode="json", by_alias=True) + r = brostar.post_upload(payload) + r.raise_for_status() + + uuid: str = r.json()["uuid"] + brostar.await_completed(uuid=uuid) + + +def delete_invalid_upload_tasks() -> None: + """Delete all upload tasks that are not valid.""" + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + next = "" + while next is not None: + r = brostar.get("uploadtasks", params={"status": "PROCESSING", "log": "XML is not valid"}) + r.raise_for_status() + tasks = r.json()["results"] + + for task in tasks: + uuid = task["uuid"] + logger.info(f"Deleting invalid upload task {uuid}") + delete_r = brostar.s.delete(url=f"{brostar.website}/uploadtasks/{uuid}") + delete_r.raise_for_status() + + next = r.json().get("next") + + def bulk_move_request(excel_file: str) -> None: """Use an excel to move multiple GMWs. @@ -63,16 +117,856 @@ def bulk_move_request(excel_file: str) -> None: actual_date = row.get("new_date") logger.info(f"Moving {bro_id} from {date_to_be_corrected} to {actual_date}") - construction, metadata = formatter.format_gmw_construction(bro_id) + construction = formatter.format_gmw_construction(bro_id) construction.object_id_accountable_party = intern_id construction.well_construction_date = actual_date construction.date_to_be_corrected = date_to_be_corrected + + metadata = UploadTaskMetadata( + request_reference="BROSTAR-API", + delivery_accountable_party=intern_id, + quality_regime="IMBRO", + bro_id=bro_id, + correction_reason="eigenCorrectie", + ) + + payload = UploadTask( + bro_domain="GMW", + project_number="5871", + registration_type="GMW_Construction", + request_type="registration", + sourcedocument_data=construction, + metadata=metadata, + ) + payload = payload.model_dump(mode="json", by_alias=True) _move_gmw(brostar, construction, metadata) -def main(): - file_path = r"C:\Users\steven.hosper\Desktop\PythonPackages\BrostarAPI\20250425_move_wells.xlsx" - bulk_move_request(file_path) +def setup_lizard_session() -> requests.Session: + lizard_api_key = os.getenv("LIZARD_API_KEY") + ls = requests.Session() + ls.headers = { + "username": "__key__", + "password": lizard_api_key, + "Content-Type": "application/json", + } + retry = Retry( + total=6, + backoff_factor=0.5, + ) + adapter = HTTPAdapter(pool_connections=5, pool_maxsize=5, max_retries=retry) + ls.mount("http://", adapter) + ls.mount("https://", adapter) + return ls + + +def post_timeseries_events( + timeseries_url: str, events_df: pl.DataFrame, session: requests.Session +) -> None: + """Post timeseries events to lizard with adjusted flag""" + logger.info(f"Posting timeseries to {f'{timeseries_url}events/'}.") + logger.info(events_df) + r = session.post( + url=f"{timeseries_url}events/", + json=events_df.to_dicts(), + timeout=30, + ) + r.raise_for_status() + + +def create_brostar_task(url: str, payload: dict, brostar_s: requests.Session) -> dict: + r = brostar_s.post(url, json=payload, timeout=60) + print(r.url) + print(r.json()) + if r.status_code < 250: + time.sleep(10) + res = brostar_s.get(r.json()["url"], timeout=30) + res.raise_for_status() + else: + res = r + return r.json() + + +def check_status(url: str, brostar_s: requests.Session) -> dict: + brostar_s.post(f"{url}check_status/") + r = brostar_s.get(url, timeout=15) + r.raise_for_status() + return r.json() + + +def determine_status_quality_control(value: int | None) -> str: + if value is None: + return "nogNietBeoordeeld" + + # Sort the items by their value + sorted_items = sorted(VALIDATION_MAPPING.items(), key=lambda item: item[1]) + + # Iterate over sorted items + for key, threshold in sorted_items: + if value < threshold: + return key + + # If no valid key is found, return a default value (optional) + return "Invalid value" + + +def determine_censor_reason(detection_limit: str): + if detection_limit == ">": + return "groterDanLimietwaarde" + elif detection_limit == "<": + return "kleinerDanLimietwaarde" + else: + return "onbekend" + + +def convert_timeaware_to_bro_str(datetime_val: datetime.datetime) -> str: + datetime_str = datetime_val.strftime("%Y-%m-%dT%H:%M:%S%z") + return datetime_str[:22] + ":" + datetime_str[22:] + + +def setup_time_value_pairs(events_df: pl.DataFrame, limits: dict[str, str]) -> list[dict[str, str]]: + """Transforms the event_df (lizard format) to BROSTAR (BRO) format.""" + brostar_data_list = [] + + events_df = events_df.with_columns( + pl.col("datetime") + .dt.replace_time_zone(time_zone="UTC", non_existent="null") + .dt.convert_time_zone(time_zone="Europe/Amsterdam") + .alias("datetime"), + ) + logger.info(events_df) + events_df = events_df.with_columns( + pl.col("datetime") + .map_elements(convert_timeaware_to_bro_str, return_dtype=pl.String) + .alias("datetime"), + ) + + logger.info(events_df) + for row in events_df.iter_rows(named=True): + if row["value"] in [None, "None"]: + value = None + else: + value = row["value"] + + brostar_data = { + "time": row["datetime"], + "value": value, + "statusQualityControl": determine_status_quality_control(row["flag"]), + } + + if brostar_data["statusQualityControl"] == "afgekeurd" and value in ["", None]: + brostar_data["censorReason"] = determine_censor_reason(row["detection_limit"]) + elif brostar_data["value"] is None: + brostar_data["censorReason"] = "onbekend" + else: + brostar_data["censorReason"] = None + + if brostar_data["censorReason"] in [ + "groterDanLimietwaarde", + "kleinerDanLimietwaarde", + ]: + brostar_data["censorLimit"] = ( + limits["referenceLevel"] + if brostar_data["censorReason"] == "groterDanLimietwaarde" + else limits["filterBottomLevel"] + ) + + brostar_data_list.append(brostar_data) + + return brostar_data_list + + +def send_gldaddition_for_vitens_location(business_id: str, kvk: str, projectnummer: str) -> None: + """The GLD-ID should be available within the location metadata of the Lizard API. Otherwise this function will fail. For now this only works with IMBRO, as that was the purpose for the function.""" + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + ls = setup_lizard_session() + + # Fetch the location metadata from the Lizard API + r = ls.get( + url="https://vitens.lizard.net/api/v4/locations/", params={"code__startswith": business_id} + ) + r.raise_for_status() + locations = r.json().get("results", []) + for location in locations: + logger.info(f"Processing location: {location}") + location_metadata = location.get("extra_metadata", {}).get("bro", {}) + limits = { + "referenceLevel": location_metadata.get("temporal_data", [{}])[0].get( + "referenceLevel", None + ), + "filterBottomLevel": location_metadata.get("filterBottomDepth", None), + } + gld_id_imbro = location_metadata.get("broid_gld_imbro", None) + quality_regime = "IMBRO" + if gld_id_imbro is None: + logger.info(f"No GLD ID found for location {location['code']}. Skipping.") + continue + + for observation_type in [28, 911]: + r = ls.get( + url="https://vitens.lizard.net/api/v4/timeseries/", + params={"location__code": location["code"], "observation_type": observation_type}, + ) + r.raise_for_status() + timeseries = r.json().get("results", []) + if len(timeseries) != 1: + logger.info( + f"No timeseries found for location {location['code']} and observation type {observation_type}. Skipping." + ) + continue + + timeserie_info = timeseries[0] + logger.info(f"Processing timeseries: {timeserie_info}") + procedures = timeserie_info["extra_metadata"].get("bro", {}).get("procedure", []) + if not procedures: + logger.info( + f"No procedures found for timeseries {timeserie_info['code']}. Skipping." + ) + continue + elif isinstance(procedures, dict): + procedures = [procedures] + + procedures_df = pl.DataFrame(procedures) + procedures_df = procedures_df.with_columns( + pl.col("start") + .str.to_datetime(format="%Y-%m-%dT%H:%M:%SZ") + .alias("start_datetime"), + pl.col("eind") + .str.replace("None", "5000-01-01T00:00:00Z") + .str.to_datetime(format="%Y-%m-%dT%H:%M:%SZ") + .alias("eind_datetime"), + ) + logger.info(procedures_df) + + r = ls.get( + f"{timeserie_info['url']}events/", params={"validation_code!": "V", "limit": 10000} + ) + r.raise_for_status() + events = r.json().get("results", []) + + while r.json().get("next") is not None: + r = ls.get(r.json().get("next")) + r.raise_for_status() + events += r.json().get("results", []) + + events_df = pl.DataFrame(events) + events_df = events_df.with_columns( + pl.col("time").str.to_datetime(format="%Y-%m-%dT%H:%M:%SZ").alias("datetime") + ) + + for procedure in procedures_df.iter_rows(named=True): + logger.info(f"Processing procedure: {procedure}") + procedure_events_df = events_df.filter( + pl.col("datetime").is_between( + procedure["start_datetime"], + procedure["eind_datetime"], + ), + pl.col("value").is_not_null(), + ) + logger.info(procedure_events_df) + + n_rows = procedure_events_df.height # or len(timeseries_df) + + logger.info(procedure) + for i in range(0, n_rows, CHUNK_SIZE): + chunk = procedure_events_df.slice(i, CHUNK_SIZE) + start_time = chunk["time"][0] + end_time = chunk["time"][-1] + result_time = chunk["time"][-1] # Only do voorlopig and controle + + observatie_type = procedure["observationtype"] + proces_referentie = procedure["processreference"] + evaluatie_procedure = procedure["evaluationprocedure"] + meetinstrument_type = procedure["measurementinstrumenttype"] + luchtdrukcompensatie = procedure["airpressurecompensationtype"] + logger.info(chunk) + + metadata = UploadTaskMetadata( + bro_id=gld_id_imbro, + request_reference=f"{gld_id_imbro}: {quality_regime} {observatie_type} {procedure['start']}-{procedure['eind']} ({datetime.datetime.now(tz=AMS_TZ).strftime('%Y-%m-%dT%H:%M:%SZ')})", + delivery_accountable_party=kvk, + quality_regime="IMBRO", + ) + + sourcedocument_data = GLDAddition( + date=result_time.split("T")[0], + investigator_kvk=kvk, + validation_status="voorlopig" + if observatie_type == "reguliereMeting" + else None, + observation_type=observatie_type, + evaluation_procedure=evaluatie_procedure, + process_reference=proces_referentie, + measurement_instrument_type=meetinstrument_type, + air_pressure_compensation_type=luchtdrukcompensatie, + begin_position=start_time.split("T")[0], + end_position=end_time.split("T")[0], + result_time=result_time, + time_value_pairs=setup_time_value_pairs(events_df=chunk, limits=limits), + ) + + payload = UploadTask( + bro_domain="GLD", + project_number=str(projectnummer), + registration_type="GLD_Addition", + request_type="registration", + sourcedocument_data=sourcedocument_data, + metadata=metadata, + ) + + # Create delivery + try: + result_dict: dict = create_brostar_task( + url=f"{brostar.website}/uploadtasks/", + payload=payload.model_dump(mode="json", by_alias=True), + brostar_s=brostar.s, + ) + except Exception as e: + logger.exception( + f"Failed to post addition: {e}. Payload was: {payload.model_dump(mode='json', by_alias=True)}" + ) + continue + + # Check delivery + retry = 0 + while ( + result_dict.get("status", "UNKNOWN") in ["PROCESSING", "PENDING"] + and retry < 10 + ): + try: + result_dict = check_status(result_dict["url"], brostar_s=brostar.s) + except Exception as e: + logger.exception(f"Failed to check the status at brostar: {e}.") + + retry += 1 + time.sleep(5) + + # Update last delivered date + if result_dict["status"] in ["COMPLETED", "UNFINISHED"]: + url = timeserie_info["url"] + chunk = chunk.with_columns(pl.lit("V").alias("validation_code")) + # Convert datetime to str (JSON-Serializeable) + chunk = chunk.select( + "time", + "value", + "validation_code", + "detection_limit", + "flag", + "comment", + "last_modified", + ) + post_timeseries_events(url, chunk, ls) + + +def map_polars_to_gmw_constructions(df: pl.DataFrame, kvk: str) -> GMWConstruction: + """ + Maps polars DataFrame to GMWConstruction objects. + Groups by 'Putnaam' and creates one GMWConstruction per well with associated MonitoringTubes. + """ + # Create monitoring tubes for this well + monitoring_tubes = [] + for index, row in enumerate(df.iter_rows(named=True)): + logger.info(f"Processing row {index + 1}: {row}") + tube = create_monitoring_tube( + row, tube_number=row["Filternummer"] + ) # tube_number starts at 1 + monitoring_tubes.append(tube) + + first_row = df.row(0, named=True) + putnaam = first_row.get("Putnaam", "") + + # Create delivered_location from coordinates + x_coord = first_row.get("X-coordinaat(RD)", "") + y_coord = first_row.get("Y-coordinaat(RD)", "") + delivered_location = f"{x_coord} {y_coord}" if x_coord and y_coord else "" + + # Create GMWConstruction + construction = GMWConstruction( + # Required fields + object_id_accountable_party=putnaam, # Using Putnaam as specified + nitg_code=str(putnaam)[:-1], + delivery_context=first_row.get("Kader aanlevering", ""), + construction_standard=first_row.get("Kwaliteitsnorminrichting", ""), + initial_function=first_row.get("Initiële functie", ""), + number_of_monitoring_tubes=len(monitoring_tubes), + ground_level_stable=first_row.get("Maaiveld stabiel", ""), + well_stability=first_row.get("Putstabiliteit"), + # Optional fields with defaults + owner=kvk, + well_head_protector=first_row.get("Beschermconstructie", ""), + well_construction_date=format_date(first_row.get("Inrichtingsdatum")), + delivered_location=delivered_location, + horizontal_positioning_method=first_row.get("Method Coordinatenbepaling", ""), + local_vertical_reference_point="NAP", # Always NAP as specified + offset=0.0, # No mapping available - needs default + vertical_datum="NAP", # Always NAP as specified + ground_level_position=first_row.get("Maaiveldpositie (m+NAP)"), + ground_level_positioning_method=first_row.get("Method Maaiveldpositiebepaling", ""), + monitoring_tubes=monitoring_tubes, + ) + + return construction + + +def create_monitoring_tube(row: dict, tube_number: int) -> MonitoringTube: + """Creates a MonitoringTube from a row of data.""" + + return MonitoringTube( + tube_number=tube_number, + tube_type=row.get("BuisType", ""), + artesian_well_cap_present=row.get("Drukdop", ""), # Assuming this maps to Drukdop + sediment_sump_present=row.get("Voorzien van zandvang", ""), + number_of_geo_ohm_cables=0, # No data available + tube_top_diameter=row.get("Diameter bovenkantbuis (mm)"), + variable_diameter=row.get("Variable diameter"), + tube_status=row.get("Buis status", ""), + tube_top_position=row.get("Positie bovenkantbuis (m+NAP)", 0.0), + tube_top_positioning_method=row.get("MethodePositiebepalingBovenkantbuis", ""), + tube_packing_material=row.get("Aanvulmaterial buis", ""), + tube_material=row.get("Materiaal peilbuis", ""), + glue=row.get("Lijm", ""), + screen_length=max(row.get("Filterlengte (meters)", 0.5), 0.5), + screen_protection=None, # No clear mapping + sock_material=row.get("Kousmateriaal", ""), + plain_tube_part_length=max(row.get("Lengte stijgbuisdeel (meters)", 0.5), 0.5), + sediment_sump_length=row.get("Zandvanglengte (meters)") + if row.get("Zandvanglengte (meters)") + else None, + geo_ohm_cables=None, # No data available + ) + + +def format_date(date_value) -> str: + """Format date value to string. Adjust based on your date format needs.""" + if date_value is None: + return "" + + # Handle different date formats as needed + if isinstance(date_value, str): + return date_value + elif hasattr(date_value, "strftime"): + return date_value.strftime("%Y-%m-%d") + else: + return str(date_value) + + +def format_incomplete_date(incomplete_date) -> str | None: + """Format incomplete date field.""" + if incomplete_date is None or incomplete_date == "": + return None + return str(incomplete_date) + + +def bulk_gmw_correction_request(kvk: str) -> None: + """Use an excel to move multiple GMWs. + + Columns: gmw_id""" + # Access your API key + brostar = BROSTARConnection("HUhO9Jl2.rLXSyJq83wA9kQLT7wACNZbkZpK3eUug") # BROSTAR API Key + brostar.set_website(production=True) + results = [] + r = brostar.get("gmw/gmws") + r.raise_for_status() + results += r.json().get("results", []) + next = r.json().get("next") + while next is not None: + r = brostar.s.get(next) + r.raise_for_status() + results += r.json().get("results", []) + next = r.json().get("next") + + print(results) + df = pl.DataFrame(results, schema_overrides={"nitg_code": pl.String}) + df = df.filter(pl.col("nitg_code").is_not_null()) + df = df.select("uuid", "bro_id", "nitg_code") + formatter = PayloadFormatter(brostar) + + for row in df.iter_rows(named=True): + logger.info(row) + bro_id = row.get("bro_id") + + construction = formatter.format_gmw_construction(bro_id) + construction.object_id_accountable_party = ( + f"Correctie_{construction.nitg_code if construction.nitg_code else bro_id}" + ) + construction.nitg_code = None + + metadata = UploadTaskMetadata( + request_reference="20250718_Correctie_Tholen", + delivery_accountable_party=str(kvk), + quality_regime="IMBRO/A", + bro_id=bro_id, + correction_reason="inOnderzoek", + ) + upload_task = UploadTask( + bro_domain="GMW", + project_number="981", + registration_type="GMW_Construction", + request_type="replace", + sourcedocument_data=construction, + metadata=metadata, + ) + logger.info(upload_task.model_dump(mode="json", by_alias=True)) + _correct_gmw(brostar, upload_task) + + +def retry_upload_task() -> None: + """Retry all upload tasks that are in PROCESSING state.""" + import re + + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + r = brostar.get("uploadtasks", params={"status": "FAILED"}) + for task in r.json().get("results", []): + uuid = task["uuid"] + logger.info(f"Retrying upload task {uuid}") + + if "mag niet voor de laatst geregistreerde gebeurtenis" in task["bro_errors"]: + metadata = task["metadata"] + metadata["correctionReason"] = "eigenCorrectie" + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", json={"metadata": metadata} + ) + retry_r.raise_for_status() + + metadata["request_type"] = "insert" + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", json={"metadata": metadata} + ) + retry_r.raise_for_status() + + if "moet liggen na of op de inrichtingsdatum" in task["bro_errors"]: + # Extract all dates in YYYY-MM-DD format + dates = re.findall(r"\d{4}-\d{2}-\d{2}", task["bro_errors"]) + + if len(dates) >= 2: + second_date = dates[1] # The inrichtingsdatum + sourcedocument_data = task["sourcedocument_data"] + sourcedocument_data["eventDate"] = second_date + + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", + json={"sourcedocument_data": sourcedocument_data}, + ) + retry_r.raise_for_status() + + if ( + "Dit brondocument is al eerder via het bronhouderportaal aangeleverd aan de BRO" + in task["bro_errors"] + ): + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", json={"status": "COMPLETED"} + ) + retry_r.raise_for_status() + + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", json={"progress": 100.0} + ) + retry_r.raise_for_status() + + retry_r = brostar.s.patch( + url=f"{brostar.website}/uploadtasks/{uuid}/", json={"log": ""} + ) + retry_r.raise_for_status() + continue + + # retry_r = brostar.s.patch(url=f"{brostar.website}/uploadtasks/{uuid}/", json={"status": "PENDING"}) + # retry_r.raise_for_status() + + +def bulk_gmw_construction_request(excel_file: str | Path, kvk: str) -> None: + """Use an excel to create multiple GMWs.""" + # Access your API key + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + df = pl.read_excel(excel_file, has_header=True) + putten = df.unique("Putnaam").to_series(0).to_list() + + for put in putten: + construction = map_polars_to_gmw_constructions(df.filter(pl.col("Putnaam").eq(put)), kvk) + ### Setup the payload + metadata = UploadTaskMetadata( + request_reference=f"{put}", + delivery_accountable_party=kvk, + quality_regime="IMBRO", # Add to row? + ) + + ## Extract excel into GMW Construction + sourcedocument_data = construction + + payload = UploadTask( + bro_domain="GMW", + project_number="1", + registration_type="GMW_Construction", + request_type="registration", + sourcedocument_data=sourcedocument_data, + metadata=metadata, + ) + payload = payload.model_dump(mode="json", by_alias=True) + print(payload) + r = brostar.post_upload(payload=payload, is_json=True) + r.raise_for_status() + + uuid: str = r.json()["uuid"] + brostar.await_completed(uuid=uuid) + return + + +def pop_upload_task_fields(upload_task: dict) -> dict: + """Remove unnecessary fields from the upload task.""" + upload_task.pop("uuid", None) + upload_task.pop("created_at", None) + upload_task.pop("updated_at", None) + upload_task.pop("data_owner", None) + return upload_task + + +def total_events_delivered() -> int: + """Retrieve the total number of events delivered.""" + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + r = brostar.get( + "uploadtasks", params={"status": "COMPLETED", "registration_type": "GLD_Addition"} + ) + r.raise_for_status() + + total_count = 0 + next = r.json().get("next") + bro_ids = [] + for result in r.json().get("results", []): + bro_id = result.get("bro_id", None) + events_count = result.get("sourcedocument_data", {}).get("timeValuePairsCount", 0) + total_count += events_count + bro_ids.append(bro_id) + + while next is not None: + r = brostar.s.get(next) + r.raise_for_status() + for result in r.json().get("results", []): + bro_id = result.get("bro_id", None) + events_count = result.get("sourcedocument_data", {}).get("timeValuePairsCount", 0) + total_count += events_count + bro_ids.append(bro_id) + + next = r.json().get("next") + + bro_ids = list(set(bro_ids)) # Remove duplicates + logger.info(f"Total unique GLD IDs: {len(bro_ids)}") + + return total_count + + +def deliver_gld_start_registration( + internal_id: str, + bro_id: str, + tube_number: int, + delivery_accountable_party: str, + monitoring_nets: list[str], + project_number: str, +) -> str | None: + """Send a gld start registration request that corrects the dates.""" + + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) + brostar.set_website(production=True) + sourcedocument_data = { + "gmwBroId": bro_id, + "tubeNumber": tube_number, + "groundwaterMonitoringNets": eval(monitoring_nets), + "objectIdAccountableParty": internal_id, + } + metadata = UploadTaskMetadata( + request_reference="MeetnettenVitens-BROSTAR", + delivery_accountable_party=delivery_accountable_party, + quality_regime="IMBRO", + ) + + payload = UploadTask( + bro_domain="GLD", + project_number=str(project_number), + registration_type="GLD_StartRegistration", + request_type="registration", + sourcedocument_data=sourcedocument_data, + metadata=metadata, + ) + payload = payload.model_dump(mode="json", by_alias=True) + r = brostar.post_upload(payload) + logger.info(r.json()) + r.raise_for_status() + + uuid: str = r.json()["uuid"] + r = brostar.await_completed(uuid=uuid) + + return r.json().get("broId") + + +def clear_fields_for_upload(upload_task: dict) -> dict: + """Clear fields that should not be set for a new upload task.""" + upload_task["status"] = "PENDING" + upload_task["log"] = "" + upload_task["progress"] = 0 + upload_task["bro_id"] = "" + upload_task["bro_delivery_url"] = "" + return upload_task + + +def correct_gld_dossier_for_observation_request( + current_id: str, + target_id: str, +): + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) # BROSTAR API Key + brostar.set_website(production=True) + + r = brostar.get( + "uploadtasks", + params={ + "registration_type": "GLD_Addition", + "bro_id": current_id, + }, + ) + for result in r.json().get("results", []): + result = brostar.get_detail(endpoint="uploadtasks", uuid=result["uuid"]).json() + result = pop_upload_task_fields(result) + result = clear_fields_for_upload(result) + result["request_type"] = "delete" + result["metadata"].update({"correctionReason": "eigenCorrectie"}) + r = brostar.post_upload(payload=result, is_json=True) + r.raise_for_status() + + uuid: str = r.json()["uuid"] + brostar.await_completed(uuid=uuid) + + result["request_type"] = "registration" + result["metadata"]["requestReference"].replace(current_id, target_id) + result["metadata"]["broId"] = target_id + result["metadata"].pop("correctionReason") + result["status"] = "PENDING" + r = brostar.post_upload(payload=result, is_json=True) + r.raise_for_status() + + uuid: str = r.json()["uuid"] + brostar.await_completed(uuid=uuid) + + +def convert_to_list(s): + return ast.literal_eval(s) + + +def correct_bulk_gld(excel_file: str | Path) -> None: + df = pl.read_excel(excel_file, has_header=True) + df_converted = df.with_columns( + pl.col("broId").map_elements(convert_to_list, return_dtype=pl.List(pl.String)) + ) + # Extract first value as 'correct_id' and explode the rest as 'target_id' + result = ( + df_converted.with_columns( + [ + pl.col("broId").list.first().alias("target_id"), + pl.col("broId").list.slice(1).alias("current_ids"), + ] + ) + .drop("broId") + .explode("current_ids") + .rename({"current_ids": "current_id"}) + ) + total = result.height + logger.info(f"Total rows to process: {total}") + skip_count = 0 + delete_ids = [] + for i, row in enumerate(result.iter_rows(named=True)): + logger.info(f"Processing row {i + 1}/{total}: {row}") + r = requests.get( + f"https://publiek.broservices.nl/gm/gld/v1/objects/{row['current_id']}/observationsSummary" + ) + if len(r.json()) == 0: + skip_count += 1 + logger.info(f"No observations found for {row['current_id']}. Skipping.") + delete_ids += [row["current_id"]] + continue + + correct_gld_dossier_for_observation_request( + current_id=row["current_id"], + target_id=row["target_id"], + ) + logger.info(f"Completed processing row {i + 1}/{total}") + delete_ids += [row["current_id"]] + + print(f"Skipped {skip_count} rows due to no observations found.") + + # Write to a CSV file + with open( + r"C:\Users\steven.hosper\Downloads\delete_ids.csv", mode="w", newline="", encoding="utf-8" + ) as file: + writer = csv.writer(file) + writer.writerow(["broId"]) # Header + for bro_id in delete_ids: + writer.writerow([bro_id]) + + +def create_bulk_gld(excel_file: str | Path) -> None: + df = pl.read_excel(excel_file, has_header=True) + brostar_api_key = os.getenv("BROSTAR_API_KEY") + brostar = BROSTARConnection(brostar_api_key) + brostar.set_website(production=True) + + r = brostar.get("uploadtasks", params={"registration_type": "GLD_StartRegistration"}) + r.raise_for_status() + + results = [] + next = r.json().get("next") + results += r.json().get("results", []) + while next is not None: + r = brostar.s.get(next) + r.raise_for_status() + results += r.json().get("results", []) + next = r.json().get("next") + + df2 = pl.DataFrame(results) + df2 = df2.with_columns( + pl.col("sourcedocument_data").struct.field("objectIdAccountableParty").alias("business_id"), + ) + df2 = df2.select( + "bro_id", + "business_id", + ) + print(df2) + + df = df.join(df2, left_on="objectIdAccountableParty", right_on="business_id", how="left") + print(df) + + bro_ids = [] + for _i, row in enumerate(df.iter_rows(named=True)): + if row["groundwaterMonitoringNets"] is None: + bro_ids.append(None) + continue + + bro_id = deliver_gld_start_registration( + internal_id=row["objectIdAccountableParty"], + bro_id=row["gmwBroId"], + tube_number=row["tubeNumber"], + delivery_accountable_party=str(row["deliveryAccountableParty"]), + monitoring_nets=row["groundwaterMonitoringNets"], + project_number=row["projectNumber"], + ) + bro_ids.append(bro_id) + logger.info(bro_id) + + # Save to new Excel file with "v2" suffix + new_filename = excel_file.replace(".xlsx", "_v2.xlsx") + df = df.with_columns(pl.Series("broId", bro_ids)) + df.write_excel(new_filename) + + logger.info(f"Saved updated DataFrame to {new_filename}") def process_result(result: dict) -> None: @@ -115,6 +1009,39 @@ def process_result(result: dict) -> None: print("\n\n") +def gld_to_lizard(location_code: str, gld_id: str) -> None: + lizard_api_key = os.getenv("LIZARD_API_KEY") + lizard_s = requests.Session() + lizard_s.headers = { + "username": "__key__", + "password": lizard_api_key, # Lizard API Key + "Content-Type": "application/json", + } + + r = lizard_s.get( + url="https://vitens.lizard.net/api/v4/locations/", + params={"code": f"{location_code}"}, + timeout=15, + ) + r.raise_for_status() + if len(r.json()["results"]) == 0: + logger.info(r.url) + logger.info("No locations found.") + return + + extra_metadata = r.json()["results"][0]["extra_metadata"] + + extra_metadata["bro"]["broid_gld_imbro"] = gld_id + logger.info(extra_metadata["bro"]) + + r = lizard_s.patch( + url=r.json()["results"][0]["url"], json={"extra_metadata": extra_metadata}, timeout=15 + ) + r.raise_for_status() + print(r.json()) + print("\n\n") + + def ingest_gld_ids_into_lizard(): """Retrieve all uploadtasks / registrations and ingest the information into Lizard.""" brostar_api_key = os.getenv("BROSTAR_API_KEY") @@ -127,12 +1054,8 @@ def ingest_gld_ids_into_lizard(): while r.json()["next"] is not None: for result in r.json()["results"]: logger.info(f"Processing {result}") + # Get the bro_id from the registration and update Lizard process_result(result) r = brostar.s.get(url=r.json()["next"], timeout=15) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - main() diff --git a/src/brostar_api_requests/connection.py b/src/brostar_api_requests/connection.py index d6c319f..11e31b7 100644 --- a/src/brostar_api_requests/connection.py +++ b/src/brostar_api_requests/connection.py @@ -76,14 +76,12 @@ def authenticate(self, token: str) -> None: logger.info("Authentication set.") def get(self, endpoint: BrostarEndpoint, params: dict | None = None) -> requests.Response: - print(self.s.auth.username, self.s.auth.password) return self.s.get(url=f"{self.website}/{endpoint}/", params=params, timeout=15) def get_detail(self, endpoint: BrostarEndpoint, uuid: str) -> requests.Response: return self.s.get(url=f"{self.website}/{endpoint}/{uuid}", timeout=15) def post_upload(self, payload: dict[str, str], is_json: bool = True) -> requests.Response: - print(payload) if is_json: return self.s.post(url=f"{self.website}/uploadtasks/", json=payload, timeout=15) return self.s.post(url=f"{self.website}/uploadtasks/", data=payload, timeout=15) diff --git a/src/brostar_api_requests/formatter.py b/src/brostar_api_requests/formatter.py index ebdff5f..8a57631 100644 --- a/src/brostar_api_requests/formatter.py +++ b/src/brostar_api_requests/formatter.py @@ -1,40 +1,58 @@ -from typing import Literal - from .connection import BROSTARConnection from .upload_models import ( Electrode, GeoOhmCable, GMWConstruction, MonitoringTube, - UploadTaskMetadata, ) -RequestTypeOptions = Literal["registration", "replace", "insert", "move", "delete"] -RegistrationTypeOptions = Literal["GMW_Construction"] + +def format_electrodes(electrodes_data: list[dict[str, str]]) -> list[Electrode]: + return [Electrode(**e) for e in electrodes_data] + + +def format_geo_ohm_cables(cables_data: list[dict[str, str]]) -> list[GeoOhmCable] | None: + geo_ohm_cables = [] + for cable in cables_data: + electrodes_list = cable.pop("electrodes", []) + geo_ohm_cables.append( + GeoOhmCable( + **cable, + electrodes=format_electrodes(electrodes_list), + ) + ) + return geo_ohm_cables if geo_ohm_cables else None + + +def format_monitoring_tubes(tubes_data: list[dict[str, str]]) -> list[MonitoringTube]: + monitoring_tubes = [] + for tube in tubes_data: + geo_ohm_cable_list = tube.pop("geo_ohm_cables", []) + monitoring_tubes.append( + MonitoringTube( + **tube, + geo_ohm_cables=format_geo_ohm_cables(geo_ohm_cable_list), + ) + ) + return monitoring_tubes + + +def build_gmw_construction( + gmw_data: dict[str, str], monitoring_tubes_data: dict[str, str] +) -> GMWConstruction: + return GMWConstruction( + **gmw_data, + object_id_accountable_party=gmw_data.get("bro_id"), + number_of_monitoring_tubes=len(monitoring_tubes_data), + monitoringTubes=format_monitoring_tubes(monitoring_tubes_data), + ) class PayloadFormatter: def __init__(self, brostar: BROSTARConnection) -> None: self.brostar = brostar - def format_metadata( - self, - request_reference: str, - delivery_accountable_party: str, - bro_id: str, - quality_regime: Literal["IMBRO", "IMBRO/A"], - ) -> UploadTaskMetadata: - return UploadTaskMetadata( - requestReference=request_reference, - deliveryAccountableParty=delivery_accountable_party, - qualityRegime=quality_regime, - broId=bro_id, - correctionReason="eigenCorrectie", - ) - - def format_gmw_construction( - self, gmw_bro_id: str - ) -> tuple[GMWConstruction, UploadTaskMetadata]: + def format_gmw_construction(self, gmw_bro_id: str) -> GMWConstruction: """Based on a BRO-ID retrieve all information for a construction""" # Get the main GMW data r = self.brostar.get("gmw/gmws", params={"bro_id": gmw_bro_id}) @@ -51,81 +69,8 @@ def format_gmw_construction( r = self.brostar.get("gmw/monitoringtubes", params={"gmw_bro_id": gmw_bro_id}) monitoring_tubes_data = r.json()["results"] - # Format monitoring tubes - monitoring_tubes = [] - for tube_data in monitoring_tubes_data: - # Format GeoOhmCables for this tube if they exist - geo_ohm_cables = [] - if tube_data.get("geo_ohm_cables"): - for cable_data in tube_data["geo_ohm_cables"]: - # Format electrodes for this cable - electrodes = [] - for electrode_data in cable_data.get("electrodes", []): - electrode = Electrode( - electrodeNumber=electrode_data["electrode_number"], - electrodePackingMaterial=electrode_data["electrode_packing_material"], - electrodeStatus=electrode_data["electrode_status"], - electrodePosition=electrode_data["electrode_position"], - ) - electrodes.append(electrode) - - geo_ohm_cable = GeoOhmCable( - cableNumber=cable_data["cable_number"], electrodes=electrodes - ) - geo_ohm_cables.append(geo_ohm_cable) - - # Create the monitoring tube object - monitoring_tube = MonitoringTube( - tubeNumber=tube_data["tube_number"], - tubeType=tube_data["tube_type"], - artesianWellCapPresent=tube_data["artesian_well_cap_present"], - sedimentSumpPresent=tube_data["sediment_sump_present"], - numberOfGeoOhmCables=tube_data["number_of_geo_ohm_cables"], - tubeTopDiameter=tube_data.get("tube_top_diameter"), - variableDiameter=tube_data["variable_diameter"], - tubeStatus=tube_data["tube_status"], - tubeTopPosition=tube_data["tube_top_position"], - tubeTopPositioningMethod=tube_data["tube_top_positioning_method"], - tubePackingMaterial=tube_data["tube_packing_material"], - tubeMaterial=tube_data["tube_material"], - glue=tube_data["glue"], - screenLength=tube_data["screen_length"], - screenProtection=tube_data.get("screen_protection"), - sockMaterial=tube_data["sock_material"], - plainTubePartLength=tube_data["plain_tube_part_length"], - sedimentSumpLength=tube_data.get("sediment_sump_length"), - geoOhmCables=geo_ohm_cables if geo_ohm_cables else None, - ) - monitoring_tubes.append(monitoring_tube) - - # Create the final GMWConstruction object - gmw_construction = GMWConstruction( - objectIdAccountableParty=gmw_data["delivery_accountable_party"], - deliveryContext=gmw_data["delivery_context"], - constructionStandard=gmw_data["construction_standard"], - initialFunction=gmw_data["initial_function"], - numberOfMonitoringTubes=gmw_data["nr_of_monitoring_tubes"], - groundLevelStable=gmw_data["ground_level_stable"], - wellStability=gmw_data.get("well_stability"), - owner=gmw_data.get("owner"), - maintenanceResponsibleParty=None, # This field doesn't appear in the API response - wellHeadProtector=gmw_data["well_head_protector"], - wellConstructionDate=gmw_data["well_construction_date"], - deliveredLocation=gmw_data["delivered_location"], - horizontalPositioningMethod=gmw_data["horizontal_positioning_method"], - localVerticalReferencePoint=gmw_data["local_vertical_reference_point"], - offset=gmw_data["offset"], - verticalDatum=gmw_data["vertical_datum"], - groundLevelPosition=gmw_data.get("ground_level_position"), - groundLevelPositioningMethod=gmw_data["ground_level_positioning_method"], - monitoringTubes=monitoring_tubes, - ) - - metadata = self.format_metadata( - bro_id=gmw_bro_id, - quality_regime=gmw_data["quality_regime"], - request_reference=gmw_data.get("intern_id", f"{gmw_bro_id}"), - delivery_accountable_party=gmw_data["delivery_accountable_party"], + gmw_construction = build_gmw_construction( + gmw_data=gmw_data, monitoring_tubes_data=monitoring_tubes_data ) - return gmw_construction, metadata + return gmw_construction diff --git a/src/brostar_api_requests/type_helpers.py b/src/brostar_api_requests/type_helpers.py new file mode 100644 index 0000000..e2970e2 --- /dev/null +++ b/src/brostar_api_requests/type_helpers.py @@ -0,0 +1,32 @@ +from typing import Literal + +QualityRegimeOptions = Literal["IMBRO", "IMBRO/A"] +CorrectionReasonOptions = Literal["eigenCorrectie", "bronhouder", "kwaliteitsregime", "inOnderzoek"] +BroDomainOptions = Literal["GMW", "GMN", "GAR", "GLD", "FRD"] +RequestTypeOptions = Literal["registration", "replace", "insert", "move", "delete"] +RegistrationTypeOptions = Literal[ + "GMW_Construction", + "GMW_ElectrodeStatus", + "GMW_GroundLevel", + "GMW_GroundLevelMeasuring", + "GMW_Insertion", + "GMW_Lengthening", + "GMW_Maintainer", + "GMW_Owner", + "GMW_Positions", + "GMW_PositionsMeasuring", + "GMW_Removal", + "GMW_Shift", + "GMW_Shortening", + "GMW_TubeStatus", + "GMW_WellHeadProtector", + "GMN_Startregistration", + "GMN_MeasuringPoint", + "GMN_MeasuringPointEndDate", + "GMN_TubeReference", + "GMN_Closure", + "GAR", + "GLD_StartRegistration", + "GLD_Addition", + "GLD_Closure", +] diff --git a/src/brostar_api_requests/upload_models.py b/src/brostar_api_requests/upload_models.py index d38e86f..5a6c314 100644 --- a/src/brostar_api_requests/upload_models.py +++ b/src/brostar_api_requests/upload_models.py @@ -1,10 +1,18 @@ import logging import uuid from datetime import date, datetime -from typing import Any, Literal +from typing import Any from pydantic import BaseModel, field_validator, model_validator +from .type_helpers import ( + BroDomainOptions, + CorrectionReasonOptions, + QualityRegimeOptions, + RegistrationTypeOptions, + RequestTypeOptions, +) + logger = logging.getLogger(__name__) @@ -16,6 +24,7 @@ def to_camel(string: str) -> str: class CamelModel(BaseModel): class Config: validate_by_name = True + extra = "ignore" # Ensure aliasing works for all fields with underscores @staticmethod @@ -26,10 +35,10 @@ def alias_generator(field_name: str) -> str: ## Uploadtask models class UploadTaskMetadata(CamelModel): request_reference: str - delivery_accountable_party: str | None - quality_regime: str - bro_id: str | None - correction_reason: str | None + delivery_accountable_party: str | None = None + bro_id: str | None = None + quality_regime: QualityRegimeOptions + correction_reason: CorrectionReasonOptions | None = None class GARBulkUploadMetadata(CamelModel): @@ -153,6 +162,7 @@ class GMWConstruction(CamelModel): delivery_context: str construction_standard: str initial_function: str + nitg_code: str number_of_monitoring_tubes: int ground_level_stable: str well_stability: str | None = None @@ -520,35 +530,6 @@ class FRDGemMeasurement(CamelModel): | FRDEmmMeasurement ) -BroDomainOptions = Literal["GMW", "GMN", "GAR", "GLD", "FRD"] -RequestTypeOptions = Literal["registration", "replace", "insert", "move", "delete"] -RegistrationTypeOptions = Literal[ - "GMW_Construction", - "GMW_ElectrodeStatus", - "GMW_GroundLevel", - "GMW_GroundLevelMeasuring", - "GMW_Insertion", - "GMW_Lengthening", - "GMW_Maintainer", - "GMW_Owner", - "GMW_Positions", - "GMW_PositionsMeasuring", - "GMW_Removal", - "GMW_Shift", - "GMW_Shortening", - "GMW_TubeStatus", - "GMW_WellHeadProtector", - "GMN_Startregistration", - "GMN_MeasuringPoint", - "GMN_MeasuringPointEndDate", - "GMN_TubeReference", - "GMN_Closure", - "GAR", - "GLD_Startregistration", - "GLD_Addition", - "GLD_Closure", -] - class UploadTask(BaseModel): bro_domain: BroDomainOptions diff --git a/src/tests/test_connection.py b/src/tests/test_connection.py index 188bea6..957046c 100644 --- a/src/tests/test_connection.py +++ b/src/tests/test_connection.py @@ -1,4 +1,5 @@ import io +import os import pytest import requests_mock @@ -10,13 +11,13 @@ @pytest.fixture def brostar(): - return BROSTARConnection(token="test-token") + token = os.getenv("BROSTAR_TEST_TOKEN", "dummy-token") + return BROSTARConnection(token=token) -def test_authentication_headers(): - brostar = BROSTARConnection(token="test-token") +def test_authentication_headers(brostar): assert brostar.s.auth.username == "__key__" - assert brostar.s.auth.password == "test-token" + assert brostar.s.auth.password == os.getenv("BROSTAR_TEST_TOKEN", "dummy-token") def test_set_website_staging(brostar: BROSTARConnection): diff --git a/src/tests/test_formatter.py b/src/tests/test_formatter.py new file mode 100644 index 0000000..17731ca --- /dev/null +++ b/src/tests/test_formatter.py @@ -0,0 +1,467 @@ +from unittest.mock import MagicMock + +import pytest +from pydantic import ValidationError + +from ..brostar_api_requests.formatter import ( + PayloadFormatter, + build_gmw_construction, + format_electrodes, + format_geo_ohm_cables, + format_monitoring_tubes, +) +from ..brostar_api_requests.upload_models import ( + Electrode, + GeoOhmCable, + GMWConstruction, + MonitoringTube, +) + + +# 1. Snake case input +def test_format_electrodes_snake_case(): + data = [ + { + "electrode_number": 1, + "electrode_packing_material": "sand", + "electrode_status": "active", + "electrode_position": 12.5, + } + ] + result = format_electrodes(data) + assert isinstance(result[0], Electrode) + assert result[0].electrode_number == 1 + + +# 2. Camel case input +def test_format_electrodes_camel_case(): + data = [ + { + "electrodeNumber": 2, + "electrodePackingMaterial": "gravel", + "electrodeStatus": "inactive", + "electrodePosition": 8.3, + } + ] + result = format_electrodes(data) + assert result[0].electrode_status == "inactive" + assert result[0].electrode_number == 2 + + +# 3. Input with extra fields (should be ignored) +def test_format_electrodes_with_extra_field(): + data = [ + { + "electrode_number": 3, + "electrode_packing_material": "clay", + "electrode_status": "active", + "electrode_position": 5.0, + "extra_field": "should be ignored", + } + ] + result = format_electrodes(data) + assert hasattr(result[0], "electrode_number") + assert not hasattr(result[0], "extra_field") + + +# 4. Faulty input: missing required field +def test_format_electrodes_missing_required_field(): + data = [ + { + "electrode_number": 4, + "electrode_packing_material": "sand", + # Missing 'electrode_status' + } + ] + with pytest.raises(ValidationError): + format_electrodes(data) + + +### GEO OHM CABLES TESTS ### +# 1. Valid cable with electrodes (snake_case) +def test_format_geo_ohm_cables_snake_case(): + data = [ + { + "cable_number": 1, + "electrodes": [ + { + "electrode_number": 1, + "electrode_packing_material": "sand", + "electrode_status": "active", + "electrode_position": 10.0, + } + ], + } + ] + result = format_geo_ohm_cables(data) + assert isinstance(result[0], GeoOhmCable) + assert result[0].electrodes[0].electrode_status == "active" + + +# 2. Valid cable with electrodes (camelCase) +def test_format_geo_ohm_cables_camel_case(): + data = [ + { + "cableNumber": 2, + "electrodes": [ + { + "electrodeNumber": 2, + "electrodePackingMaterial": "gravel", + "electrodeStatus": "inactive", + "electrodePosition": 8.2, + } + ], + } + ] + result = format_geo_ohm_cables(data) + assert result[0].cable_number == 2 + assert result[0].electrodes[0].electrode_packing_material == "gravel" + + +# 3. Empty cable list → None +def test_format_geo_ohm_cables_empty_list(): + assert format_geo_ohm_cables([]) is None + + +# 4. Missing electrodes key → default to empty list +def test_format_geo_ohm_cables_missing_electrodes(): + data = [{"cable_number": 3}] + result = format_geo_ohm_cables(data) + assert result[0].electrodes == [] + + +# 5. Extra field in cable and electrode (should be ignored) +def test_format_geo_ohm_cables_with_extra_fields(): + data = [ + { + "cable_number": 4, + "extra_cable_field": "ignore me", + "electrodes": [ + { + "electrode_number": 3, + "electrode_packing_material": "clay", + "electrode_status": "active", + "electrode_position": 6.0, + "extra_electrode_field": "ignore me too", + } + ], + } + ] + result = format_geo_ohm_cables(data) + cable = result[0] + assert not hasattr(cable, "extra_cable_field") + assert not hasattr(cable.electrodes[0], "extra_electrode_field") + + +# 6. Missing required cable_number field +def test_format_geo_ohm_cables_missing_required_field(): + data = [{"electrodes": []}] + with pytest.raises(ValidationError): + format_geo_ohm_cables(data) + + +# 7. Electrode with invalid data (e.g., missing electrode_status) +def test_format_geo_ohm_cables_invalid_electrode_data(): + data = [ + { + "cable_number": 5, + "electrodes": [ + { + "electrode_number": 4, + "electrode_packing_material": "sand", + # Missing 'electrode_status' + } + ], + } + ] + with pytest.raises(ValidationError): + format_geo_ohm_cables(data) + + +# Sample base GMW data +BASE_GMW_DATA = { + "object_id_accountable_party": "org-123", + "delivery_context": "some-context", + "construction_standard": "NEN", + "initial_function": "monitoring", + "number_of_monitoring_tubes": 1, + "ground_level_stable": "yes", + "well_head_protector": "standard", + "well_construction_date": "2023-06-01", + "delivered_location": "location-abc", + "horizontal_positioning_method": "GPS", + "local_vertical_reference_point": "point-xyz", + "offset": 0.0, + "vertical_datum": "NAP", + "ground_level_positioning_method": "manual", +} + +BASE_TUBE_DATA = [ + { + "tube_number": 1, + "tube_type": "monitor", + "artesian_well_cap_present": "yes", + "sediment_sump_present": "no", + "number_of_geo_ohm_cables": 0, + "tube_status": "active", + "tube_top_position": 10.5, + "tube_top_positioning_method": "manual", + "tube_packing_material": "gravel", + "tube_material": "PVC", + "glue": "epoxy", + "screen_length": 10.0, + "sock_material": "nylon", + "plain_tube_part_length": 5.0, + } +] + + +### MONITORING TUBES TESTS ### +# 1. Valid input with nested cables and electrodes (snake_case) +def test_format_monitoring_tubes_snake_case(): + data = BASE_TUBE_DATA + data[0]["geo_ohm_cables"] = [ + { + "cable_number": 1, + "electrodes": [ + { + "electrode_number": 1, + "electrode_packing_material": "gravel", + "electrode_status": "active", + "electrode_position": 10.0, + } + ], + } + ] + result = format_monitoring_tubes(data) + assert isinstance(result[0], MonitoringTube) + assert result[0].geo_ohm_cables[0].cable_number == 1 + + +# 2. Valid input with camelCase keys +def test_format_monitoring_tubes_camel_case(): + data = [ + { + "tubeNumber": 2, + "tubeType": "monitor", + "artesianWellCapPresent": "yes", + "sedimentSumpPresent": "yes", + "numberOfGeoOhmCables": 0, + "tubeTopDiameter": 110, + "variableDiameter": "yes", + "tubeStatus": "inactive", + "tubeTopPosition": 15.0, + "tubeTopPositioningMethod": "manual", + "tubePackingMaterial": "clay", + "tubeMaterial": "HDPE", + "glue": "none", + "screenLength": 12.0, + "sockMaterial": "polyester", + "plainTubePartLength": 6.0, + "geoOhmCables": [ + { + "cableNumber": 1, + "electrodes": [ + { + "electrodeNumber": 1, + "electrodePackingMaterial": "gravel", + "electrodeStatus": "active", + "electrodePosition": 10.0, + } + ], + } + ], + } + ] + result = format_monitoring_tubes(data) + assert result[0].tube_number == 2 + assert isinstance(result[0], MonitoringTube) + assert result[0].geo_ohm_cables[0].cable_number == 1 + assert isinstance(result[0].geo_ohm_cables[0], GeoOhmCable) + + +# 3. Missing optional fields +def test_format_monitoring_tubes_missing_optional_fields(): + data = [ + { + "tube_number": 3, + "tube_type": "monitor", + "artesian_well_cap_present": "no", + "sediment_sump_present": "yes", + "number_of_geo_ohm_cables": 0, + "tube_status": "active", + "tube_top_position": 10.0, + "tube_top_positioning_method": "survey", + "tube_packing_material": "sand", + "tube_material": "PVC", + "glue": "none", + "screen_length": 8.0, + "sock_material": "nylon", + "plain_tube_part_length": 3.0, + } + ] + result = format_monitoring_tubes(data) + assert result[0].tube_type == "monitor" + assert result[0].geo_ohm_cables is None + + +# 4. Extra fields (ignored) +def test_format_monitoring_tubes_with_extra_fields(): + data = [ + { + "tube_number": 4, + "tube_type": "test", + "artesian_well_cap_present": "yes", + "sediment_sump_present": "yes", + "number_of_geo_ohm_cables": 0, + "tube_status": "active", + "tube_top_position": 13.0, + "tube_top_positioning_method": "estimate", + "tube_packing_material": "gravel", + "tube_material": "PVC", + "glue": "adhesive", + "screen_length": 9.0, + "sock_material": "cotton", + "plain_tube_part_length": 4.0, + "extra_field": "ignore me", + } + ] + result = format_monitoring_tubes(data) + assert hasattr(result[0], "tube_number") + assert not hasattr(result[0], "extra_field") + + +# 5. Missing required field → should raise ValidationError +def test_format_monitoring_tubes_missing_required_field(): + data = [ + { + "tube_type": "test", + "artesian_well_cap_present": "yes", + "sediment_sump_present": "yes", + "number_of_geo_ohm_cables": 0, + "tube_status": "active", + "tube_top_position": 10.0, + "tube_top_positioning_method": "survey", + "tube_packing_material": "sand", + "tube_material": "PVC", + "glue": "adhesive", + "screen_length": 9.0, + "sock_material": "cotton", + "plain_tube_part_length": 4.0, + } + ] + with pytest.raises(ValidationError): + format_monitoring_tubes(data) + + +# 6. Invalid nested cable → should raise ValidationError +def test_format_monitoring_tubes_invalid_nested_cable(): + data = [ + { + "tube_number": 5, + "tube_type": "invalid", + "artesian_well_cap_present": "no", + "sediment_sump_present": "no", + "number_of_geo_ohm_cables": 1, + "tube_status": "inactive", + "tube_top_position": 11.0, + "tube_top_positioning_method": "gps", + "tube_packing_material": "sand", + "tube_material": "PVC", + "glue": "adhesive", + "screen_length": 7.0, + "sock_material": "cotton", + "plain_tube_part_length": 2.0, + "geo_ohm_cables": [ + { + # Missing cable_number + "electrodes": [] + } + ], + } + ] + with pytest.raises(ValidationError): + format_monitoring_tubes(data) + + +# 1. Valid GMW + monitoring tube +def test_build_gmw_construction_valid(): + result = build_gmw_construction(BASE_GMW_DATA, BASE_TUBE_DATA) + assert isinstance(result, GMWConstruction) + assert result.monitoring_tubes[0].tube_number == 1 + + +# 2. Valid with camelCase keys +def test_build_gmw_construction_camel_case_keys(): + gmw_data = {**BASE_GMW_DATA} + # Simulate camelCase for all fields (the model supports aliasing) + gmw_data["objectIdAccountableParty"] = gmw_data.pop("object_id_accountable_party") + monitoring_tubes = BASE_TUBE_DATA + monitoring_tubes[0]["tube_number"] = 2 + result = build_gmw_construction(gmw_data, monitoring_tubes) + assert result.monitoring_tubes[0].tube_number == 2 + + +# 3. Missing optional fields +def test_build_gmw_construction_missing_optional(): + gmw_data = BASE_GMW_DATA.copy() + del gmw_data["ground_level_stable"] + gmw_data["ground_level_stable"] = "no" + gmw_data.pop("ground_level_position", None) # Optional + gmw_data.pop("well_stability", None) # Optional + gmw_data.pop("owner", None) # Optional + gmw_data.pop("maintenance_responsible_party", None) + result = build_gmw_construction(gmw_data, BASE_TUBE_DATA) + assert result.ground_level_stable == "no" + assert result.owner is None + + +# 4. Empty monitoring tube list +def test_build_gmw_construction_empty_tubes(): + gmw_data = BASE_GMW_DATA.copy() + gmw_data["number_of_monitoring_tubes"] = 0 + result = build_gmw_construction(gmw_data, []) + assert result.monitoring_tubes == [] + + +# 5. Missing required GMW field +def test_build_gmw_construction_missing_required_field(): + gmw_data = BASE_GMW_DATA.copy() + del gmw_data["construction_standard"] # Required + with pytest.raises(ValidationError): + build_gmw_construction(gmw_data, BASE_TUBE_DATA) + + +# 6. Invalid nested monitoring tube (e.g., missing required field) +def test_build_gmw_construction_invalid_nested_tube(): + tube_data = [ + { + "tube_number": 1, + "tube_type": "monitor", + # Missing many required fields + } + ] + with pytest.raises(ValidationError): + build_gmw_construction(BASE_GMW_DATA, tube_data) + + +def test_format_gmw_construction_valid(monkeypatch): + # Arrange + mock_brostar = MagicMock() + + # Mocking API responses + mock_brostar.get.side_effect = [ + MagicMock(json=lambda: {"results": [BASE_GMW_DATA]}), + MagicMock(json=lambda: {"results": BASE_TUBE_DATA}), + ] + + formatter = PayloadFormatter(brostar=mock_brostar) + + # Act + result = formatter.format_gmw_construction(gmw_bro_id="12345") + + # Assert + assert isinstance(result, GMWConstruction) + # Optionally assert UploadTaskMetadata if it’s returned + # assert isinstance(result[1], UploadTaskMetadata) + assert result.monitoring_tubes[0].tube_number == 2 diff --git a/src/tests/test_upload_models.py b/src/tests/test_upload_models.py index 28f2688..2112f27 100644 --- a/src/tests/test_upload_models.py +++ b/src/tests/test_upload_models.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import Literal from uuid import UUID import pytest @@ -18,13 +19,30 @@ ) +@pytest.fixture +def gld_addition_factory() -> GLDAddition: + def _create(observation_type: Literal["controlemeting", "reguliereMeting"] = "reguliereMeting"): + return GLDAddition( + investigator_kvk="12345678", + observation_type=observation_type, + evaluation_procedure="ProcedureA", + measurement_instrument_type="InstrumentX", + process_reference="PR123", + begin_position="2024-01-01T00:00:00", + end_position="2024-01-02T00:00:00", + time_value_pairs=[TimeValuePair(time="2024-01-01T12:00:00", value=10.0)], + ) + + return _create + + def test_upload_task_metadata_valid(): metadata = UploadTaskMetadata( request_reference="REQ123", delivery_accountable_party="Some Party", - quality_regime="High", + quality_regime="IMBRO", bro_id="BRO123", - correction_reason="Correction needed", + correction_reason="eigenCorrectie", ) assert metadata.request_reference == "REQ123" assert metadata.delivery_accountable_party == "Some Party" @@ -115,7 +133,7 @@ def test_upload_task_valid(): metadata = UploadTaskMetadata( request_reference="REQ123", delivery_accountable_party=None, - quality_regime="High", + quality_regime="IMBRO", bro_id=None, correction_reason=None, ) @@ -135,7 +153,7 @@ def test_upload_task_invalid_bro_domain(): metadata = UploadTaskMetadata( request_reference="REQ123", delivery_accountable_party=None, - quality_regime="High", + quality_regime="IMBRO", bro_id=None, correction_reason=None, ) @@ -165,17 +183,8 @@ def test_time_value_pair_valid_string(): assert pair.time == "2024-01-01T12:30:00" -def test_gld_addition_auto_generate_ids(): - gld = GLDAddition( - investigator_kvk="12345678", - observation_type="reguliereMeting", - evaluation_procedure="ProcedureA", - measurement_instrument_type="InstrumentX", - process_reference="PR123", - begin_position="2024-01-01T00:00:00", - end_position="2024-01-02T00:00:00", - time_value_pairs=[TimeValuePair(time="2024-01-01T12:00:00", value=10.0)], - ) +def test_gld_addition_auto_generate_ids(gld_addition_factory): + gld = gld_addition_factory(observation_type="controlemeting") # The fields should have been auto-populated with a UUID string assert gld.observation_id.startswith("_") assert gld.observation_process_id.startswith("_") @@ -187,31 +196,13 @@ def test_gld_addition_auto_generate_ids(): UUID(gld.measurement_timeseries_id[1:]) -def test_gld_addition_validation_status_reguliere_meting(): - gld = GLDAddition( - investigator_kvk="12345678", - observation_type="reguliereMeting", - evaluation_procedure="ProcedureA", - measurement_instrument_type="InstrumentX", - process_reference="PR123", - begin_position="2024-01-01T00:00:00", - end_position="2024-01-02T00:00:00", - time_value_pairs=[TimeValuePair(time="2024-01-01T12:00:00", value=10.0)], - ) +def test_gld_addition_validation_status_reguliere_meting(gld_addition_factory): + gld = gld_addition_factory(observation_type="reguliereMeting") assert gld.validation_status == "onbekend" -def test_gld_addition_validation_status_controlemeting(): - gld = GLDAddition( - investigator_kvk="12345678", - observation_type="controlemeting", - evaluation_procedure="ProcedureB", - measurement_instrument_type="InstrumentY", - process_reference="PR456", - begin_position="2024-02-01T00:00:00", - end_position="2024-02-02T00:00:00", - time_value_pairs=[TimeValuePair(time="2024-02-01T12:00:00", value=12.0)], - ) +def test_gld_addition_validation_status_controlemeting(gld_addition_factory): + gld = gld_addition_factory(observation_type="controlemeting") assert gld.validation_status is None