Skip to content
Open
76 changes: 54 additions & 22 deletions ddtrace/internal/openfeature/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import OrderedDict
from collections.abc import MutableMapping
from importlib.metadata import version
import threading
import typing

from openfeature.evaluation_context import EvaluationContext
Expand Down Expand Up @@ -90,11 +91,20 @@ class DataDogProvider(AbstractProvider):
Feature Flags and Experimentation (FFE) product.
"""

def __init__(self, *args: typing.Any, **kwargs: typing.Any):
def __init__(self, *args: typing.Any, initialization_timeout: typing.Optional[float] = None, **kwargs: typing.Any):
super().__init__(*args, **kwargs)
self._metadata = Metadata(name="Datadog")
self._status = ProviderStatus.NOT_READY
self._config_received = False

# Initialization timeout: constructor arg takes priority, then env var (default 30s)
if initialization_timeout is not None:
self._initialization_timeout = initialization_timeout
else:
self._initialization_timeout = ffe_config.initialization_timeout_ms / 1000.0

# Event used to block initialize() until config arrives.
# Also serves as the "config received" flag via is_set().
self._config_received = threading.Event()

# Cache for reported exposures to prevent duplicates
# Stores mapping of (flag_key, subject_id) -> (allocation_key, variant_key)
Expand All @@ -119,8 +129,6 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any):
self._flag_eval_metrics = FlagEvalMetrics()
self._flag_eval_hook = FlagEvalHook(self._flag_eval_metrics)

# Register this provider instance for status updates
_register_provider(self)

def get_metadata(self) -> Metadata:
"""Returns provider metadata."""
Expand All @@ -142,32 +150,52 @@ def initialize(self, evaluation_context: EvaluationContext) -> None:
"""
Initialize the provider.

Called by the OpenFeature SDK when the provider is set.
Provider Creation → NOT_READY
First Remote Config Payload
READY (emits PROVIDER_READY event)
Shutdown
NOT_READY
Blocks until Remote Config delivers the first FFE configuration or
the initialization timeout expires.

The timeout is configurable via:
- Constructor: DataDogProvider(initialization_timeout=10.0) # seconds
- Env var: DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS=10000

Provider lifecycle:
NOT_READY -> initialize() blocks -> config arrives -> READY
NOT_READY -> initialize() blocks -> timeout -> raises ProviderNotReadyError
"""
if not self._enabled:
return

# Register for RC config callbacks (in initialize, not __init__, so
# re-initialization after shutdown re-registers the provider)
_register_provider(self)

try:
# Start the exposure writer for reporting
start_exposure_writer()
except ServiceStatusError:
logger.debug("Exposure writer is already running", exc_info=True)

# If configuration was already received before initialization, emit ready now
# Fast path: config already available (RC delivered before set_provider)
config = _get_ffe_config()
if config is not None and not self._config_received:
self._config_received = True
if config is not None:
logger.debug("FFE configuration already available, provider is READY")
self._config_received.set()
self._status = ProviderStatus.READY
self._emit_ready_event()
return # SDK will dispatch PROVIDER_READY

# Block until config arrives or timeout expires
logger.debug(
"Waiting up to %.1fs for initial FFE configuration from Remote Config", self._initialization_timeout
)
if not self._config_received.wait(timeout=self._initialization_timeout):
# Timeout expired without receiving config
from openfeature.exception import ProviderNotReadyError

raise ProviderNotReadyError(
f"Provider timed out after {self._initialization_timeout:.1f}s waiting for "
"initial configuration from Remote Config"
)

# Config received during wait -- on_configuration_received() already set status

def shutdown(self) -> None:
"""
Expand Down Expand Up @@ -196,7 +224,7 @@ def shutdown(self) -> None:
# Unregister provider
_unregister_provider(self)
self._status = ProviderStatus.NOT_READY
self._config_received = False
self._config_received.clear()

