diff --git a/runtimes/v2/azure_functions_runtime/loader.py b/runtimes/v2/azure_functions_runtime/loader.py index d2edc046..3f5b54e6 100644 --- a/runtimes/v2/azure_functions_runtime/loader.py +++ b/runtimes/v2/azure_functions_runtime/loader.py @@ -91,23 +91,16 @@ def build_fixed_delay_retry(protos, retry, max_retry_count, retry_strategy): def build_variable_interval_retry(protos, retry, max_retry_count, retry_strategy): - try: - from google.protobuf.duration_pb2 import Duration - except ImportError: - raise ImportError( - "protobuf not found when trying to " - "import Duration." - "Sys Path: %s. " - "Sys Modules: %s. ", - sys.path, sys.modules) - minimum_interval = Duration( - seconds=convert_to_seconds( - retry.get(RetryPolicy.MINIMUM_INTERVAL.value)) - ) - maximum_interval = Duration( - seconds=convert_to_seconds( - retry.get(RetryPolicy.MAXIMUM_INTERVAL.value)) - ) + # Get minimum_interval with default of 00:00:00 (0 seconds) + min_interval_str = retry.get(RetryPolicy.MINIMUM_INTERVAL.value) + min_seconds = convert_to_seconds(min_interval_str) if min_interval_str else 0 + minimum_interval = timedelta(seconds=min_seconds) + + # Get maximum_interval with default of TimeSpan.MaxValue equivalent (max int32 seconds) + max_interval_str = retry.get(RetryPolicy.MAXIMUM_INTERVAL.value) + max_seconds = convert_to_seconds(max_interval_str) if max_interval_str else 2147483647 + maximum_interval = timedelta(seconds=max_seconds) + return protos.RpcRetryOptions( max_retry_count=max_retry_count, retry_strategy=retry_strategy, diff --git a/workers/tests/emulator_tests/eventhub_functions/eventhub_retry_stein/function_app.py b/workers/tests/emulator_tests/eventhub_functions/eventhub_retry_stein/function_app.py new file mode 100644 index 00000000..f49b2e8d --- /dev/null +++ b/workers/tests/emulator_tests/eventhub_functions/eventhub_retry_stein/function_app.py @@ -0,0 +1,94 @@ +import json +import logging + +import azure.functions as func + +app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +# Global counter to track retry attempts +retry_attempts = {} + + +# An HttpTrigger to generate EventHub event from EventHub Output Binding +@app.function_name(name="eventhub_retry_output") +@app.route(route="eventhub_retry_output") +@app.event_hub_output(arg_name="event", + event_hub_name="python-worker-ci-eventhub-retry", + connection="AzureWebJobsEventHubConnectionString") +def eventhub_retry_output(req: func.HttpRequest, event: func.Out[str]): + event.set(req.get_body().decode('utf-8')) + return 'OK' + + +# EventHub trigger with exponential backoff retry policy (no explicit intervals) +@app.function_name(name="eventhub_retry_trigger") +@app.retry( + strategy="exponential_backoff", + max_retry_count="3" + # Note: minimum_interval and maximum_interval are optional + # and will use default values if not specified +) +@app.event_hub_message_trigger( + arg_name="event", + event_hub_name="python-worker-ci-eventhub-retry", + connection="AzureWebJobsEventHubConnectionString" +) +@app.blob_output(arg_name="$return", + path="python-worker-tests/test-eventhub-retry-triggered.txt", + connection="AzureWebJobsStorage") +def eventhub_retry_trigger(event: func.EventHubEvent, context: func.Context) -> bytes: + event_id = event.get_body().decode('utf-8') + retry_count = context.retry_context.retry_count if context.retry_context else 0 + max_retry = context.retry_context.max_retry_count if context.retry_context else 0 + + logging.info(f'EventHub retry trigger processed event: {event_id}, ' + f'retry count: {retry_count}, max retry: {max_retry}') + + # Track retry attempts + if event_id not in retry_attempts: + retry_attempts[event_id] = [] + retry_attempts[event_id].append(retry_count) + + # Create result dictionary + result = { + 'event_id': event_id, + 'retry_count': retry_count, + 'max_retry_count': max_retry, + 'all_attempts': retry_attempts[event_id] + } + + # Fail on first two attempts to test retry + if retry_count < 2: + logging.warning(f'Simulating failure for retry test (attempt {retry_count})') + raise Exception(f"Simulated failure for retry testing (attempt {retry_count})") + + # Success on third attempt + logging.info(f'Success on attempt {retry_count}') + return json.dumps(result).encode('utf-8') + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_eventhub_retry_triggered") +@app.route(route="get_eventhub_retry_triggered") +@app.blob_input(arg_name="file", + path="python-worker-tests/test-eventhub-retry-triggered.txt", + connection="AzureWebJobsStorage") +def get_eventhub_retry_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +# HTTP endpoint to check retry state (for testing) +@app.function_name(name="get_retry_state") +@app.route(route="get_retry_state") +def get_retry_state(req: func.HttpRequest) -> str: + return json.dumps(retry_attempts) + + +# HTTP endpoint to reset retry state +@app.function_name(name="reset_retry_state") +@app.route(route="reset_retry_state") +def reset_retry_state(req: func.HttpRequest) -> str: + global retry_attempts + retry_attempts = {} + return 'Reset complete' diff --git a/workers/tests/emulator_tests/test_eventhub_functions.py b/workers/tests/emulator_tests/test_eventhub_functions.py index d6559e67..d82e88f0 100644 --- a/workers/tests/emulator_tests/test_eventhub_functions.py +++ b/workers/tests/emulator_tests/test_eventhub_functions.py @@ -120,6 +120,57 @@ def get_script_dir(cls): 'eventhub_functions_stein' / 'generic' +class TestEventHubRetryStein(testutils.WebHostTestCase): + """Test EventHub Trigger with Retry Policy (exponential backoff without explicit intervals).""" + + @classmethod + def get_script_dir(cls): + return testutils.EMULATOR_TESTS_FOLDER / 'eventhub_functions' / \ + 'eventhub_retry_stein' + + @classmethod + def get_libraries_to_install(cls): + return ['azure-eventhub'] + + @testutils.retryable_test(3, 5) + def test_eventhub_retry_trigger_with_default_intervals(self): + """Test that exponential backoff retry works without explicit min/max intervals.""" + # Generate a unique event ID + event_id = f"retry-test-{round(time.time())}" + doc = {'id': event_id} + + # Reset retry state + r = self.webhost.request('POST', 'reset_retry_state') + self.assertEqual(r.status_code, 200) + + # Send event to EventHub + r = self.webhost.request('POST', 'eventhub_retry_output', + data=json.dumps(doc)) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, 'OK') + + # Wait for retries to complete (with exponential backoff) + # First attempt: immediate + # Second attempt: after backoff + # Third attempt: after longer backoff + time.sleep(15) + + # Retrieve the result from blob storage + r = self.webhost.request('GET', 'get_eventhub_retry_triggered') + self.assertEqual(r.status_code, 200) + + result = json.loads(r.text) + + # Verify the event was processed after retries + self.assertEqual(result['event_id'], event_id) + self.assertEqual(result['retry_count'], 2) # Should succeed on third attempt (count 2) + self.assertEqual(result['max_retry_count'], 3) + + # Verify all retry attempts were tracked + self.assertEqual(len(result['all_attempts']), 3) # 0, 1, 2 + self.assertEqual(result['all_attempts'], [0, 1, 2]) + + @skipIf(sys.version_info.minor >= 14, "Skip to figure out uamqp.") class TestEventHubFunctionsSDK(TestEventHubFunctions): diff --git a/workers/tests/emulator_tests/utils/eventhub/config.json b/workers/tests/emulator_tests/utils/eventhub/config.json index ba14a13b..2cf43126 100644 --- a/workers/tests/emulator_tests/utils/eventhub/config.json +++ b/workers/tests/emulator_tests/utils/eventhub/config.json @@ -58,6 +58,15 @@ "Name": "cg1" } ] + }, + { + "Name": "python-worker-ci-eventhub-retry", + "PartitionCount": 2, + "ConsumerGroups": [ + { + "Name": "cg1" + } + ] } ] } diff --git a/workers/tests/unittests/test_loader.py b/workers/tests/unittests/test_loader.py index a6af8faa..ef8ac15c 100644 --- a/workers/tests/unittests/test_loader.py +++ b/workers/tests/unittests/test_loader.py @@ -66,6 +66,22 @@ def test_loader_building_exponential_retry_protos(self): self.assertEqual(protos.minimum_interval.seconds, 60) self.assertEqual(protos.maximum_interval.seconds, 120) + def test_loader_building_exponential_retry_protos_with_defaults(self): + """Test exponential backoff without explicit min/max intervals (uses defaults)""" + trigger = TimerTrigger(schedule="*/1 * * * * *", arg_name="mytimer", + name="mytimer") + self.func.add_trigger(trigger=trigger) + setting = RetryPolicy(strategy="exponential_backoff", + max_retry_count="3") + self.func.add_setting(setting=setting) + + protos = build_retry_protos(self.func) + self.assertEqual(protos.max_retry_count, 3) + self.assertEqual(protos.retry_strategy, + 0) # 0 enum for exponential backoff + self.assertEqual(protos.minimum_interval.seconds, 0) # Default: 0 seconds + self.assertEqual(protos.maximum_interval.seconds, 2147483647) # Default: max int32 + @patch('azure_functions_worker.logging.logger.warning') def test_loader_retry_policy_attribute_error(self, mock_logger): self.func = Mock()