From 8da47b0fe14d54e0e45f349307c5a2dac02a7f06 Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Sat, 6 Sep 2025 20:17:48 +0200 Subject: [PATCH] Using of dataframe! macro in tests --- src/lib.rs | 212 ++++++++++++++++++++++++++++++++++++++++---------- src/pregel.rs | 33 ++------ 2 files changed, 179 insertions(+), 66 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2590090..3b8d9ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +mod connected_components; mod pagerank; mod pregel; mod shortest_paths; @@ -7,13 +8,54 @@ use datafusion::error::Result; use datafusion::functions_aggregate::count::count; use datafusion::prelude::*; +/// Column names for the vertex id column. pub const VERTEX_ID: &str = "id"; +/// Column names for the edge source column. pub const EDGE_SRC: &str = "src"; +/// Column names for the edge destination column. pub const EDGE_DST: &str = "dst"; +/// Column names for the edge column in triplet representation. pub const EDGE_COL: &str = "edge"; +/// Column names for the source vertex in triplet representation. pub const SRC_VERTEX: &str = "src_vertex"; +/// Column names for the destination vertex in triplet representation. pub const DST_VERTEX: &str = "dst_vertex"; +/// A data structure representing a graph in the form of vertices and edges. +/// +/// The `GraphFrame` struct is designed to hold a graph's data where vertices +/// (nodes) and edges (connections) are represented as `DataFrame` structures. +/// +/// # Fields +/// +/// * `vertices` - A `DataFrame` that contains information about the graph's vertices. +/// Each row in the `DataFrame` represents a vertex (`VERTEX_ID`), and additional +/// columns can store attributes (e.g., labels or properties) for +/// each vertex. +/// +/// * `edges` - A `DataFrame` that contains information about the graph's edges. +/// Each row in the `DataFrame` represents an edge, with columns +/// typically storing the source vertex (`EDGE_SRC`), destination vertex (`EDGE_DST`), and +/// any additional attributes (e.g., weights or labels) associated +/// with the edge. +/// +/// # Example +/// +/// ``` +/// use datafusion::dataframe; +/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; +/// let vertices = dataframe!( +/// VERTEX_ID => vec![1i64, 2i64, 3i64], +/// "attr" => vec!["a", "b", "c"] +/// ).unwrap(); +/// let edges = dataframe!( +/// EDGE_SRC => vec![1i64, 2i64, 3i64], +/// EDGE_DST => vec![3i64, 1i64, 2i64], +/// "attr" => vec!["d", "j", "h"] +/// ).unwrap(); +/// +/// let graph = GraphFrame { vertices, edges }; +/// ``` #[derive(Debug, Clone)] pub struct GraphFrame { pub vertices: DataFrame, @@ -21,16 +63,98 @@ pub struct GraphFrame { } impl GraphFrame { + /// Returns the total number of nodes in the graph. + /// + /// # Returns + /// + /// This function returns a `Result`: + /// - `Ok(i64)`: The total number of nodes (vertices) in the graph, represented as a 64-bit signed integer. + /// - `Err`: If an error occurs during the computation or retrieval of the node count. + /// + /// # Example + /// + /// ``` + /// use datafusion::dataframe; + /// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; + /// let vertices = dataframe!( + /// VERTEX_ID => vec![1i64, 2i64, 3i64], + /// "attr" => vec!["a", "b", "c"] + /// ).unwrap(); + /// let edges = dataframe!( + /// EDGE_SRC => vec![1i64, 2i64, 3i64], + /// EDGE_DST => vec![3i64, 1i64, 2i64], + /// "attr" => vec!["d", "j", "h"] + /// ).unwrap(); + /// + /// let graph = GraphFrame { vertices, edges }; + /// let node_count = graph.num_nodes(); + /// ``` pub async fn num_nodes(&self) -> Result { let count = self.vertices.clone().count().await?; Ok(count as i64) } + /// Returns the total number of edges in the graph. + /// + /// # Returns + /// + /// This function returns a `Result`: + /// - `Ok(i64)` - The total number of edges, represented as a 64-bit integer. + /// - `Err(E)` - If an error occurs during the computation, the error is propagated. + /// + /// # Examples + /// + /// ``` + /// use datafusion::dataframe; + /// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; + /// let vertices = dataframe!( + /// VERTEX_ID => vec![1i64, 2i64, 3i64], + /// "attr" => vec!["a", "b", "c"] + /// ).unwrap(); + /// let edges = dataframe!( + /// EDGE_SRC => vec![1i64, 2i64, 3i64], + /// EDGE_DST => vec![3i64, 1i64, 2i64], + /// "attr" => vec!["d", "j", "h"] + /// ).unwrap(); + /// + /// let graph = GraphFrame { vertices, edges }; + /// let edge_count = graph.num_edges(); + /// ``` pub async fn num_edges(&self) -> Result { let count = self.edges.clone().count().await?; Ok(count as i64) } + /// Computes the in-degrees for each vertex in the graph. + /// + /// This function calculates the in-degree of each vertex by counting the number of + /// incoming edges. It returns a `DataFrame` + /// containing two columns: + /// - `VERTEX_ID`: The unique identifier of the vertex (derived from the destination of the edges). + /// - `in_degree`: The count of incoming edges (in-degrees) for each vertex. + /// + /// # Returns + /// An asynchronous function that returns: + /// - `Ok(DataFrame)` containing the vertex IDs and their corresponding in-degrees. + /// - `Err` if the aggregation or selection operation fails. + /// + /// # Example + /// ```rust + /// use datafusion::dataframe; + /// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; + /// let vertices = dataframe!( + /// VERTEX_ID => vec![1i64, 2i64, 3i64], + /// "attr" => vec!["a", "b", "c"] + /// ).unwrap(); + /// let edges = dataframe!( + /// EDGE_SRC => vec![1i64, 2i64, 3i64], + /// EDGE_DST => vec![3i64, 1i64, 2i64], + /// "attr" => vec!["d", "j", "h"] + /// ).unwrap(); + /// + /// let graph = GraphFrame { vertices, edges }; + /// let edge_count = graph.in_degrees(); + /// ``` pub async fn in_degrees(&self) -> Result { let df = self.edges.clone().aggregate( vec![col(EDGE_DST)], @@ -38,7 +162,36 @@ impl GraphFrame { )?; Ok(df.select(vec![col(EDGE_DST).alias(VERTEX_ID), col("in_degree")])?) } - + /// Computes the out-degrees for each vertex in the graph. + /// + /// This function calculates the out-degree of each vertex by counting the number of + /// outcoming edges. It returns a `DataFrame` + /// containing two columns: + /// - `VERTEX_ID`: The unique identifier of the vertex (derived from the destination of the edges). + /// - `in_degree`: The count of incoming edges (in-degrees) for each vertex. + /// + /// # Returns + /// An asynchronous function that returns: + /// - `Ok(DataFrame)` containing the vertex IDs and their corresponding in-degrees. + /// - `Err` if the aggregation or selection operation fails. + /// + /// # Example + /// ```rust + /// use datafusion::dataframe; + /// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST}; + /// let vertices = dataframe!( + /// VERTEX_ID => vec![1i64, 2i64, 3i64], + /// "attr" => vec!["a", "b", "c"] + /// ).unwrap(); + /// let edges = dataframe!( + /// EDGE_SRC => vec![1i64, 2i64, 3i64], + /// EDGE_DST => vec![3i64, 1i64, 2i64], + /// "attr" => vec!["d", "j", "h"] + /// ).unwrap(); + /// + /// let graph = GraphFrame { vertices, edges }; + /// let edge_count = graph.in_degrees(); + /// ``` pub async fn out_degrees(&self) -> Result { let df = self.edges.clone().aggregate( vec![col(EDGE_SRC)], @@ -133,7 +286,6 @@ impl GraphFrame { /// let graph = GraphFrame { vertices, edges }; /// let triplets = graph.triplets(); /// ``` - /// // Assuming `edges_df` and `vertices_df` are initialized DataFrames for pub async fn triplets(&self) -> Result { let edges_struct = self.edges.clone().select(vec![ col(EDGE_SRC), @@ -189,46 +341,24 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Fields}; use std::collections::HashMap; - use std::sync::Arc; - - fn create_test_graph() -> Result { - let ctx = SessionContext::new(); - - let vertices_data = RecordBatch::try_new( - SchemaRef::from(Schema::new(vec![ - Field::new("id", DataType::Int64, false), - Field::new("name", DataType::Utf8, false), - ])), - vec![ - Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), - Arc::new(StringArray::from(vec![ - "Hub", "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Henry", - "Ivy", - ])), - ], - ); - let vertices = ctx.read_batch(vertices_data?)?; - - let edges_data = RecordBatch::try_new( - SchemaRef::from(Schema::new(vec![ - Field::new("src", DataType::Int64, false), - Field::new("dst", DataType::Int64, false), - ])), - vec![ - Arc::new(Int64Array::from(vec![ - 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7, - 8, 8, 9, 10, - ])), - Arc::new(Int64Array::from(vec![ - 2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 4, 5, 6, 5, 6, 7, 6, 7, 8, 7, 8, 8, 9, - 9, 10, 10, 1, - ])), - ], - ); - let edges = ctx.read_batch(edges_data?)?; + + pub(crate) fn create_test_graph() -> Result { + let vertices = dataframe!( + VERTEX_ID => vec![1i64, 2i64, 3i64, 4i64, 5i64, 6i64, 7i64, 8i64, 9i64, 10i64], + "name" => vec!["Hub", "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Henry", "Ivy"] + )?; + + let edges = dataframe!( + EDGE_SRC => Vec::::from( + vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7, 8, 8, 9, 10,] + ), + EDGE_DST => Vec::::from( + vec![2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 4, 5, 6, 5, 6, 7, 6, 7, 8, 7, 8, 8, 9, 9, 10, 10, 1,] + ), + )?; Ok(GraphFrame { vertices, edges }) } diff --git a/src/pregel.rs b/src/pregel.rs index efbedf8..ffca023 100644 --- a/src/pregel.rs +++ b/src/pregel.rs @@ -413,36 +413,19 @@ impl GraphFrame { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::{Array, Int32Array, Int64Array, RecordBatch}; - use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::arrow::array::{Array, Int32Array, Int64Array}; use datafusion::functions_aggregate::min_max::max; use datafusion::functions_aggregate::sum::sum; - use std::sync::Arc; fn create_graph(vertices: Vec, edges: Vec>) -> Result { - let ctx = SessionContext::new(); - - let vertices_data = RecordBatch::try_new( - SchemaRef::from(Schema::new(vec![Field::new("id", DataType::Int64, false)])), - vec![Arc::new(Int64Array::from(vertices))], - )?; - let vertices_df = ctx.read_batch(vertices_data)?; - - let edges_data = RecordBatch::try_new( - SchemaRef::from(Schema::new(vec![ - Field::new("src", DataType::Int64, false), - Field::new("dst", DataType::Int64, false), - ])), - vec![ - Arc::new(Int64Array::from( - edges.iter().map(|e| e[0]).collect::>(), - )), - Arc::new(Int64Array::from( - edges.iter().map(|e| e[1]).collect::>(), - )), - ], + let vertices_df = dataframe!( + VERTEX_ID => Vec::::from(vertices), )?; - let edges_df = ctx.read_batch(edges_data)?; + let edges_df = dataframe!(EDGE_SRC => Vec::::from( + edges.iter().map(|e| e[0]).collect::>() + ), EDGE_DST => Vec::::from( + edges.iter().map(|e| e[1]).collect::>() + ))?; Ok(GraphFrame { vertices: vertices_df,