diff --git a/ddtrace/internal/openfeature/_provider.py b/ddtrace/internal/openfeature/_provider.py index 8047134fb17..7800d2e5761 100644 --- a/ddtrace/internal/openfeature/_provider.py +++ b/ddtrace/internal/openfeature/_provider.py @@ -8,6 +8,7 @@ from collections import OrderedDict from collections.abc import MutableMapping from importlib.metadata import version +import threading import typing from openfeature.evaluation_context import EvaluationContext @@ -90,11 +91,20 @@ class DataDogProvider(AbstractProvider): Feature Flags and Experimentation (FFE) product. """ - def __init__(self, *args: typing.Any, **kwargs: typing.Any): + def __init__(self, *args: typing.Any, initialization_timeout: typing.Optional[float] = None, **kwargs: typing.Any): super().__init__(*args, **kwargs) self._metadata = Metadata(name="Datadog") self._status = ProviderStatus.NOT_READY - self._config_received = False + + # Initialization timeout: constructor arg takes priority, then env var (default 30s) + if initialization_timeout is not None: + self._initialization_timeout = initialization_timeout + else: + self._initialization_timeout = ffe_config.initialization_timeout_ms / 1000.0 + + # Event used to block initialize() until config arrives. + # Also serves as the "config received" flag via is_set(). + self._config_received = threading.Event() # Cache for reported exposures to prevent duplicates # Stores mapping of (flag_key, subject_id) -> (allocation_key, variant_key) @@ -119,8 +129,6 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any): self._flag_eval_metrics = FlagEvalMetrics() self._flag_eval_hook = FlagEvalHook(self._flag_eval_metrics) - # Register this provider instance for status updates - _register_provider(self) def get_metadata(self) -> Metadata: """Returns provider metadata.""" @@ -142,32 +150,52 @@ def initialize(self, evaluation_context: EvaluationContext) -> None: """ Initialize the provider. - Called by the OpenFeature SDK when the provider is set. - Provider Creation → NOT_READY - ↓ - First Remote Config Payload - ↓ - READY (emits PROVIDER_READY event) - ↓ - Shutdown - ↓ - NOT_READY + Blocks until Remote Config delivers the first FFE configuration or + the initialization timeout expires. + + The timeout is configurable via: + - Constructor: DataDogProvider(initialization_timeout=10.0) # seconds + - Env var: DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS=10000 + + Provider lifecycle: + NOT_READY -> initialize() blocks -> config arrives -> READY + NOT_READY -> initialize() blocks -> timeout -> raises ProviderNotReadyError """ if not self._enabled: return + # Register for RC config callbacks (in initialize, not __init__, so + # re-initialization after shutdown re-registers the provider) + _register_provider(self) + try: # Start the exposure writer for reporting start_exposure_writer() except ServiceStatusError: logger.debug("Exposure writer is already running", exc_info=True) - # If configuration was already received before initialization, emit ready now + # Fast path: config already available (RC delivered before set_provider) config = _get_ffe_config() - if config is not None and not self._config_received: - self._config_received = True + if config is not None: + logger.debug("FFE configuration already available, provider is READY") + self._config_received.set() self._status = ProviderStatus.READY - self._emit_ready_event() + return # SDK will dispatch PROVIDER_READY + + # Block until config arrives or timeout expires + logger.debug( + "Waiting up to %.1fs for initial FFE configuration from Remote Config", self._initialization_timeout + ) + if not self._config_received.wait(timeout=self._initialization_timeout): + # Timeout expired without receiving config + from openfeature.exception import ProviderNotReadyError + + raise ProviderNotReadyError( + f"Provider timed out after {self._initialization_timeout:.1f}s waiting for " + "initial configuration from Remote Config" + ) + + # Config received during wait -- on_configuration_received() already set status def shutdown(self) -> None: """ @@ -196,7 +224,7 @@ def shutdown(self) -> None: # Unregister provider _unregister_provider(self) self._status = ProviderStatus.NOT_READY - self._config_received = False + self._config_received.clear() def resolve_boolean_details( self, @@ -463,14 +491,18 @@ def on_configuration_received(self) -> None: """ Called when a Remote Configuration payload is received and processed. - Emits PROVIDER_READY event on first configuration. + Updates status first, then signals the event to unblock initialize(). + Emits PROVIDER_READY for late arrivals (config received after initialize() timed out). """ - if not self._config_received: - self._config_received = True + if not self._config_received.is_set(): self._status = ProviderStatus.READY logger.debug("First FFE configuration received, provider is now READY") + # Emit READY for late recovery: config arrived after init timed out self._emit_ready_event() + # Signal the event last to unblock initialize() after status is updated + self._config_received.set() + def _emit_ready_event(self) -> None: """ Safely emit PROVIDER_READY event. diff --git a/ddtrace/internal/settings/openfeature.py b/ddtrace/internal/settings/openfeature.py index 5149bcee322..ada8dd42fa1 100644 --- a/ddtrace/internal/settings/openfeature.py +++ b/ddtrace/internal/settings/openfeature.py @@ -30,10 +30,20 @@ class OpenFeatureConfig(DDConfig): default=1.0, ) + # Provider initialization timeout in milliseconds. + # Controls how long initialize() blocks waiting for the first Remote Config payload. + # Default is 30000ms (30 seconds), matching Java, Go, and Node.js SDKs. + initialization_timeout_ms = DDConfig.var( + int, + "DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS", + default=30000, + ) + _openfeature_config_keys = [ "experimental_flagging_provider_enabled", "ffe_intake_enabled", "ffe_intake_heartbeat_interval", + "initialization_timeout_ms", ] diff --git a/releasenotes/notes/fix-openfeature-init-blocking-70c8d5a99287cc49.yaml b/releasenotes/notes/fix-openfeature-init-blocking-70c8d5a99287cc49.yaml new file mode 100644 index 00000000000..3622bad9ca7 --- /dev/null +++ b/releasenotes/notes/fix-openfeature-init-blocking-70c8d5a99287cc49.yaml @@ -0,0 +1,14 @@ +--- +fixes: + - | + openfeature: This fix resolves an issue where ``DataDogProvider.initialize()`` returned before + configuration was received, causing the OpenFeature SDK to mark the provider as ready to serve + evaluations too early and flag evaluations to silently return default values. The provider now + waits for configuration before returning. +features: + - | + openfeature: This introduces a configurable initialization timeout for ``DataDogProvider``. + The timeout controls how long ``initialize()`` waits for configuration before returning, + and defaults to 30 seconds. Set it via the + ``DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS`` environment variable or the + ``init_timeout`` constructor parameter. diff --git a/tests/openfeature/test_provider_status.py b/tests/openfeature/test_provider_status.py index 5cac8fb49de..3233d3b31c9 100644 --- a/tests/openfeature/test_provider_status.py +++ b/tests/openfeature/test_provider_status.py @@ -5,8 +5,12 @@ - NOT_READY by default - READY when first Remote Config payload is received - Event emission on status change +- Blocking initialization until config arrives or timeout """ +import threading +import time + from openfeature import api from openfeature.provider import ProviderStatus import pytest @@ -43,7 +47,7 @@ def test_provider_starts_not_ready(self): provider = DataDogProvider() assert provider._status == ProviderStatus.NOT_READY - assert provider._config_received is False + assert not provider._config_received.is_set() def test_provider_becomes_ready_after_first_config(self): """Test that provider becomes READY after receiving first configuration.""" @@ -61,7 +65,7 @@ def test_provider_becomes_ready_after_first_config(self): # Verify becomes READY assert provider._status == ProviderStatus.READY - assert provider._config_received is True + assert provider._config_received.is_set() finally: api.clear_providers() @@ -73,14 +77,14 @@ def test_provider_ready_event_emitted(self): try: # Provider should not have received config yet - assert not provider._config_received + assert not provider._config_received.is_set() # Process a configuration config = create_config(create_boolean_flag("test-flag", enabled=True)) process_ffe_configuration(config) # Provider should now have received config and be READY - assert provider._config_received + assert provider._config_received.is_set() assert provider._status == ProviderStatus.READY finally: api.clear_providers() @@ -140,7 +144,7 @@ def test_provider_status_after_shutdown(self): # Verify back to NOT_READY assert provider._status == ProviderStatus.NOT_READY - assert provider._config_received is False + assert not provider._config_received.is_set() finally: api.clear_providers() @@ -194,3 +198,100 @@ def on_provider_ready(event_details): finally: api.remove_handler(ProviderEvent.PROVIDER_READY, on_provider_ready) api.clear_providers() + + +class TestProviderInitializationBlocking: + """Test that initialize() blocks until config arrives or timeout expires.""" + + def test_initialize_blocks_until_config_arrives(self): + """initialize() should block and return once config is delivered mid-wait.""" + with override_global_config({"experimental_flagging_provider_enabled": True}): + provider = DataDogProvider(initialization_timeout=5.0) + + # Deliver config from a background thread after 0.5s + def deliver_config(): + time.sleep(0.5) + config = create_config(create_boolean_flag("test-flag", enabled=True)) + process_ffe_configuration(config) + + timer = threading.Thread(target=deliver_config, daemon=True) + timer.start() + + try: + start = time.monotonic() + api.set_provider(provider) + elapsed = time.monotonic() - start + + # Should have blocked for ~0.5s (not instant, not full timeout) + assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)" + assert elapsed < 4.0, f"initialize() took too long ({elapsed:.2f}s), should have unblocked at ~0.5s" + assert provider._status == ProviderStatus.READY + assert provider._config_received.is_set() + finally: + api.clear_providers() + + def test_initialize_fast_path_when_config_exists(self): + """initialize() should return immediately if config already exists.""" + with override_global_config({"experimental_flagging_provider_enabled": True}): + # Deliver config BEFORE creating provider + config = create_config(create_boolean_flag("test-flag", enabled=True)) + process_ffe_configuration(config) + + provider = DataDogProvider(initialization_timeout=5.0) + + try: + start = time.monotonic() + api.set_provider(provider) + elapsed = time.monotonic() - start + + # Should be near-instant (config already available) + assert elapsed < 1.0, f"initialize() took {elapsed:.2f}s, should be instant with pre-loaded config" + assert provider._status == ProviderStatus.READY + finally: + api.clear_providers() + + def test_initialize_timeout_raises(self): + """initialize() should raise ProviderNotReadyError after timeout expires.""" + from openfeature.exception import ProviderNotReadyError + + with override_global_config({"experimental_flagging_provider_enabled": True}): + provider = DataDogProvider(initialization_timeout=0.5) + + try: + start = time.monotonic() + # set_provider catches the exception and dispatches PROVIDER_ERROR + api.set_provider(provider) + elapsed = time.monotonic() - start + + # Should have blocked for ~0.5s (the timeout) + assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)" + assert elapsed < 2.0, f"initialize() took too long ({elapsed:.2f}s)" + + # Provider should be in ERROR state (SDK caught ProviderNotReadyError) + client = api.get_client() + assert client.get_provider_status() == ProviderStatus.ERROR + finally: + api.clear_providers() + + def test_late_recovery_after_timeout(self): + """Config arriving after timeout should transition provider to READY.""" + with override_global_config({"experimental_flagging_provider_enabled": True}): + provider = DataDogProvider(initialization_timeout=0.5) + + try: + # Let it timeout + api.set_provider(provider) + + # Provider should be in ERROR state + client = api.get_client() + assert client.get_provider_status() == ProviderStatus.ERROR + + # Now deliver config (late recovery) + config = create_config(create_boolean_flag("test-flag", enabled=True)) + process_ffe_configuration(config) + + # Provider should recover to READY + assert provider._status == ProviderStatus.READY + assert provider._config_received.is_set() + finally: + api.clear_providers()