diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d1d78d2..2dcf4bd 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -18,20 +18,26 @@ jobs: runs-on: ${{ matrix.os }} strategy: fail-fast: false + max-parallel: 6 matrix: os: [ ubuntu-latest, windows-latest, macos-latest, ubuntu-24.04-arm ] python-version: [ "3.10", "3.11", "3.12", "3.13", "3.14" ] steps: - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies + env: + POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - python -m pip install --upgrade pip - python -m pip install poetry + uv pip install --system poetry cd python poetry lock poetry install --with dev @@ -51,10 +57,15 @@ jobs: image: python:3.12-bullseye steps: - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true - name: Install dependencies in container + env: + POETRY_VIRTUALENVS_PREFER_UV: "true" run: | - python -m pip install --upgrade pip - python -m pip install poetry + uv pip install --system poetry cd python poetry lock poetry install --with dev diff --git a/python/bucketbase/ibucket.py b/python/bucketbase/ibucket.py index 9d7d4ef..178fba3 100644 --- a/python/bucketbase/ibucket.py +++ b/python/bucketbase/ibucket.py @@ -477,7 +477,6 @@ def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None self._lock_object(name) try: if self._base_bucket.exists(name): - self._unlock_object(name) raise FileExistsError(f"Object {name} already exists in AppendOnlySynchronizedBucket") # we assume that the put_object_stream operation is atomic self._base_bucket.put_object_stream(name, stream) diff --git a/python/bucketbase/memory_bucket.py b/python/bucketbase/memory_bucket.py index f8e9d0e..52dc574 100644 --- a/python/bucketbase/memory_bucket.py +++ b/python/bucketbase/memory_bucket.py @@ -1,8 +1,11 @@ import io +import multiprocessing +import weakref from contextlib import contextmanager +from multiprocessing.managers import DictProxy, SyncManager from pathlib import PurePosixPath from threading import RLock -from typing import BinaryIO, Generator, Iterable, Union +from typing import Any, BinaryIO, Generator, Iterable, Optional, Union from streamerate import slist, sset from streamerate import stream as sstream @@ -26,8 +29,8 @@ class MemoryBucket(IBucket): """ def __init__(self) -> None: - self._objects: dict[str, bytes] = {} - self._lock = RLock() + self._objects: dict[str, bytes] | DictProxy[str, bytes] = {} + self._lock: Any = RLock() def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytearray]) -> None: _name = self._validate_name(name) @@ -138,3 +141,50 @@ def open_write_sync(self, name: PurePosixPath | str) -> Generator[BinaryIO, None if not exception_occurred: with self._lock: self._objects[_name] = content + + +class SharedMemoryBucket(MemoryBucket): + """ + MemoryBucket shared across processes via a multiprocessing Manager. + + CRITICAL: Data persists ONLY as long as the Manager process is running. + Accessing or unpickling the bucket after Manager shutdown will raise errors. + + If 'manager' is None, a new one is created and shut down automatically via + weakref finalizer when the bucket is garbage collected. Otherwise, the caller + is responsible for the manager's lifecycle. + """ + + def __init__(self, manager: Optional[SyncManager] = None) -> None: + super().__init__() + if manager is None: + ctx = multiprocessing.get_context("spawn") + manager = ctx.Manager() + owns_manager = True + weakref.finalize(self, self._safe_manager_shutdown, manager) + else: + owns_manager = False + + self._manager: SyncManager | None = manager + self._owns_manager: bool = owns_manager + + # override parent's structures with managed ones. This is a small hack, but it's simple & clear enough + self._objects: dict[str, bytes] | DictProxy[str, bytes] = manager.dict() + self._lock: Any = manager.RLock() + + def __getstate__(self) -> dict[str, Any]: + """Customize pickling to avoid serializing the manager itself.""" + state = self.__dict__.copy() + state["_manager"] = None + state["_owns_manager"] = False + return state + + @staticmethod + def _safe_manager_shutdown(manager: Any) -> None: + """Helper to safely shut down a SyncManager during GC.""" + try: + shutdown = getattr(manager, "shutdown", None) + if callable(shutdown): + shutdown() + except Exception: # pylint: disable=broad-exception-caught + pass diff --git a/python/pyproject.toml b/python/pyproject.toml index d03dc22..d54a292 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bucketbase" -version = "1.5.3" # do not edit manually. kept in sync with `tool.commitizen` config via automation +version = "1.5.4" # do not edit manually. kept in sync with `tool.commitizen` config via automation description = "bucketbase" authors = ["Andrei Suiu "] repository = "https://github.com/asuiu/bucketbase" diff --git a/python/tests/bucket_tester.py b/python/tests/bucket_tester.py index 6cb6823..5059086 100644 --- a/python/tests/bucket_tester.py +++ b/python/tests/bucket_tester.py @@ -5,18 +5,17 @@ import tempfile import threading import time +import uuid from io import BytesIO from pathlib import Path, PurePosixPath from queue import Queue from typing import BinaryIO from unittest import TestCase -import uuid import pyarrow as pa import pyarrow.parquet as pq from streamerate import slist from streamerate import stream as sstream -from tsx import iTSms from bucketbase.ibucket import AsyncObjectWriter, IBucket @@ -66,11 +65,11 @@ def write(self, data: bytes) -> int: self.bytes_processed += len(data) return len(data) - def close(self): + def close(self) -> None: self._is_closed = True @property - def closed(self): + def closed(self) -> bool: return self._is_closed def readable(self) -> bool: @@ -638,7 +637,7 @@ def test_open_write_with_parquet(self) -> None: # pylint: disable=too-many-loca # Write parquet file using open_write # Use higher timeout for MinIO due to network latency and data buffering - timeout_for_minio = 6 if "MinioBucket" in str(type(self.storage)) else 3 + timeout_for_minio = 30 if "MinioBucket" in str(type(self.storage)) else 3 tested_object: AsyncObjectWriter = self.storage.open_write(parquet_path, timeout_sec=timeout_for_minio) # type: ignore[assignment] with tested_object as sink: with pa.output_stream(sink) as arrow_sink: diff --git a/python/tests/test_append_only_fs_bucket.py b/python/tests/test_append_only_fs_bucket.py index fab1089..c9e4486 100644 --- a/python/tests/test_append_only_fs_bucket.py +++ b/python/tests/test_append_only_fs_bucket.py @@ -127,23 +127,33 @@ def test_get_after_put_object(self): def test_lock_object_with_threads(self): bucket_in_test = AppendOnlyFSBucket(self.base_bucket, self.locks_path) object_name = "shared_object" - lock_acquired = [False, False] # To track lock acquisition in threads + # Use Events for reliable thread synchronization(handshake), avoiding sleep-based timing issues + lock_held_event = threading.Event() + second_thread_ready_event = threading.Event() + second_thread_completed_event = threading.Event() def lock_and_release_first(): bucket_in_test._lock_object(object_name) - lock_acquired[0] = True - time.sleep(0.1) # Simulate work by sleeping + lock_held_event.set() + self.assertTrue(second_thread_ready_event.wait(timeout=5.0), "Second thread failed to signal readiness") + time.sleep(0.2) bucket_in_test._unlock_object(object_name) - lock_acquired[0] = False def wait_and_lock_second(): - time.sleep(0.001) # Ensure this runs after the first thread has acquired the lock + self.assertTrue(lock_held_event.wait(timeout=5.0), "First thread failed to acquire lock in time") + + second_thread_ready_event.set() + t1 = time.time() bucket_in_test._lock_object(object_name) t2 = time.time() - print(f"Time taken to acquire lock: {t2 - t1}") - self.assertTrue(t2 - t1 > 0.1, "The second thread should have waited for the first thread to release the lock") - lock_acquired[1] = True # Should only reach here after the first thread releases the lock + + # Since first thread sleeps for 0.2s *after* we are ready, we should be blocked for a significant time + duration = t2 - t1 + print(f"Time taken to acquire lock: {duration}") + self.assertGreater(duration, 0.1, f"Second thread acquired lock too fast ({duration}s), should have been blocked") + + second_thread_completed_event.set() bucket_in_test._unlock_object(object_name) # Create threads @@ -154,12 +164,12 @@ def wait_and_lock_second(): thread2.start() # Wait for both threads to complete - thread1.join() - thread2.join() + thread1.join(timeout=10.0) + thread2.join(timeout=10.0) + self.assertFalse(thread1.is_alive(), "The first thread did not finish") + self.assertFalse(thread2.is_alive(), "The second thread did not finish") - # Verify that both threads were able to acquire the lock - self.assertFalse(lock_acquired[0], "The first thread should have released the lock") - self.assertTrue(lock_acquired[1], "The second thread should have acquired the lock after the first thread released it") + self.assertTrue(second_thread_completed_event.is_set(), "The second thread did not complete correctly") def test_get_size(self): bucket = AppendOnlyFSBucket(self.base_bucket, self.locks_path) diff --git a/python/tests/test_integrated_cached_immutable_bucket.py b/python/tests/test_integrated_cached_immutable_bucket.py index 98a9633..1164913 100644 --- a/python/tests/test_integrated_cached_immutable_bucket.py +++ b/python/tests/test_integrated_cached_immutable_bucket.py @@ -261,11 +261,10 @@ def get_object_from_thread(): thread.start() for thread in threads: - thread.join(timeout=2.0) - self.assertFalse(thread.is_alive(), "Thread did not complete within timeout") + thread.join(timeout=10.0) + self.assertFalse(thread.is_alive(), f"Thread {thread.name} did not complete within timeout") - # Verify results + self.assertEqual(len(results), num_threads, f"Expected {num_threads} results, but got {len(results)}") self.assertEqual(len(get_object_calls), 1, "Main bucket's get_object should be called exactly once") - self.assertEqual(len(results), num_threads, "All threads should have retrieved the content") for result in results: self.assertEqual(result, content, "All threads should get the same content") diff --git a/python/tests/test_memory_bucket.py b/python/tests/test_memory_bucket.py index 457d815..b7c28f1 100644 --- a/python/tests/test_memory_bucket.py +++ b/python/tests/test_memory_bucket.py @@ -1,6 +1,10 @@ +import atexit +import multiprocessing +import pickle from unittest import TestCase from bucketbase import MemoryBucket +from bucketbase.memory_bucket import SharedMemoryBucket from tests.bucket_tester import IBucketTester @@ -71,3 +75,205 @@ def test_regression_exception_thrown_in_open_write_context_by_AMX(self): def test_regression_infinite_cycle_on_unentered_open_write_context(self): self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + + def test_throws_on_pickle(self): + bucket = MemoryBucket() + with self.assertRaises(TypeError) as cm: + pickle.dumps(bucket) + self.assertIn("RLock", str(cm.exception)) + + +class _SharedManagerMixin: + """Provides a single shared manager for all shared memory tests to speed up execution.""" + + _shared_manager = None + + @classmethod + def get_multiprocessing_context(cls): + return multiprocessing.get_context("spawn") + + @classmethod + def get_shared_manager(cls): + if cls._shared_manager is None: + ctx = cls.get_multiprocessing_context() + cls._shared_manager = ctx.Manager() + # Ensure the manager is shut down when the process exits + atexit.register(cls.shutdown_shared_manager) + return cls._shared_manager + + @classmethod + def shutdown_shared_manager(cls): + if cls._shared_manager is not None: + try: + cls._shared_manager.shutdown() + except (AttributeError, RuntimeError): + pass + cls._shared_manager = None + + +class TestSharedMemoryBucket(TestCase, _SharedManagerMixin): + """ + Runs ALL MemoryBucket tests using shared state to ensure full compatibility. + """ + + @classmethod + def setUpClass(cls): + cls.manager = cls.get_shared_manager() + + def setUp(self): + self.storage = SharedMemoryBucket(manager=self.manager) + self.tester = IBucketTester(self.storage, self) + + def test_put_and_get_object(self): + self.tester.test_put_and_get_object() + + def test_put_and_get_object_stream(self): + self.tester.test_put_and_get_object_stream() + + def test_list_objects(self): + self.tester.test_list_objects() + + def test_shallow_list_objects(self): + self.tester.test_shallow_list_objects() + + def test_exists(self): + self.tester.test_exists() + + def test_remove_objects(self): + self.tester.test_remove_objects() + + def test_get_size(self): + self.tester.test_get_size() + + def test_open_write(self): + self.tester.test_open_write() + + def test_open_write_timeout(self): + self.tester.test_open_write_timeout() + + def test_open_write_consumer_throws(self): + self.tester.test_open_write_consumer_throws() + + def test_open_write_feeder_throws(self): + self.tester.test_open_write_feeder_throws() + + def test_open_write_with_parquet(self): + self.tester.test_open_write_with_parquet() + + def test_streaming_failure_atomicity(self): + self.tester.test_streaming_failure_atomicity() + + def test_put_object_stream_exception_cleanup(self): + self.tester.test_put_object_stream_exception_cleanup() + + def test_open_write_partial_write_exception_cleanup(self): + self.tester.test_open_write_partial_write_exception_cleanup() + + def test_open_write_without_proper_close(self): + self.tester.test_open_write_without_proper_close() + + def test_open_write_sync_exception_cleanup(self): + self.tester.test_open_write_sync_exception_cleanup() + + def test_regression_parquet_exception_thrown_in_prq_writer_by_AMX(self): + self.tester.test_regression_exception_thrown_in_parquet_writer_context_doesnt_save_object() + + def test_regression_exception_thrown_in_arrow_sink_by_AMX(self): + self.tester.test_regression_exception_thrown_in_arrow_sink_context_doesnt_save_object() + + def test_regression_exception_thrown_in_open_write_context_by_AMX(self): + self.tester.test_regression_exception_thrown_in_open_write_context_doesnt_save_object() + + def test_regression_infinite_cycle_on_unentered_open_write_context(self): + self.tester.test_regression_infinite_cycle_on_unentered_open_write_context() + + def test_is_picklable(self): + shared_bucket = SharedMemoryBucket(manager=self.get_shared_manager()) + + shared_bucket.put_object("shared.txt", b"shared data") + pickled_data = pickle.dumps(shared_bucket) + unpickled_bucket: MemoryBucket = pickle.loads(pickled_data) + self.assertEqual(unpickled_bucket.get_object("shared.txt"), b"shared data") + + def test_unpickle_after_manager_shutdown_raises_error(self): + shared_bucket = SharedMemoryBucket() + shared_bucket.put_object("abc", b"123") + pickled_data = pickle.dumps(shared_bucket) + + shared_bucket._manager.shutdown() + + with self.assertRaises((EOFError, ConnectionError, BrokenPipeError, FileNotFoundError, Exception)): + unpickled_bucket: SharedMemoryBucket = pickle.loads(pickled_data) + unpickled_bucket.get_object("abc") + + +class TestSharedMemoryBucketMultiprocessingCorrectness(TestCase, _SharedManagerMixin): + """Functional correctness tests for MemoryBucket with multiple processes.""" + + @classmethod + def tearDownClass(cls): + cls.shutdown_shared_manager() + + def test_create_without_manager_parameter(self): + """Verify that instantiation without manager parameter works.""" + bucket_factory = SharedMemoryBucket() + bucket_factory.put_object("factory.txt", b"ok") + self.assertEqual(bucket_factory.get_object("factory.txt"), b"ok") + + def test_multiprocess_put_object(self): + bucket = self._helper_run_multiprocess_orchestration(self._worker_put_object) + + self.assertEqual(len(bucket.list_objects()), 3) + for i in range(3): + self.assertEqual(bucket.get_object(f"obj_{i}.txt"), f"content_{i}".encode()) + + def test_multiprocess_open_write(self): + bucket = self._helper_run_multiprocess_orchestration(self._worker_open_write) + + self.assertEqual(len(bucket.list_objects()), 3) + for i in range(3): + self.assertEqual(bucket.get_object(f"stream_{i}.txt"), f"streamed_content_{i}".encode()) + + def test_concurrent_shared_lock_behavior(self): + """Verify that multiple processes can coordinate via the shared bucket lock.""" + key = "counter" + num_procs = 4 + iters = 50 + bucket = self._helper_run_multiprocess_orchestration(self._worker_increment, num_processes=num_procs, args=(key, iters)) + + final_val = int(bucket.get_object(key).decode()) + self.assertEqual(final_val, num_procs * iters) + + @staticmethod + def _worker_increment(bucket, key, iters=50): + for _ in range(iters): + with bucket._lock: # Using the shared lock from the manager + try: + current = int(bucket.get_object(key).decode()) + except FileNotFoundError: + current = 0 + bucket.put_object(key, str(current + 1).encode()) + + @staticmethod + def _worker_put_object(bucket, index): + bucket.put_object(f"obj_{index}.txt", f"content_{index}".encode()) + + @staticmethod + def _worker_open_write(bucket, index): + with bucket.open_write(f"stream_{index}.txt") as writer: + writer.write(f"streamed_content_{index}".encode()) + + def _helper_run_multiprocess_orchestration(self, worker_target, num_processes=3, args=None): + bucket = SharedMemoryBucket(manager=self.get_shared_manager()) + ctx = self.get_multiprocessing_context() + processes = [] + for i in range(num_processes): + worker_args = (bucket, i) if args is None else (bucket,) + args + p = ctx.Process(target=worker_target, args=worker_args) + processes.append(p) + + for p in processes: + p.start() + for p in processes: + p.join() + return bucket diff --git a/python/tests/test_minio_bucket.py b/python/tests/test_minio_bucket.py index 3a9634c..91a5ca3 100644 --- a/python/tests/test_minio_bucket.py +++ b/python/tests/test_minio_bucket.py @@ -14,7 +14,9 @@ def setUp(self) -> None: self.assertIsNotNone(CONFIG.MINIO_PUBLIC_SERVER, "MINIO_PUBLIC_SERVER not set") self.assertIsNotNone(CONFIG.MINIO_ACCESS_KEY, "MINIO_ACCESS_KEY not set") self.assertIsNotNone(CONFIG.MINIO_SECRET_KEY, "MINIO_SECRET_KEY not set") - self.minio_client = build_minio_client(endpoints=CONFIG.MINIO_PUBLIC_SERVER, access_key=CONFIG.MINIO_ACCESS_KEY, secret_key=CONFIG.MINIO_SECRET_KEY) + self.minio_client = build_minio_client( + endpoints=CONFIG.MINIO_PUBLIC_SERVER, access_key=CONFIG.MINIO_ACCESS_KEY, secret_key=CONFIG.MINIO_SECRET_KEY, timeout=30 + ) self.bucket = MinioBucket(bucket_name=CONFIG.MINIO_DEV_TESTS_BUCKET, minio_client=self.minio_client) if not self.minio_client.bucket_exists(CONFIG.MINIO_DEV_TESTS_BUCKET): self.minio_client.make_bucket(bucket_name=CONFIG.MINIO_DEV_TESTS_BUCKET)