-
Notifications
You must be signed in to change notification settings - Fork 329
refactor: extract StorageInterface protocol and add InProcessStorageControllerHandle #1126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: refactor/extract-command-context
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,163 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """In-process StorageController for testing. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| Runs the StorageController's asyncio event loop in a daemon thread instead of | ||||||||||||||||||||||||||||||||||||||||||||||||||
| a subprocess, eliminating subprocess spawn overhead. Uses the same TCP server | ||||||||||||||||||||||||||||||||||||||||||||||||||
| and protocol, so the WebExtension connects to it identically. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import random | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import List, Optional, Tuple | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| from multiprocess import Queue | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| from ..types import BrowserId, VisitId | ||||||||||||||||||||||||||||||||||||||||||||||||||
| from .storage_controller import StorageController | ||||||||||||||||||||||||||||||||||||||||||||||||||
| from .storage_providers import StructuredStorageProvider, UnstructuredStorageProvider | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| class InProcessStorageControllerHandle: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """StorageControllerHandle replacement that runs in a thread, not a subprocess. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| Implements the same interface as StorageControllerHandle (satisfies | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StorageInterface protocol) but runs the asyncio event loop in a daemon | ||||||||||||||||||||||||||||||||||||||||||||||||||
| thread within the current process. This avoids subprocess spawn overhead | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for testing. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| structured_storage: StructuredStorageProvider, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unstructured_storage: Optional[UnstructuredStorageProvider], | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.listener_address: Optional[Tuple[str, int]] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.status_queue: Queue = Queue() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.completion_queue: Queue = Queue() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.shutdown_queue: Queue = Queue() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self._last_status: Optional[int] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self._last_status_received: Optional[float] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self.logger = logging.getLogger("openwpm") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self._storage_controller = StorageController( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| structured_storage, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| unstructured_storage, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| status_queue=self.status_queue, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| completion_queue=self.completion_queue, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| shutdown_queue=self.shutdown_queue, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| self._thread: Optional[threading.Thread] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| def get_next_visit_id(self) -> VisitId: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """Generate visit id as randomly generated positive integer less than 2^53.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return VisitId(random.getrandbits(53)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| def get_next_browser_id(self) -> BrowserId: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| """Generate crawl id as randomly generated positive 32bit integer.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return BrowserId(random.getrandbits(32)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+54
to
+58
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| return VisitId(random.getrandbits(53)) | |
| def get_next_browser_id(self) -> BrowserId: | |
| """Generate crawl id as randomly generated positive 32bit integer.""" | |
| return BrowserId(random.getrandbits(32)) | |
| visit_id = 0 | |
| while visit_id == 0: | |
| visit_id = random.getrandbits(53) | |
| return VisitId(visit_id) | |
| def get_next_browser_id(self) -> BrowserId: | |
| """Generate crawl id as randomly generated positive 32bit integer.""" | |
| browser_id = 0 | |
| while browser_id == 0: | |
| browser_id = random.getrandbits(32) | |
| return BrowserId(browser_id) |
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduces global side effects for the whole process by changing the asyncio logger level, and it always enables asyncio debug mode (debug=True) which can significantly slow down tests and change behavior. Consider avoiding global logger mutation (or scoping it) and making debug configurable/defaulting to False.
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.status_queue.get() blocks indefinitely if the controller fails to start or dies before publishing the listener address, which can hang tests/CI. Use a bounded timeout here and raise a clear error if the address doesn't arrive (and consider surfacing thread exceptions via a shared variable/queue).
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Draining a queue using while not queue.empty(): queue.get() is race-prone (items can arrive between empty() and get()) and .empty() is not reliable for multiprocessing-style queues. Use non-blocking gets in a try/except loop (e.g., get_nowait) until empty is raised, which is correct for both thread queues and process queues.
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After join(timeout=60), the code doesn’t check whether the thread actually stopped. This can silently leak a daemon thread and cause later tests to behave unpredictably. Check self._thread.is_alive() after join and raise/handle it (and consider a longer timeout or a deterministic shutdown acknowledgement from the controller).
| self.logger.debug( | |
| "%s took %s seconds to close." | |
| % (type(self).__name__, str(time.time() - start_time)) | |
| ) | |
| elapsed = time.time() - start_time | |
| self.logger.debug( | |
| "%s took %s seconds to close." | |
| % (type(self).__name__, str(elapsed)) | |
| ) | |
| if self._thread.is_alive(): | |
| msg = ( | |
| "InProcessStorageController thread failed to shut down " | |
| "within the 60-second timeout." | |
| ) | |
| self.logger.error(msg) | |
| raise RuntimeError(msg) |
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_most_recent_status() is annotated to return int, but it can return a non-int if any non-status message ends up on status_queue (there is no type check here, unlike get_status()). Also, the queue-drain loop uses .empty() which is race-prone/unreliable. Mirror get_status()'s validation (assert/raise if non-int) and use a get_nowait drain pattern.
Copilot
AI
Feb 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using *args/**kwargs and positional indexing makes this API easy to misuse and harder to type-check (e.g., callers passing keywords or refactoring argument order). Prefer an explicit signature matching the production handle’s save_configuration(...) parameters (including typing), and avoid relying on args[n].
| def save_configuration(self, *args, **kwargs) -> None: | |
| """Save configuration - delegates to a DataSocket like StorageControllerHandle.""" | |
| from .storage_controller import DataSocket, INVALID_VISIT_ID | |
| from ..config import BrowserParamsInternal, ManagerParamsInternal | |
| from .storage_providers import TableName | |
| assert self.listener_address is not None | |
| manager_params: ManagerParamsInternal = args[0] | |
| browser_params: List[BrowserParamsInternal] = args[1] | |
| openwpm_version: str = args[2] | |
| browser_version: str = args[3] | |
| def save_configuration( | |
| self, | |
| manager_params: "ManagerParamsInternal", | |
| browser_params: List["BrowserParamsInternal"], | |
| openwpm_version: str, | |
| browser_version: str, | |
| ) -> None: | |
| """Save configuration - delegates to a DataSocket like StorageControllerHandle.""" | |
| from .storage_controller import DataSocket, INVALID_VISIT_ID | |
| from ..config import BrowserParamsInternal, ManagerParamsInternal | |
| from .storage_providers import TableName | |
| assert self.listener_address is not None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| """Protocol defining the interface TaskManager uses to interact with storage. | ||
|
|
||
| This decouples TaskManager from the concrete StorageControllerHandle, | ||
| allowing tests to use lightweight in-process alternatives. | ||
| """ | ||
|
|
||
| from typing import List, Optional, Protocol, Tuple | ||
|
|
||
| from ..types import BrowserId, VisitId | ||
|
|
||
|
|
||
| class StorageInterface(Protocol): | ||
| """Interface for storage controller handles. | ||
|
|
||
| StorageControllerHandle implements this protocol for production use. | ||
| InProcessStorageControllerHandle implements it for testing. | ||
| """ | ||
|
|
||
| def get_next_visit_id(self) -> VisitId: ... | ||
|
|
||
| def get_next_browser_id(self) -> BrowserId: ... | ||
|
|
||
| def get_most_recent_status(self) -> int: ... | ||
|
|
||
| def get_new_completed_visits(self) -> List[Tuple[int, bool]]: ... | ||
|
|
||
| def launch(self) -> None: ... | ||
|
|
||
| listener_address: Optional[Tuple[str, int]] | ||
|
|
||
| def shutdown(self, relaxed: bool = True) -> None: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handle runs entirely in-process (thread-based), but it uses
multiprocess.Queue.multiprocessing/multiprocessqueues have semantics optimized for IPC and their.empty()/.qsize()behavior can be unreliable; they can also add unnecessary overhead. Preferqueue.Queue/queue.SimpleQueuefor thread communication here (and update the drain logic accordingly).