def resolve_boolean_details(
self,
Expand Down Expand Up @@ -463,14 +491,18 @@ def on_configuration_received(self) -> None:
"""
Called when a Remote Configuration payload is received and processed.

Emits PROVIDER_READY event on first configuration.
Updates status first, then signals the event to unblock initialize().
Emits PROVIDER_READY for late arrivals (config received after initialize() timed out).
"""
if not self._config_received:
self._config_received = True
if not self._config_received.is_set():
self._status = ProviderStatus.READY
logger.debug("First FFE configuration received, provider is now READY")
# Emit READY for late recovery: config arrived after init timed out
self._emit_ready_event()

# Signal the event last to unblock initialize() after status is updated
self._config_received.set()

def _emit_ready_event(self) -> None:
"""
Safely emit PROVIDER_READY event.
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/internal/settings/openfeature.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,20 @@ class OpenFeatureConfig(DDConfig):
default=1.0,
)

# Provider initialization timeout in milliseconds.
# Controls how long initialize() blocks waiting for the first Remote Config payload.
# Default is 30000ms (30 seconds), matching Java, Go, and Node.js SDKs.
initialization_timeout_ms = DDConfig.var(
int,
"DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS",
default=30000,
)

_openfeature_config_keys = [
"experimental_flagging_provider_enabled",
"ffe_intake_enabled",
"ffe_intake_heartbeat_interval",
"initialization_timeout_ms",
]


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
fixes:
- |
openfeature: This fix resolves an issue where ``DataDogProvider.initialize()`` returned before
configuration was received, causing the OpenFeature SDK to mark the provider as ready to serve
evaluations too early and flag evaluations to silently return default values. The provider now
waits for configuration before returning.
features:
- |
openfeature: This introduces a configurable initialization timeout for ``DataDogProvider``.
The timeout controls how long ``initialize()`` waits for configuration before returning,
and defaults to 30 seconds. Set it via the
``DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS`` environment variable or the
``init_timeout`` constructor parameter.
111 changes: 106 additions & 5 deletions tests/openfeature/test_provider_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
- NOT_READY by default
- READY when first Remote Config payload is received
- Event emission on status change
- Blocking initialization until config arrives or timeout
"""

import threading
import time

from openfeature import api
from openfeature.provider import ProviderStatus
import pytest
Expand Down Expand Up @@ -43,7 +47,7 @@ def test_provider_starts_not_ready(self):
provider = DataDogProvider()

assert provider._status == ProviderStatus.NOT_READY
assert provider._config_received is False
assert not provider._config_received.is_set()

def test_provider_becomes_ready_after_first_config(self):
"""Test that provider becomes READY after receiving first configuration."""
Expand All @@ -61,7 +65,7 @@ def test_provider_becomes_ready_after_first_config(self):

# Verify becomes READY
assert provider._status == ProviderStatus.READY
assert provider._config_received is True
assert provider._config_received.is_set()
finally:
api.clear_providers()

Expand All @@ -73,14 +77,14 @@ def test_provider_ready_event_emitted(self):

try:
# Provider should not have received config yet
assert not provider._config_received
assert not provider._config_received.is_set()

# Process a configuration
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

# Provider should now have received config and be READY
assert provider._config_received
assert provider._config_received.is_set()
assert provider._status == ProviderStatus.READY
finally:
api.clear_providers()
Expand Down Expand Up @@ -140,7 +144,7 @@ def test_provider_status_after_shutdown(self):

# Verify back to NOT_READY
assert provider._status == ProviderStatus.NOT_READY
assert provider._config_received is False
assert not provider._config_received.is_set()
finally:
api.clear_providers()

Expand Down Expand Up @@ -194,3 +198,100 @@ def on_provider_ready(event_details):
finally:
api.remove_handler(ProviderEvent.PROVIDER_READY, on_provider_ready)
api.clear_providers()


class TestProviderInitializationBlocking:
"""Test that initialize() blocks until config arrives or timeout expires."""

def test_initialize_blocks_until_config_arrives(self):
"""initialize() should block and return once config is delivered mid-wait."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=5.0)

# Deliver config from a background thread after 0.5s
def deliver_config():
time.sleep(0.5)
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

timer = threading.Thread(target=deliver_config, daemon=True)
timer.start()

try:
start = time.monotonic()
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should have blocked for ~0.5s (not instant, not full timeout)
assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)"
assert elapsed < 4.0, f"initialize() took too long ({elapsed:.2f}s), should have unblocked at ~0.5s"
assert provider._status == ProviderStatus.READY
assert provider._config_received.is_set()
finally:
api.clear_providers()

def test_initialize_fast_path_when_config_exists(self):
"""initialize() should return immediately if config already exists."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
# Deliver config BEFORE creating provider
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

provider = DataDogProvider(initialization_timeout=5.0)

try:
start = time.monotonic()
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should be near-instant (config already available)
assert elapsed < 1.0, f"initialize() took {elapsed:.2f}s, should be instant with pre-loaded config"
assert provider._status == ProviderStatus.READY
finally:
api.clear_providers()

def test_initialize_timeout_raises(self):
"""initialize() should raise ProviderNotReadyError after timeout expires."""
from openfeature.exception import ProviderNotReadyError

with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=0.5)

try:
start = time.monotonic()
# set_provider catches the exception and dispatches PROVIDER_ERROR
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should have blocked for ~0.5s (the timeout)
assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)"
assert elapsed < 2.0, f"initialize() took too long ({elapsed:.2f}s)"

# Provider should be in ERROR state (SDK caught ProviderNotReadyError)
client = api.get_client()
assert client.get_provider_status() == ProviderStatus.ERROR
finally:
api.clear_providers()

def test_late_recovery_after_timeout(self):
"""Config arriving after timeout should transition provider to READY."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=0.5)

try:
# Let it timeout
api.set_provider(provider)

# Provider should be in ERROR state
client = api.get_client()
assert client.get_provider_status() == ProviderStatus.ERROR

# Now deliver config (late recovery)
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

# Provider should recover to READY
assert provider._status == ProviderStatus.READY
assert provider._config_received.is_set()
finally:
api.clear_providers()
Loading