From 0a38e846b11af0c9ae438c469b732b0e2bd667e2 Mon Sep 17 00:00:00 2001 From: Alexandru MAXIMCIUC Date: Wed, 23 Apr 2025 10:33:27 +0300 Subject: [PATCH] add MirroredBucket - objects are added to multiple IBuckets. code and tests written entirely by LLMs --- python/bucketbase/mirrored_bucket.py | 328 +++++++++++++++++++++++ python/tests/test_mirrored_bucket.py | 385 +++++++++++++++++++++++++++ 2 files changed, 713 insertions(+) create mode 100644 python/bucketbase/mirrored_bucket.py create mode 100644 python/tests/test_mirrored_bucket.py diff --git a/python/bucketbase/mirrored_bucket.py b/python/bucketbase/mirrored_bucket.py new file mode 100644 index 0000000..078edf1 --- /dev/null +++ b/python/bucketbase/mirrored_bucket.py @@ -0,0 +1,328 @@ +import shutil +import tempfile +import threading +from queue import Queue +from pathlib import PurePosixPath +from typing import BinaryIO, List, Tuple, Any, Callable +from abc import ABC, abstractmethod +from bucketbase import IBucket +from enum import Enum, auto + + +class SuccessCriteria(Enum): + ANY = auto() # At least one bucket write succeeds + MAJORITY = auto() # At least n/2+1 bucket writes succeed + ALL = auto() # All bucket writes succeed + + +class MirrorOperationError(RuntimeError): + """Exception raised when a mirroring operation fails to meet success criteria""" + + def __init__(self, message, errors=None): + super().__init__(message) + self.errors = errors or [] + + +class ReadQueue(BinaryIO): + """File-like object that reads from a queue""" + + def __init__(self, queue): + self.queue = queue + self.buffer = b"" + self._closed = False + self._eof_reached = False + + @property + def closed(self): + return self._closed + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + self._closed = True + + def read(self, size=-1): + if self.closed: + return b"" + + if self._eof_reached and not self.buffer: + return b"" + + if size < 0: # Read everything + result = self.buffer + while not self._eof_reached: + chunk = self.queue.get() + self.queue.task_done() + if chunk is None: # EOF marker + self._eof_reached = True + break + result += chunk + self.buffer = b"" + return result + else: + while len(self.buffer) < size and not self._eof_reached: + chunk = self.queue.get() + self.queue.task_done() + if chunk is None: # EOF marker + self._eof_reached = True + break + self.buffer += chunk + + result = self.buffer[:size] + self.buffer = self.buffer[size:] + return result + + +class SuccessEvaluator: + @staticmethod + def evaluate(success_count: int, total_buckets: int, success_criteria: SuccessCriteria, errors: list = None) -> None: + """Evaluate if the operation met the success criteria""" + if success_criteria == SuccessCriteria.ALL and success_count < total_buckets: + failed_indices = [i for i, _ in errors] if errors else [] + raise MirrorOperationError(f"Failed to write to all buckets. Failed bucket indices: {failed_indices}", errors) + + elif success_criteria == SuccessCriteria.MAJORITY and success_count <= total_buckets // 2: + raise MirrorOperationError(f"Failed to write to a majority of buckets. " f"Success: {success_count}, Required: {total_buckets // 2 + 1}", errors) + + elif success_criteria == SuccessCriteria.ANY and success_count == 0: + raise MirrorOperationError("Failed to write to any bucket", errors) + + +class MirrorStrategy(ABC): + @abstractmethod + def mirror_stream(self, name: str, stream: BinaryIO, buckets: List[IBucket], success_criteria: SuccessCriteria) -> None: + pass + + +class BaseMirrorStrategy(MirrorStrategy, ABC): + def execute_on_buckets( + self, buckets: List[IBucket], operation: Callable[[IBucket], None], success_criteria: SuccessCriteria + ) -> Tuple[int, List[Tuple[int, Exception]]]: + """Execute operation on each bucket and evaluate success criteria""" + errors = [] + success_count = 0 + + for i, bucket in enumerate(buckets): + try: + operation(bucket) + success_count += 1 + except Exception as e: + errors.append((i, e)) + + SuccessEvaluator.evaluate(success_count, len(buckets), success_criteria, errors) + return success_count, errors + + +class ParallelStrategy(BaseMirrorStrategy): + def mirror_stream(self, name: str, stream: BinaryIO, buckets: List[IBucket], success_criteria: SuccessCriteria) -> None: + content = stream.read() + errors = [] + success_count = 0 + completion_lock = threading.Lock() + + def write_to_bucket(bucket, content, bucket_index): + nonlocal success_count + try: + bucket.put_object(name, content) + with completion_lock: + success_count += 1 + except Exception as e: + errors.append((bucket_index, e)) + + threads = [] + for i, bucket in enumerate(buckets): + thread = threading.Thread(target=write_to_bucket, args=(bucket, content, i)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + SuccessEvaluator.evaluate(success_count, len(buckets), success_criteria, errors) + + +class InMemoryStrategy(BaseMirrorStrategy): + def mirror_stream(self, name: str, stream: BinaryIO, buckets: List[IBucket], success_criteria: SuccessCriteria) -> None: + content = stream.read() + + def write_operation(bucket): + bucket.put_object(name, content) + + self.execute_on_buckets(buckets, lambda bucket: bucket.put_object(name, content), success_criteria) + + +class TempFileStrategy(BaseMirrorStrategy): + def mirror_stream(self, name: str, stream: BinaryIO, buckets: List[IBucket], success_criteria: SuccessCriteria) -> None: + with tempfile.NamedTemporaryFile() as tmp: + shutil.copyfileobj(stream, tmp) + tmp.flush() + + def write_operation(bucket): + tmp.seek(0) + bucket.put_object_stream(name, tmp) + + self.execute_on_buckets(buckets, write_operation, success_criteria) + + +class StreamingStrategy(BaseMirrorStrategy): + """Strategy that streams to multiple buckets using queues without buffering everything in memory or on disk""" + + def __init__(self, buffer_size=8192, queue_size=100, timeout=30): + self.buffer_size = buffer_size + self.queue_size = queue_size + self.timeout = timeout + + def mirror_stream(self, name: str, stream: BinaryIO, buckets: List[IBucket], success_criteria: SuccessCriteria) -> None: + # Fast path for single bucket + if len(buckets) == 1: + buckets[0].put_object_stream(name, stream) + return + + data_queue = Queue(maxsize=self.queue_size) + end_marker = object() # Unique end marker + errors = [] + success_counter = [0] # Use a mutable object to track success count + completion_lock = threading.Lock() + + reader_thread = self._create_reader_thread(stream, data_queue, end_marker, errors) + bucket_queues = [Queue(maxsize=10) for _ in range(len(buckets))] + distributor_thread = self._create_distributor_thread(data_queue, bucket_queues, end_marker, errors) + writer_threads = self._create_writer_threads(buckets, name, bucket_queues, completion_lock, success_counter, errors) + + # Wait for completion + reader_thread.join(timeout=self.timeout) + distributor_thread.join(timeout=self.timeout) + for thread in writer_threads: + thread.join(timeout=self.timeout) + + SuccessEvaluator.evaluate(success_counter[0], len(buckets), success_criteria, errors) + + def _create_reader_thread(self, stream, data_queue, end_marker, errors): + def reader_task(): + try: + while True: + chunk = stream.read(self.buffer_size) + if not chunk: # End of stream + data_queue.put(end_marker) + break + data_queue.put(chunk) + except Exception as e: + data_queue.put(end_marker) # Signal end on error + errors.append(("reader", e)) + + thread = threading.Thread(target=reader_task) + thread.daemon = True + thread.start() + return thread + + def _create_distributor_thread(self, data_queue, bucket_queues, end_marker, errors): + def distributor_task(): + try: + while True: + item = data_queue.get() + if item is end_marker: + # Pass end marker to all bucket queues + for q in bucket_queues: + q.put(None) + break + + # Distribute to all bucket queues + for q in bucket_queues: + q.put(item) + except Exception as e: + # Signal end to all queues on error + for q in bucket_queues: + q.put(None) + errors.append(("distributor", e)) + + thread = threading.Thread(target=distributor_task) + thread.daemon = True + thread.start() + return thread + + def _create_writer_threads(self, buckets, name, bucket_queues, completion_lock, success_counter, errors): + writer_threads = [] + + def writer_task(_bucket, q, bucket_index): + try: + with ReadQueue(q) as reader: + _bucket.put_object_stream(name, reader) + with completion_lock: + success_counter[0] += 1 + except Exception as e: + errors.append((bucket_index, e)) + + for i, bucket in enumerate(buckets): + thread = threading.Thread(target=writer_task, args=(bucket, bucket_queues[i], i)) + thread.daemon = True + writer_threads.append(thread) + thread.start() + + return writer_threads + + +class MirrorBucket(IBucket): + def __init__(self, buckets: List[IBucket], strategy: MirrorStrategy = None, success_criteria: SuccessCriteria = SuccessCriteria.ALL) -> None: + if not buckets: + raise ValueError("At least one bucket is required") + self._buckets = buckets + self._read_bucket = buckets[0] + self._strategy = strategy or InMemoryStrategy() + self._success_criteria = success_criteria + + def put_object(self, name: PurePosixPath | str, content: bytes) -> None: + _name = self._validate_name(name) + import io + + stream = io.BytesIO(content) + self._strategy.mirror_stream(_name, stream, self._buckets, self._success_criteria) + + def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None: + _name = self._validate_name(name) + self._strategy.mirror_stream(_name, stream, self._buckets, self._success_criteria) + + def get_object(self, name: PurePosixPath | str) -> bytes: + _name = self._validate_name(name) + return self._read_bucket.get_object(_name) + + def get_object_stream(self, name: PurePosixPath | str) -> BinaryIO: + _name = self._validate_name(name) + return self._read_bucket.get_object_stream(_name) + + def exists(self, name: PurePosixPath | str) -> bool: + _name = self._validate_name(name) + return self._read_bucket.exists(_name) + + def get_size(self, name: PurePosixPath | str) -> int: + _name = self._validate_name(name) + return self._read_bucket.get_size(_name) + + def list_objects(self, prefix: PurePosixPath | str = "") -> List[str]: + _prefix = self._validate_name(prefix) if prefix else "" + return self._read_bucket.list_objects(_prefix) + + def shallow_list_objects(self, prefix: PurePosixPath | str = "", delimiter: str = "/") -> List[str]: + _prefix = self._validate_name(prefix) if prefix else "" + return self._read_bucket.shallow_list_objects(_prefix, delimiter) + + def remove_objects(self, names: List[PurePosixPath | str]) -> None: + _names = [self._validate_name(name) for name in names] + + def remove_operation(bucket): + bucket.remove_objects(_names) + + errors = [] + success_count = 0 + + for i, bucket in enumerate(self._buckets): + try: + bucket.remove_objects(_names) + success_count += 1 + except Exception as e: + errors.append((i, e)) + + SuccessEvaluator.evaluate(success_count, len(self._buckets), self._success_criteria, errors) diff --git a/python/tests/test_mirrored_bucket.py b/python/tests/test_mirrored_bucket.py new file mode 100644 index 0000000..7b30a29 --- /dev/null +++ b/python/tests/test_mirrored_bucket.py @@ -0,0 +1,385 @@ +import unittest +import tempfile +from bucketbase.fs_bucket import FSBucket +from bucketbase.memory_bucket import MemoryBucket +import threading +import queue +from bucketbase.mirrored_bucket import ReadQueue +from bucketbase.mirrored_bucket import ParallelStrategy, MirrorOperationError, TempFileStrategy, SuccessCriteria, \ + MirrorBucket, InMemoryStrategy, StreamingStrategy +from queue import Queue +import io +from pathlib import Path + + +class TestMirrorBucketInitialization(unittest.TestCase): + def setUp(self): + self.temp_dirs = [tempfile.TemporaryDirectory() for _ in range(3)] + self.fs_buckets = [FSBucket(Path(temp_dir.name)) for temp_dir in self.temp_dirs] + + def tearDown(self): + for temp_dir in self.temp_dirs: + temp_dir.cleanup() + + def test_default_initialization(self): + mirror = MirrorBucket(self.fs_buckets) + self.assertEqual(mirror._read_bucket, self.fs_buckets[0]) + self.assertIsInstance(mirror._strategy, InMemoryStrategy) + self.assertEqual(mirror._success_criteria, SuccessCriteria.ALL) + + def test_custom_initialization(self): + mirror = MirrorBucket(self.fs_buckets, StreamingStrategy(), SuccessCriteria.ANY) + self.assertIsInstance(mirror._strategy, StreamingStrategy) + self.assertEqual(mirror._success_criteria, SuccessCriteria.ANY) + + def test_empty_buckets_list(self): + with self.assertRaises(ValueError): + MirrorBucket([]) + + +class TestMirrorBucketStrategies(unittest.TestCase): + def setUp(self): + self.temp_dirs = [tempfile.TemporaryDirectory() for _ in range(3)] + self.fs_buckets = [FSBucket(Path(temp_dir.name)) for temp_dir in self.temp_dirs] + self.memory_buckets = [MemoryBucket() for _ in range(3)] + self.test_content = b"sample content" + + def tearDown(self): + for temp_dir in self.temp_dirs: + temp_dir.cleanup() + + def _verify_buckets_content(self, buckets, name, content): + for bucket in buckets: + self.assertEqual(bucket.get_object(name), content) + + def test_inmemory_strategy_stream(self): + name = "inmemory_test.txt" + mirror = MirrorBucket(self.fs_buckets, InMemoryStrategy()) + stream = io.BytesIO(self.test_content) + mirror.put_object_stream(name, stream) + self._verify_buckets_content(self.fs_buckets, name, self.test_content) + + def test_tempfile_strategy_large_file(self): + name = "tempfile_large.bin" + mirror = MirrorBucket(self.fs_buckets, TempFileStrategy()) + large_content = b"A" * (1024 * 1024) # 1MB + stream = io.BytesIO(large_content) + mirror.put_object_stream(name, stream) + self._verify_buckets_content(self.fs_buckets, name, large_content) + + def test_streaming_strategy(self): + name = "streaming_test.txt" + all_buckets = self.fs_buckets + self.memory_buckets + mirror = MirrorBucket(all_buckets, StreamingStrategy()) + large_content = b"B" * 500000 + stream = io.BytesIO(large_content) + mirror.put_object_stream(name, stream) + self._verify_buckets_content(self.fs_buckets, name, large_content) + self._verify_buckets_content(self.memory_buckets, name, large_content) + + def test_parallel_strategy(self): + name = "parallel_test.txt" + mirror = MirrorBucket(self.fs_buckets, ParallelStrategy()) + stream = io.BytesIO(self.test_content) + mirror.put_object_stream(name, stream) + self._verify_buckets_content(self.fs_buckets, name, self.test_content) + + def test_put_object_method(self): + name = "object_test.txt" + mirror = MirrorBucket(self.fs_buckets, InMemoryStrategy()) + mirror.put_object(name, self.test_content) + self._verify_buckets_content(self.fs_buckets, name, self.test_content) + + +class MockFailingBucket(MemoryBucket): + def put_object(self, name, content): + raise RuntimeError("Simulated failure") + + def put_object_stream(self, name, stream): + raise RuntimeError("Simulated failure") + + +class TestSuccessCriteria(unittest.TestCase): + def setUp(self): + self.temp_dirs = [tempfile.TemporaryDirectory() for _ in range(2)] + self.fs_buckets = [FSBucket(Path(temp_dir.name)) for temp_dir in self.temp_dirs] + self.test_content = b"criteria test" + + def tearDown(self): + for temp_dir in self.temp_dirs: + temp_dir.cleanup() + + def test_all_success(self): + name = "all_success.txt" + mirror = MirrorBucket(self.fs_buckets, StreamingStrategy(), SuccessCriteria.ALL) + stream = io.BytesIO(self.test_content) + mirror.put_object_stream(name, stream) + for bucket in self.fs_buckets: + self.assertEqual(bucket.get_object(name), self.test_content) + + def test_all_failure(self): + name = "all_failure.txt" + failing_bucket = MockFailingBucket() + buckets = [self.fs_buckets[0], failing_bucket, self.fs_buckets[1]] + mirror = MirrorBucket(buckets, StreamingStrategy(), SuccessCriteria.ALL) + stream = io.BytesIO(self.test_content) + with self.assertRaises(RuntimeError): + mirror.put_object_stream(name, stream) + self.assertEqual(self.fs_buckets[0].get_object(name), self.test_content) + self.assertEqual(self.fs_buckets[1].get_object(name), self.test_content) + + def test_any_success(self): + name = "any_success.txt" + failing_buckets = [MockFailingBucket() for _ in range(2)] + buckets = [self.fs_buckets[0]] + failing_buckets + mirror = MirrorBucket(buckets, StreamingStrategy(), SuccessCriteria.ANY) + stream = io.BytesIO(self.test_content) + mirror.put_object_stream(name, stream) + self.assertEqual(self.fs_buckets[0].get_object(name), self.test_content) + + def test_any_failure(self): + name = "any_failure.txt" + failing_buckets = [MockFailingBucket() for _ in range(2)] + buckets = failing_buckets + mirror = MirrorBucket(buckets, StreamingStrategy(), SuccessCriteria.ANY) + stream = io.BytesIO(self.test_content) + with self.assertRaises(RuntimeError): + mirror.put_object_stream(name, stream) + + def test_majority_success(self): + name = "majority_success.txt" + failing_bucket = MockFailingBucket() + buckets = [self.fs_buckets[0], self.fs_buckets[1], failing_bucket] + mirror = MirrorBucket(buckets, StreamingStrategy(), SuccessCriteria.MAJORITY) + stream = io.BytesIO(self.test_content) + mirror.put_object_stream(name, stream) + self.assertEqual(self.fs_buckets[0].get_object(name), self.test_content) + self.assertEqual(self.fs_buckets[1].get_object(name), self.test_content) + + def test_majority_failure(self): + name = "majority_failure.txt" + failing_buckets = [MockFailingBucket() for _ in range(2)] + buckets = [self.fs_buckets[0]] + failing_buckets + mirror = MirrorBucket(buckets, StreamingStrategy(), SuccessCriteria.MAJORITY) + stream = io.BytesIO(self.test_content) + with self.assertRaises(RuntimeError): + mirror.put_object_stream(name, stream) + self.assertEqual(self.fs_buckets[0].get_object(name), self.test_content) + + +class TestReadQueue(unittest.TestCase): + def setUp(self): + self.data_queue = queue.Queue() + self.read_queue = ReadQueue(self.data_queue) + + def test_basic_read(self): + self.data_queue.put(b"hello") + self.data_queue.put(None) + result = self.read_queue.read() + self.assertEqual(result, b"hello") + + def test_sized_read(self): + self.data_queue.put(b"hello world") + self.data_queue.put(None) + result = self.read_queue.read(5) + self.assertEqual(result, b"hello") + result = self.read_queue.read(6) + self.assertEqual(result, b" world") + + def test_multiple_chunks(self): + self.data_queue.put(b"chunk1") + self.data_queue.put(b"chunk2") + self.data_queue.put(b"chunk3") + self.data_queue.put(None) + result = self.read_queue.read() + self.assertEqual(result, b"chunk1chunk2chunk3") + + def test_read_after_close(self): + self.data_queue.put(b"test") + self.read_queue.close() + result = self.read_queue.read() + self.assertEqual(result, b"") + + def test_context_manager(self): + self.data_queue.put(b"context") + self.data_queue.put(None) + with self.read_queue as rq: + result = rq.read() + self.assertEqual(result, b"context") + self.assertTrue(self.read_queue.closed) + + def test_threaded_reading(self): + results = [] + + def producer(): + for i in range(10): + self.data_queue.put(f"chunk{i}".encode()) + self.data_queue.put(None) + + def consumer(): + while True: + chunk = self.read_queue.read(5) + if not chunk: + break + results.append(chunk) + + producer_thread = threading.Thread(target=producer) + consumer_thread = threading.Thread(target=consumer) + producer_thread.start() + consumer_thread.start() + producer_thread.join(timeout=2) + consumer_thread.join(timeout=2) + full_content = b"".join(results) + expected = b"".join(f"chunk{i}".encode() for i in range(10)) + self.assertEqual(full_content, expected) + + +class FailingBucket: + def put_object(self, name, content): + raise Exception("operation failed") + + +class TestBaseMirrorStrategyExecuteOnBucketsException(unittest.TestCase): + def test_execute_on_buckets_all_fail(self): + strategy = InMemoryStrategy() + buckets = [FailingBucket(), FailingBucket()] + + def operation(bucket): + bucket.put_object("test.txt", b"content") + + with self.assertRaises(MirrorOperationError) as cm: + strategy.execute_on_buckets(buckets, operation, SuccessCriteria.ANY) + self.assertIn("Failed to write to any bucket", str(cm.exception)) + + +class FailingBucket: + def put_object(self, name, content): + raise Exception("parallel failure") + + # Provide a dummy implementation for put_object_stream to satisfy interface + def put_object_stream(self, name, stream): + raise Exception("parallel failure") + + +class TestParallelStrategyMirrorStreamException(unittest.TestCase): + def test_parallel_strategy_failure(self): + strategy = ParallelStrategy() + stream = io.BytesIO(b"data") + buckets = [FailingBucket(), FailingBucket()] + with self.assertRaises(MirrorOperationError) as cm: + strategy.mirror_stream("test.txt", stream, buckets, SuccessCriteria.ALL) + self.assertIn("Failed to write", str(cm.exception)) + + +# File: `python/tests/test_reader_thread_exception.py` + + +class FaultyStream: + def read(self, size=-1): + raise Exception("Reader error") + + +class TestReaderThreadException(unittest.TestCase): + def test_reader_thread_exception(self): + strategy = StreamingStrategy() + errors = [] + data_queue = Queue() + end_marker = object() + faulty_stream = FaultyStream() + reader_thread = strategy._create_reader_thread(faulty_stream, data_queue, end_marker, errors) + reader_thread.join(timeout=2) + self.assertTrue(any(tag == "reader" and "Reader error" in str(err) for tag, err in errors)) + + +class FaultyQueue(Queue): + def get(self, timeout=None): + raise Exception("Distributor error") + + +class TestDistributorThreadException(unittest.TestCase): + def test_distributor_thread_exception(self): + strategy = StreamingStrategy() + errors = [] + faulty_queue = FaultyQueue() + bucket_queues = [Queue(), Queue()] + end_marker = object() + distributor_thread = strategy._create_distributor_thread(faulty_queue, bucket_queues, end_marker, errors) + distributor_thread.join(timeout=2) + self.assertTrue(any(tag == "distributor" and "Distributor error" in str(err) for tag, err in errors)) + + +# Minimal fake implementation of IBucket for testing purposes +class FakeBucket: + def __init__(self): + self.objects = {} + + def put_object(self, name, content): + self.objects[name] = content + + def put_object_stream(self, name, stream): + self.objects[name] = stream.read() + + def get_object(self, name): + return self.objects.get(name, b"") + + def get_object_stream(self, name): + from io import BytesIO + + return BytesIO(self.objects.get(name, b"")) + + def exists(self, name): + return name in self.objects + + def get_size(self, name): + return len(self.objects.get(name, b"")) + + def list_objects(self, prefix=""): + return [name for name in self.objects if name.startswith(prefix)] + + def shallow_list_objects(self, prefix="", delimiter="/"): + return [name for name in self.objects if name.startswith(prefix)] + + def remove_objects(self, names): + for name in names: + if name in self.objects: + del self.objects[name] + + +# Update _validate_name to strip both whitespace and leading '/' +def dummy_validate(name): + return name.strip().lstrip("/") + + +MirrorBucket._validate_name = lambda self, n: dummy_validate(n) + + +class TestMirrorBucketCoverage(unittest.TestCase): + def setUp(self): + self.bucket = FakeBucket() + self.mirror = MirrorBucket([self.bucket], InMemoryStrategy(), SuccessCriteria.ALL) + + def test_put_and_get_object(self): + self.mirror.put_object(" /test.txt ", b"hello") + self.assertEqual(self.mirror.get_object("test.txt"), b"hello") + + def test_put_object_stream(self): + stream = io.BytesIO(b"stream data") + self.mirror.put_object_stream(" /stream.txt ", stream) + self.assertEqual(self.mirror.get_object("stream.txt"), b"stream data") + + def test_exists_and_get_size(self): + self.mirror.put_object(" /exists.txt ", b"data") + self.assertTrue(self.mirror.exists("exists.txt")) + self.assertEqual(self.mirror.get_size("exists.txt"), 4) + + def test_list_objects(self): + self.mirror.put_object(" /folder/file1.txt ", b"1") + self.mirror.put_object(" /folder/file2.txt ", b"2") + lst = self.mirror.list_objects("folder") + self.assertTrue(all(name.startswith("folder") for name in lst)) + + +if __name__ == "__main__": + import unittest + + unittest.main()