diff --git a/amiadapters/storage/snowflake.py b/amiadapters/storage/snowflake.py index 2002b4f..a65ad57 100644 --- a/amiadapters/storage/snowflake.py +++ b/amiadapters/storage/snowflake.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from datetime import datetime, time +from datetime import datetime import logging from typing import List, Set @@ -999,53 +999,24 @@ def calculate_end_of_backfill_range( logger.info(f"Set earliest to {next_end.isoformat()}") return earliest - # Calculate nth percentile of number of readings per day - # We will use that as a threshold for what we consider "already backfilled" - percentile_query = """ - SELECT PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY num_readings) AS nth_percentile - FROM ( - SELECT count(*) as num_readings - FROM readings WHERE org_id = ? AND flowtime > ? AND flowtime < ? - GROUP BY date(flowtime) - ) - """ - percentile_result = conn.cursor().execute( - percentile_query, (org_id, min_date, max_date) - ) - percentile_rows = [i for i in percentile_result] - if len(percentile_rows) != 1: - threshold = 0 - else: - threshold = float(percentile_rows[0][0]) - - # Lower threshold by x% to allow dates with legitimately lower volume to be considered backfilled - threshold = 0.5 * threshold - - # Find the oldest day in the range that we've already backfilled - query = """ - SELECT MIN(flow_date) from ( - SELECT DATE(flowtime) as flow_date - FROM readings + # Use MIN(flowtime) as the leading edge of already-backfilled data. + # Each successful chunk pushes MIN earlier, so the next chunk + # advances naturally. Replaces an earlier percentile-threshold + # heuristic that got stuck on orgs whose leading edge had + # below-threshold per-day read volume (legitimate ramp-up at the + # start of vendor data). + cursor = conn.cursor() + cursor.execute( + """ + SELECT MIN(flowtime) FROM readings WHERE org_id = ? AND flowtime > ? AND flowtime < ? - GROUP BY DATE(flowtime) - HAVING COUNT(*) > ? - ) - """ - result = conn.cursor().execute( - query, - ( - org_id, - min_date, - max_date, - threshold, - ), + """, + (org_id, min_date, max_date), ) - rows = [i for i in result] - if rows is None or len(rows) != 1 or rows[0][0] is None: + row = cursor.fetchone() + if row is None or row[0] is None: return max_date - - result = rows[0][0] - return datetime.combine(result, time(0, 0)) + return row[0] class SnowflakeMetersUniqueByDeviceIdCheck(BaseAMIDataQualityCheck): diff --git a/test/amiadapters/storage/test_snowflake.py b/test/amiadapters/storage/test_snowflake.py index 4a22289..03410ad 100644 --- a/test/amiadapters/storage/test_snowflake.py +++ b/test/amiadapters/storage/test_snowflake.py @@ -472,3 +472,27 @@ def test_verify_no_duplicate_reads_and_return_oldest_flowtime_finds_oldest_flowt reads[1].flowtime, oldest_flowtime, ) + + def test_calculate_end_of_backfill_range__returns_min_flowtime(self): + expected_min = datetime.datetime(2024, 11, 29, 19, 0, tzinfo=pytz.UTC) + self.mock_cursor.fetchone.return_value = (expected_min,) + + result = self.snowflake_sink.calculate_end_of_backfill_range( + "some_org", + datetime.datetime(2023, 1, 1), + datetime.datetime(2026, 4, 23), + ) + + self.assertEqual(expected_min, result) + + def test_calculate_end_of_backfill_range__returns_max_date_when_no_readings(self): + max_date = datetime.datetime(2026, 4, 23) + self.mock_cursor.fetchone.return_value = (None,) + + result = self.snowflake_sink.calculate_end_of_backfill_range( + "some_org", + datetime.datetime(2023, 1, 1), + max_date, + ) + + self.assertEqual(max_date, result)