From b369b67b09940116e79eb1fee38fc422f3bb6dd3 Mon Sep 17 00:00:00 2001 From: Dave Marulli Date: Fri, 8 May 2026 12:51:26 -0400 Subject: [PATCH 1/3] Replace percentile heuristic with MIN(flowtime) for backfill chunk selection The heuristic-based chunk selector got stuck on orgs whose leading edge of backfilled data had below-threshold per-day read volume (legitimate ramp-up at the start of vendor data). cadc_crescent's backfill stopped advancing at 2024-11-29 even though Xylem has data back to 2023-01-01. Replace with raw MIN(flowtime) -- each successful chunk pushes MIN earlier, so the next chunk advances naturally. No state tracking, no per-org heuristics. The cadc_thousand_oaks branch is left untouched (separate PR). --- amiadapters/storage/snowflake.py | 59 ++++++++------------------------ 1 file changed, 14 insertions(+), 45 deletions(-) diff --git a/amiadapters/storage/snowflake.py b/amiadapters/storage/snowflake.py index 2002b4f..0360282 100644 --- a/amiadapters/storage/snowflake.py +++ b/amiadapters/storage/snowflake.py @@ -999,53 +999,22 @@ def calculate_end_of_backfill_range( logger.info(f"Set earliest to {next_end.isoformat()}") return earliest - # Calculate nth percentile of number of readings per day - # We will use that as a threshold for what we consider "already backfilled" - percentile_query = """ - SELECT PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY num_readings) AS nth_percentile - FROM ( - SELECT count(*) as num_readings - FROM readings WHERE org_id = ? AND flowtime > ? AND flowtime < ? - GROUP BY date(flowtime) - ) - """ - percentile_result = conn.cursor().execute( - percentile_query, (org_id, min_date, max_date) - ) - percentile_rows = [i for i in percentile_result] - if len(percentile_rows) != 1: - threshold = 0 - else: - threshold = float(percentile_rows[0][0]) - - # Lower threshold by x% to allow dates with legitimately lower volume to be considered backfilled - threshold = 0.5 * threshold - - # Find the oldest day in the range that we've already backfilled - query = """ - SELECT MIN(flow_date) from ( - SELECT DATE(flowtime) as flow_date - FROM readings - WHERE org_id = ? AND flowtime > ? AND flowtime < ? - GROUP BY DATE(flowtime) - HAVING COUNT(*) > ? - ) - """ + # Use MIN(flowtime) as the leading edge of already-backfilled data. + # Each successful chunk pushes MIN earlier, so the next chunk + # advances naturally. Replaces an earlier percentile-threshold + # heuristic that got stuck on orgs whose leading edge had + # below-threshold per-day read volume (legitimate ramp-up at the + # start of vendor data). result = conn.cursor().execute( - query, - ( - org_id, - min_date, - max_date, - threshold, - ), - ) - rows = [i for i in result] - if rows is None or len(rows) != 1 or rows[0][0] is None: + """ + SELECT MIN(flowtime) FROM readings + WHERE org_id = ? AND flowtime > ? AND flowtime < ? + """, + (org_id, min_date, max_date), + ).fetchall() + if not result or result[0][0] is None: return max_date - - result = rows[0][0] - return datetime.combine(result, time(0, 0)) + return result[0][0] class SnowflakeMetersUniqueByDeviceIdCheck(BaseAMIDataQualityCheck): From 33f03ac57d32e287c39ce39f2a5bfef446fec357 Mon Sep 17 00:00:00 2001 From: Dave Marulli Date: Fri, 8 May 2026 13:36:38 -0400 Subject: [PATCH 2/3] Drop unused time import, add tests for calculate_end_of_backfill_range --- amiadapters/storage/snowflake.py | 2 +- test/amiadapters/storage/test_snowflake.py | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/amiadapters/storage/snowflake.py b/amiadapters/storage/snowflake.py index 0360282..c3f8776 100644 --- a/amiadapters/storage/snowflake.py +++ b/amiadapters/storage/snowflake.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from datetime import datetime, time +from datetime import datetime import logging from typing import List, Set diff --git a/test/amiadapters/storage/test_snowflake.py b/test/amiadapters/storage/test_snowflake.py index 4a22289..e8bc02e 100644 --- a/test/amiadapters/storage/test_snowflake.py +++ b/test/amiadapters/storage/test_snowflake.py @@ -472,3 +472,27 @@ def test_verify_no_duplicate_reads_and_return_oldest_flowtime_finds_oldest_flowt reads[1].flowtime, oldest_flowtime, ) + + def test_calculate_end_of_backfill_range__returns_min_flowtime(self): + expected_min = datetime.datetime(2024, 11, 29, 19, 0, tzinfo=pytz.UTC) + self.mock_cursor.execute.return_value.fetchall.return_value = [(expected_min,)] + + result = self.snowflake_sink.calculate_end_of_backfill_range( + "some_org", + datetime.datetime(2023, 1, 1), + datetime.datetime(2026, 4, 23), + ) + + self.assertEqual(expected_min, result) + + def test_calculate_end_of_backfill_range__returns_max_date_when_no_readings(self): + max_date = datetime.datetime(2026, 4, 23) + self.mock_cursor.execute.return_value.fetchall.return_value = [(None,)] + + result = self.snowflake_sink.calculate_end_of_backfill_range( + "some_org", + datetime.datetime(2023, 1, 1), + max_date, + ) + + self.assertEqual(max_date, result) From f88e9d25d26cf81f642478bd6e6586b4b5a05336 Mon Sep 17 00:00:00 2001 From: Dave Marulli Date: Fri, 8 May 2026 13:50:41 -0400 Subject: [PATCH 3/3] Restructure cursor call to satisfy Black 26 chained-call wrap rule --- amiadapters/storage/snowflake.py | 10 ++++++---- test/amiadapters/storage/test_snowflake.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/amiadapters/storage/snowflake.py b/amiadapters/storage/snowflake.py index c3f8776..a65ad57 100644 --- a/amiadapters/storage/snowflake.py +++ b/amiadapters/storage/snowflake.py @@ -1005,16 +1005,18 @@ def calculate_end_of_backfill_range( # heuristic that got stuck on orgs whose leading edge had # below-threshold per-day read volume (legitimate ramp-up at the # start of vendor data). - result = conn.cursor().execute( + cursor = conn.cursor() + cursor.execute( """ SELECT MIN(flowtime) FROM readings WHERE org_id = ? AND flowtime > ? AND flowtime < ? """, (org_id, min_date, max_date), - ).fetchall() - if not result or result[0][0] is None: + ) + row = cursor.fetchone() + if row is None or row[0] is None: return max_date - return result[0][0] + return row[0] class SnowflakeMetersUniqueByDeviceIdCheck(BaseAMIDataQualityCheck): diff --git a/test/amiadapters/storage/test_snowflake.py b/test/amiadapters/storage/test_snowflake.py index e8bc02e..03410ad 100644 --- a/test/amiadapters/storage/test_snowflake.py +++ b/test/amiadapters/storage/test_snowflake.py @@ -475,7 +475,7 @@ def test_verify_no_duplicate_reads_and_return_oldest_flowtime_finds_oldest_flowt def test_calculate_end_of_backfill_range__returns_min_flowtime(self): expected_min = datetime.datetime(2024, 11, 29, 19, 0, tzinfo=pytz.UTC) - self.mock_cursor.execute.return_value.fetchall.return_value = [(expected_min,)] + self.mock_cursor.fetchone.return_value = (expected_min,) result = self.snowflake_sink.calculate_end_of_backfill_range( "some_org", @@ -487,7 +487,7 @@ def test_calculate_end_of_backfill_range__returns_min_flowtime(self): def test_calculate_end_of_backfill_range__returns_max_date_when_no_readings(self): max_date = datetime.datetime(2026, 4, 23) - self.mock_cursor.execute.return_value.fetchall.return_value = [(None,)] + self.mock_cursor.fetchone.return_value = (None,) result = self.snowflake_sink.calculate_end_of_backfill_range( "some_org",