Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions integration/snowflake/data/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,15 +380,19 @@ def _load_all_tables(session: Session, schema_name: str) -> dict[str, int]:
logger.info("Loading tables from stage...")
results: dict[str, int] = {}

for table_name in JAFFLE_SHOP_TABLE_NAMES:
row_count: int = _load_single_table(session, table_name, schema_name)
for source_name in JAFFLE_SHOP_TABLE_NAMES:
table_name = source_name.removeprefix("raw_")
row_count: int = _load_single_table(
session, source_name, table_name, schema_name
)
results[table_name] = row_count

return results


def _load_single_table(
session: Session,
source_name: str,
table_name: str,
schema_name: str,
) -> int:
Expand All @@ -398,10 +402,11 @@ def _load_single_table(
1. Creates empty table with schema inferred from Parquet metadata (INFER_SCHEMA)
2. Copies data from stage using MATCH_BY_COLUMN_NAME for proper column mapping
"""
logger.info(f"Loading {table_name}...")
logger.info(f"Loading {source_name} -> {table_name}...")

create_sql: str = load_sql(
path="ingestion/create_table_from_parquet.sql",
source_name=source_name,
table_name=table_name,
stage_name=DEFAULT_STAGE_NAME,
schema_name=schema_name,
Expand All @@ -410,6 +415,7 @@ def _load_single_table(

copy_sql: str = load_sql(
path="ingestion/copy_into_table.sql",
source_name=source_name,
table_name=table_name,
stage_name=DEFAULT_STAGE_NAME,
schema_name=schema_name,
Expand Down
39 changes: 22 additions & 17 deletions integration/snowflake/data/preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This module creates:
- weekly_stores table: Store-week combinations with reference_date (Monday week start)
- Population view with target (next week's sales)
- Population dynamic table with target (next week's sales)

reference_date is the Monday (week start) derived from DATE_TRUNC('week', ordered_at).

Expand Down Expand Up @@ -67,7 +67,7 @@ def create_weekly_sales_by_store_with_target(

Creates:
- weekly_stores: Table with store-week combinations (reference_date = Monday)
- Population view with target column (configurable name via table_name)
- Population dynamic table with target column (configurable name via table_name)
- Target: Sum of order_total for the 7-day window starting at reference_date

When settings are provided, auto-bootstraps the warehouse and database
Expand All @@ -77,7 +77,7 @@ def create_weekly_sales_by_store_with_target(
session: Active Snowflake Snowpark session.
settings: Optional settings for auto-bootstrapping warehouse and database.
When provided, ensures infrastructure exists before preparing data.
source_schema: Schema containing raw_stores and raw_orders tables.
source_schema: Schema containing stores and orders tables.
target_schema: Schema where prepared tables/views will be created.
table_name: Name of the population view to create.

Expand All @@ -104,9 +104,13 @@ def create_weekly_sales_by_store_with_target(
================================================================================
""")

warehouse = session.get_current_warehouse()
if not warehouse:
raise DataPreparationError("No warehouse set. Required for dynamic table creation.")

_analyze_and_display_stores(session, source_schema)
per_store = _create_weekly_stores_table(session, source_schema, target_schema)
_create_target_view(session, source_schema, target_schema, table_name)
per_store = _create_weekly_stores_dynamic_table(session, source_schema, target_schema, warehouse)
_create_target_dynamic_table(session, source_schema, target_schema, table_name, warehouse)

