-
-
Notifications
You must be signed in to change notification settings - Fork 93
HA Websocket for service calls, catch zero rates #3168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the HA websocket service call mechanism to use a queue-based architecture and fixes a bug where zero/missing rates caused crashes in cost calculations.
Key Changes:
- Refactored websocket service calls from direct async calls to a queue-based system where
async_call_service_websocket_command()queues commands that are processed by the existingsocketLoop() - Added thread synchronization infrastructure (command queue, pending requests tracker, bridge thread) to coordinate between sync and async contexts
- Fixed KeyError crashes in
output.pyby using.get(minute, 0)instead of direct dictionary indexing when rates are missing - Updated tests to use
receive()method instead of__aiter__pattern for websocket mocking - Removed duplicate
wait_api_started()method fromGECloudData
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| apps/predbat/ha.py | Major refactoring: Added queue infrastructure for websocket commands, bridge thread for sync/async coordination, and integrated command processing into socketLoop |
| apps/predbat/output.py | Bug fix: Changed rate dictionary access from rate_import[minute] to rate_import.get(minute, 0) to handle missing rate data |
| apps/predbat/tests/test_hainterface_websocket.py | Updated test mocking from __aiter__ pattern to receive() method to match new websocket handling |
| apps/predbat/tests/test_hainterface_service.py | Refactored tests to use queue-based architecture with background threads simulating socketLoop processing |
| apps/predbat/gecloud.py | Cleanup: Removed duplicate wait_api_started() method (inherited from ComponentBase) |
| apps/predbat/fox.py | Changed entity refresh to run every 5 minutes (removed "every minute" comment) |
| apps/predbat/component_base.py | Increased default wait_api_started timeout from 5 to 10 minutes |
apps/predbat/ha.py
Outdated
| self.ws_sync_event.wait() | ||
| self.ws_sync_event.clear() | ||
| if self.ws_event_loop and self.ws_async_event: |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bridge_event thread can hang indefinitely on shutdown. The self.ws_sync_event.wait() call at line 309 has no timeout, which means if api_stop is set but the sync event is never triggered, the thread will block forever. Consider adding a timeout to the wait call (e.g., self.ws_sync_event.wait(timeout=1.0)) to allow periodic checks of the api_stop flag.
| self.ws_sync_event.wait() | |
| self.ws_sync_event.clear() | |
| if self.ws_event_loop and self.ws_async_event: | |
| # Use a timeout so we can periodically check api_stop and avoid hanging on shutdown | |
| triggered = self.ws_sync_event.wait(timeout=1.0) | |
| if not triggered: | |
| continue | |
| self.ws_sync_event.clear() | |
| if not self.api_stop and self.ws_event_loop and self.ws_async_event: |
|
|
||
| # Refresh HA entities every minute | ||
| if first or (seconds % (60) == 0): | ||
| # Refresh HA entities |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 305 states "Refresh HA entities every minute" but was changed to run every 5 minutes (based on the condition seconds % (5 * 60) == 0 at line 300). The comment should be updated to match the actual behavior, or clarify that this is intentional. The comment has been moved but not updated to reflect the timing change.
| # Refresh HA entities | |
| # Refresh HA entities every 5 minutes |
| # Extract result | ||
| if result_holder.get("error"): | ||
| self.log("Warn: Service call {}/{} failed: {}".format(domain, service, result_holder["error"])) | ||
| return None | ||
|
|
||
| success = result_holder.get("success", False) | ||
| if not success: | ||
| self.log("Warn: Service call {}/{} data {} failed".format(domain, service, service_data)) | ||
| return None |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When event.wait() times out at line 340, the result_holder will still have success=None and error=None. The check at line 347 treats None as False, logging "Service call failed" without indicating that it was a timeout. This makes it difficult to distinguish between an actual service call failure and a timeout. Consider checking explicitly for timeout: if result_holder.get("success") is None and not result_holder.get("error"), log a specific timeout message.
apps/predbat/ha.py
Outdated
| # Send command with current sid | ||
| await websocket.send_json({"id": sid, "type": "call_service", "domain": domain, "service": service, "service_data": service_data, "return_response": return_response}) | ||
|
|
||
| # Track pending request | ||
| with self.ws_pending_lock: | ||
| self.ws_pending_requests[sid] = {"event": event, "result_holder": result_holder, "timestamp": time.time()} | ||
|
|
||
| sid += 1 |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If websocket.send_json() at line 495 raises an exception, the command is removed from the queue (line 487) but never tracked in ws_pending_requests. The caller thread will wait for 30 seconds at line 340 before timing out, but the error won't be properly propagated. Consider wrapping the send_json call in a try-except block to set an error in result_holder if the send fails, then signal the event to unblock the caller immediately.
| # Send command with current sid | |
| await websocket.send_json({"id": sid, "type": "call_service", "domain": domain, "service": service, "service_data": service_data, "return_response": return_response}) | |
| # Track pending request | |
| with self.ws_pending_lock: | |
| self.ws_pending_requests[sid] = {"event": event, "result_holder": result_holder, "timestamp": time.time()} | |
| sid += 1 | |
| try: | |
| # Send command with current sid | |
| await websocket.send_json({"id": sid, "type": "call_service", "domain": domain, "service": service, "service_data": service_data, "return_response": return_response}) | |
| # Track pending request | |
| with self.ws_pending_lock: | |
| self.ws_pending_requests[sid] = {"event": event, "result_holder": result_holder, "timestamp": time.time()} | |
| sid += 1 | |
| except Exception as e: | |
| # Propagate send failure to waiting caller immediately | |
| try: | |
| result_holder["error"] = "send_failed: {}".format(e) | |
| result_holder["success"] = False | |
| except Exception: | |
| # Defensive: avoid crashing if result_holder has unexpected structure | |
| pass | |
| try: | |
| event.set() | |
| except Exception: | |
| pass | |
| self.log("Error: WebSocket send_json failed for service {}.{}: {}".format(domain, service, e)) |
|
|
||
| async def test_command(): | ||
| with patch.object(threading.Event, 'wait', mock_wait): | ||
| # This will timeout immediately |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable original_wait is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
|
|
||
| # Check for timeout (neither success nor error was set) | ||
| if result_holder.get("success") is None and not result_holder.get("error"): | ||
| self.log("Warn: Service call {}/{} timed out....".format(domain, service)) |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout message contains four dots ("timed out....") which appears to be a typo. This should be three dots ("timed out...") or remove the extra dots for consistency with other log messages in the codebase.
| self.log("Warn: Service call {}/{} timed out....".format(domain, service)) | |
| self.log("Warn: Service call {}/{} timed out...".format(domain, service)) |
| ha_interface.ws_pending_lock = threading.Lock() | ||
|
|
||
| # Patch threading.Event.wait to simulate instant timeout | ||
| original_wait = threading.Event.wait |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable original_wait is not used.
| original_wait = threading.Event.wait |
No description provided.