From aac36d5ff017f4ea96096c5480975c93a65c6937 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Sat, 17 Jan 2026 22:29:54 +0200 Subject: [PATCH 01/13] add shared memory support to MemoryBucket so that it can be shared in multiprocess scenarios --- python/bucketbase/memory_bucket.py | 53 +++++++++- python/tests/test_memory_bucket.py | 161 +++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 3 deletions(-) diff --git a/python/bucketbase/memory_bucket.py b/python/bucketbase/memory_bucket.py index f8e9d0e..34c5acc 100644 --- a/python/bucketbase/memory_bucket.py +++ b/python/bucketbase/memory_bucket.py @@ -1,8 +1,11 @@ import io +import multiprocessing +import weakref from contextlib import contextmanager +from multiprocessing.managers import SyncManager from pathlib import PurePosixPath from threading import RLock -from typing import BinaryIO, Generator, Iterable, Union +from typing import Any, BinaryIO, Generator, Iterable, MutableMapping, Optional, Union from streamerate import slist, sset from streamerate import stream as sstream @@ -26,8 +29,52 @@ class MemoryBucket(IBucket): """ def __init__(self) -> None: - self._objects: dict[str, bytes] = {} - self._lock = RLock() + self._objects: MutableMapping[str, bytes] = {} + self._lock: Any = RLock() + self._manager = None # Internal manager handle for shared buckets + + @classmethod + def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": + """ + Creates a MemoryBucket instance that is automatically shared across processes. + + If a 'manager' is provided, it uses it to create shared structures and the caller + is responsible for the manager's lifecycle (shutdown). + If not, it creates a new manager internally and registers a weak-ref finalizer + to shut it down when the bucket is garbage collected. + + Note: The shared bucket's data lives only as long as the Manager process. + Pickling a shared bucket preserves the proxy references, but unpickling after + the Manager has been shut down will raise connection errors. + """ + + if manager is None: + try: + ctx = multiprocessing.get_context("spawn") + except AttributeError: + ctx = multiprocessing + manager = ctx.Manager() + should_shutdown = True + else: + should_shutdown = False + + bucket = cls() + # Override local state with shared state from the manager + bucket._objects = manager.dict() + bucket._lock = manager.RLock() + bucket._manager = manager if should_shutdown else None + + if should_shutdown: + # Register a finalizer to shut down the manager when 'bucket' is GC-ed + weakref.finalize(bucket, manager.shutdown) + + return bucket + + def __getstate__(self): + state = self.__dict__.copy() + # Ensure the manager handle itself is not picklable + state["_manager"] = None + return state def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytearray]) -> None: _name = self._validate_name(name) diff --git a/python/tests/test_memory_bucket.py b/python/tests/test_memory_bucket.py index 457d815..5df80d8 100644 --- a/python/tests/test_memory_bucket.py +++ b/python/tests/test_memory_bucket.py @@ -1,3 +1,5 @@ +import multiprocessing +import pickle from unittest import TestCase from bucketbase import MemoryBucket @@ -71,3 +73,162 @@ def test_regression_exception_thrown_in_open_write_context_by_AMX(self): def test_regression_infinite_cycle_on_unentered_open_write_context(self): self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + + +class SharedManagerMixin: + """Provides a single shared manager for all shared memory tests to speed up execution.""" + + _shared_manager = None + + @classmethod + def get_shared_manager(cls): + if cls._shared_manager is None: + ctx = multiprocessing.get_context("spawn") + cls._shared_manager = ctx.Manager() + return cls._shared_manager + + @classmethod + def shutdown_shared_manager(cls): + if cls._shared_manager is not None: + cls._shared_manager.shutdown() + cls._shared_manager = None + + +class TestSharedMemoryBucket(TestCase, SharedManagerMixin): + """ + Runs ALL MemoryBucket tests using shared state to ensure full compatibility. + """ + + @classmethod + def setUpClass(cls): + cls.manager = cls.get_shared_manager() + + def setUp(self): + self.storage = MemoryBucket.create_shared(manager=self.manager) + self.tester = IBucketTester(self.storage, self) + + def test_put_and_get_object(self): + self.tester.test_put_and_get_object() + + def test_put_and_get_object_stream(self): + self.tester.test_put_and_get_object_stream() + + def test_list_objects(self): + self.tester.test_list_objects() + + def test_shallow_list_objects(self): + self.tester.test_shallow_list_objects() + + def test_exists(self): + self.tester.test_exists() + + def test_remove_objects(self): + self.tester.test_remove_objects() + + def test_get_size(self): + self.tester.test_get_size() + + def test_open_write(self): + self.tester.test_open_write() + + def test_open_write_timeout(self): + self.tester.test_open_write_timeout() + + def test_open_write_consumer_throws(self): + self.tester.test_open_write_consumer_throws() + + def test_open_write_feeder_throws(self): + self.tester.test_open_write_feeder_throws() + + def test_open_write_with_parquet(self): + self.tester.test_open_write_with_parquet() + + def test_streaming_failure_atomicity(self): + self.tester.test_streaming_failure_atomicity() + + def test_put_object_stream_exception_cleanup(self): + self.tester.test_put_object_stream_exception_cleanup() + + def test_open_write_partial_write_exception_cleanup(self): + self.tester.test_open_write_partial_write_exception_cleanup() + + def test_open_write_without_proper_close(self): + self.tester.test_open_write_without_proper_close() + + def test_open_write_sync_exception_cleanup(self): + self.tester.test_open_write_sync_exception_cleanup() + + def test_regression_parquet_exception_thrown_in_prq_writer_by_AMX(self): + self.tester.test_regression_exception_thrown_in_parquet_writer_context_doesnt_save_object() + + def test_regression_exception_thrown_in_arrow_sink_by_AMX(self): + self.tester.test_regression_exception_thrown_in_arrow_sink_context_doesnt_save_object() + + def test_regression_exception_thrown_in_open_write_context_by_AMX(self): + self.tester.test_regression_exception_thrown_in_open_write_context_doesnt_save_object() + + def test_regression_infinite_cycle_on_unentered_open_write_context(self): + self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + + +class TestMemoryBucketPickle(TestCase, SharedManagerMixin): + def test_normal_memory_bucket_is_not_picklable(self): + bucket = MemoryBucket() + with self.assertRaises(TypeError) as cm: + pickle.dumps(bucket) + self.assertIn("RLock", str(cm.exception)) + + def test_shared_memory_bucket_is_picklable(self): + bucket = MemoryBucket.create_shared(manager=self.get_shared_manager()) + + bucket.put_object("shared.txt", b"shared data") + pickled_data = pickle.dumps(bucket) + unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) + self.assertEqual(unpickled_bucket.get_object("shared.txt"), b"shared data") + + +class TestSharedMemoryBucketMultiprocessing(TestCase, SharedManagerMixin): + """Stress tests for MemoryBucket with multiple processes.""" + + @classmethod + def tearDownClass(cls): + cls.shutdown_shared_manager() + + @staticmethod + def worker_put_object(bucket, index): + bucket.put_object(f"obj_{index}.txt", f"content_{index}".encode()) + + @staticmethod + def worker_open_write(bucket, index): + with bucket.open_write(f"stream_{index}.txt") as writer: + writer.write(f"streamed_content_{index}".encode()) + + def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3): + bucket = MemoryBucket.create_shared(manager=self.get_shared_manager()) + ctx = multiprocessing.get_context("spawn") + processes = [ctx.Process(target=worker_target, args=(bucket, i)) for i in range(num_processes)] + for p in processes: + p.start() + for p in processes: + p.join() + return bucket + + def test_shared_memory_bucket_factory_creates_isolated_manager(self): + """Verify that create_shared() without manager creates its own isolated manager.""" + bucket_factory = MemoryBucket.create_shared() + bucket_factory.put_object("factory.txt", b"ok") + self.assertEqual(bucket_factory.get_object("factory.txt"), b"ok") + + def test_multiprocess_put_object(self): + bucket = self._helper_run_multiprocess_orchestration(self.worker_put_object) + + self.assertEqual(len(bucket.list_objects()), 3) + for i in range(3): + self.assertEqual(bucket.get_object(f"obj_{i}.txt"), f"content_{i}".encode()) + + def test_multiprocess_open_write(self): + bucket = self._helper_run_multiprocess_orchestration(self.worker_open_write) + + self.assertEqual(len(bucket.list_objects()), 3) + for i in range(3): + self.assertEqual(bucket.get_object(f"stream_{i}.txt"), f"streamed_content_{i}".encode()) From 7207b2ea33f11bd3aa7f24903b677537b8f07eed Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 10:10:26 +0200 Subject: [PATCH 02/13] [copilot] Enhance MemoryBucket with shared manager support and improve multiprocessing tests --- python/bucketbase/memory_bucket.py | 42 +++++++++++++--------- python/tests/test_memory_bucket.py | 56 +++++++++++++++++++++++++----- 2 files changed, 73 insertions(+), 25 deletions(-) diff --git a/python/bucketbase/memory_bucket.py b/python/bucketbase/memory_bucket.py index 34c5acc..0ff1446 100644 --- a/python/bucketbase/memory_bucket.py +++ b/python/bucketbase/memory_bucket.py @@ -2,10 +2,10 @@ import multiprocessing import weakref from contextlib import contextmanager -from multiprocessing.managers import SyncManager +from multiprocessing.managers import DictProxy, SyncManager from pathlib import PurePosixPath from threading import RLock -from typing import Any, BinaryIO, Generator, Iterable, MutableMapping, Optional, Union +from typing import Any, BinaryIO, Generator, Iterable, Optional, Union from streamerate import slist, sset from streamerate import stream as sstream @@ -22,6 +22,16 @@ def really_close(self) -> None: super().close() +def _safe_manager_shutdown(manager: Any) -> None: + """Helper to safely shut down a SyncManager during GC.""" + try: + shutdown = getattr(manager, "shutdown", None) + if callable(shutdown): + shutdown() + except Exception: # pylint: disable=broad-exception-caught + pass + + class MemoryBucket(IBucket): """ Implements IObjectStorage interface, but stores all objects in memory. @@ -29,9 +39,10 @@ class MemoryBucket(IBucket): """ def __init__(self) -> None: - self._objects: MutableMapping[str, bytes] = {} + self._objects: dict[str, bytes] | DictProxy[str, bytes] = {} self._lock: Any = RLock() - self._manager = None # Internal manager handle for shared buckets + self._manager: SyncManager | None = None + self._owns_manager: bool = False @classmethod def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": @@ -40,7 +51,7 @@ def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": If a 'manager' is provided, it uses it to create shared structures and the caller is responsible for the manager's lifecycle (shutdown). - If not, it creates a new manager internally and registers a weak-ref finalizer + If not, it creates a new manager internally and registers a weakref finalizer to shut it down when the bucket is garbage collected. Note: The shared bucket's data lives only as long as the Manager process. @@ -49,31 +60,28 @@ def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": """ if manager is None: - try: - ctx = multiprocessing.get_context("spawn") - except AttributeError: - ctx = multiprocessing + ctx = multiprocessing.get_context("spawn") manager = ctx.Manager() - should_shutdown = True + owns_manager = True else: - should_shutdown = False + owns_manager = False - bucket = cls() - # Override local state with shared state from the manager + bucket = cls.__new__(cls) bucket._objects = manager.dict() bucket._lock = manager.RLock() - bucket._manager = manager if should_shutdown else None + bucket._manager = manager + bucket._owns_manager = owns_manager - if should_shutdown: + if owns_manager: # Register a finalizer to shut down the manager when 'bucket' is GC-ed - weakref.finalize(bucket, manager.shutdown) + weakref.finalize(bucket, _safe_manager_shutdown, manager) return bucket def __getstate__(self): state = self.__dict__.copy() - # Ensure the manager handle itself is not picklable state["_manager"] = None + state["_owns_manager"] = False return state def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytearray]) -> None: diff --git a/python/tests/test_memory_bucket.py b/python/tests/test_memory_bucket.py index 5df80d8..97cd6c6 100644 --- a/python/tests/test_memory_bucket.py +++ b/python/tests/test_memory_bucket.py @@ -80,10 +80,14 @@ class SharedManagerMixin: _shared_manager = None + @classmethod + def get_multiprocessing_context(cls): + return multiprocessing.get_context("spawn") + @classmethod def get_shared_manager(cls): if cls._shared_manager is None: - ctx = multiprocessing.get_context("spawn") + ctx = cls.get_multiprocessing_context() cls._shared_manager = ctx.Manager() return cls._shared_manager @@ -186,9 +190,20 @@ def test_shared_memory_bucket_is_picklable(self): unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) self.assertEqual(unpickled_bucket.get_object("shared.txt"), b"shared data") + def test_unpickle_after_manager_shutdown_raises_error(self): + bucket = MemoryBucket.create_shared() + bucket.put_object("abc", b"123") + pickled_data = pickle.dumps(bucket) + + bucket._manager.shutdown() + + with self.assertRaises((EOFError, ConnectionError, BrokenPipeError, FileNotFoundError, Exception)): + unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) + unpickled_bucket.get_object("abc") -class TestSharedMemoryBucketMultiprocessing(TestCase, SharedManagerMixin): - """Stress tests for MemoryBucket with multiple processes.""" + +class TestSharedMemoryBucketMultiprocessingCorrectness(TestCase, SharedManagerMixin): + """Functional correctness tests for MemoryBucket with multiple processes.""" @classmethod def tearDownClass(cls): @@ -203,18 +218,23 @@ def worker_open_write(bucket, index): with bucket.open_write(f"stream_{index}.txt") as writer: writer.write(f"streamed_content_{index}".encode()) - def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3): + def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3, args=None): bucket = MemoryBucket.create_shared(manager=self.get_shared_manager()) - ctx = multiprocessing.get_context("spawn") - processes = [ctx.Process(target=worker_target, args=(bucket, i)) for i in range(num_processes)] + ctx = self.get_multiprocessing_context() + processes = [] + for i in range(num_processes): + worker_args = (bucket, i) if args is None else (bucket,) + args + p = ctx.Process(target=worker_target, args=worker_args) + processes.append(p) + for p in processes: p.start() for p in processes: p.join() return bucket - def test_shared_memory_bucket_factory_creates_isolated_manager(self): - """Verify that create_shared() without manager creates its own isolated manager.""" + def test_create_shared_without_manager_parameter(self): + """Verify that create_shared() without manager parameter works.""" bucket_factory = MemoryBucket.create_shared() bucket_factory.put_object("factory.txt", b"ok") self.assertEqual(bucket_factory.get_object("factory.txt"), b"ok") @@ -232,3 +252,23 @@ def test_multiprocess_open_write(self): self.assertEqual(len(bucket.list_objects()), 3) for i in range(3): self.assertEqual(bucket.get_object(f"stream_{i}.txt"), f"streamed_content_{i}".encode()) + + @staticmethod + def worker_increment(bucket, key, iters=50): + for _ in range(iters): + with bucket._lock: # Using the shared lock from the manager + try: + current = int(bucket.get_object(key).decode()) + except FileNotFoundError: + current = 0 + bucket.put_object(key, str(current + 1).encode()) + + def test_concurrent_shared_lock_behavior(self): + """Verify that multiple processes can coordinate via the shared bucket lock.""" + key = "counter" + num_procs = 4 + iters = 50 + bucket = self._helper_run_multiprocess_orchestration(self.worker_increment, num_processes=num_procs, args=(key, iters)) + + final_val = int(bucket.get_object(key).decode()) + self.assertEqual(final_val, num_procs * iters) From 446505666504fbd69abeb8296ac59217ffdd899e Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 10:10:47 +0200 Subject: [PATCH 03/13] Increase MinIO timeout to 12 seconds in tests. --- python/tests/bucket_tester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/bucket_tester.py b/python/tests/bucket_tester.py index 6cb6823..4f828fe 100644 --- a/python/tests/bucket_tester.py +++ b/python/tests/bucket_tester.py @@ -638,7 +638,7 @@ def test_open_write_with_parquet(self) -> None: # pylint: disable=too-many-loca # Write parquet file using open_write # Use higher timeout for MinIO due to network latency and data buffering - timeout_for_minio = 6 if "MinioBucket" in str(type(self.storage)) else 3 + timeout_for_minio = 12 if "MinioBucket" in str(type(self.storage)) else 3 tested_object: AsyncObjectWriter = self.storage.open_write(parquet_path, timeout_sec=timeout_for_minio) # type: ignore[assignment] with tested_object as sink: with pa.output_stream(sink) as arrow_sink: From d765c99a92ceb35139901ae166c7ad069e07c4d2 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 10:44:19 +0200 Subject: [PATCH 04/13] Refactor locking mechanism in AppendOnlyFSBucket tests for improved thread synchronization and reliability --- python/bucketbase/ibucket.py | 1 - python/tests/test_append_only_fs_bucket.py | 34 ++++++++++++------- ...test_integrated_cached_immutable_bucket.py | 7 ++-- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/python/bucketbase/ibucket.py b/python/bucketbase/ibucket.py index 9d7d4ef..178fba3 100644 --- a/python/bucketbase/ibucket.py +++ b/python/bucketbase/ibucket.py @@ -477,7 +477,6 @@ def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None self._lock_object(name) try: if self._base_bucket.exists(name): - self._unlock_object(name) raise FileExistsError(f"Object {name} already exists in AppendOnlySynchronizedBucket") # we assume that the put_object_stream operation is atomic self._base_bucket.put_object_stream(name, stream) diff --git a/python/tests/test_append_only_fs_bucket.py b/python/tests/test_append_only_fs_bucket.py index fab1089..bda2d97 100644 --- a/python/tests/test_append_only_fs_bucket.py +++ b/python/tests/test_append_only_fs_bucket.py @@ -127,23 +127,33 @@ def test_get_after_put_object(self): def test_lock_object_with_threads(self): bucket_in_test = AppendOnlyFSBucket(self.base_bucket, self.locks_path) object_name = "shared_object" - lock_acquired = [False, False] # To track lock acquisition in threads + # Use Events for reliable thread synchronization(handshake), avoiding sleep-based timing issues + lock_held_event = threading.Event() + second_thread_ready_event = threading.Event() + second_thread_completed_event = threading.Event() def lock_and_release_first(): bucket_in_test._lock_object(object_name) - lock_acquired[0] = True - time.sleep(0.1) # Simulate work by sleeping + lock_held_event.set() + self.assertTrue(second_thread_ready_event.wait(timeout=5.0), "Second thread failed to signal readiness") + time.sleep(0.2) bucket_in_test._unlock_object(object_name) - lock_acquired[0] = False def wait_and_lock_second(): - time.sleep(0.001) # Ensure this runs after the first thread has acquired the lock + self.assertTrue(lock_held_event.wait(timeout=5.0), "First thread failed to acquire lock in time") + + second_thread_ready_event.set() + t1 = time.time() bucket_in_test._lock_object(object_name) t2 = time.time() - print(f"Time taken to acquire lock: {t2 - t1}") - self.assertTrue(t2 - t1 > 0.1, "The second thread should have waited for the first thread to release the lock") - lock_acquired[1] = True # Should only reach here after the first thread releases the lock + + # Since first thread sleeps for 0.2s *after* we are ready, we should be blocked for a significant time + duration = t2 - t1 + print(f"Time taken to acquire lock: {duration}") + self.assertGreater(duration, 0.1, f"Second thread acquired lock too fast ({duration}s), should have been blocked") + + second_thread_completed_event.set() bucket_in_test._unlock_object(object_name) # Create threads @@ -154,12 +164,10 @@ def wait_and_lock_second(): thread2.start() # Wait for both threads to complete - thread1.join() - thread2.join() + thread1.join(timeout=10.0) + thread2.join(timeout=10.0) - # Verify that both threads were able to acquire the lock - self.assertFalse(lock_acquired[0], "The first thread should have released the lock") - self.assertTrue(lock_acquired[1], "The second thread should have acquired the lock after the first thread released it") + self.assertTrue(second_thread_completed_event.is_set(), "The second thread did not complete correctly") def test_get_size(self): bucket = AppendOnlyFSBucket(self.base_bucket, self.locks_path) diff --git a/python/tests/test_integrated_cached_immutable_bucket.py b/python/tests/test_integrated_cached_immutable_bucket.py index 98a9633..1164913 100644 --- a/python/tests/test_integrated_cached_immutable_bucket.py +++ b/python/tests/test_integrated_cached_immutable_bucket.py @@ -261,11 +261,10 @@ def get_object_from_thread(): thread.start() for thread in threads: - thread.join(timeout=2.0) - self.assertFalse(thread.is_alive(), "Thread did not complete within timeout") + thread.join(timeout=10.0) + self.assertFalse(thread.is_alive(), f"Thread {thread.name} did not complete within timeout") - # Verify results + self.assertEqual(len(results), num_threads, f"Expected {num_threads} results, but got {len(results)}") self.assertEqual(len(get_object_calls), 1, "Main bucket's get_object should be called exactly once") - self.assertEqual(len(results), num_threads, "All threads should have retrieved the content") for result in results: self.assertEqual(result, content, "All threads should get the same content") From 36ba9cf2513ba4e939fb0b42860651b74c34c20c Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 11:52:56 +0200 Subject: [PATCH 05/13] Add max-parallel configuration to build job try to reduce flakiness in play.min.io tests ("max retries exceeded") --- .github/workflows/python-package.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d1d78d2..97ba039 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -18,6 +18,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: fail-fast: false + max-parallel: 6 matrix: os: [ ubuntu-latest, windows-latest, macos-latest, ubuntu-24.04-arm ] python-version: [ "3.10", "3.11", "3.12", "3.13", "3.14" ] From 662511fab49f478fed2a630701b650311c216029 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 11:53:27 +0200 Subject: [PATCH 06/13] Increase MinIO client timeout to 30 seconds in test setup try to reduce flakiness in play.min.io tests ("The write operation timed out") --- python/tests/test_minio_bucket.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/tests/test_minio_bucket.py b/python/tests/test_minio_bucket.py index 3a9634c..91a5ca3 100644 --- a/python/tests/test_minio_bucket.py +++ b/python/tests/test_minio_bucket.py @@ -14,7 +14,9 @@ def setUp(self) -> None: self.assertIsNotNone(CONFIG.MINIO_PUBLIC_SERVER, "MINIO_PUBLIC_SERVER not set") self.assertIsNotNone(CONFIG.MINIO_ACCESS_KEY, "MINIO_ACCESS_KEY not set") self.assertIsNotNone(CONFIG.MINIO_SECRET_KEY, "MINIO_SECRET_KEY not set") - self.minio_client = build_minio_client(endpoints=CONFIG.MINIO_PUBLIC_SERVER, access_key=CONFIG.MINIO_ACCESS_KEY, secret_key=CONFIG.MINIO_SECRET_KEY) + self.minio_client = build_minio_client( + endpoints=CONFIG.MINIO_PUBLIC_SERVER, access_key=CONFIG.MINIO_ACCESS_KEY, secret_key=CONFIG.MINIO_SECRET_KEY, timeout=30 + ) self.bucket = MinioBucket(bucket_name=CONFIG.MINIO_DEV_TESTS_BUCKET, minio_client=self.minio_client) if not self.minio_client.bucket_exists(CONFIG.MINIO_DEV_TESTS_BUCKET): self.minio_client.make_bucket(bucket_name=CONFIG.MINIO_DEV_TESTS_BUCKET) From b00245c8af934c25aa0d200f02da696cb7eda427 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 12:02:35 +0200 Subject: [PATCH 07/13] Refactor Python package workflow to use 'uv' for dependency installation and streamline steps speedup --- .github/workflows/python-package.yml | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 97ba039..900f95e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,16 +25,20 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies + env: + POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - python -m pip install --upgrade pip - python -m pip install poetry + uv pip install poetry cd python - poetry lock poetry install --with dev - name: Run tests env: @@ -52,12 +56,16 @@ jobs: image: python:3.12-bullseye steps: - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true - name: Install dependencies in container + env: + POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - python -m pip install --upgrade pip - python -m pip install poetry + uv pip install poetry cd python - poetry lock poetry install --with dev - name: Run tests in container env: From df12452f30b71ce78aa03e114bb8a2b477825afe Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 12:03:51 +0200 Subject: [PATCH 08/13] Update dependency installation command to include --system flag for poetry --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 900f95e..1d136f6 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -37,7 +37,7 @@ jobs: env: POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - uv pip install poetry + uv pip install --system poetry cd python poetry install --with dev - name: Run tests @@ -64,7 +64,7 @@ jobs: env: POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - uv pip install poetry + uv pip install --system poetry cd python poetry install --with dev - name: Run tests in container From da0674f64a3db829fc455cc2a004da405627d634 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 12:09:55 +0200 Subject: [PATCH 09/13] Add poetry lock step in workflow and fix directory name in container setup --- .github/workflows/python-package.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 1d136f6..9a72aa5 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -39,6 +39,7 @@ jobs: run: | uv pip install --system poetry cd python + poetry lock poetry install --with dev - name: Run tests env: From 4873d847aab186c412c06201df841324ae6dbd30 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 12:11:57 +0200 Subject: [PATCH 10/13] Add poetry lock step in workflow and fix directory name in container setup --- .github/workflows/python-package.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9a72aa5..2dcf4bd 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -67,6 +67,7 @@ jobs: run: | uv pip install --system poetry cd python + poetry lock poetry install --with dev - name: Run tests in container env: From 8f4bdbb5b283460fd5f67b07a4549d3e7f60f900 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 12:19:41 +0200 Subject: [PATCH 11/13] Increase MinIO client timeout to 30(from 12) seconds for improved stability during parquet file writes targeting CI & play.min.io --- python/tests/bucket_tester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/bucket_tester.py b/python/tests/bucket_tester.py index 4f828fe..b49b5a9 100644 --- a/python/tests/bucket_tester.py +++ b/python/tests/bucket_tester.py @@ -638,7 +638,7 @@ def test_open_write_with_parquet(self) -> None: # pylint: disable=too-many-loca # Write parquet file using open_write # Use higher timeout for MinIO due to network latency and data buffering - timeout_for_minio = 12 if "MinioBucket" in str(type(self.storage)) else 3 + timeout_for_minio = 30 if "MinioBucket" in str(type(self.storage)) else 3 tested_object: AsyncObjectWriter = self.storage.open_write(parquet_path, timeout_sec=timeout_for_minio) # type: ignore[assignment] with tested_object as sink: with pa.output_stream(sink) as arrow_sink: From 0bb233686c35561dfaf43a682ac211dc583d95e5 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Tue, 20 Jan 2026 21:54:24 +0200 Subject: [PATCH 12/13] [LLM review] --- python/bucketbase/memory_bucket.py | 14 ++++++-------- python/tests/test_memory_bucket.py | 8 +++++++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/python/bucketbase/memory_bucket.py b/python/bucketbase/memory_bucket.py index 0ff1446..848320d 100644 --- a/python/bucketbase/memory_bucket.py +++ b/python/bucketbase/memory_bucket.py @@ -47,16 +47,14 @@ def __init__(self) -> None: @classmethod def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": """ - Creates a MemoryBucket instance that is automatically shared across processes. + Creates a MemoryBucket shared across processes via a multiprocessing Manager. - If a 'manager' is provided, it uses it to create shared structures and the caller - is responsible for the manager's lifecycle (shutdown). - If not, it creates a new manager internally and registers a weakref finalizer - to shut it down when the bucket is garbage collected. + CRITICAL: Data persists ONLY as long as the Manager process is running. + Accessing or unpickling the bucket after Manager shutdown will raise errors. - Note: The shared bucket's data lives only as long as the Manager process. - Pickling a shared bucket preserves the proxy references, but unpickling after - the Manager has been shut down will raise connection errors. + If 'manager' is None, a new one is created and shut down automatically via + weakref finalizer when the bucket is garbage collected. Otherwise, the caller + is responsible for the manager's lifecycle. """ if manager is None: diff --git a/python/tests/test_memory_bucket.py b/python/tests/test_memory_bucket.py index 97cd6c6..ee15fe4 100644 --- a/python/tests/test_memory_bucket.py +++ b/python/tests/test_memory_bucket.py @@ -1,3 +1,4 @@ +import atexit import multiprocessing import pickle from unittest import TestCase @@ -89,12 +90,17 @@ def get_shared_manager(cls): if cls._shared_manager is None: ctx = cls.get_multiprocessing_context() cls._shared_manager = ctx.Manager() + # Ensure the manager is shut down when the process exits + atexit.register(cls.shutdown_shared_manager) return cls._shared_manager @classmethod def shutdown_shared_manager(cls): if cls._shared_manager is not None: - cls._shared_manager.shutdown() + try: + cls._shared_manager.shutdown() + except (AttributeError, RuntimeError): + pass cls._shared_manager = None From 5b2b60358194fcb4ebcc99a2427696bc95bb1bbd Mon Sep 17 00:00:00 2001 From: ASU Date: Wed, 21 Jan 2026 09:48:55 +0200 Subject: [PATCH 13/13] v1.5.4: Added SharedMemoryBucket for multiprocessing support --- python/bucketbase/memory_bucket.py | 97 +++++++++-------- python/pyproject.toml | 2 +- python/tests/bucket_tester.py | 7 +- python/tests/test_append_only_fs_bucket.py | 10 +- python/tests/test_memory_bucket.py | 117 ++++++++++----------- 5 files changed, 115 insertions(+), 118 deletions(-) diff --git a/python/bucketbase/memory_bucket.py b/python/bucketbase/memory_bucket.py index 848320d..52dc574 100644 --- a/python/bucketbase/memory_bucket.py +++ b/python/bucketbase/memory_bucket.py @@ -22,16 +22,6 @@ def really_close(self) -> None: super().close() -def _safe_manager_shutdown(manager: Any) -> None: - """Helper to safely shut down a SyncManager during GC.""" - try: - shutdown = getattr(manager, "shutdown", None) - if callable(shutdown): - shutdown() - except Exception: # pylint: disable=broad-exception-caught - pass - - class MemoryBucket(IBucket): """ Implements IObjectStorage interface, but stores all objects in memory. @@ -41,46 +31,6 @@ class MemoryBucket(IBucket): def __init__(self) -> None: self._objects: dict[str, bytes] | DictProxy[str, bytes] = {} self._lock: Any = RLock() - self._manager: SyncManager | None = None - self._owns_manager: bool = False - - @classmethod - def create_shared(cls, manager: Optional[SyncManager] = None) -> "MemoryBucket": - """ - Creates a MemoryBucket shared across processes via a multiprocessing Manager. - - CRITICAL: Data persists ONLY as long as the Manager process is running. - Accessing or unpickling the bucket after Manager shutdown will raise errors. - - If 'manager' is None, a new one is created and shut down automatically via - weakref finalizer when the bucket is garbage collected. Otherwise, the caller - is responsible for the manager's lifecycle. - """ - - if manager is None: - ctx = multiprocessing.get_context("spawn") - manager = ctx.Manager() - owns_manager = True - else: - owns_manager = False - - bucket = cls.__new__(cls) - bucket._objects = manager.dict() - bucket._lock = manager.RLock() - bucket._manager = manager - bucket._owns_manager = owns_manager - - if owns_manager: - # Register a finalizer to shut down the manager when 'bucket' is GC-ed - weakref.finalize(bucket, _safe_manager_shutdown, manager) - - return bucket - - def __getstate__(self): - state = self.__dict__.copy() - state["_manager"] = None - state["_owns_manager"] = False - return state def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytearray]) -> None: _name = self._validate_name(name) @@ -191,3 +141,50 @@ def open_write_sync(self, name: PurePosixPath | str) -> Generator[BinaryIO, None if not exception_occurred: with self._lock: self._objects[_name] = content + + +class SharedMemoryBucket(MemoryBucket): + """ + MemoryBucket shared across processes via a multiprocessing Manager. + + CRITICAL: Data persists ONLY as long as the Manager process is running. + Accessing or unpickling the bucket after Manager shutdown will raise errors. + + If 'manager' is None, a new one is created and shut down automatically via + weakref finalizer when the bucket is garbage collected. Otherwise, the caller + is responsible for the manager's lifecycle. + """ + + def __init__(self, manager: Optional[SyncManager] = None) -> None: + super().__init__() + if manager is None: + ctx = multiprocessing.get_context("spawn") + manager = ctx.Manager() + owns_manager = True + weakref.finalize(self, self._safe_manager_shutdown, manager) + else: + owns_manager = False + + self._manager: SyncManager | None = manager + self._owns_manager: bool = owns_manager + + # override parent's structures with managed ones. This is a small hack, but it's simple & clear enough + self._objects: dict[str, bytes] | DictProxy[str, bytes] = manager.dict() + self._lock: Any = manager.RLock() + + def __getstate__(self) -> dict[str, Any]: + """Customize pickling to avoid serializing the manager itself.""" + state = self.__dict__.copy() + state["_manager"] = None + state["_owns_manager"] = False + return state + + @staticmethod + def _safe_manager_shutdown(manager: Any) -> None: + """Helper to safely shut down a SyncManager during GC.""" + try: + shutdown = getattr(manager, "shutdown", None) + if callable(shutdown): + shutdown() + except Exception: # pylint: disable=broad-exception-caught + pass diff --git a/python/pyproject.toml b/python/pyproject.toml index d03dc22..d54a292 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bucketbase" -version = "1.5.3" # do not edit manually. kept in sync with `tool.commitizen` config via automation +version = "1.5.4" # do not edit manually. kept in sync with `tool.commitizen` config via automation description = "bucketbase" authors = ["Andrei Suiu "] repository = "https://github.com/asuiu/bucketbase" diff --git a/python/tests/bucket_tester.py b/python/tests/bucket_tester.py index b49b5a9..5059086 100644 --- a/python/tests/bucket_tester.py +++ b/python/tests/bucket_tester.py @@ -5,18 +5,17 @@ import tempfile import threading import time +import uuid from io import BytesIO from pathlib import Path, PurePosixPath from queue import Queue from typing import BinaryIO from unittest import TestCase -import uuid import pyarrow as pa import pyarrow.parquet as pq from streamerate import slist from streamerate import stream as sstream -from tsx import iTSms from bucketbase.ibucket import AsyncObjectWriter, IBucket @@ -66,11 +65,11 @@ def write(self, data: bytes) -> int: self.bytes_processed += len(data) return len(data) - def close(self): + def close(self) -> None: self._is_closed = True @property - def closed(self): + def closed(self) -> bool: return self._is_closed def readable(self) -> bool: diff --git a/python/tests/test_append_only_fs_bucket.py b/python/tests/test_append_only_fs_bucket.py index bda2d97..c9e4486 100644 --- a/python/tests/test_append_only_fs_bucket.py +++ b/python/tests/test_append_only_fs_bucket.py @@ -141,18 +141,18 @@ def lock_and_release_first(): def wait_and_lock_second(): self.assertTrue(lock_held_event.wait(timeout=5.0), "First thread failed to acquire lock in time") - + second_thread_ready_event.set() - + t1 = time.time() bucket_in_test._lock_object(object_name) t2 = time.time() - + # Since first thread sleeps for 0.2s *after* we are ready, we should be blocked for a significant time duration = t2 - t1 print(f"Time taken to acquire lock: {duration}") self.assertGreater(duration, 0.1, f"Second thread acquired lock too fast ({duration}s), should have been blocked") - + second_thread_completed_event.set() bucket_in_test._unlock_object(object_name) @@ -166,6 +166,8 @@ def wait_and_lock_second(): # Wait for both threads to complete thread1.join(timeout=10.0) thread2.join(timeout=10.0) + self.assertFalse(thread1.is_alive(), "The first thread did not finish") + self.assertFalse(thread2.is_alive(), "The second thread did not finish") self.assertTrue(second_thread_completed_event.is_set(), "The second thread did not complete correctly") diff --git a/python/tests/test_memory_bucket.py b/python/tests/test_memory_bucket.py index ee15fe4..b7c28f1 100644 --- a/python/tests/test_memory_bucket.py +++ b/python/tests/test_memory_bucket.py @@ -4,6 +4,7 @@ from unittest import TestCase from bucketbase import MemoryBucket +from bucketbase.memory_bucket import SharedMemoryBucket from tests.bucket_tester import IBucketTester @@ -75,8 +76,14 @@ def test_regression_exception_thrown_in_open_write_context_by_AMX(self): def test_regression_infinite_cycle_on_unentered_open_write_context(self): self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + def test_throws_on_pickle(self): + bucket = MemoryBucket() + with self.assertRaises(TypeError) as cm: + pickle.dumps(bucket) + self.assertIn("RLock", str(cm.exception)) + -class SharedManagerMixin: +class _SharedManagerMixin: """Provides a single shared manager for all shared memory tests to speed up execution.""" _shared_manager = None @@ -104,7 +111,7 @@ def shutdown_shared_manager(cls): cls._shared_manager = None -class TestSharedMemoryBucket(TestCase, SharedManagerMixin): +class TestSharedMemoryBucket(TestCase, _SharedManagerMixin): """ Runs ALL MemoryBucket tests using shared state to ensure full compatibility. """ @@ -114,7 +121,7 @@ def setUpClass(cls): cls.manager = cls.get_shared_manager() def setUp(self): - self.storage = MemoryBucket.create_shared(manager=self.manager) + self.storage = SharedMemoryBucket(manager=self.manager) self.tester = IBucketTester(self.storage, self) def test_put_and_get_object(self): @@ -180,87 +187,65 @@ def test_regression_exception_thrown_in_open_write_context_by_AMX(self): def test_regression_infinite_cycle_on_unentered_open_write_context(self): self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + def test_is_picklable(self): + shared_bucket = SharedMemoryBucket(manager=self.get_shared_manager()) -class TestMemoryBucketPickle(TestCase, SharedManagerMixin): - def test_normal_memory_bucket_is_not_picklable(self): - bucket = MemoryBucket() - with self.assertRaises(TypeError) as cm: - pickle.dumps(bucket) - self.assertIn("RLock", str(cm.exception)) - - def test_shared_memory_bucket_is_picklable(self): - bucket = MemoryBucket.create_shared(manager=self.get_shared_manager()) - - bucket.put_object("shared.txt", b"shared data") - pickled_data = pickle.dumps(bucket) + shared_bucket.put_object("shared.txt", b"shared data") + pickled_data = pickle.dumps(shared_bucket) unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) self.assertEqual(unpickled_bucket.get_object("shared.txt"), b"shared data") def test_unpickle_after_manager_shutdown_raises_error(self): - bucket = MemoryBucket.create_shared() - bucket.put_object("abc", b"123") - pickled_data = pickle.dumps(bucket) + shared_bucket = SharedMemoryBucket() + shared_bucket.put_object("abc", b"123") + pickled_data = pickle.dumps(shared_bucket) - bucket._manager.shutdown() + shared_bucket._manager.shutdown() with self.assertRaises((EOFError, ConnectionError, BrokenPipeError, FileNotFoundError, Exception)): - unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) + unpickled_bucket: SharedMemoryBucket = pickle.loads(pickled_data) unpickled_bucket.get_object("abc") -class TestSharedMemoryBucketMultiprocessingCorrectness(TestCase, SharedManagerMixin): +class TestSharedMemoryBucketMultiprocessingCorrectness(TestCase, _SharedManagerMixin): """Functional correctness tests for MemoryBucket with multiple processes.""" @classmethod def tearDownClass(cls): cls.shutdown_shared_manager() - @staticmethod - def worker_put_object(bucket, index): - bucket.put_object(f"obj_{index}.txt", f"content_{index}".encode()) - - @staticmethod - def worker_open_write(bucket, index): - with bucket.open_write(f"stream_{index}.txt") as writer: - writer.write(f"streamed_content_{index}".encode()) - - def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3, args=None): - bucket = MemoryBucket.create_shared(manager=self.get_shared_manager()) - ctx = self.get_multiprocessing_context() - processes = [] - for i in range(num_processes): - worker_args = (bucket, i) if args is None else (bucket,) + args - p = ctx.Process(target=worker_target, args=worker_args) - processes.append(p) - - for p in processes: - p.start() - for p in processes: - p.join() - return bucket - - def test_create_shared_without_manager_parameter(self): - """Verify that create_shared() without manager parameter works.""" - bucket_factory = MemoryBucket.create_shared() + def test_create_without_manager_parameter(self): + """Verify that instantiation without manager parameter works.""" + bucket_factory = SharedMemoryBucket() bucket_factory.put_object("factory.txt", b"ok") self.assertEqual(bucket_factory.get_object("factory.txt"), b"ok") def test_multiprocess_put_object(self): - bucket = self._helper_run_multiprocess_orchestration(self.worker_put_object) + bucket = self._helper_run_multiprocess_orchestration(self._worker_put_object) self.assertEqual(len(bucket.list_objects()), 3) for i in range(3): self.assertEqual(bucket.get_object(f"obj_{i}.txt"), f"content_{i}".encode()) def test_multiprocess_open_write(self): - bucket = self._helper_run_multiprocess_orchestration(self.worker_open_write) + bucket = self._helper_run_multiprocess_orchestration(self._worker_open_write) self.assertEqual(len(bucket.list_objects()), 3) for i in range(3): self.assertEqual(bucket.get_object(f"stream_{i}.txt"), f"streamed_content_{i}".encode()) + def test_concurrent_shared_lock_behavior(self): + """Verify that multiple processes can coordinate via the shared bucket lock.""" + key = "counter" + num_procs = 4 + iters = 50 + bucket = self._helper_run_multiprocess_orchestration(self._worker_increment, num_processes=num_procs, args=(key, iters)) + + final_val = int(bucket.get_object(key).decode()) + self.assertEqual(final_val, num_procs * iters) + @staticmethod - def worker_increment(bucket, key, iters=50): + def _worker_increment(bucket, key, iters=50): for _ in range(iters): with bucket._lock: # Using the shared lock from the manager try: @@ -269,12 +254,26 @@ def worker_increment(bucket, key, iters=50): current = 0 bucket.put_object(key, str(current + 1).encode()) - def test_concurrent_shared_lock_behavior(self): - """Verify that multiple processes can coordinate via the shared bucket lock.""" - key = "counter" - num_procs = 4 - iters = 50 - bucket = self._helper_run_multiprocess_orchestration(self.worker_increment, num_processes=num_procs, args=(key, iters)) + @staticmethod + def _worker_put_object(bucket, index): + bucket.put_object(f"obj_{index}.txt", f"content_{index}".encode()) - final_val = int(bucket.get_object(key).decode()) - self.assertEqual(final_val, num_procs * iters) + @staticmethod + def _worker_open_write(bucket, index): + with bucket.open_write(f"stream_{index}.txt") as writer: + writer.write(f"streamed_content_{index}".encode()) + + def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3, args=None): + bucket = SharedMemoryBucket(manager=self.get_shared_manager()) + ctx = self.get_multiprocessing_context() + processes = [] + for i in range(num_processes): + worker_args = (bucket, i) if args is None else (bucket,) + args + p = ctx.Process(target=worker_target, args=worker_args) + processes.append(p) + + for p in processes: + p.start() + for p in processes: + p.join() + return bucket