Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ impl SQLPatternParser {
Expr::Function(func) if func.name.to_string().to_uppercase() == "DATEADD" => {
self.parse_dateadd(func)
}

// Elastic semantics requires CAST() for datetime strings.
Expr::Cast { expr, .. } => match expr.as_ref() {
Expr::Value(ValueWithSpan {
value: SingleQuotedString(datetime_str),
..
}) => Self::get_timestamp_from_datetime_str(datetime_str),
_ => None,
},

_ => {
panic!("invalid time syntax {:?}", highlow);
}
Expand All @@ -362,8 +372,8 @@ impl SQLPatternParser {
_ => return None,
};

let start = self.get_timestamp_from_between_highlow(low.as_ref())?;
let end = self.get_timestamp_from_between_highlow(high.as_ref())?;
let start = self.get_timestamp_from_between_highlow(low)?;
let end = self.get_timestamp_from_between_highlow(high)?;

let duration = end - start;

Expand All @@ -388,6 +398,10 @@ impl SQLPatternParser {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => {
ident.value.to_lowercase()
}
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(ValueWithSpan {
value: SingleQuotedString(s),
..
}))) => s.to_lowercase(),
_ => return None,
};

Expand Down Expand Up @@ -429,6 +443,20 @@ impl SQLPatternParser {
value: SingleQuotedString(datetime_str),
span: _,
}))) => parse_datetime(datetime_str).ok()?.timestamp().as_second() as f64,

FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Cast { expr, .. })) => {
match expr.as_ref() {
Expr::Value(ValueWithSpan {
value: SingleQuotedString(datetime_str),
..
}) => parse_datetime(datetime_str).ok()?.timestamp().as_second() as f64,
_ => {
println!("Unsupported CAST expression in DATEADD");
return None;
}
}
}

_ => {
println!("time upper bound not calculating from present");
return None;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ ES_PORT=9200
ES_INDEX_NAME=h2o_benchmark
ES_BULK_SIZE=10000
ES_API_KEY=your-api-key
TOTAL_RECORDS=5000000
Original file line number Diff line number Diff line change
Expand Up @@ -888,49 +888,60 @@ async fn run_h2o_elasticsearch_mode(args: &Args) -> Result<(), Box<dyn std::erro
.status_code()
.is_success();

if !index_exists {
println!("Creating index: {}", args.elastic_index);
let create_response = client
if index_exists {
println!("Deleting existing index: {}", args.elastic_index);
let delete_response = client
.indices()
.create(elasticsearch::indices::IndicesCreateParts::Index(
&args.elastic_index,
))
.body(json!({
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "30s"
},
"mappings": {
"properties": {
"timestamp": {"type": "date", "format": "epoch_millis"},
"id1": {"type": "keyword"},
"id2": {"type": "keyword"},
"id3": {"type": "keyword"},
"id4": {"type": "long"},
"id5": {"type": "long"},
"id6": {"type": "long"},
"v1": {"type": "long"},
"v2": {"type": "long"},
"v3": {"type": "double"}
}
}
}))
.delete(elasticsearch::indices::IndicesDeleteParts::Index(&[
&args.elastic_index
]))
.send()
.await?;

if !create_response.status_code().is_success() {
let error_text = create_response.text().await?;
eprintln!("Failed to create index. Error response: {}", error_text);
return Err("Failed to create index".into());
if !delete_response.status_code().is_success() {
let error_text = delete_response.text().await?;
eprintln!("Failed to delete index. Error response: {}", error_text);
return Err("Failed to delete index".into());
}
println!("Index created successfully");
} else {
println!(
"Index {} already exists, skipping creation",
args.elastic_index
);
println!("Index deleted successfully");
}

println!("Creating index: {}", args.elastic_index);
let create_response = client
.indices()
.create(elasticsearch::indices::IndicesCreateParts::Index(
&args.elastic_index,
))
.body(json!({
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "30s"
},
"mappings": {
"properties": {
"timestamp": {"type": "date", "format": "epoch_millis"},
"id1": {"type": "keyword"},
"id2": {"type": "keyword"},
"id3": {"type": "keyword"},
"id4": {"type": "long"},
"id5": {"type": "long"},
"id6": {"type": "long"},
"v1": {"type": "long"},
"v2": {"type": "long"},
"v3": {"type": "double"}
}
}
}))
.send()
.await?;

if !create_response.status_code().is_success() {
let error_text = create_response.text().await?;
eprintln!("Failed to create index. Error response: {}", error_text);
return Err("Failed to create index".into());
}
println!("Index created successfully");

let file = File::open(file_path)?;
let reader = BufReader::new(file);
Expand Down Expand Up @@ -960,7 +971,7 @@ async fn run_h2o_elasticsearch_mode(args: &Args) -> Result<(), Box<dyn std::erro

// Create document with timestamp
let doc = H2oEsDoc {
timestamp: base_timestamp + (row_num * 1000), // Increment by 1 second per row
timestamp: base_timestamp + (row_num * 10), // Increment by 10 ms per row
id1: cols[0].to_string(),
id2: cols[1].to_string(),
id3: cols[2].to_string(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

row_limit = 5000000
df = pd.read_csv("../clickhouse-benchmark-pipeline/G1_1e7_1e2_0_0.csv").head(row_limit)

base_ts = int(pd.Timestamp("2024-01-01", tz="UTC").timestamp() * 1000)
timestamps = [base_ts + i * 10 for i in range(len(df))]

schema = pa.schema(
[
("timestamp", pa.timestamp("ms")),
("v1", pa.float64()),
("v2", pa.float64()),
("v3", pa.float64()),
("id1", pa.string()),
("id2", pa.string()),
("id3", pa.string()),
("id4", pa.string()),
("id5", pa.string()),
("id6", pa.string()),
]
)

table = pa.table(
{
"timestamp": pa.array(timestamps, type=pa.timestamp("ms")),
"v1": pa.array(df["v1"].astype(float), type=pa.float64()),
"v2": pa.array(df["v2"].astype(float), type=pa.float64()),
"v3": pa.array(df["v3"].astype(float), type=pa.float64()),
"id1": pa.array(df["id1"].astype(str), type=pa.string()),
"id2": pa.array(df["id2"].astype(str), type=pa.string()),
"id3": pa.array(df["id3"].astype(str), type=pa.string()),
"id4": pa.array(df["id4"].astype(str), type=pa.string()),
"id5": pa.array(df["id5"].astype(str), type=pa.string()),
"id6": pa.array(df["id6"].astype(str), type=pa.string()),
},
schema=schema,
)

pq.write_table(table, "h2o_file.parquet")
print("Done:", len(df), "rows")
Loading
Loading