From d9fe510b8d7f470d21b7c1e5b12f364aa1abe771 Mon Sep 17 00:00:00 2001 From: kyounghoonJang Date: Wed, 21 May 2025 22:51:10 +0900 Subject: [PATCH] feat: Add SQLite support and simplify SQL output configuration --- Cargo.lock | 1 + Cargo.toml | 2 +- crates/arkflow-plugin/src/output/sql.rs | 52 +++++++++++++++++++++---- examples/sql_output_example.yaml | 7 ++-- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8b0d83b0..f82350a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,6 +4233,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ + "cc", "pkg-config", "vcpkg", ] diff --git a/Cargo.toml b/Cargo.toml index eee9cbe1..a9a56e0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ flume = "=0.11" rumqttc = "0.24.0" # Sql -sqlx = { version = "0.8", features = [ "mysql","postgres","runtime-tokio", "tls-native-tls" ] } +sqlx = { version = "0.8", features = [ "mysql","postgres","sqlite","runtime-tokio", "tls-native-tls" ] } # Kafka aws-msk-iam-sasl-signer = "1.0.0" diff --git a/crates/arkflow-plugin/src/output/sql.rs b/crates/arkflow-plugin/src/output/sql.rs index d34b4d4c..c99cc40f 100644 --- a/crates/arkflow-plugin/src/output/sql.rs +++ b/crates/arkflow-plugin/src/output/sql.rs @@ -29,7 +29,7 @@ use tracing::warn; use sqlx::mysql::{MySqlConnectOptions, MySqlSslMode}; use sqlx::postgres::{PgConnectOptions, PgSslMode}; -use sqlx::{Connection, MySqlConnection, PgConnection, QueryBuilder}; +use sqlx::{Connection, MySqlConnection, PgConnection, SqliteConnection, QueryBuilder}; #[derive(Debug, Clone)] pub enum SqlValue { @@ -42,16 +42,16 @@ pub enum SqlValue { } #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(rename_all = "snake_case")] pub enum DatabaseType { Mysql, Postgres, - // Sqlite, + Sqlite, } pub enum DatabaseConnection { Mysql(MySqlConnection), Postgres(PgConnection), - // Sqlite(SqliteConnection), + Sqlite(SqliteConnection), } impl DatabaseConnection { /// Executes an INSERT query with the given columns and rows @@ -124,6 +124,36 @@ impl DatabaseConnection { Error::Process(format!("Failed to execute PostgresSQL query: {}", e)) })?; + Ok(()) + } + DatabaseConnection::Sqlite(conn) => { + let mut query_builder = QueryBuilder::::new(format!( + "INSERT INTO {} ({})", + output_config.table_name, + columns + .iter() + .map(|c| format!("\"{}\"", c)) + .collect::>() + .join(", "), + )); + query_builder.push_values(rows, |mut b, row| { + for value in row { + match value { + SqlValue::String(s) => b.push_bind(s), + SqlValue::Int64(i) => b.push_bind(i), + SqlValue::UInt64(u) => b.push_bind(u as i64), + SqlValue::Float64(f) => b.push_bind(f), + SqlValue::Boolean(bool) => b.push_bind(bool), + SqlValue::Null => b.push_bind(None::), + }; + } + }); + + let query = query_builder.build(); + query.execute(conn).await.map_err(|e| { + Error::Process(format!("Failed to execute SQLite query: {}", e)) + })?; + Ok(()) } } @@ -135,7 +165,7 @@ impl DatabaseConnection { #[serde(rename_all = "snake_case")] pub struct SqlOutputConfig { /// SQL query statement - output_type: DatabaseType, + database_type: DatabaseType, table_name: String, uri: String, ssl: Option, @@ -268,9 +298,10 @@ impl SqlOutput { /// Initialize a new DB connection. /// If `ssl` is configured, apply root certificates to the SSL options. async fn init_connect(&self) -> Result { - let conn = match self.sql_config.output_type { + let conn = match self.sql_config.database_type { DatabaseType::Mysql => self.generate_mysql_conn().await?, DatabaseType::Postgres => self.generate_postgres_conn().await?, + DatabaseType::Sqlite => self.generate_sqlite_conn().await?, }; Ok(conn) } @@ -374,7 +405,7 @@ impl SqlOutput { MySqlConnection::connect(&self.sql_config.uri) .await .map_err(|e| Error::Config(format!("Failed to connect to MySQL: {}", e)))? - }; + }; Ok(DatabaseConnection::Mysql(mysql_conn)) } async fn generate_postgres_conn(&self) -> Result { @@ -390,6 +421,13 @@ impl SqlOutput { }; Ok(DatabaseConnection::Postgres(postgres_conn)) } + async fn generate_sqlite_conn(&self) -> Result { + let sqlite_conn = SqliteConnection::connect(&self.sql_config.uri) + .await + .map_err(|e| Error::Config(format!("Failed to connect to SQLite: {}", e)))?; + Ok(DatabaseConnection::Sqlite(sqlite_conn)) + } + } pub(crate) struct SqlOutputBuilder; diff --git a/examples/sql_output_example.yaml b/examples/sql_output_example.yaml index 18955ff1..6674199e 100644 --- a/examples/sql_output_example.yaml +++ b/examples/sql_output_example.yaml @@ -16,10 +16,9 @@ streams: output: type: "sql" - output_type: - type: "mysql" - uri: "mysql://root:1234@localhost:3306/arkflow" - table_name: "arkflow_test" + database_type: "mysql" + uri: "mysql://root:1234@localhost:3306/arkflow" + table_name: "arkflow_test" error_output: type: "stdout" \ No newline at end of file