From 9059ce7116461c57389a7d59d82aeef0381700b5 Mon Sep 17 00:00:00 2001
From: Pavlos-Petros Tournaris
Date: Tue, 27 May 2025 16:21:02 +0300
Subject: [PATCH] Allow special handling of Geometry types
---
src/postgres/mod.rs | 2 +
src/postgres/postgres_geometry_type.rs | 52 ++++++++++++++++++++
src/postgres/postgres_geometry_type_tests.rs | 16 ++++++
src/postgres/postgres_operator_impl.rs | 33 ++++++++++++-
4 files changed, 102 insertions(+), 1 deletion(-)
create mode 100644 src/postgres/postgres_geometry_type.rs
create mode 100644 src/postgres/postgres_geometry_type_tests.rs
diff --git a/src/postgres/mod.rs b/src/postgres/mod.rs
index 2930a9a..e7a9628 100644
--- a/src/postgres/mod.rs
+++ b/src/postgres/mod.rs
@@ -1,4 +1,5 @@
pub mod postgres_config;
+pub mod postgres_geometry_type;
pub mod postgres_operator;
pub mod postgres_operator_impl;
pub mod postgres_row_struct;
@@ -6,4 +7,5 @@ pub mod table_mode;
pub mod table_query;
#[cfg(test)]
+mod postgres_geometry_type_tests;
mod postgres_operator_tests;
diff --git a/src/postgres/postgres_geometry_type.rs b/src/postgres/postgres_geometry_type.rs
new file mode 100644
index 0000000..4430e33
--- /dev/null
+++ b/src/postgres/postgres_geometry_type.rs
@@ -0,0 +1,52 @@
+pub struct PostgresGeometryType {
+ value_type: String,
+}
+
+impl PostgresGeometryType {
+ const MULTIPOLYGON: &str = "MULTIPOLYGON";
+ const GEOMETRY_TYPE_FROM_TEXT_PREFIX: &str = "ST_GeomFromText";
+ const ACCEPTED_GEOMETRY_KEYWORDS: &[&str] = &[Self::MULTIPOLYGON];
+ const NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK: usize = 30;
+
+ pub fn new(input: &str) -> Self {
+ let value_type = input
+ .chars()
+ .take(Self::NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK)
+ .collect::();
+
+ let sanitized_value_type = if value_type.starts_with('"') && value_type.ends_with('"') {
+ value_type
+ .strip_prefix('"')
+ .and_then(|s| s.strip_suffix('"'))
+ .unwrap_or(&value_type)
+ .to_string()
+ } else {
+ value_type.to_string()
+ };
+
+ PostgresGeometryType {
+ value_type: sanitized_value_type,
+ }
+ }
+
+ pub fn is_geometry_type(&self) -> bool {
+ self.value_type
+ .find('(')
+ .map(|pos| &self.value_type[..pos])
+ .is_some_and(|prefix| Self::ACCEPTED_GEOMETRY_KEYWORDS.contains(&prefix))
+ }
+
+ pub fn format_value(&self, value: &str) -> String {
+ let value_type =
+ &self.value_type[..self.value_type.find('(').unwrap_or(self.value_type.len())];
+
+ match value_type {
+ Self::MULTIPOLYGON => format!(
+ "{}('{}', 4326)",
+ Self::GEOMETRY_TYPE_FROM_TEXT_PREFIX,
+ value
+ ),
+ _ => self.value_type.clone(),
+ }
+ }
+}
diff --git a/src/postgres/postgres_geometry_type_tests.rs b/src/postgres/postgres_geometry_type_tests.rs
new file mode 100644
index 0000000..f75a979
--- /dev/null
+++ b/src/postgres/postgres_geometry_type_tests.rs
@@ -0,0 +1,16 @@
+#[cfg(test)]
+mod tests {
+ use crate::postgres::postgres_geometry_type::PostgresGeometryType;
+
+ #[test]
+ fn test_is_geometry_type() {
+ let geometry_type = PostgresGeometryType::new("MULTIPOLYGON(");
+ assert!(geometry_type.is_geometry_type());
+ }
+
+ #[test]
+ fn test_is_not_geometry_type() {
+ let geometry_type = PostgresGeometryType::new("POINT(");
+ assert!(!geometry_type.is_geometry_type());
+ }
+}
diff --git a/src/postgres/postgres_operator_impl.rs b/src/postgres/postgres_operator_impl.rs
index 82580c7..e3eb28e 100644
--- a/src/postgres/postgres_operator_impl.rs
+++ b/src/postgres/postgres_operator_impl.rs
@@ -13,6 +13,7 @@ use std::{fmt::Display, time::Instant};
use TableQuery::*;
use tracing::info;
+use super::postgres_geometry_type::PostgresGeometryType;
pub(crate) use super::postgres_operator::PostgresOperator;
use super::{
postgres_operator::{InsertDataframePayload, UpsertDataframePayload},
@@ -219,6 +220,8 @@ impl PostgresOperator for PostgresOperatorImpl {
let column_names = df.get_column_names_str();
let fields = column_names.join(", ");
+ debug!("Columns names: {fields}");
+
let df_height = df.height().to_f64().unwrap();
info!("Total DF height: {df_height}");
@@ -255,7 +258,7 @@ impl PostgresOperator for PostgresOperatorImpl {
.iter()
.map(|column| {
let v = column.get(row_idx).unwrap();
- RowStruct::new(&v).displayed()
+ preprocess_value(&v)
})
.collect::>()
.join(", ");
@@ -284,6 +287,10 @@ impl PostgresOperator for PostgresOperatorImpl {
"Failed to insert data into table -> {}: {e}",
payload.table_name
);
+ panic!(
+ "Failed query: {}",
+ query.chars().take(1000).collect::()
+ );
}
}
@@ -473,3 +480,27 @@ fn insert_delay() -> std::time::Duration {
.unwrap(),
)
}
+
+fn preprocess_value(value: &AnyValue) -> String {
+ match value {
+ AnyValue::String(_) | AnyValue::StringOwned(_) => {
+ let string_value = &value.str_value();
+ let potential_geometry_value = string_value.to_string();
+ let potential_geometry_value = potential_geometry_value.trim();
+ let postgres_geometry_type = PostgresGeometryType::new(potential_geometry_value);
+ if postgres_geometry_type.is_geometry_type() {
+ let formatted_geometry_value =
+ postgres_geometry_type.format_value(potential_geometry_value);
+ debug!("Formatted Geometry value: {formatted_geometry_value}");
+ formatted_geometry_value
+ } else {
+ RowStruct::FromString(string_value.to_string()).displayed()
+ }
+ }
+ _ => {
+ debug!("On other =>: {}", RowStruct::new(value).displayed());
+ debug!("Type of value: {}", value.dtype());
+ RowStruct::new(value).displayed()
+ }
+ }
+}