Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/moonlink_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ pub use error::{Error, Result};
use mooncake_table_id::MooncakeTableId;
pub use moonlink::ReadState;
use moonlink::{ReadStateFilepathRemap, TableEventManager};
use moonlink_connectors::ReplicationManager;
pub use moonlink_connectors::{
rest_ingest::rest_source::{EventOperation, EventRequest},
REST_API_URI,
pub use moonlink_connectors::rest_ingest::rest_source::{
EventRequest, FileEventOperation, RowEventOperation, RowEventRequest,
};
use moonlink_connectors::ReplicationManager;
pub use moonlink_connectors::REST_API_URI;
use moonlink_metadata_store::base_metadata_store::MetadataStoreTrait;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down
2 changes: 2 additions & 0 deletions src/moonlink_connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ connector-pg = []

[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bigdecimal = { version = "0.4", default-features = false, features = ["std"] }
Expand All @@ -23,6 +24,7 @@ futures = { workspace = true }
moonlink = { path = "../moonlink" }
moonlink_error = { path = "../moonlink_error" }
num-traits = { workspace = true }
parquet = { workspace = true }
pg_escape = "0.1"
pin-project-lite = "0.2"
postgres-replication = { workspace = true }
Expand Down
53 changes: 44 additions & 9 deletions src/moonlink_connectors/src/rest_ingest/moonlink_rest_sink.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::rest_ingest::rest_source::SrcTableId;
use crate::rest_ingest::rest_source::{EventOperation, RestEvent};
use crate::rest_ingest::rest_source::{FileEventOperation, RestEvent, RowEventOperation};
use crate::{Error, Result};
use moonlink::TableEvent;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch};
use tracing::{debug, warn};

Expand Down Expand Up @@ -64,6 +66,10 @@ impl RestSink {
/// This is the main entry point for REST event processing, similar to moonlink_sink's process_cdc_event
pub async fn process_rest_event(&mut self, rest_event: RestEvent) -> Result<()> {
match rest_event {
// ==================
// Row events
// ==================
//
RestEvent::RowEvent {
src_table_id,
operation,
Expand All @@ -83,19 +89,48 @@ impl RestSink {
self.process_commit_event(lsn, src_table_id, timestamp)
.await
}
// ==================
// Table events
// ==================
//
RestEvent::FileEvent {
operation,
table_events,
} => self.process_file_event_boxed(operation, table_events).await,
}
}

/// Process a file event (upload files).
fn process_file_event_boxed<'a>(
&'a mut self,
operation: FileEventOperation,
table_events: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<RestEvent>>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.process_file_event_impl(operation, table_events).await })
}
async fn process_file_event_impl(
&mut self,
operation: FileEventOperation,
table_events: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<RestEvent>>>,
) -> Result<()> {
assert_eq!(operation, FileEventOperation::Upload);
let mut guard = table_events.lock().await;
while let Some(event) = guard.recv().await {
self.process_rest_event(event).await?;
}
Ok(())
}

/// Process a row event (Insert, Update, Delete)
/// Process a row event (Insert, Update, Delete).
async fn process_row_event(
&self,
src_table_id: SrcTableId,
operation: EventOperation,
operation: RowEventOperation,
row: moonlink::row::MoonlinkRow,
lsn: u64,
) -> Result<()> {
match operation {
EventOperation::Insert => {
RowEventOperation::Insert => {
let table_event = TableEvent::Append {
row,
lsn,
Expand All @@ -107,7 +142,7 @@ impl RestSink {
self.send_table_event(src_table_id, table_event).await?;
debug!(src_table_id, lsn, "processed REST insert event");
}
EventOperation::Update => {
RowEventOperation::Update => {
// For updates, we send both delete and append events
// First send delete for the old row (using the same row for simplicity)
let delete_event = TableEvent::Delete {
Expand All @@ -132,7 +167,7 @@ impl RestSink {
self.send_table_event(src_table_id, append_event).await?;
debug!(src_table_id, lsn, "processed REST update append event");
}
EventOperation::Delete => {
RowEventOperation::Delete => {
let table_event = TableEvent::Delete {
row,
lsn,
Expand Down Expand Up @@ -195,7 +230,7 @@ impl RestSink {
#[cfg(test)]
mod tests {
use super::*;
use crate::rest_ingest::rest_source::{EventOperation, RestEvent};
use crate::rest_ingest::rest_source::{RestEvent, RowEventOperation};
use moonlink::row::{MoonlinkRow, RowValue};
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -320,7 +355,7 @@ mod tests {
// Test Insert event
let insert_event = RestEvent::RowEvent {
src_table_id,
operation: EventOperation::Insert,
operation: RowEventOperation::Insert,
row: test_row.clone(),
lsn: 10,
timestamp: SystemTime::now(),
Expand All @@ -338,7 +373,7 @@ mod tests {
// Test Update event (should produce both Delete and Append)
let update_event = RestEvent::RowEvent {
src_table_id,
operation: EventOperation::Update,
operation: RowEventOperation::Update,
row: test_row.clone(),
lsn: 20,
timestamp: SystemTime::now(),
Expand Down
Loading