From 2209200fcf2e993fa50835c537d3eeca72f1dffa Mon Sep 17 00:00:00 2001 From: Saimon Michelson Date: Sun, 17 May 2026 18:58:19 -0400 Subject: [PATCH 1/2] support backup of portal to s3 --- cterasdk/core/query.py | 4 + cterasdk/core/servers.py | 49 +++++- cterasdk/core/types.py | 13 ++ cterasdk/direct/lib.py | 23 +-- cterasdk/direct/types.py | 166 +++++++++++++----- .../UserGuides/Portal/Administration.rst | 33 ++++ tests/ut/core/admin/test_servers.py | 63 +++++++ 7 files changed, 294 insertions(+), 57 deletions(-) diff --git a/cterasdk/core/query.py b/cterasdk/core/query.py index 7bbed9bf..2b49777c 100644 --- a/cterasdk/core/query.py +++ b/cterasdk/core/query.py @@ -4,6 +4,10 @@ from ..common import Object +def database(core, path, param): + return create_callback_function(core, path, callback_response=DefaultResponse)(param) + + def create_callback_function(core, path, name=None, *, callback_response=None): """ Create a query callback function diff --git a/cterasdk/core/servers.py b/cterasdk/core/servers.py index 0a4f19e3..6c2886ee 100644 --- a/cterasdk/core/servers.py +++ b/cterasdk/core/servers.py @@ -20,6 +20,7 @@ class Servers(BaseCommand): def __init__(self, portal): super().__init__(portal) self.tasks = Tasks(self._core) + self.backup = Backup(self._core) def _get_entire_object(self, server): ref = f'/servers/{server}' @@ -43,6 +44,19 @@ def get(self, name, include=None): raise ObjectNotFoundException(f'/servers/{name}') return server + @property + def system_database(self): + """ + Retrieve the main database object + + :returns: Main database object. + :rtype: cterasdk.common.object.Object + """ + response = query.database(self._core, '/servers', query.QueryParamBuilder().addFilter( + query.FilterBuilder('mainDB').eq(True) + ).build()) + return response.objects[0] + def list_servers(self, include=None): """ Retrieve the servers that comprise CTERA Portal. @@ -70,7 +84,6 @@ def modify(self, name, server_name=None, app=None, preview=None, enable_public_i :param bool,optional enable_replication: Enable or disable database replication :param str,optional replica_of: Configure as a replicate of another Portal server. `enable_replication` must be set to `True` """ - server = self._get_entire_object(name) if enable_replication is True and replica_of is not None: server.replicationSettings = Object() @@ -101,6 +114,40 @@ def modify(self, name, server_name=None, app=None, preview=None, enable_public_i raise CTERAException(f'Server modification failed: {ref}') from error +class Backup(BaseCommand): + + def connected(self): + """ + Verify connectivity to the backup S3 bucket. + """ + return self._core.servers.system_database.backupToBucket.status == 'Connected' + + def enable(self, bucket, interval): + """ + Enable Main Database Backup to an S3 Bucket + + :param cterasdk.core.types.Bucket bucket: Storage bucket + :param int interval: Backup interval in minutes + """ + database = self._core.servers.system_database + database.backupToBucket = Object(enabled=True, exportSchedulePeriod=interval, details=bucket.database_backup_server_object()) + logger.info("Enabling database backup. %s", {'server': database.name}) + response = self._core.api.put(f'/servers/{database.name}', database) + logger.info("Database backup enabled. %s", {'server': database.name}) + return response + + def disable(self): + """ + Disable Main Database Backup + """ + database = self._core.servers.system_database + database.backupToBucket.enabled = False + logger.info("Disabling database backup. %s", {'server': database.name}) + response = self._core.api.put(f'/servers/{database.name}', database) + logger.info("Database backup disabled. %s", {'server': database.name}) + return response + + class Tasks(BaseCommand): def background(self, name): diff --git a/cterasdk/core/types.py b/cterasdk/core/types.py index a9827a66..3436566e 100644 --- a/cterasdk/core/types.py +++ b/cterasdk/core/types.py @@ -331,6 +331,19 @@ def __init__(self, bucket, driver, access_key, secret_key, endpoint, https, dire def trust_all_certificates(self): return not self.verify_ssl + def database_backup_server_object(self): + return Object( + storage=self.driver, + bucket=self.bucket, + accessKey=self.access_key, + secretKey=self.secret_key, + endPoint=self.endpoint, + useHttps=self.https, + trustAllCertificates=self.trust_all_certificates, + masterHost=None, + usePathStyleAddressing=False + ) + class AzureBlob(HTTPBucket): diff --git a/cterasdk/direct/lib.py b/cterasdk/direct/lib.py index 7a9a55b9..5c1c705f 100644 --- a/cterasdk/direct/lib.py +++ b/cterasdk/direct/lib.py @@ -27,10 +27,7 @@ async def get_object(client, file_id, chunk): :returns: Object :rtype: bytes """ - message = ( - f"Downloading block #{chunk.number} " - f"(offset={chunk.offset}, length={chunk.length})" - ) + message = f"Downloading block (offset={chunk.offset}, length={chunk.length})" if file_id: message += f" for file ID {file_id}" @@ -61,10 +58,8 @@ async def get_object(client, file_id, chunk): "unknown": "Unknown error" } - message = ( - f"Failed to download block #{chunk.number} " - f"(offset={chunk.offset}, length={chunk.length})" - ) + message = f"Failed to download block (offset={chunk.offset}, length={chunk.length})" + if file_id: message = message + f" for file ID {file_id}" @@ -128,17 +123,14 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore): :rtype: cterasdk.direct.types.Block """ async def process(client, chunk, encryption_key): - message = ( - f"Processing block #{chunk.number} " - f"(offset={chunk.offset}, length={chunk.length})" - ) + message = f"Processing block (offset={chunk.offset}, length={chunk.length}) " if file_id: message = message + f" for file ID {file_id}" logger.debug(message) encrypted_object = await get_object(client, file_id, chunk) decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk) decompressed_object = await decompress_object(file_id, decrypted_object, chunk) - return Block(file_id, chunk.number, chunk.offset, decompressed_object, chunk.length) + return Block(file_id, chunk.offset, decompressed_object, chunk.length) if semaphore is not None: async with semaphore: @@ -188,7 +180,7 @@ def decrypt_encryption_key(file_id, wrapped_key, secret_access_key): @execute_with_retries(retries=3, backoff=1, max_backoff=10) -async def get_chunks(api, bearer, file_id): +async def get_chunks(api, bearer, file_id, start=None, end=None): """ Get Chunks. @@ -200,7 +192,8 @@ async def get_chunks(api, bearer, file_id): """ logger.debug('Listing blocks for file ID: %s', file_id) try: - response = await api.get(f'{file_id}', headers={'Authorization': bearer}) + params = {k: v for k, v in [('rangeStart', start), ('rangeEnd', end)] if v is not None} + response = await api.get(f'{file_id}', params=params, headers={'Authorization': bearer}) if not response.chunks: logger.error('Could not find blocks for file ID: %s.', file_id) raise BlocksNotFoundError(file_id) diff --git a/cterasdk/direct/types.py b/cterasdk/direct/types.py index 5efa2a7d..bcb79f2a 100644 --- a/cterasdk/direct/types.py +++ b/cterasdk/direct/types.py @@ -56,89 +56,177 @@ class CompressionLib: class Chunk(Object): - def __init__(self, number, offset, url, length): + def __init__(self, offset, url, length): """ Initialize a Chunk. - :param int number: Chunk number. :param int offset: Chunk offset. :param str url: Signed URL. :param int length: Object length. """ super().__init__( - number=number, offset=offset, url=url, length=length ) -class Metadata(Object): +class MetadataPart(Object): """ - CTERA Direct IO File Metadata + CTERA Direct I/O File Metadata Part + + :ivar int start: Starting offset. + :ivar int end: Ending offset. + :ivar int length: Length of the range in bytes. + :ivar bool encrypted: Indicates whether the range data is encrypted. + :ivar str encryption_key: Encryption key used for the range data. + :ivar bool compressed: Indicates whether the range data is compressed. + :ivar str compression_alg: Compression algorithm used for the range data. + :ivar list[cterasdk.common.object.Object] chunks: List of Chunk Objects. """ - - def __init__(self, file_id, server_object): + def __init__(self, offset, end, encrypted, encryption_key, compressed, compression_alg, chunks): """ Initialize a Direct IO metadata response object. - :param int file_id: File ID. - :param cterasdk.common.object.Object server_object: Response Object. + :param int offset: Starting offset. + :param int end: Ending offset. + :param bool encrypted: Indicates whether the file is encrypted. + :param str encryption_key: Encryption key used for the file. + :param bool compressed: Indicates whether the file is compressed. + :param str compression_alg: Compression algorithm used for the file. + :param list[cterasdk.common.object.Object] chunks: List of Chunk Objects. """ super().__init__( - file_id=file_id, - encrypted=server_object.encrypt_info.data_encrypted, - compressed=server_object.compression_type != CompressionLib.Off, - chunks=Metadata._format_chunks(server_object.chunks) + start=offset, + end=end, + length=(end - offset) + 1, + encrypted=encrypted, + encryption_key=encryption_key, + compressed=compressed, + compression_alg=compression_alg ) - self.encryption_key = server_object.encrypt_info.wrapped_key if self.encrypted else None - self.compression_library = server_object.compression_type if self.compressed else None - last_chunk = self.chunks[-1] - self.size = last_chunk.offset + last_chunk.length + for chunk in chunks: + self.chunks.append(Chunk(offset, chunk.url, chunk.len)) + offset = offset + chunk.len @staticmethod - def _format_chunks(server_object): + def from_server_object(server_object): + compressed = server_object.compression_type != CompressionLib.Off + start, end = map(int, server_object.actual_blocks_range.range.split('-')) + return MetadataPart( + start, + end, + server_object.encrypt_info.data_encrypted, + server_object.encrypt_info.wrapped_key if server_object.encrypt_info.data_encrypted else None, + compressed, + server_object.compression_type if compressed else None + ) + + def __repr__(self): + return str(self) + + def __str__(self): + return ( + f"{self.__class__.__name__}(" + f"{{'start': {self.start}, " + f"'end': {self.end}, " + f"'encrypted': {self.encrypted}, " + f"'compressed': {self.compressed}, " + f"'length': {self.length}, " + f"'chunks': {len(self.chunks)}}})" + ) + + +class Metadata(Object): + """ + CTERA Direct I/O File Metadata + + :ivar int file_id: File ID. + :ivar int file_size: File Size. + :ivar list[cterasdk.direct.types.MetadataPart] parts: List of Metadata Parts. + :ivar bool encrypted: Indicates whether the range data is encrypted. + :ivar str encryption_key: Encryption key used for the range data. + :ivar bool compressed: Indicates whether the range data is compressed. + :ivar str compression_alg: Compression algorithm used for the range data. + """ + def __init__(self, file_id, *parts): """ - Create Chunks. + Initialize a Direct IO metadata response object. :param int file_id: File ID. - :param cterasdk.common.object.Object server_object: Server response. - :param list[int] blocks: List of block numbers to retrieve. - :returns: Chunk objects - :rtype: list[cterasdk.direct.types.Chunk] + :param list[cterasdk.direct.types.MetadataPart] parts: List of Metadata Parts. """ - offset = 0 - chunks = [] - for number, chunk in enumerate(server_object, 1): - chunks.append(Chunk(number, offset, chunk.url, chunk.len)) - offset = offset + chunk.len - return chunks + super().__init__(file_id=file_id, parts=parts) + + @property + def size(self): + return self.parts[0].actual_blocks_range.file_size + + @property + def encrypted(self): + return self.parts[0].encrypted + + @property + def encryption_key(self): + return self.parts[0].encryption_key + + @property + def compressed(self): + return self.parts[0].compressed + + @property + def compression_alg(self): + return self.parts[0].compression_alg + + @property + def start(self): + return self.parts[0].start + + @property + def end(self): + return self.parts[-1].end def serialize(self): """ - Serialize Direct IO metadata to a dictionary. + Serialize Direct I/O metadata to a dictionary. """ - x = copy.deepcopy(self) - if self.encrypted: - x.encryption_key = utils.utf8_decode(base64.b64encode(self.encryption_key)) - return x + chunks = [] + for part in self.parts: + chunks.extend(part.chunks) + + return { + 'file_id': self.file_id, + 'size': self.size, + 'encrypted': self.encrypted, + 'encryption_key': utils.utf8_decode(base64.b64encode(self.encryption_key)) if self.encrypted else None, + 'compressed': self.compressed, + 'compression_alg': self.compression_alg, + 'chunks': chunks + } + + def __repr__(self): + return str(self) + + def __str__(self): + d = self.serialize() + d.pop('encryption_key') + d['chunks'] = len(d['chunks']) + return d class Block: """Block""" - def __init__(self, file_id, number, offset, data, length): + def __init__(self, file_id, offset, data, length): """ Initialize a Block. :param int file_id: File ID. - :param int number: Block number. :param int offset: Block offset. :param bytes data: Bytes :param int length: Block length. """ self._file_id = file_id - self._number = number self._offset = offset self._data = data self._length = length @@ -147,10 +235,6 @@ def __init__(self, file_id, number, offset, data, length): def file_id(self): return self._file_id - @property - def number(self): - return self._number - @property def offset(self): return self._offset diff --git a/docs/source/UserGuides/Portal/Administration.rst b/docs/source/UserGuides/Portal/Administration.rst index df365eba..23d4023b 100644 --- a/docs/source/UserGuides/Portal/Administration.rst +++ b/docs/source/UserGuides/Portal/Administration.rst @@ -605,6 +605,39 @@ Server Tasks for task in admin.servers.tasks.scheduled('database'): print(task.name) + +Main Database Backup to S3 +-------------------------- + +.. automethod:: cterasdk.core.servers.Backup.enable + :noindex: + +.. code-block:: python + + name = 'target-s3-bucket-name' + access, secret = 'access-key', 'secret-access-key' + endpoint = 's3.eu-west-1.amazonaws.com' + https = True + + bucket = core_types.AmazonS3(name, access, secret, endpoint, https) # use verify_ssl=False to trust all certificates + user.servers.backup.enable(bucket, 60) # backup every 60 minutes + + +.. automethod:: cterasdk.core.servers.Backup.disable + :noindex: + +.. code-block:: python + + user.servers.backup.disable() + +.. automethod:: cterasdk.core.servers.Backup.status + :noindex: + +.. code-block:: python + + is_connected = user.servers.backup.connected() + + Messaging Service ================= diff --git a/tests/ut/core/admin/test_servers.py b/tests/ut/core/admin/test_servers.py index fac1b38b..efe793aa 100644 --- a/tests/ut/core/admin/test_servers.py +++ b/tests/ut/core/admin/test_servers.py @@ -4,6 +4,7 @@ import munch from cterasdk.common import Object from cterasdk.core import servers +from cterasdk.core.types import AmazonS3 from cterasdk import exceptions from tests.ut.core.admin import base_admin @@ -110,6 +111,68 @@ def test_modify_success(self): self._assert_equal_objects(actual_param, expected_param) self.assertEqual(ret, put_response) + def test_system_database(self): + with mock.patch("cterasdk.core.servers.query.database") as query_mock: + query_mock.return_value = munch.Munch({ + 'objects': [munch.Munch({'mainDB': True})] + }) + server = servers.Servers(self._global_admin).system_database + expected_query_params = base_admin.BaseCoreTest._create_query_params(start_from=0, count_limit=50, filters=[ + munch.Munch({ + 'field': 'mainDB', + 'restriction': 'eq', + '_classname': 'BooleanFilter', + 'value': True + }) + ]) + actual_query_params = query_mock.call_args[0][2] + self._assert_equal_objects(actual_query_params, expected_query_params) + self.assertEqual(server.mainDB, True) + + def test_enable_server_backup(self): + server_name = 'server' + with mock.patch("cterasdk.core.servers.query.database") as query_mock: + query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': None}) + bucket, access, secret, endpoint = 'bucket-name', 'access', 'secret', 'www.endpoint.com' + bucket = AmazonS3(bucket, access, secret, endpoint, True, verify_ssl=False) + servers.Servers(self._global_admin).backup.enable(60) + self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY) + actual_param = self._global_admin.api.put.call_args[0][1] + expected_param = munch.Munch({ + 'enabled': True, + 'exportSchedulePeriod': 60, + 'details': self._create_database_backup_server_object(bucket) + }) + self._assert_equal_objects(actual_param, expected_param) + + def _create_database_backup_server_object(self, bucket): + return munch.Munch({ + 'storage': bucket.driver, + 'bucket': bucket.bucket, + 'accessKey': bucket.access_key, + 'secretKey': bucket.secret_key, + 'endPoint': bucket.endpoint, + 'useHttps': bucket.https, + 'trustAllCertificates': bucket.trust_all_certificates, + 'masterHost': None, + 'usePathStyleAddressing': False + }) + + def test_disable_server_backup(self): + server_name = 'server' + with mock.patch("cterasdk.core.servers.query.database") as query_mock: + query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': munch.Munch({'enabled': True})}) + servers.Servers(self._global_admin).backup.disable() + self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY) + actual_param = self._global_admin.api.put.call_args[0][1] + self._assert_equal_objects(actual_param.backupToBucket.enabled, False) + + def test_server_backup_status(self): + with mock.patch("cterasdk.core.servers.query.database") as query_mock: + query_mock.return_value = munch.Munch({'backupToBucket': munch.Munch({'status', 'Connected'})}) + ret = servers.Servers(self._global_admin).backup.connected() + self.assertEqual(ret, True) + @staticmethod def _create_server_object(name=None, app=None, preview=None, enable_public_ip=None, public_ip=None, allow_user_login=None, enable_replication=None, replica_of=None): From 2c51ed27c8a5f34806b40d27b5806d98e223312e Mon Sep 17 00:00:00 2001 From: Saimon Michelson Date: Sun, 17 May 2026 19:36:42 -0400 Subject: [PATCH 2/2] update to pass flake, pylint and ut --- cterasdk/core/query.py | 2 +- cterasdk/core/servers.py | 6 +- cterasdk/direct/lib.py | 34 +++--- cterasdk/direct/types.py | 166 +++++++--------------------- tests/ut/core/admin/test_servers.py | 22 ++-- 5 files changed, 76 insertions(+), 154 deletions(-) diff --git a/cterasdk/core/query.py b/cterasdk/core/query.py index 2b49777c..13e86b5d 100644 --- a/cterasdk/core/query.py +++ b/cterasdk/core/query.py @@ -4,7 +4,7 @@ from ..common import Object -def database(core, path, param): +def run(core, path, param): return create_callback_function(core, path, callback_response=DefaultResponse)(param) diff --git a/cterasdk/core/servers.py b/cterasdk/core/servers.py index 6c2886ee..52b21aaa 100644 --- a/cterasdk/core/servers.py +++ b/cterasdk/core/servers.py @@ -52,7 +52,7 @@ def system_database(self): :returns: Main database object. :rtype: cterasdk.common.object.Object """ - response = query.database(self._core, '/servers', query.QueryParamBuilder().addFilter( + response = query.run(self._core, '/servers', query.QueryParamBuilder().addFilter( query.FilterBuilder('mainDB').eq(True) ).build()) return response.objects[0] @@ -118,7 +118,7 @@ class Backup(BaseCommand): def connected(self): """ - Verify connectivity to the backup S3 bucket. + Verify connectivity to the backup S3 bucket """ return self._core.servers.system_database.backupToBucket.status == 'Connected' @@ -127,7 +127,7 @@ def enable(self, bucket, interval): Enable Main Database Backup to an S3 Bucket :param cterasdk.core.types.Bucket bucket: Storage bucket - :param int interval: Backup interval in minutes + :param int interval: Backup interval in minutes """ database = self._core.servers.system_database database.backupToBucket = Object(enabled=True, exportSchedulePeriod=interval, details=bucket.database_backup_server_object()) diff --git a/cterasdk/direct/lib.py b/cterasdk/direct/lib.py index 5c1c705f..3343f82c 100644 --- a/cterasdk/direct/lib.py +++ b/cterasdk/direct/lib.py @@ -1,12 +1,11 @@ import logging import asyncio -import urllib.parse from ..lib.retries import execute_with_retries from .types import Metadata, Block from .crypto import decrypt_key, decrypt_block from .decompressor import decompress -from ..exceptions.transport import BadRequest, Unauthorized, Forbidden, Unprocessable, InternalServerError, HTTPError +from ..exceptions.transport import BadRequest, Unauthorized, Unprocessable, InternalServerError, HTTPError from ..exceptions.direct import ( AuthorizationError, BlockListConnectionError, BlockListTimeout, BlockValidationException, BlocksNotFoundError, DecompressBlockError, DecryptBlockError, DecryptKeyError, DirectIOError, DownloadConnectionError, @@ -27,7 +26,10 @@ async def get_object(client, file_id, chunk): :returns: Object :rtype: bytes """ - message = f"Downloading block (offset={chunk.offset}, length={chunk.length})" + message = ( + f"Downloading block #{chunk.number} " + f"(offset={chunk.offset}, length={chunk.length})" + ) if file_id: message += f" for file ID {file_id}" @@ -58,8 +60,10 @@ async def get_object(client, file_id, chunk): "unknown": "Unknown error" } - message = f"Failed to download block (offset={chunk.offset}, length={chunk.length})" - + message = ( + f"Failed to download block #{chunk.number} " + f"(offset={chunk.offset}, length={chunk.length})" + ) if file_id: message = message + f" for file ID {file_id}" @@ -68,10 +72,6 @@ async def get_object(client, file_id, chunk): raise exception -def is_azure_object_storage(chunk): - return urllib.parse.urlparse(chunk.url).netloc.endswith('core.windows.net') - - async def decrypt_object(file_id, encrypted_object, encryption_key, chunk): """ Decrypt Encrypted Object. @@ -83,7 +83,7 @@ async def decrypt_object(file_id, encrypted_object, encryption_key, chunk): :rtype: bytes """ try: - return decrypt_block(encrypted_object[16:] if is_azure_object_storage(chunk) else encrypted_object, encryption_key) + return decrypt_block(encrypted_object, encryption_key) except DirectIOError: logger.error('Failed to decrypt block.') raise DecryptBlockError(file_id, chunk) @@ -123,14 +123,17 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore): :rtype: cterasdk.direct.types.Block """ async def process(client, chunk, encryption_key): - message = f"Processing block (offset={chunk.offset}, length={chunk.length}) " + message = ( + f"Processing block #{chunk.number} " + f"(offset={chunk.offset}, length={chunk.length})" + ) if file_id: message = message + f" for file ID {file_id}" logger.debug(message) encrypted_object = await get_object(client, file_id, chunk) decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk) decompressed_object = await decompress_object(file_id, decrypted_object, chunk) - return Block(file_id, chunk.offset, decompressed_object, chunk.length) + return Block(file_id, chunk.number, chunk.offset, decompressed_object, chunk.length) if semaphore is not None: async with semaphore: @@ -180,7 +183,7 @@ def decrypt_encryption_key(file_id, wrapped_key, secret_access_key): @execute_with_retries(retries=3, backoff=1, max_backoff=10) -async def get_chunks(api, bearer, file_id, start=None, end=None): +async def get_chunks(api, bearer, file_id): """ Get Chunks. @@ -192,15 +195,14 @@ async def get_chunks(api, bearer, file_id, start=None, end=None): """ logger.debug('Listing blocks for file ID: %s', file_id) try: - params = {k: v for k, v in [('rangeStart', start), ('rangeEnd', end)] if v is not None} - response = await api.get(f'{file_id}', params=params, headers={'Authorization': bearer}) + response = await api.get(f'{file_id}', headers={'Authorization': bearer}) if not response.chunks: logger.error('Could not find blocks for file ID: %s.', file_id) raise BlocksNotFoundError(file_id) return Metadata(file_id, response) except BadRequest as error: raise ObjectNotFoundError(file_id) from error - except (Unauthorized, Forbidden) as error: + except Unauthorized as error: raise AuthorizationError(file_id) from error except Unprocessable as error: raise UnsupportedStorageError(file_id) from error diff --git a/cterasdk/direct/types.py b/cterasdk/direct/types.py index bcb79f2a..5efa2a7d 100644 --- a/cterasdk/direct/types.py +++ b/cterasdk/direct/types.py @@ -56,177 +56,89 @@ class CompressionLib: class Chunk(Object): - def __init__(self, offset, url, length): + def __init__(self, number, offset, url, length): """ Initialize a Chunk. + :param int number: Chunk number. :param int offset: Chunk offset. :param str url: Signed URL. :param int length: Object length. """ super().__init__( + number=number, offset=offset, url=url, length=length ) -class MetadataPart(Object): +class Metadata(Object): """ - CTERA Direct I/O File Metadata Part - - :ivar int start: Starting offset. - :ivar int end: Ending offset. - :ivar int length: Length of the range in bytes. - :ivar bool encrypted: Indicates whether the range data is encrypted. - :ivar str encryption_key: Encryption key used for the range data. - :ivar bool compressed: Indicates whether the range data is compressed. - :ivar str compression_alg: Compression algorithm used for the range data. - :ivar list[cterasdk.common.object.Object] chunks: List of Chunk Objects. + CTERA Direct IO File Metadata """ - def __init__(self, offset, end, encrypted, encryption_key, compressed, compression_alg, chunks): + + def __init__(self, file_id, server_object): """ Initialize a Direct IO metadata response object. - :param int offset: Starting offset. - :param int end: Ending offset. - :param bool encrypted: Indicates whether the file is encrypted. - :param str encryption_key: Encryption key used for the file. - :param bool compressed: Indicates whether the file is compressed. - :param str compression_alg: Compression algorithm used for the file. - :param list[cterasdk.common.object.Object] chunks: List of Chunk Objects. + :param int file_id: File ID. + :param cterasdk.common.object.Object server_object: Response Object. """ super().__init__( - start=offset, - end=end, - length=(end - offset) + 1, - encrypted=encrypted, - encryption_key=encryption_key, - compressed=compressed, - compression_alg=compression_alg + file_id=file_id, + encrypted=server_object.encrypt_info.data_encrypted, + compressed=server_object.compression_type != CompressionLib.Off, + chunks=Metadata._format_chunks(server_object.chunks) ) - for chunk in chunks: - self.chunks.append(Chunk(offset, chunk.url, chunk.len)) - offset = offset + chunk.len + self.encryption_key = server_object.encrypt_info.wrapped_key if self.encrypted else None + self.compression_library = server_object.compression_type if self.compressed else None + last_chunk = self.chunks[-1] + self.size = last_chunk.offset + last_chunk.length @staticmethod - def from_server_object(server_object): - compressed = server_object.compression_type != CompressionLib.Off - start, end = map(int, server_object.actual_blocks_range.range.split('-')) - return MetadataPart( - start, - end, - server_object.encrypt_info.data_encrypted, - server_object.encrypt_info.wrapped_key if server_object.encrypt_info.data_encrypted else None, - compressed, - server_object.compression_type if compressed else None - ) - - def __repr__(self): - return str(self) - - def __str__(self): - return ( - f"{self.__class__.__name__}(" - f"{{'start': {self.start}, " - f"'end': {self.end}, " - f"'encrypted': {self.encrypted}, " - f"'compressed': {self.compressed}, " - f"'length': {self.length}, " - f"'chunks': {len(self.chunks)}}})" - ) - - -class Metadata(Object): - """ - CTERA Direct I/O File Metadata - - :ivar int file_id: File ID. - :ivar int file_size: File Size. - :ivar list[cterasdk.direct.types.MetadataPart] parts: List of Metadata Parts. - :ivar bool encrypted: Indicates whether the range data is encrypted. - :ivar str encryption_key: Encryption key used for the range data. - :ivar bool compressed: Indicates whether the range data is compressed. - :ivar str compression_alg: Compression algorithm used for the range data. - """ - def __init__(self, file_id, *parts): + def _format_chunks(server_object): """ - Initialize a Direct IO metadata response object. + Create Chunks. :param int file_id: File ID. - :param list[cterasdk.direct.types.MetadataPart] parts: List of Metadata Parts. + :param cterasdk.common.object.Object server_object: Server response. + :param list[int] blocks: List of block numbers to retrieve. + :returns: Chunk objects + :rtype: list[cterasdk.direct.types.Chunk] """ - super().__init__(file_id=file_id, parts=parts) - - @property - def size(self): - return self.parts[0].actual_blocks_range.file_size - - @property - def encrypted(self): - return self.parts[0].encrypted - - @property - def encryption_key(self): - return self.parts[0].encryption_key - - @property - def compressed(self): - return self.parts[0].compressed - - @property - def compression_alg(self): - return self.parts[0].compression_alg - - @property - def start(self): - return self.parts[0].start - - @property - def end(self): - return self.parts[-1].end + offset = 0 + chunks = [] + for number, chunk in enumerate(server_object, 1): + chunks.append(Chunk(number, offset, chunk.url, chunk.len)) + offset = offset + chunk.len + return chunks def serialize(self): """ - Serialize Direct I/O metadata to a dictionary. + Serialize Direct IO metadata to a dictionary. """ - chunks = [] - for part in self.parts: - chunks.extend(part.chunks) - - return { - 'file_id': self.file_id, - 'size': self.size, - 'encrypted': self.encrypted, - 'encryption_key': utils.utf8_decode(base64.b64encode(self.encryption_key)) if self.encrypted else None, - 'compressed': self.compressed, - 'compression_alg': self.compression_alg, - 'chunks': chunks - } - - def __repr__(self): - return str(self) - - def __str__(self): - d = self.serialize() - d.pop('encryption_key') - d['chunks'] = len(d['chunks']) - return d + x = copy.deepcopy(self) + if self.encrypted: + x.encryption_key = utils.utf8_decode(base64.b64encode(self.encryption_key)) + return x class Block: """Block""" - def __init__(self, file_id, offset, data, length): + def __init__(self, file_id, number, offset, data, length): """ Initialize a Block. :param int file_id: File ID. + :param int number: Block number. :param int offset: Block offset. :param bytes data: Bytes :param int length: Block length. """ self._file_id = file_id + self._number = number self._offset = offset self._data = data self._length = length @@ -235,6 +147,10 @@ def __init__(self, file_id, offset, data, length): def file_id(self): return self._file_id + @property + def number(self): + return self._number + @property def offset(self): return self._offset diff --git a/tests/ut/core/admin/test_servers.py b/tests/ut/core/admin/test_servers.py index efe793aa..65f7df3e 100644 --- a/tests/ut/core/admin/test_servers.py +++ b/tests/ut/core/admin/test_servers.py @@ -112,7 +112,7 @@ def test_modify_success(self): self.assertEqual(ret, put_response) def test_system_database(self): - with mock.patch("cterasdk.core.servers.query.database") as query_mock: + with mock.patch("cterasdk.core.servers.query.run") as query_mock: query_mock.return_value = munch.Munch({ 'objects': [munch.Munch({'mainDB': True})] }) @@ -130,22 +130,24 @@ def test_system_database(self): self.assertEqual(server.mainDB, True) def test_enable_server_backup(self): + self._init_global_admin() server_name = 'server' - with mock.patch("cterasdk.core.servers.query.database") as query_mock: + with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock: query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': None}) bucket, access, secret, endpoint = 'bucket-name', 'access', 'secret', 'www.endpoint.com' bucket = AmazonS3(bucket, access, secret, endpoint, True, verify_ssl=False) - servers.Servers(self._global_admin).backup.enable(60) + servers.Servers(self._global_admin).backup.enable(bucket, 60) self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY) actual_param = self._global_admin.api.put.call_args[0][1] expected_param = munch.Munch({ 'enabled': True, 'exportSchedulePeriod': 60, - 'details': self._create_database_backup_server_object(bucket) + 'details': TestCoreServers._create_database_backup_server_object(bucket) }) - self._assert_equal_objects(actual_param, expected_param) + self._assert_equal_objects(actual_param.backupToBucket, expected_param) - def _create_database_backup_server_object(self, bucket): + @staticmethod + def _create_database_backup_server_object(bucket): return munch.Munch({ 'storage': bucket.driver, 'bucket': bucket.bucket, @@ -159,8 +161,9 @@ def _create_database_backup_server_object(self, bucket): }) def test_disable_server_backup(self): + self._init_global_admin() server_name = 'server' - with mock.patch("cterasdk.core.servers.query.database") as query_mock: + with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock: query_mock.return_value = munch.Munch({'name': server_name, 'backupToBucket': munch.Munch({'enabled': True})}) servers.Servers(self._global_admin).backup.disable() self._global_admin.api.put.assert_called_once_with(f'/servers/{server_name}', mock.ANY) @@ -168,8 +171,9 @@ def test_disable_server_backup(self): self._assert_equal_objects(actual_param.backupToBucket.enabled, False) def test_server_backup_status(self): - with mock.patch("cterasdk.core.servers.query.database") as query_mock: - query_mock.return_value = munch.Munch({'backupToBucket': munch.Munch({'status', 'Connected'})}) + self._init_global_admin() + with mock.patch("cterasdk.core.servers.Servers.system_database", new_callable=mock.PropertyMock) as query_mock: + query_mock.return_value = munch.Munch({'backupToBucket': munch.Munch({'status': 'Connected'})}) ret = servers.Servers(self._global_admin).backup.connected() self.assertEqual(ret, True)