_display_sample_data(session, target_schema, per_store, table_name)
_display_store_statistics(session, target_schema, table_name)
Expand All @@ -122,8 +126,8 @@ def create_weekly_sales_by_store_with_target(
================================================================================

Objects created in {target_schema} schema:
- weekly_stores (TABLE)
- {table_name} (VIEW - USE THIS FOR getML)
- weekly_stores (DYNAMIC TABLE)
- {table_name} (DYNAMIC TABLE - USE THIS FOR getML)

Population table: {qualified_table_name}
""")
Expand All @@ -138,7 +142,7 @@ def create_weekly_sales_by_store_with_target(

def _validate_source_tables(session: Session, source_schema: str) -> None:
"""Validate that source schema exists and contains required tables."""
required_tables: list[str] = ["raw_stores", "raw_orders"]
required_tables: list[str] = ["stores", "orders"]
logger.info(f"Validating {source_schema} schema and required tables...")

try:
Expand Down Expand Up @@ -192,29 +196,28 @@ def _analyze_and_display_stores(session: Session, source_schema: str) -> None:
)


def _create_weekly_stores_table(
def _create_weekly_stores_dynamic_table(
session: Session,
source_schema: str,
target_schema: str,
warehouse: str,
) -> list[Row]:
"""Create weekly_stores table with store-week combinations.
"""Create weekly_stores dynamic table with store-week combinations.

Creates one row per store per week using reference_date (Monday week start).

Returns:
List of store information rows.
"""
logger.info("\n2. Creating weekly_stores table (store-week combinations)...")
logger.info("\n2. Creating weekly_stores dynamic table (store-week combinations)...")
logger.info(" reference_date is Monday (week start) from DATE_TRUNC('week', ...)")

_ = session.sql(
load_sql(path="preparation/drop_weekly_stores.sql", target_schema=target_schema)
).collect()
_ = session.sql(
query=load_sql(
path="preparation/create_weekly_stores.sql",
source_schema=source_schema,
target_schema=target_schema,
warehouse=warehouse,
)
).collect()

Expand Down Expand Up @@ -244,20 +247,22 @@ def _create_weekly_stores_table(
return per_store


def _create_target_view(
def _create_target_dynamic_table(
session: Session,
source_schema: str,
target_schema: str,
table_name: str,
warehouse: str,
) -> None:
"""Create view with target - total sales for the following week per store."""
logger.info("\n3. Creating target view: next week's total sales per store...")
"""Create dynamic table with target - total sales for the following week per store."""
logger.info("\n3. Creating target dynamic table: next week's total sales per store...")
_ = session.sql(
query=load_sql(
path="preparation/calculate_target.sql",
source_schema=source_schema,
target_schema=target_schema,
table_name=table_name,
warehouse=warehouse,
)
).collect()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Copy data from staged Parquet file into table
-- Uses MATCH_BY_COLUMN_NAME to map Parquet columns to table columns
COPY INTO {schema_name}.{table_name}
FROM @{schema_name}.{stage_name}/{table_name}.parquet
FROM @{schema_name}.{stage_name}/{source_name}.parquet
FILE_FORMAT = (FORMAT_NAME = '{schema_name}.PARQUET_FORMAT')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE OR REPLACE TABLE {schema_name}.{table_name}
))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@{schema_name}.{stage_name}/{table_name}.parquet',
LOCATION=>'@{schema_name}.{stage_name}/{source_name}.parquet',
FILE_FORMAT=>'{schema_name}.PARQUET_FORMAT'
)
));
4 changes: 2 additions & 2 deletions integration/snowflake/data/sql/preparation/analyze_stores.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ SELECT
MAX(TRY_TO_TIMESTAMP(o.ordered_at)) as last_order,
COUNT(o.id) as total_orders,
SUM(COALESCE(o.order_total, 0)) / 100.0 as total_sales
FROM {source_schema}.raw_stores s
LEFT JOIN {source_schema}.raw_orders o ON o.store_id = s.id
FROM {source_schema}.stores s
LEFT JOIN {source_schema}.orders o ON o.store_id = s.id
GROUP BY s.id, s.name, s.opened_at
ORDER BY TRY_TO_TIMESTAMP(s.opened_at);
11 changes: 7 additions & 4 deletions integration/snowflake/data/sql/preparation/calculate_target.sql
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
-- Create view with target: next week's sales per store
-- Create dynamic table with target: next week's sales per store
--
-- This view joins weekly_stores with pre-aggregated order totals.
-- Dynamic table joins weekly_stores with pre-aggregated order totals.
-- Target is the sum of order_total for the 7-day window starting at reference_date.
--
-- Window: [reference_date, reference_date + 7 days)
-- - reference_date is Monday 00:00:00 (week start)
-- - Target covers Monday through Sunday of that week
--
-- Optimized: Single aggregation pass instead of correlated subqueries
CREATE OR REPLACE VIEW {target_schema}.{table_name} AS
CREATE OR REPLACE DYNAMIC TABLE {target_schema}.{table_name}
TARGET_LAG = '1 day'
WAREHOUSE = {warehouse}
AS
WITH weekly_order_totals AS (
SELECT
store_id,
DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at)) as week_start,
SUM(order_total) / 100.0 as week_sales,
COUNT(*) as week_orders
FROM {source_schema}.raw_orders
FROM {source_schema}.orders
WHERE ordered_at IS NOT NULL
GROUP BY store_id, DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at))
)
Expand Down
24 changes: 13 additions & 11 deletions integration/snowflake/data/sql/preparation/create_weekly_stores.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Create store-week combinations for weekly sales forecasting
--
-- This table creates the base data for getML: one row per store per week.
-- This dynamic table creates the base data for getML: one row per store per week.
-- reference_date is the Monday (week start) derived from DATE_TRUNC('week', ordered_at).
--
-- Filtering logic:
Expand All @@ -11,7 +11,10 @@
-- - is_full_week_after_opening: Store had a full week of operation before this week
-- - has_order_activity: Store has order data spanning this week
-- - has_min_history: At least 7 days since store opened
CREATE TABLE {target_schema}.weekly_stores AS
CREATE OR REPLACE DYNAMIC TABLE {target_schema}.weekly_stores
TARGET_LAG = '1 day'
WAREHOUSE = {warehouse}
AS
WITH store_activity AS (
SELECT
s.id as store_id,
Expand All @@ -22,20 +25,20 @@ WITH store_activity AS (
MAX(TRY_TO_TIMESTAMP(o.ordered_at)) as last_order_date,
DATE_TRUNC('week', MIN(TRY_TO_TIMESTAMP(o.ordered_at))) as first_order_week,
DATE_TRUNC('week', MAX(TRY_TO_TIMESTAMP(o.ordered_at))) as last_order_week
FROM {source_schema}.raw_stores s
LEFT JOIN {source_schema}.raw_orders o ON o.store_id = s.id
FROM {source_schema}.stores s
LEFT JOIN {source_schema}.orders o ON o.store_id = s.id
GROUP BY s.id, s.name, TRY_TO_TIMESTAMP(s.opened_at)
),

all_weeks AS (
SELECT DISTINCT
DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at)) as reference_date
FROM {source_schema}.raw_orders
FROM {source_schema}.orders
WHERE ordered_at IS NOT NULL
),

