Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_schema::FieldRef;
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
Expand Down Expand Up @@ -1074,9 +1075,7 @@ impl DataFrame {
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.filter(|f| !matches!(f.data_type(), DataType::Boolean))
.map(|f| min(ident(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
Expand All @@ -1085,9 +1084,7 @@ impl DataFrame {
vec![],
original_schema_fields
.clone()
.filter(|f| {
!matches!(f.data_type(), DataType::Binary | DataType::Boolean)
})
.filter(|f| !matches!(f.data_type(), DataType::Boolean))
.map(|f| max(ident(f.name())).alias(f.name()))
.collect::<Vec<_>>(),
),
Expand Down Expand Up @@ -1126,14 +1123,31 @@ impl DataFrame {
Arc::new(StringArray::from(vec!["null"]))
} else if field.data_type().is_numeric() {
cast(column, &DataType::Float64)?
} else if field.data_type().is_binary() {
let formatter = ArrayFormatter::try_new(
column.as_ref(),
&FormatOptions::default(),
)?;
let values: Vec<Option<String>> = (0..column.len())
.map(|i| {
if column.is_null(i) {
None
} else {
let value = formatter.value(i);
Some(value.to_string())
}
})
.collect();
Arc::new(StringArray::from(values))
} else {
cast(column, &DataType::Utf8)?
}
}
_ => Arc::new(StringArray::from(vec!["null"])),
}
}
//Handling error when only boolean/binary column, and in other cases
// Handles the case where all columns were filtered out
// (e.g. only boolean columns for mean/std/min/max/median)
Err(err)
if err.to_string().contains(
"Error during planning: \
Expand Down Expand Up @@ -1517,7 +1531,7 @@ impl DataFrame {
/// # }
pub async fn to_string(self) -> Result<String> {
let options = self.session_state.config().options().format.clone();
let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
let arrow_options: FormatOptions = (&options).try_into()?;

let registry = self.session_state.extension_type_registry();
let formatter_factory = DFArrayFormatterFactory::new(Arc::clone(registry));
Expand Down
59 changes: 59 additions & 0 deletions datafusion/core/tests/dataframe/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{
BinaryArray, BinaryViewArray, FixedSizeBinaryArray, LargeBinaryArray, RecordBatch,
};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{Result, test_util::parquet_test_data};
Expand Down Expand Up @@ -112,6 +118,59 @@ async fn describe_null() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn describe_binary_columns() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("bin", DataType::Binary, true),
Field::new("lbin", DataType::LargeBinary, true),
Field::new("vbin", DataType::BinaryView, true),
Field::new("fbin", DataType::FixedSizeBinary(2), true),
]));

let bin: BinaryArray = vec![Some([0x00u8, 0x01]), Some([0xff, 0xee]), None]
.into_iter()
.collect();
let lbin: LargeBinaryArray = vec![Some([0x00u8, 0x01]), Some([0xff, 0xee]), None]
.into_iter()
.collect();
let vbin: BinaryViewArray = vec![Some([0x00u8, 0x01]), Some([0xff, 0xee]), None]
.into_iter()
.collect();
let fbin = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
[Some([0x00u8, 0x01]), Some([0xff, 0xee]), None].into_iter(),
2,
)?;
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(bin),
Arc::new(lbin),
Arc::new(vbin),
Arc::new(fbin),
],
)?;
let ctx = SessionContext::new();
ctx.register_batch("t", batch)?;
let result = ctx.table("t").await?.describe().await?.collect().await?;

assert_snapshot!(batches_to_string(&result),
@r"
+------------+------+------+------+------+
| describe | bin | lbin | vbin | fbin |
+------------+------+------+------+------+
| count | 2 | 2 | 2 | 2 |
| null_count | 1 | 1 | 1 | 1 |
| mean | null | null | null | null |
| std | null | null | null | null |
| min | 0001 | 0001 | 0001 | 0001 |
| max | ffee | ffee | ffee | ffee |
| median | null | null | null | null |
+------------+------+------+------+------+
");

Ok(())
}

/// Return a SessionContext with parquet file registered
async fn parquet_context() -> SessionContext {
let ctx = SessionContext::new();
Expand Down