Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod postgres_config;
pub mod postgres_geometry_type;
pub mod postgres_operator;
pub mod postgres_operator_impl;
pub mod postgres_row_struct;
pub mod table_mode;
pub mod table_query;

#[cfg(test)]
mod postgres_geometry_type_tests;
mod postgres_operator_tests;
52 changes: 52 additions & 0 deletions src/postgres/postgres_geometry_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
pub struct PostgresGeometryType {
value_type: String,
}

impl PostgresGeometryType {
const MULTIPOLYGON: &str = "MULTIPOLYGON";
const GEOMETRY_TYPE_FROM_TEXT_PREFIX: &str = "ST_GeomFromText";
const ACCEPTED_GEOMETRY_KEYWORDS: &[&str] = &[Self::MULTIPOLYGON];
const NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK: usize = 30;

pub fn new(input: &str) -> Self {
let value_type = input
.chars()
.take(Self::NUM_OF_CHARS_FOR_GEOMETRY_TYPE_CHECK)
.collect::<String>();

let sanitized_value_type = if value_type.starts_with('"') && value_type.ends_with('"') {
value_type
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(&value_type)
.to_string()
} else {
value_type.to_string()
};

PostgresGeometryType {
value_type: sanitized_value_type,
}
}

pub fn is_geometry_type(&self) -> bool {
self.value_type
.find('(')
.map(|pos| &self.value_type[..pos])
.is_some_and(|prefix| Self::ACCEPTED_GEOMETRY_KEYWORDS.contains(&prefix))
}

pub fn format_value(&self, value: &str) -> String {
let value_type =
&self.value_type[..self.value_type.find('(').unwrap_or(self.value_type.len())];

match value_type {
Self::MULTIPOLYGON => format!(
"{}('{}', 4326)",
Self::GEOMETRY_TYPE_FROM_TEXT_PREFIX,
value
),
_ => self.value_type.clone(),
}
}
}
16 changes: 16 additions & 0 deletions src/postgres/postgres_geometry_type_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#[cfg(test)]
mod tests {
use crate::postgres::postgres_geometry_type::PostgresGeometryType;

#[test]
fn test_is_geometry_type() {
let geometry_type = PostgresGeometryType::new("MULTIPOLYGON(");
assert!(geometry_type.is_geometry_type());
}

#[test]
fn test_is_not_geometry_type() {
let geometry_type = PostgresGeometryType::new("POINT(");
assert!(!geometry_type.is_geometry_type());
}
}
33 changes: 32 additions & 1 deletion src/postgres/postgres_operator_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{fmt::Display, time::Instant};
use TableQuery::*;
use tracing::info;

use super::postgres_geometry_type::PostgresGeometryType;
pub(crate) use super::postgres_operator::PostgresOperator;
use super::{
postgres_operator::{InsertDataframePayload, UpsertDataframePayload},
Expand Down Expand Up @@ -219,6 +220,8 @@ impl PostgresOperator for PostgresOperatorImpl {
let column_names = df.get_column_names_str();
let fields = column_names.join(", ");

debug!("Columns names: {fields}");

let df_height = df.height().to_f64().unwrap();

info!("Total DF height: {df_height}");
Expand Down Expand Up @@ -255,7 +258,7 @@ impl PostgresOperator for PostgresOperatorImpl {
.iter()
.map(|column| {
let v = column.get(row_idx).unwrap();
RowStruct::new(&v).displayed()
preprocess_value(&v)
})
.collect::<Vec<_>>()
.join(", ");
Expand Down Expand Up @@ -284,6 +287,10 @@ impl PostgresOperator for PostgresOperatorImpl {
"Failed to insert data into table -> {}: {e}",
payload.table_name
);
panic!(
"Failed query: {}",
query.chars().take(1000).collect::<String>()
);
}
}

Expand Down Expand Up @@ -473,3 +480,27 @@ fn insert_delay() -> std::time::Duration {
.unwrap(),
)
}

fn preprocess_value(value: &AnyValue) -> String {
match value {
AnyValue::String(_) | AnyValue::StringOwned(_) => {
let string_value = &value.str_value();
let potential_geometry_value = string_value.to_string();
let potential_geometry_value = potential_geometry_value.trim();
let postgres_geometry_type = PostgresGeometryType::new(potential_geometry_value);
if postgres_geometry_type.is_geometry_type() {
let formatted_geometry_value =
postgres_geometry_type.format_value(potential_geometry_value);
debug!("Formatted Geometry value: {formatted_geometry_value}");
formatted_geometry_value
} else {
RowStruct::FromString(string_value.to_string()).displayed()
}
}
_ => {
debug!("On other =>: {}", RowStruct::new(value).displayed());
debug!("Type of value: {}", value.dtype());
RowStruct::new(value).displayed()
}
}
}