store_weeks AS (
SELECT
SELECT
sa.store_id,
sa.store_name,
w.reference_date,
Expand All @@ -49,8 +52,8 @@ store_weeks AS (
AND w.reference_date < sa.last_order_week
)

SELECT
ROW_NUMBER() OVER (ORDER BY reference_date, store_id) as snapshot_id,
SELECT
HASH(store_id, reference_date) as snapshot_id,
store_id,
store_name,
reference_date,
Expand All @@ -59,9 +62,8 @@ SELECT
EXTRACT(week FROM reference_date) as week_number,
DATEDIFF('day', opened_at, reference_date) as days_since_open,
reference_date >= first_full_week as is_full_week_after_opening,
first_order_week IS NOT NULL
first_order_week IS NOT NULL
AND reference_date >= first_order_week
AND reference_date < last_order_week as has_order_activity,
DATEDIFF('day', opened_at, reference_date) >= 7 as has_min_history
FROM store_weeks
ORDER BY reference_date, store_id;
FROM store_weeks;
4 changes: 3 additions & 1 deletion integration/snowflake/mise.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
[env]
SNOWFLAKE_ACCOUNT = "{{ exec(command='op read --account getml.1password.eu \"op://svibqfali56qralxbzg6wanc5i/Snowflake/account\"', cache_key='snowflake_account', duration='1w') }}"
SNOWFLAKE_USER = "{{ exec(command='op read --account getml.1password.eu \"op://svibqfali56qralxbzg6wanc5i/Snowflake/username\"', cache_key='snowflake_user', duration='1w') }}"
SNOWFLAKE_PASSWORD = "{{ exec(command='op read --account getml.1password.eu \"op://svibqfali56qralxbzg6wanc5i/Snowflake/password\"', cache_key='snowflake_password', duration='1w') }}"
SNOWFLAKE_PASSWORD = "{{ exec(command='op read --account getml.1password.eu \"op://svibqfali56qralxbzg6wanc5i/Snowflake/access_token\"', cache_key='snowflake_access_token__', duration='1w') }}"

SNOWFLAKE_ROLE = "ACCOUNTADMIN"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"
SNOWFLAKE_DATABASE = "JAFFLE_SHOP"
SNOWFLAKE_SCHEMA = "RAW"

FDS_API_KEY = "{{ exec(command='op read --account getml.1password.eu \"op://svibqfali56qralxbzg6wanc5i/SupabaseProject/TEST_API_KEY\"', cache_key='fds_api_key', duration='1w') }}"
Loading