Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions asap-sketch-ingest/run_arroyosketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,12 @@ def create_pipeline(
"""Create a pipeline JSON based on template"""

# Escape newlines in SQL query for JSON compatibility
sql_queries = [sql_query.replace("\n", "\\n") for sql_query in sql_queries]
# Escape characters that would break the JSON string in pipeline.j2.
# Double quotes appear now that column names are quoted (issue #116).
sql_queries = [
sql_query.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
for sql_query in sql_queries
]
sql_query = "\\n\\n".join(sql_queries)

# UDFs handling
Expand Down Expand Up @@ -575,6 +580,21 @@ def delete_pipelines(args):
# )


def _quote_col(label_prefix: str, label: str) -> str:
"""Return a SQL column reference with double-quotes for case-sensitivity.

DataFusion (Arroyo's SQL engine) normalises unquoted identifiers to
lowercase, which silently corrupts mixed-case column names. Wrapping
every user-supplied identifier in double-quotes preserves the original
casing.

Examples:
_quote_col("", "hostName") -> '"hostName"'
_quote_col("labels.", "hostName") -> 'labels."hostName"'
"""
return f'{label_prefix}"{label}"'


def get_sql_query(
streaming_aggregation_config: StreamingAggregationConfig,
schema_config, # MetricConfig or SQLTableConfig
Expand Down Expand Up @@ -617,12 +637,14 @@ def get_sql_query(
value_column = "value"
label_prefix = "labels." if use_nested_labels else ""

# Double-quote all user-supplied column names so DataFusion preserves
# their original casing (issue #116).
fully_qualified_group_by_columns = [
"{}{}".format(label_prefix, label)
_quote_col(label_prefix, label)
for label in streaming_aggregation_config.labels["grouping"].keys
]
fully_qualified_agg_columns = [
"{}{}".format(label_prefix, label)
_quote_col(label_prefix, label)
for label in streaming_aggregation_config.labels["aggregated"].keys
]

Expand All @@ -634,9 +656,11 @@ def get_sql_query(
source_identifier = streaming_aggregation_config.metric
all_labels = schema_config.config[source_identifier].keys

all_labels_agg_columns = [
"{}{}".format(label_prefix, label) for label in all_labels
]
all_labels_agg_columns = [_quote_col(label_prefix, label) for label in all_labels]

# Quote the scalar column references for the same reason.
time_column = f'"{time_column}"'
value_column = f'"{value_column}"'

# Determine if timestamps should be included as argument
include_timestamps_as_argument = (
Expand Down
21 changes: 10 additions & 11 deletions asap-sketch-ingest/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ def test_sql_query_uses_value_column(
use_nested_labels=False,
)

assert "cpu_usage" in sql_query
assert "value" not in sql_query or "cpu_usage" in sql_query
assert '"cpu_usage"' in sql_query
assert '"value"' not in sql_query or '"cpu_usage"' in sql_query

def test_sql_query_no_label_prefix(
self, sql_schema_config, sql_agg_config, sql_template
Expand All @@ -246,12 +246,11 @@ def test_sql_query_no_label_prefix(
use_nested_labels=False,
)

# Should have flat column names, not labels.host
assert "labels.host" not in sql_query
assert "labels.region" not in sql_query
# Should have host and region directly
assert "host" in sql_query
assert "region" in sql_query
# Should have flat quoted column names, not labels.host
assert "labels." not in sql_query
# Should have double-quoted host and region directly
assert '"host"' in sql_query
assert '"region"' in sql_query


class TestGetSqlQueryPromQL:
Expand Down Expand Up @@ -310,7 +309,7 @@ def test_promql_query_uses_value(
use_nested_labels=True,
)

assert "value" in sql_query
assert '"value"' in sql_query

def test_promql_query_uses_label_prefix(
self, promql_metric_config, promql_agg_config, sql_template
Expand All @@ -330,8 +329,8 @@ def test_promql_query_uses_label_prefix(
use_nested_labels=True,
)

assert "labels.instance" in sql_query
assert "labels.job" in sql_query
assert 'labels."instance"' in sql_query
assert 'labels."job"' in sql_query


class TestEndToEndConfigParsing:
Expand Down