Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions runtimes/v2/azure_functions_runtime/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,16 @@ def build_fixed_delay_retry(protos, retry, max_retry_count, retry_strategy):


def build_variable_interval_retry(protos, retry, max_retry_count, retry_strategy):
try:
from google.protobuf.duration_pb2 import Duration
except ImportError:
raise ImportError(
"protobuf not found when trying to "
"import Duration."
"Sys Path: %s. "
"Sys Modules: %s. ",
sys.path, sys.modules)
minimum_interval = Duration(
seconds=convert_to_seconds(
retry.get(RetryPolicy.MINIMUM_INTERVAL.value))
)
maximum_interval = Duration(
seconds=convert_to_seconds(
retry.get(RetryPolicy.MAXIMUM_INTERVAL.value))
)
# Get minimum_interval with default of 00:00:00 (0 seconds)
min_interval_str = retry.get(RetryPolicy.MINIMUM_INTERVAL.value)
min_seconds = convert_to_seconds(min_interval_str) if min_interval_str else 0
minimum_interval = timedelta(seconds=min_seconds)

# Get maximum_interval with default of TimeSpan.MaxValue equivalent (max int32 seconds)
max_interval_str = retry.get(RetryPolicy.MAXIMUM_INTERVAL.value)
max_seconds = convert_to_seconds(max_interval_str) if max_interval_str else 2147483647
maximum_interval = timedelta(seconds=max_seconds)

return protos.RpcRetryOptions(
max_retry_count=max_retry_count,
retry_strategy=retry_strategy,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import logging

import azure.functions as func

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# Global counter to track retry attempts
retry_attempts = {}


# An HttpTrigger to generate EventHub event from EventHub Output Binding
@app.function_name(name="eventhub_retry_output")
@app.route(route="eventhub_retry_output")
@app.event_hub_output(arg_name="event",
event_hub_name="python-worker-ci-eventhub-retry",
connection="AzureWebJobsEventHubConnectionString")
def eventhub_retry_output(req: func.HttpRequest, event: func.Out[str]):
event.set(req.get_body().decode('utf-8'))
return 'OK'


# EventHub trigger with exponential backoff retry policy (no explicit intervals)
@app.function_name(name="eventhub_retry_trigger")
@app.retry(
strategy="exponential_backoff",
max_retry_count="3"
# Note: minimum_interval and maximum_interval are optional
# and will use default values if not specified
)
@app.event_hub_message_trigger(
arg_name="event",
event_hub_name="python-worker-ci-eventhub-retry",
connection="AzureWebJobsEventHubConnectionString"
)
@app.blob_output(arg_name="$return",
path="python-worker-tests/test-eventhub-retry-triggered.txt",
connection="AzureWebJobsStorage")
def eventhub_retry_trigger(event: func.EventHubEvent, context: func.Context) -> bytes:
event_id = event.get_body().decode('utf-8')
retry_count = context.retry_context.retry_count if context.retry_context else 0
max_retry = context.retry_context.max_retry_count if context.retry_context else 0

logging.info(f'EventHub retry trigger processed event: {event_id}, '
f'retry count: {retry_count}, max retry: {max_retry}')

# Track retry attempts
if event_id not in retry_attempts:
retry_attempts[event_id] = []
retry_attempts[event_id].append(retry_count)

# Create result dictionary
result = {
'event_id': event_id,
'retry_count': retry_count,
'max_retry_count': max_retry,
'all_attempts': retry_attempts[event_id]
}

# Fail on first two attempts to test retry
if retry_count < 2:
logging.warning(f'Simulating failure for retry test (attempt {retry_count})')
raise Exception(f"Simulated failure for retry testing (attempt {retry_count})")

# Success on third attempt
logging.info(f'Success on attempt {retry_count}')
return json.dumps(result).encode('utf-8')


# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_eventhub_retry_triggered")
@app.route(route="get_eventhub_retry_triggered")
@app.blob_input(arg_name="file",
path="python-worker-tests/test-eventhub-retry-triggered.txt",
connection="AzureWebJobsStorage")
def get_eventhub_retry_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return file.read().decode('utf-8')


# HTTP endpoint to check retry state (for testing)
@app.function_name(name="get_retry_state")
@app.route(route="get_retry_state")
def get_retry_state(req: func.HttpRequest) -> str:
return json.dumps(retry_attempts)


# HTTP endpoint to reset retry state
@app.function_name(name="reset_retry_state")
@app.route(route="reset_retry_state")
def reset_retry_state(req: func.HttpRequest) -> str:
global retry_attempts
retry_attempts = {}
return 'Reset complete'
51 changes: 51 additions & 0 deletions workers/tests/emulator_tests/test_eventhub_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,57 @@ def get_script_dir(cls):
'eventhub_functions_stein' / 'generic'


class TestEventHubRetryStein(testutils.WebHostTestCase):
"""Test EventHub Trigger with Retry Policy (exponential backoff without explicit intervals)."""

@classmethod
def get_script_dir(cls):
return testutils.EMULATOR_TESTS_FOLDER / 'eventhub_functions' / \
'eventhub_retry_stein'

@classmethod
def get_libraries_to_install(cls):
return ['azure-eventhub']

@testutils.retryable_test(3, 5)
def test_eventhub_retry_trigger_with_default_intervals(self):
"""Test that exponential backoff retry works without explicit min/max intervals."""
# Generate a unique event ID
event_id = f"retry-test-{round(time.time())}"
doc = {'id': event_id}

# Reset retry state
r = self.webhost.request('POST', 'reset_retry_state')
self.assertEqual(r.status_code, 200)

# Send event to EventHub
r = self.webhost.request('POST', 'eventhub_retry_output',
data=json.dumps(doc))
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, 'OK')

