Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 16 additions & 45 deletions amiadapters/storage/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from datetime import datetime, time
from datetime import datetime
import logging
from typing import List, Set

Expand Down Expand Up @@ -999,53 +999,24 @@ def calculate_end_of_backfill_range(
logger.info(f"Set earliest to {next_end.isoformat()}")
return earliest

# Calculate nth percentile of number of readings per day
# We will use that as a threshold for what we consider "already backfilled"
percentile_query = """
SELECT PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY num_readings) AS nth_percentile
FROM (
SELECT count(*) as num_readings
FROM readings WHERE org_id = ? AND flowtime > ? AND flowtime < ?
GROUP BY date(flowtime)
)
"""
percentile_result = conn.cursor().execute(
percentile_query, (org_id, min_date, max_date)
)
percentile_rows = [i for i in percentile_result]
if len(percentile_rows) != 1:
threshold = 0
else:
threshold = float(percentile_rows[0][0])

# Lower threshold by x% to allow dates with legitimately lower volume to be considered backfilled
threshold = 0.5 * threshold

# Find the oldest day in the range that we've already backfilled
query = """
SELECT MIN(flow_date) from (
SELECT DATE(flowtime) as flow_date
FROM readings
# Use MIN(flowtime) as the leading edge of already-backfilled data.
# Each successful chunk pushes MIN earlier, so the next chunk
# advances naturally. Replaces an earlier percentile-threshold
# heuristic that got stuck on orgs whose leading edge had
# below-threshold per-day read volume (legitimate ramp-up at the
# start of vendor data).
cursor = conn.cursor()
cursor.execute(
"""
SELECT MIN(flowtime) FROM readings
WHERE org_id = ? AND flowtime > ? AND flowtime < ?
GROUP BY DATE(flowtime)
HAVING COUNT(*) > ?
)
"""
result = conn.cursor().execute(
query,
(
org_id,
min_date,
max_date,
threshold,
),
""",
(org_id, min_date, max_date),
)
rows = [i for i in result]
if rows is None or len(rows) != 1 or rows[0][0] is None:
row = cursor.fetchone()
if row is None or row[0] is None:
return max_date

result = rows[0][0]
return datetime.combine(result, time(0, 0))
return row[0]


class SnowflakeMetersUniqueByDeviceIdCheck(BaseAMIDataQualityCheck):
Expand Down
24 changes: 24 additions & 0 deletions test/amiadapters/storage/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,27 @@ def test_verify_no_duplicate_reads_and_return_oldest_flowtime_finds_oldest_flowt
reads[1].flowtime,
oldest_flowtime,
)

def test_calculate_end_of_backfill_range__returns_min_flowtime(self):
expected_min = datetime.datetime(2024, 11, 29, 19, 0, tzinfo=pytz.UTC)
self.mock_cursor.fetchone.return_value = (expected_min,)

result = self.snowflake_sink.calculate_end_of_backfill_range(
"some_org",
datetime.datetime(2023, 1, 1),
datetime.datetime(2026, 4, 23),
)

self.assertEqual(expected_min, result)

def test_calculate_end_of_backfill_range__returns_max_date_when_no_readings(self):
max_date = datetime.datetime(2026, 4, 23)
self.mock_cursor.fetchone.return_value = (None,)

result = self.snowflake_sink.calculate_end_of_backfill_range(
"some_org",
datetime.datetime(2023, 1, 1),
max_date,
)

self.assertEqual(max_date, result)
Loading