[fix] Add polling mechanism for kv_batch_get #32
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Adds a polling mechanism to the high-level KV batch retrieval APIs to mitigate “partial readiness” cases where requested fields are not yet fully available for the requested key(s).
Changes:
- Introduce polling in
kv_batch_getwhenfieldsis specified, retrying metadata retrieval until requested fields appear or a timeout is reached. - Introduce equivalent async polling in
async_kv_batch_get. - Add env-configurable timeout/interval knobs for the polling behavior.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| target_fields = set(fields) | ||
| current_fields = set(batch_meta.field_names) | ||
|
|
||
| not_ready_fields = target_fields - current_fields | ||
| begin_polling_time = time.time() | ||
| while not_ready_fields: | ||
| if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT: | ||
| raise RuntimeError( | ||
| f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}" | ||
| f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds." | ||
| ) | ||
|
|
||
| logger.warning( | ||
| f"Fields {list(not_ready_fields)} are not ready yet! " | ||
| f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds." | ||
| ) | ||
|
|
||
| time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL) | ||
| batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False) | ||
| current_fields = set(batch_meta.field_names) | ||
| not_ready_fields = target_fields - current_fields | ||
|
|
There was a problem hiding this comment.
The polling loop treats any missing requested field as "not ready" and will wait until timeout. If a caller passes a field name that is not registered for the partition, this now blocks and raises a timeout, whereas BatchMeta.select_fields() intentionally ignores unknown field names. Consider adding an explicit validation step to distinguish "not yet produced" vs "unknown/unregistered" fields (e.g., via the controller production-status API) and fail fast (or preserve the previous ignore semantics) instead of polling until timeout.
| target_fields = set(fields) | |
| current_fields = set(batch_meta.field_names) | |
| not_ready_fields = target_fields - current_fields | |
| begin_polling_time = time.time() | |
| while not_ready_fields: | |
| if time.time() - begin_polling_time > TQ_KV_POLLING_METADATA_TIMEOUT: | |
| raise RuntimeError( | |
| f"Timeout for kv_batch_get. Missing fields: {not_ready_fields}" | |
| f" after {TQ_KV_POLLING_METADATA_TIMEOUT} seconds." | |
| ) | |
| logger.warning( | |
| f"Fields {list(not_ready_fields)} are not ready yet! " | |
| f"Retry in {TQ_KV_POLLING_METADATA_CHECK_INTERVAL} seconds." | |
| ) | |
| time.sleep(TQ_KV_POLLING_METADATA_CHECK_INTERVAL) | |
| batch_meta = tq_client.kv_retrieve_keys(keys=keys, partition_id=partition_id, create=False) | |
| current_fields = set(batch_meta.field_names) | |
| not_ready_fields = target_fields - current_fields | |
| # NOTE: BatchMeta.select_fields() is expected to ignore unknown field names. | |
| # Avoid polling on missing fields here to prevent timeouts when callers | |
| # request fields that are not registered for the partition. |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 14 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10)) | ||
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5)) | ||
|
|
||
|
|
There was a problem hiding this comment.
The polling timeout/interval values come directly from env vars but aren’t validated. If TQ_KV_POLLING_METADATA_CHECK_INTERVAL is <= 0, time.sleep() will raise (negative) or the loop can become a busy-wait (0), and a negative timeout makes the behavior confusing. Consider validating/clamping these at import time (e.g., require timeout >= 0 and interval > 0, otherwise raise ValueError).
| TQ_KV_POLLING_METADATA_TIMEOUT = int(os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", 10)) | |
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", 0.5)) | |
| # Validate polling configuration derived from environment variables at import time | |
| _raw_timeout = os.environ.get("TQ_KV_POLLING_METADATA_TIMEOUT", "10") | |
| try: | |
| TQ_KV_POLLING_METADATA_TIMEOUT = int(_raw_timeout) | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Invalid value for TQ_KV_POLLING_METADATA_TIMEOUT: {_raw_timeout!r}. " | |
| "Expected a non-negative integer." | |
| ) from exc | |
| if TQ_KV_POLLING_METADATA_TIMEOUT < 0: | |
| raise ValueError( | |
| f"TQ_KV_POLLING_METADATA_TIMEOUT must be >= 0, got {TQ_KV_POLLING_METADATA_TIMEOUT}." | |
| ) | |
| _raw_interval = os.environ.get("TQ_KV_POLLING_METADATA_CHECK_INTERVAL", "0.5") | |
| try: | |
| TQ_KV_POLLING_METADATA_CHECK_INTERVAL = float(_raw_interval) | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Invalid value for TQ_KV_POLLING_METADATA_CHECK_INTERVAL: {_raw_interval!r}. " | |
| "Expected a positive float." | |
| ) from exc | |
| if TQ_KV_POLLING_METADATA_CHECK_INTERVAL <= 0.0: | |
| raise ValueError( | |
| "TQ_KV_POLLING_METADATA_CHECK_INTERVAL must be > 0, " | |
| f"got {TQ_KV_POLLING_METADATA_CHECK_INTERVAL}." | |
| ) |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> try Signed-off-by: 0oshowero0 <o0shower0o@outlook.com> minor improve Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Background
Since TransferQueue does not treat each key as an atomic object, scenarios may arise where some fields of a key are ready while others are not (as shown below). This partial readiness issue can also occur when requesting multiple keys.
Why not happen in low-level APIs
In the low-level API workflow, consumers first call
tq_client.get_meta()beforetq_client.get_data().get_meta()is designed to block until it identifies enough samples where all requireddata_fieldsare ready, acting as a dynamic sample router. This mechanism inherently guarantees that data is fully produced and written to the TransferQueue before retrieval.However, in the high-level KV API, users provide specific keys directly. While these keys serve as the identifiers (similar to metadata), the new logic for resolving keys into
BatchMetalacked a validation step to ensure all requested fields were ready. This PR bridges that gap.Changes
This PR introduce a polling mechanism for
(async_)kv_batch_getto ensure data completeness when specific fields are requested.Polling Mechanism: When user-specified fields are provided, we checks if the retrieved metadata contains all requested fields. If not, it retries every
TQ_KV_POLLING_METADATA_CHECK_INTERVALseconds until theTQ_KV_POLLING_METADATA_TIMEOUTlimit is reached.