From 9059ce7116461c57389a7d59d82aeef0381700b5 Mon Sep 17 00:00:00 2001 From: Pavlos-Petros Tournaris Date: Tue, 27 May 2025 16:21:02 +0300 Subject: [PATCH] Allow special handling of Geometry types --- src/postgres/mod.rs | 2 + src/postgres/postgres_geometry_type.rs | 52 ++++++++++++++++++++ src/postgres/postgres_geometry_type_tests.rs | 16 ++++++ src/postgres/postgres_operator_impl.rs | 33 ++++++++++++- 4 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 src/postgres/postgres_geometry_type.rs create mode 100644 src/postgres/postgres_geometry_type_tests.rs diff --git a/src/postgres/mod.rs b/src/postgres/mod.rs index 2930a9a..e7a9628 100644 --- a/src/postgres/mod.rs +++ b/src/postgres/mod.rs @@ -1,4 +1,5 @@ pub mod postgres_config; +pub mod postgres_geometry_type; pub mod postgres_operator; pub mod postgres_operator_impl; pub mod postgres_row_struct; @@ -6,4 +7,5 @@ pub mod table_mode; pub mod table_query; #[cfg(test)] +mod postgres_geometry_type_tests; mod postgres_operator_tests; diff --git a/src/postgres/postgres_geometry_type.rs b/src/postgres/postgres_geometry_type.rs new file mode 100644 index 0000000..4430e33 --- /dev/null +++ b/src/postgres/postgres_geometry_type.rs @@ -0,0 +1,52 @@ +pub struct PostgresGeometryType { + value_type: String, +} + +impl PostgresGeometryType { + const MULTIPOLYGON: &str = "MULTIPOLYGON"; + const GEOMETRY_TYPE_FROM_TEXT_PREFIX: &str = "ST_GeomFromText"; + const ACCEPTED_GEOMETRY_KEYWORDS: &[&str] = &[Self::MULTIPOLYGON]; + const NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK: usize = 30; + + pub fn new(input: &str) -> Self { + let value_type = input + .chars() + .take(Self::NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK) + .collect::(); + + let sanitized_value_type = if value_type.starts_with('"') && value_type.ends_with('"') { + value_type + .strip_prefix('"') + .and_then(|s| s.strip_suffix('"')) + .unwrap_or(&value_type) + .to_string() + } else { + value_type.to_string() + }; + + PostgresGeometryType { + value_type: sanitized_value_type, + } + } + + pub fn is_geometry_type(&self) -> bool { + self.value_type + .find('(') + .map(|pos| &self.value_type[..pos]) + .is_some_and(|prefix| Self::ACCEPTED_GEOMETRY_KEYWORDS.contains(&prefix)) + } + + pub fn format_value(&self, value: &str) -> String { + let value_type = + &self.value_type[..self.value_type.find('(').unwrap_or(self.value_type.len())]; + + match value_type { + Self::MULTIPOLYGON => format!( + "{}('{}', 4326)", + Self::GEOMETRY_TYPE_FROM_TEXT_PREFIX, + value + ), + _ => self.value_type.clone(), + } + } +} diff --git a/src/postgres/postgres_geometry_type_tests.rs b/src/postgres/postgres_geometry_type_tests.rs new file mode 100644 index 0000000..f75a979 --- /dev/null +++ b/src/postgres/postgres_geometry_type_tests.rs @@ -0,0 +1,16 @@ +#[cfg(test)] +mod tests { + use crate::postgres::postgres_geometry_type::PostgresGeometryType; + + #[test] + fn test_is_geometry_type() { + let geometry_type = PostgresGeometryType::new("MULTIPOLYGON("); + assert!(geometry_type.is_geometry_type()); + } + + #[test] + fn test_is_not_geometry_type() { + let geometry_type = PostgresGeometryType::new("POINT("); + assert!(!geometry_type.is_geometry_type()); + } +} diff --git a/src/postgres/postgres_operator_impl.rs b/src/postgres/postgres_operator_impl.rs index 82580c7..e3eb28e 100644 --- a/src/postgres/postgres_operator_impl.rs +++ b/src/postgres/postgres_operator_impl.rs @@ -13,6 +13,7 @@ use std::{fmt::Display, time::Instant}; use TableQuery::*; use tracing::info; +use super::postgres_geometry_type::PostgresGeometryType; pub(crate) use super::postgres_operator::PostgresOperator; use super::{ postgres_operator::{InsertDataframePayload, UpsertDataframePayload}, @@ -219,6 +220,8 @@ impl PostgresOperator for PostgresOperatorImpl { let column_names = df.get_column_names_str(); let fields = column_names.join(", "); + debug!("Columns names: {fields}"); + let df_height = df.height().to_f64().unwrap(); info!("Total DF height: {df_height}"); @@ -255,7 +258,7 @@ impl PostgresOperator for PostgresOperatorImpl { .iter() .map(|column| { let v = column.get(row_idx).unwrap(); - RowStruct::new(&v).displayed() + preprocess_value(&v) }) .collect::>() .join(", "); @@ -284,6 +287,10 @@ impl PostgresOperator for PostgresOperatorImpl { "Failed to insert data into table -> {}: {e}", payload.table_name ); + panic!( + "Failed query: {}", + query.chars().take(1000).collect::() + ); } } @@ -473,3 +480,27 @@ fn insert_delay() -> std::time::Duration { .unwrap(), ) } + +fn preprocess_value(value: &AnyValue) -> String { + match value { + AnyValue::String(_) | AnyValue::StringOwned(_) => { + let string_value = &value.str_value(); + let potential_geometry_value = string_value.to_string(); + let potential_geometry_value = potential_geometry_value.trim(); + let postgres_geometry_type = PostgresGeometryType::new(potential_geometry_value); + if postgres_geometry_type.is_geometry_type() { + let formatted_geometry_value = + postgres_geometry_type.format_value(potential_geometry_value); + debug!("Formatted Geometry value: {formatted_geometry_value}"); + formatted_geometry_value + } else { + RowStruct::FromString(string_value.to_string()).displayed() + } + } + _ => { + debug!("On other =>: {}", RowStruct::new(value).displayed()); + debug!("Type of value: {}", value.dtype()); + RowStruct::new(value).displayed() + } + } +}