From d84f6bf90d0ee55a0afc44445b737698d8d664b1 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Wed, 3 Sep 2025 17:30:53 +0200 Subject: [PATCH 1/5] Add Connected Components implementation and symmetrize graph function WIP --- src/connected_components.rs | 231 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 106 ++++++++++++++++- 2 files changed, 336 insertions(+), 1 deletion(-) create mode 100644 src/connected_components.rs diff --git a/src/connected_components.rs b/src/connected_components.rs new file mode 100644 index 0000000..b7e5b5b --- /dev/null +++ b/src/connected_components.rs @@ -0,0 +1,231 @@ +//! Implementation inspired by GraphFrames Spark/Scala code +//! that is distributed under Apache License 2.0. +//! https://github.com/graphframes/graphframes/blob/master/core/src/main/scala/org/graphframes/lib/ConnectedComponents.scala + +use crate::{EDGE_DST, EDGE_SRC, GraphFrame, VERTEX_ID}; +use datafusion::arrow::array::{Array, Decimal128Array}; +use datafusion::arrow::datatypes::DataType; +use datafusion::error::Result; +use datafusion::functions_aggregate::expr_fn::{count, min, sum}; +use datafusion::prelude::*; + +pub const COMPONENT_COL: &str = "component"; +const MIN_NBR: &str = "min_nbr"; +const CNT_OF_NBR: &str = "cnt_of_nbr"; + +fn min_neighbours(edges: &DataFrame, including_self: bool) -> Result { + let res = edges + .clone() + .union(edges.clone().select(vec![ + col(EDGE_DST).alias(EDGE_SRC), + col(EDGE_SRC).alias(EDGE_DST), + ])?)? + .aggregate( + vec![col(EDGE_SRC).alias(VERTEX_ID)], + vec![min(col(EDGE_DST)).alias(MIN_NBR), count(col(EDGE_DST))], + ); + + if including_self { + res?.with_column( + MIN_NBR, + when(col(VERTEX_ID).lt(col(MIN_NBR)), col(VERTEX_ID)).otherwise(col(MIN_NBR))?, + ) + } else { + res + } +} + +/// Calculate the sum of all the minimum neighbor values in a DataFrame. +async fn min_nbr_sum(min_neighbours: &DataFrame) -> Result { + min_neighbours + .clone() + .aggregate( + vec![], + vec![sum(cast(col(MIN_NBR), DataType::Decimal128(38, 0))).alias(MIN_NBR)], + )? + .collect() + .await? + .first() + .ok_or(datafusion::error::DataFusionError::Internal( + "failed to calculate and collect min_nbr_sum: result is empty".to_string(), + ))? + .column(0) + .as_any() + .downcast_ref::() + .ok_or(datafusion::error::DataFusionError::Internal( + "failed to get min_nbr_sum as Decimal128Array".to_string(), + )) + .map(|a| a.value(0)) +} + +#[derive(Debug)] +pub struct ConnectedComponentsOutput { + pub data: DataFrame, + pub num_iterations: usize, + pub min_nbr_sum: Vec, +} + +#[derive(Debug)] +pub struct ConnectedComponentsBuilder<'a> { + graph_frame: &'a GraphFrame, + checkpoint_interval: i32, +} + +impl<'a> ConnectedComponentsBuilder<'a> { + pub fn new(graph_frame: &'a GraphFrame) -> Self { + Self { + graph_frame, + checkpoint_interval: 1, + } + } + + pub fn checkpoint_interval(mut self, checkpoint_interval: i32) -> Self { + self.checkpoint_interval = checkpoint_interval; + self + } + + pub async fn run(self) -> Result { + // Preparation of the graph: + // - removing self-loops + // - changing edge direction so SRC < DST + // - de-duplicate edges + let vertices = self.graph_frame.vertices.clone(); + let original_edges = self.graph_frame.edges.clone(); + + let no_loops_edges = original_edges.filter(col(EDGE_SRC).not_eq(col(EDGE_DST)))?; + let ordered_by_direction_edges = no_loops_edges.select(vec![ + when(col(EDGE_SRC).lt(col(EDGE_DST)), col(EDGE_SRC)) + .otherwise(col(EDGE_DST))? + .alias(EDGE_SRC), + when(col(EDGE_SRC).lt(col(EDGE_DST)), col(EDGE_DST)) + .otherwise(col(EDGE_SRC))? + .alias(EDGE_DST), + ])?; + let deduped_edges = ordered_by_direction_edges.distinct()?; + + let cc_graph = GraphFrame { + vertices: vertices.clone(), + edges: deduped_edges.clone(), + }; + + let mut iteration = 0usize; + let mut metrics = Vec::::new(); + let mut converged = false; + + let mut minimal_neighbours_1 = min_neighbours(&cc_graph.edges.clone(), true)?; + let mut last_iter_nbr_sum = min_nbr_sum(&minimal_neighbours_1.clone()).await?; + metrics.push(last_iter_nbr_sum); + let mut current_edges = deduped_edges.clone(); + + while !converged { + iteration += 1; + // large-star step: + // connects all strictly larger neighbors to the min neighbor (including self) + current_edges = current_edges + .join_on( + minimal_neighbours_1.clone(), + JoinType::Inner, + vec![col(VERTEX_ID).eq(col(EDGE_SRC))], + )? + .select(vec![ + col(EDGE_DST).alias(EDGE_SRC), + col(MIN_NBR).alias(EDGE_DST), + ])? + .distinct()? + .cache() + .await?; + + // small-star step: + // computes min neighbors (excluding self-min) + let minimal_neighbours_2 = min_neighbours(¤t_edges.clone(), false)? + .cache() + .await?; + + // connect all smaller neighbors to the min neighbor + current_edges = current_edges + .clone() + .join_on( + minimal_neighbours_2.clone(), + JoinType::Inner, + vec![col(VERTEX_ID).eq(col(EDGE_SRC))], + )? + .union(minimal_neighbours_2.select(vec![ + col(MIN_NBR).alias(EDGE_SRC), + col(VERTEX_ID).alias(EDGE_DST), + ])?)? + .distinct()? + .cache() + .await?; + minimal_neighbours_1 = min_neighbours(¤t_edges.clone(), true)? + .cache() + .await?; + let current_sum = min_nbr_sum(&minimal_neighbours_1.clone()).await?; + + if current_sum == last_iter_nbr_sum { + converged = true; + } else { + last_iter_nbr_sum = current_sum; + metrics.push(current_sum); + } + } + + Ok(ConnectedComponentsOutput { + data: vertices.join_on( + current_edges, + JoinType::Inner, + vec![col(VERTEX_ID).eq(col(EDGE_DST))], + )?, + num_iterations: iteration, + min_nbr_sum: metrics, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::create_test_graph; + use datafusion::arrow::array::Int64Array; + + #[tokio::test] + async fn test_min_nbr() -> Result<()> { + let graph = create_test_graph()?; + let min_nbrs = min_neighbours(&graph.edges.clone(), true)?; + assert_eq!(min_nbrs.schema().fields().len(), 3); + assert_eq!(min_nbrs.clone().count().await?, 10); + let collected = min_nbrs.collect().await?; + let min_neighbours = collected + .iter() + .flat_map(|batch| { + batch + .column_by_name(MIN_NBR) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + }) + .collect::>(); + min_neighbours + .iter() + .zip(vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + .for_each(|(nbr, exp)| assert_eq!(*nbr, exp)); + Ok(()) + } + + #[tokio::test] + async fn test_min_nbr_sum() -> Result<()> { + let graph = create_test_graph()?; + let min_nbrs = min_neighbours(&graph.edges.clone(), true)?; + println!("min_nbrs schema: {:?}", min_nbrs.clone().schema()); + let first = min_nbrs + .clone() + .select(vec![cast(col(MIN_NBR), DataType::Decimal128(38, 0))])?; + first.collect().await?; + let sum = min_nbr_sum(&min_nbrs).await?; + assert_eq!(sum, 10); + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 3f39f42..dcfff4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +mod connected_components; mod pagerank; mod pregel; mod shortest_paths; @@ -42,6 +43,50 @@ impl GraphFrame { )?; Ok(df.select(vec![col(EDGE_SRC).alias(VERTEX_ID), col("out_degree")])?) } + + /// Creates a symmetric graph by duplicating all edges in the reverse direction. + /// For each edge (a,b) in the graph, adds the edge (b,a) if it doesn't exist. + /// Any additional edge attributes are preserved in the reversed edges. + /// + /// # Returns + /// A new `GraphFrame` containing the original vertices and symmetrized edges. + /// + /// # Errors + /// Return a DataFusion error if the edge transformation operations fail. + pub fn symmetrize(&self) -> Result { + let vertices = self.vertices.clone(); + let edges_cols = self + .edges + .schema() + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect::>(); + let new_edges_cols = edges_cols + .clone() + .iter() + .map(|c| { + if c == EDGE_SRC { + col(EDGE_SRC).alias(EDGE_DST) + } else if c == EDGE_DST { + col(EDGE_DST).alias(EDGE_SRC) + } else { + col(c) + } + }) + .collect::>(); + + // We need to: + // - swap dst and src + // - preserve the order of columns for union + let edges = self.edges.clone().union( + self.edges + .clone() + .select(new_edges_cols)? + .select_columns(&edges_cols.iter().map(|c| c.as_str()).collect::>())?, + )?; + Ok(GraphFrame { vertices, edges }) + } } #[cfg(test)] @@ -52,7 +97,7 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - fn create_test_graph() -> Result { + pub fn create_test_graph() -> Result { let ctx = SessionContext::new(); let vertices_data = RecordBatch::try_new( @@ -227,4 +272,63 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_symmetrize() -> Result<()> { + let graph = create_test_graph()?; + let sym_graph = graph.symmetrize()?; + + // Original vertices should be preserved + assert_eq!(graph.num_nodes().await?, sym_graph.num_nodes().await?); + + // Number of edges should double + let orig_edges = graph.num_edges().await?; + let sym_edges = sym_graph.num_edges().await?; + assert_eq!(sym_edges, orig_edges * 2); + + // In and out degrees should be equal for all vertices in symmetric graph + let in_degrees = sym_graph.in_degrees().await?.collect().await?; + let out_degrees = sym_graph.out_degrees().await?.collect().await?; + + let mut in_degree_map = HashMap::new(); + let mut out_degree_map = HashMap::new(); + + for batch in in_degrees.iter() { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let degrees = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ids.len() { + in_degree_map.insert(ids.value(i), degrees.value(i)); + } + } + + for batch in out_degrees.iter() { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let degrees = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ids.len() { + out_degree_map.insert(ids.value(i), degrees.value(i)); + } + } + + for id in 1..=10 { + assert_eq!(in_degree_map.get(&id), out_degree_map.get(&id)); + } + + Ok(()) + } } From ee977ec4b9a9a4752aef4a15a8d46f43a29f13c3 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Thu, 4 Sep 2025 10:58:54 +0200 Subject: [PATCH 2/5] Add LDBC test data and enhance Connected Components implementation - Introduced test data for LDBC weakly connected components (WCC) including vertices, edges, and expected results. - Updated implementation of Connected Components to symmetrize graphs and improve edge deduplication. - Enhanced handling of special cases such as graphs with zero, single, or disconnected vertices. - Added extensive unit tests for correctness and consistency. - Updated README to reflect completed status of the Connected Components feature. --- README.md | 4 +- src/connected_components.rs | 267 +++++++++++++++--- .../test-wcc-directed-WCC.csv | 8 + .../test-wcc-directed/test-wcc-directed.e.csv | 10 + .../test-wcc-directed.properties | 13 + .../test-wcc-directed/test-wcc-directed.v.csv | 8 + 6 files changed, 265 insertions(+), 45 deletions(-) create mode 100644 testing/data/ldbc/test-wcc-directed/test-wcc-directed-WCC.csv create mode 100644 testing/data/ldbc/test-wcc-directed/test-wcc-directed.e.csv create mode 100644 testing/data/ldbc/test-wcc-directed/test-wcc-directed.properties create mode 100644 testing/data/ldbc/test-wcc-directed/test-wcc-directed.v.csv diff --git a/README.md b/README.md index 7a7d3a6..3397a92 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ It provides a similar API to Apache Spark's GraphFrames. ## Project Status -The project is in early development stage. Currently implemented features include basic graph operations, statistics, +The project is in the early development stage. Currently implemented features include basic graph operations, statistics, and Pregel API. ## Features @@ -24,7 +24,7 @@ and Pregel API. | Shortest Paths | ✓ | ✓ | | PageRank | ✓ | ✓ | | Parallel Personalized PageRank | ✓ | Planned | -| Connected Components | ✓ | In progress | +| Connected Components | ✓ | ✓ | | Strongly Connected Components | ✓ | Planned | | Triangle Count | ✓ | Planned | | Label Propagation | ✓ | Planned | diff --git a/src/connected_components.rs b/src/connected_components.rs index b7e5b5b..44e8cdf 100644 --- a/src/connected_components.rs +++ b/src/connected_components.rs @@ -6,33 +6,38 @@ use crate::{EDGE_DST, EDGE_SRC, GraphFrame, VERTEX_ID}; use datafusion::arrow::array::{Array, Decimal128Array}; use datafusion::arrow::datatypes::DataType; use datafusion::error::Result; -use datafusion::functions_aggregate::expr_fn::{count, min, sum}; +use datafusion::functions_aggregate::expr_fn::{min, sum}; use datafusion::prelude::*; pub const COMPONENT_COL: &str = "component"; const MIN_NBR: &str = "min_nbr"; -const CNT_OF_NBR: &str = "cnt_of_nbr"; -fn min_neighbours(edges: &DataFrame, including_self: bool) -> Result { - let res = edges - .clone() - .union(edges.clone().select(vec![ +/// Computes the minimum neighbor for each vertex in a graph edge list. +/// +/// This function takes a DataFrame representing the edges of a graph. It computes the minimum destination vertex +/// (neighbor) for each source vertex in the graph. Optionally, the graph can be symmetrized before +/// computing the minimum neighbors. Symmetrization ensures that for every directed edge `(u, v)`, +/// the reverse edge `(v, u)` is also included in the edge list. +fn min_neighbours(edges: &DataFrame, symmetrize: bool) -> Result { + // symmetrize edges if needed + let ee = if symmetrize { + edges.clone().union(edges.clone().select(vec![ col(EDGE_DST).alias(EDGE_SRC), col(EDGE_SRC).alias(EDGE_DST), ])?)? - .aggregate( - vec![col(EDGE_SRC).alias(VERTEX_ID)], - vec![min(col(EDGE_DST)).alias(MIN_NBR), count(col(EDGE_DST))], - ); - - if including_self { - res?.with_column( - MIN_NBR, - when(col(VERTEX_ID).lt(col(MIN_NBR)), col(VERTEX_ID)).otherwise(col(MIN_NBR))?, - ) } else { - res - } + edges.clone() + }; + ee.aggregate( + vec![col(EDGE_SRC).alias(VERTEX_ID)], + vec![min(col(EDGE_DST)).alias(MIN_NBR)], + )? + .select(vec![ + col(VERTEX_ID), + when(col(VERTEX_ID).lt(col(MIN_NBR)), col(VERTEX_ID)) + .otherwise(col(MIN_NBR))? + .alias(MIN_NBR), + ]) } /// Calculate the sum of all the minimum neighbor values in a DataFrame. @@ -68,20 +73,11 @@ pub struct ConnectedComponentsOutput { #[derive(Debug)] pub struct ConnectedComponentsBuilder<'a> { graph_frame: &'a GraphFrame, - checkpoint_interval: i32, } impl<'a> ConnectedComponentsBuilder<'a> { pub fn new(graph_frame: &'a GraphFrame) -> Self { - Self { - graph_frame, - checkpoint_interval: 1, - } - } - - pub fn checkpoint_interval(mut self, checkpoint_interval: i32) -> Self { - self.checkpoint_interval = checkpoint_interval; - self + Self { graph_frame } } pub async fn run(self) -> Result { @@ -103,19 +99,14 @@ impl<'a> ConnectedComponentsBuilder<'a> { ])?; let deduped_edges = ordered_by_direction_edges.distinct()?; - let cc_graph = GraphFrame { - vertices: vertices.clone(), - edges: deduped_edges.clone(), - }; - let mut iteration = 0usize; let mut metrics = Vec::::new(); let mut converged = false; - let mut minimal_neighbours_1 = min_neighbours(&cc_graph.edges.clone(), true)?; + let mut minimal_neighbours_1 = min_neighbours(&deduped_edges.clone(), true)?; let mut last_iter_nbr_sum = min_nbr_sum(&minimal_neighbours_1.clone()).await?; metrics.push(last_iter_nbr_sum); - let mut current_edges = deduped_edges.clone(); + let mut current_edges = deduped_edges.clone().cache().await?; while !converged { iteration += 1; @@ -125,7 +116,7 @@ impl<'a> ConnectedComponentsBuilder<'a> { .join_on( minimal_neighbours_1.clone(), JoinType::Inner, - vec![col(VERTEX_ID).eq(col(EDGE_SRC))], + vec![col(EDGE_SRC).eq(col(VERTEX_ID))], )? .select(vec![ col(EDGE_DST).alias(EDGE_SRC), @@ -147,8 +138,10 @@ impl<'a> ConnectedComponentsBuilder<'a> { .join_on( minimal_neighbours_2.clone(), JoinType::Inner, - vec![col(VERTEX_ID).eq(col(EDGE_SRC))], + vec![col(EDGE_SRC).eq(col(VERTEX_ID))], )? + .select(vec![col(MIN_NBR).alias(EDGE_SRC), col(EDGE_DST)])? + .filter(col(EDGE_SRC).not_eq(col(EDGE_DST)))? .union(minimal_neighbours_2.select(vec![ col(MIN_NBR).alias(EDGE_SRC), col(VERTEX_ID).alias(EDGE_DST), @@ -170,28 +163,65 @@ impl<'a> ConnectedComponentsBuilder<'a> { } Ok(ConnectedComponentsOutput { - data: vertices.join_on( - current_edges, - JoinType::Inner, - vec![col(VERTEX_ID).eq(col(EDGE_DST))], - )?, + data: vertices + .join_on( + current_edges, + JoinType::Left, + vec![col(VERTEX_ID).eq(col(EDGE_DST))], + )? + .select(vec![ + col(VERTEX_ID), + when(col(EDGE_SRC).is_null(), col(VERTEX_ID)) + .otherwise(col(EDGE_SRC))? + .alias(COMPONENT_COL), + ])?, num_iterations: iteration, min_nbr_sum: metrics, }) } } +impl GraphFrame { + /// Constructs a new `ConnectedComponentsBuilder` for the current graph. + /// + /// This method is used to initialize the process of finding weakly connected components + /// within the graph. It creates a `ConnectedComponentsBuilder` instance + /// associated with the current graph, allowing further configuration or direct + /// computation of connected components. + /// + /// An implementation is based on the "large star - small star" algorithm: + /// Kiveris, Raimondas, et al. "Connected components in mapreduce and beyond." + /// Proceedings of the ACM Symposium on Cloud Computing. 2014. + /// https://dl.acm.org/doi/10.1145/2670979.2670997 + /// + /// ### Returns + /// * `ConnectedComponentsBuilder`: A builder object for configuring or + /// computing connected components. + /// + /// ### Example + /// ``` + /// let graph = Graph::new(); + /// let components = graph.connected_components() + /// .run(); // Example of further usage with the builder. + /// ``` + pub fn connected_components(&self) -> ConnectedComponentsBuilder { + ConnectedComponentsBuilder::new(self) + } +} + #[cfg(test)] mod tests { use super::*; use crate::tests::create_test_graph; + use crate::util::create_ldbc_test_graph; use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{Field, Schema}; #[tokio::test] async fn test_min_nbr() -> Result<()> { let graph = create_test_graph()?; let min_nbrs = min_neighbours(&graph.edges.clone(), true)?; - assert_eq!(min_nbrs.schema().fields().len(), 3); + assert_eq!(min_nbrs.schema().fields().len(), 2); assert_eq!(min_nbrs.clone().count().await?, 10); let collected = min_nbrs.collect().await?; let min_neighbours = collected @@ -228,4 +258,155 @@ mod tests { assert_eq!(sum, 10); Ok(()) } + + #[tokio::test] + async fn test_zero_vertices() -> Result<()> { + let vertices = dataframe!(VERTEX_ID => Vec::::new())?; + let edges = dataframe!(EDGE_SRC => Vec::::new(), EDGE_DST => Vec::::new())?; + let graph = GraphFrame { vertices, edges }; + let cc = ConnectedComponentsBuilder::new(&graph).run().await?; + assert_eq!(cc.data.schema().fields().len(), 2); + assert_eq!(cc.data.count().await?, 0); + assert_eq!(cc.num_iterations, 1); + assert_eq!(cc.min_nbr_sum.len(), 1); + assert_eq!(cc.min_nbr_sum[0], 0); + Ok(()) + } + + #[tokio::test] + async fn test_single_vertex() -> Result<()> { + let vertices = dataframe!(VERTEX_ID => vec![1i64])?; + let edges = dataframe!(EDGE_SRC => Vec::::new(), EDGE_DST => Vec::::new())?; + let graph = GraphFrame { vertices, edges }; + let cc = ConnectedComponentsBuilder::new(&graph).run().await?; + assert_eq!(cc.data.schema().fields().len(), 2); + assert_eq!(cc.data.clone().count().await?, 1); + assert_eq!(cc.num_iterations, 1); + assert_eq!(cc.min_nbr_sum.len(), 1); + assert_eq!(cc.min_nbr_sum[0], 0); + assert_eq!( + cc.data + .collect() + .await? + .first() + .unwrap() + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 1i64 + ); + Ok(()) + } + + #[tokio::test] + async fn test_two_vertices() -> Result<()> { + let vertices = dataframe!(VERTEX_ID => vec![1i64, 2i64])?; + let edges = dataframe!(EDGE_SRC => vec![1i64], EDGE_DST => vec![2i64])?; + let graph = GraphFrame { vertices, edges }; + let cc = ConnectedComponentsBuilder::new(&graph).run().await?; + assert_eq!(cc.data.schema().fields().len(), 2); + assert_eq!(cc.data.clone().count().await?, 2); + assert_eq!(cc.num_iterations, 1); + assert_eq!(cc.min_nbr_sum.len(), 1); + assert_eq!(cc.min_nbr_sum[0], 2); + let batches = cc.data.sort_by(vec![col(VERTEX_ID)])?.collect().await?; + let result = batches.iter().fold(Vec::::new(), |mut acc, batch| { + acc.append( + &mut batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(), + ); + acc + }); + assert_eq!(result[0], 1i64); + assert_eq!(result[1], 1i64); + + Ok(()) + } + + #[tokio::test] + async fn test_disconnected_vertices() -> Result<()> { + let vertices = dataframe!(VERTEX_ID => vec![1i64, 2i64])?; + let edges = dataframe!(EDGE_SRC => Vec::::new(), EDGE_DST => Vec::::new())?; + let graph = GraphFrame { vertices, edges }; + let cc = ConnectedComponentsBuilder::new(&graph).run().await?; + assert_eq!(cc.data.schema().fields().len(), 2); + assert_eq!(cc.data.clone().count().await?, 2); + assert_eq!(cc.num_iterations, 1); + assert_eq!(cc.min_nbr_sum.len(), 1); + assert_eq!(cc.min_nbr_sum[0], 0); + let batches = cc.data.sort_by(vec![col(VERTEX_ID)])?.collect().await?; + let result = batches.iter().fold(Vec::::new(), |mut acc, batch| { + acc.append( + &mut batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(), + ); + acc + }); + assert_eq!(result[0], 1i64); + assert_eq!(result[1], 2i64); + + Ok(()) + } + + async fn get_ldbc_wcc_results(dataset: &str) -> Result { + let ctx = SessionContext::new(); + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let expected_wcc_schema = Schema::new(vec![ + Field::new("vertex_id", DataType::Int64, false), + Field::new("expected_component", DataType::Int64, false), + ]); + let expected_wcc_path = format!( + "{}/testing/data/ldbc/{}/{}-WCC.csv", + manifest_dir, dataset, dataset + ); + let expected_sp = ctx + .read_csv( + &expected_wcc_path, + CsvReadOptions::new() + .delimiter(b' ') + .has_header(false) + .schema(&expected_wcc_schema), + ) + .await?; + Ok(expected_sp) + } + + #[tokio::test] + async fn test_ldbc() -> Result<()> { + let expected_components = get_ldbc_wcc_results("test-wcc-directed").await?; + let graph = create_ldbc_test_graph("test-wcc-directed", false, false).await?; + + let results = graph.connected_components().run().await?.data; + let diff = results + .clone() + .join( + expected_components, + JoinType::Left, + &[VERTEX_ID], + &["vertex_id"], + None, + )? + .select(vec![ + col(VERTEX_ID), + col(COMPONENT_COL), + col("expected_component"), + ])? + .filter(col(COMPONENT_COL).not_eq(col("expected_component")))?; + + assert_eq!(diff.count().await?, 0); + + Ok(()) + } } diff --git a/testing/data/ldbc/test-wcc-directed/test-wcc-directed-WCC.csv b/testing/data/ldbc/test-wcc-directed/test-wcc-directed-WCC.csv new file mode 100644 index 0000000..b1ccd02 --- /dev/null +++ b/testing/data/ldbc/test-wcc-directed/test-wcc-directed-WCC.csv @@ -0,0 +1,8 @@ +1 1 +2 1 +3 1 +4 1 +6 6 +7 6 +8 6 +9 1 diff --git a/testing/data/ldbc/test-wcc-directed/test-wcc-directed.e.csv b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.e.csv new file mode 100644 index 0000000..8d81527 --- /dev/null +++ b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.e.csv @@ -0,0 +1,10 @@ +1 2 +1 3 +2 1 +2 3 +2 4 +4 2 +6 7 +6 8 +7 6 +9 3 diff --git a/testing/data/ldbc/test-wcc-directed/test-wcc-directed.properties b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.properties new file mode 100644 index 0000000..1a7ae3b --- /dev/null +++ b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.properties @@ -0,0 +1,13 @@ +# Filenames of graph on local filesystem +graph.test-wcc-directed.vertex-file = test-wcc-directed.v +graph.test-wcc-directed.edge-file = test-wcc-directed.v + +# Graph metadata for reporting purposes +graph.test-wcc-directed.meta.vertices = 8 +graph.test-wcc-directed.meta.edges = 10 + +# Properties describing the graph format +graph.test-wcc-directed.directed = true + +# List of supported algorithms on the graph +graph.test-wcc-directed.algorithms = wcc diff --git a/testing/data/ldbc/test-wcc-directed/test-wcc-directed.v.csv b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.v.csv new file mode 100644 index 0000000..b072e64 --- /dev/null +++ b/testing/data/ldbc/test-wcc-directed/test-wcc-directed.v.csv @@ -0,0 +1,8 @@ +1 +2 +3 +4 +6 +7 +8 +9 From 4dfceccadc614dcfdea5834b8222e25d7ac643bd Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Thu, 4 Sep 2025 11:05:58 +0200 Subject: [PATCH 3/5] Add lifetime annotations to builder return types in graph algorithms - Updated return types for `shortest_paths`, `pagerank`, and `connected_components` to include explicit lifetimes for better clarity and adherence to Rust's borrowing rules. --- src/connected_components.rs | 2 +- src/pagerank.rs | 2 +- src/shortest_paths.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connected_components.rs b/src/connected_components.rs index 44e8cdf..135ceb4 100644 --- a/src/connected_components.rs +++ b/src/connected_components.rs @@ -204,7 +204,7 @@ impl GraphFrame { /// let components = graph.connected_components() /// .run(); // Example of further usage with the builder. /// ``` - pub fn connected_components(&self) -> ConnectedComponentsBuilder { + pub fn connected_components(&self) -> ConnectedComponentsBuilder<'_> { ConnectedComponentsBuilder::new(self) } } diff --git a/src/pagerank.rs b/src/pagerank.rs index 6a681ea..8dad65c 100644 --- a/src/pagerank.rs +++ b/src/pagerank.rs @@ -102,7 +102,7 @@ impl<'a> PageRankBuilder<'a> { impl GraphFrame { /// Create a new PageRank algorithm builder - pub fn pagerank(&self) -> PageRankBuilder { + pub fn pagerank(&self) -> PageRankBuilder<'_> { PageRankBuilder::new(self) } } diff --git a/src/shortest_paths.rs b/src/shortest_paths.rs index f5c0a0c..e390b45 100644 --- a/src/shortest_paths.rs +++ b/src/shortest_paths.rs @@ -316,7 +316,7 @@ impl GraphFrame { /// /// # Returns /// a Builder object to configure and execute the shortest paths computation - pub fn shortest_paths(&self, landmarks: Vec) -> ShortestPathsBuilder { + pub fn shortest_paths(&self, landmarks: Vec) -> ShortestPathsBuilder<'_> { ShortestPathsBuilder::new(self, landmarks) } } From a773118bf3618505549c3153e2dcf49c663bad0d Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Thu, 4 Sep 2025 11:11:23 +0200 Subject: [PATCH 4/5] Make `GraphFrame` fields public and improve `connected_components` usage example in documentation. --- src/connected_components.rs | 9 ++++++--- src/lib.rs | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/connected_components.rs b/src/connected_components.rs index 135ceb4..27077e6 100644 --- a/src/connected_components.rs +++ b/src/connected_components.rs @@ -200,9 +200,12 @@ impl GraphFrame { /// /// ### Example /// ``` - /// let graph = Graph::new(); - /// let components = graph.connected_components() - /// .run(); // Example of further usage with the builder. + /// use datafusion::dataframe; + /// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; + /// let vertices = dataframe!(VERTEX_ID => vec![1i64, 2i64, 3i64]).unwrap(); + /// let edges = dataframe!(EDGE_SRC => vec![1i64, 2i64, 3i64], EDGE_DST => vec![3i64, 1i64, 2i64]).unwrap(); + /// let graph = GraphFrame { vertices, edges }; + /// let components = graph.connected_components().run(); /// ``` pub fn connected_components(&self) -> ConnectedComponentsBuilder<'_> { ConnectedComponentsBuilder::new(self) diff --git a/src/lib.rs b/src/lib.rs index 1194ba9..f74ea3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,8 +14,8 @@ pub const EDGE_DST: &str = "dst"; #[derive(Debug, Clone)] pub struct GraphFrame { - vertices: DataFrame, - edges: DataFrame, + pub vertices: DataFrame, + pub edges: DataFrame, } impl GraphFrame { From 9d16fedb16e11c448863d8002cbd8c23c207c534 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 6 Sep 2025 19:40:27 +0200 Subject: [PATCH 5/5] Update imports in tests to include `Fields` from Arrow datatypes. --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index e3bbe68..2590090 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -190,7 +190,7 @@ impl GraphFrame { mod tests { use super::*; use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use std::collections::HashMap; use std::sync::Arc;