# Wait for retries to complete (with exponential backoff)
# First attempt: immediate
# Second attempt: after backoff
# Third attempt: after longer backoff
time.sleep(15)

# Retrieve the result from blob storage
r = self.webhost.request('GET', 'get_eventhub_retry_triggered')
self.assertEqual(r.status_code, 200)

result = json.loads(r.text)

# Verify the event was processed after retries
self.assertEqual(result['event_id'], event_id)
self.assertEqual(result['retry_count'], 2) # Should succeed on third attempt (count 2)
self.assertEqual(result['max_retry_count'], 3)

# Verify all retry attempts were tracked
self.assertEqual(len(result['all_attempts']), 3) # 0, 1, 2
self.assertEqual(result['all_attempts'], [0, 1, 2])


@skipIf(sys.version_info.minor >= 14, "Skip to figure out uamqp.")
class TestEventHubFunctionsSDK(TestEventHubFunctions):

Expand Down
9 changes: 9 additions & 0 deletions workers/tests/emulator_tests/utils/eventhub/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@
"Name": "cg1"
}
]
},
{
"Name": "python-worker-ci-eventhub-retry",
"PartitionCount": 2,
"ConsumerGroups": [
{
"Name": "cg1"
}
]
}
]
}
Expand Down
16 changes: 16 additions & 0 deletions workers/tests/unittests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ def test_loader_building_exponential_retry_protos(self):
self.assertEqual(protos.minimum_interval.seconds, 60)
self.assertEqual(protos.maximum_interval.seconds, 120)

def test_loader_building_exponential_retry_protos_with_defaults(self):
"""Test exponential backoff without explicit min/max intervals (uses defaults)"""
trigger = TimerTrigger(schedule="*/1 * * * * *", arg_name="mytimer",
name="mytimer")
self.func.add_trigger(trigger=trigger)
setting = RetryPolicy(strategy="exponential_backoff",
max_retry_count="3")
self.func.add_setting(setting=setting)

protos = build_retry_protos(self.func)
self.assertEqual(protos.max_retry_count, 3)
self.assertEqual(protos.retry_strategy,
0) # 0 enum for exponential backoff
self.assertEqual(protos.minimum_interval.seconds, 0) # Default: 0 seconds
self.assertEqual(protos.maximum_interval.seconds, 2147483647) # Default: max int32

@patch('azure_functions_worker.logging.logger.warning')
def test_loader_retry_policy_attribute_error(self, mock_logger):
self.func = Mock()
Expand Down
Loading