diff --git a/eblob_kit.py b/eblob_kit.py index 2a5603c..1cbc252 100755 --- a/eblob_kit.py +++ b/eblob_kit.py @@ -13,6 +13,7 @@ import struct import sys +from contextlib import contextmanager from datetime import datetime from datetime import timedelta @@ -20,6 +21,9 @@ import pyhash +DEFAULT_TEMP_PREFIX = 'tmp_' + + def dump_digest(verbosity, results_digest): """Dump report to console as JSON. @@ -61,6 +65,36 @@ def dump_to_file(file_name, results): logging.error('Failed to dump json report file %s: %s', file_name, e) +def move_from_temp(temp_path, prefix=DEFAULT_TEMP_PREFIX): + """Rename working temporary file to its origins. + + Rename file `temp_path`, restoring its original name (removing {prefix}): + some/path/{prefix}file -> some/path/file + + Raises: + ValueError - if temp_path file doesn't starts with prefix. + OSError - in case of renaming failure. + + """ + abs_path = os.path.abspath(temp_path) + + directory_path = os.path.dirname(abs_path) + basename = os.path.basename(abs_path) + + if not basename.startswith(prefix): + ValueError("temporary file {filename} doesn't contain valid prefix '{prefix}', couldn't remove prefix".format( + filename=basename, + prefix=prefix)) + + stripped_basename = basename.lstrip(prefix) + move_to_path = os.path.join(directory_path, stripped_basename) + + # NOTE: throws OSError, if temp_path not exists. + os.rename(temp_path, move_to_path) + + logging.info('Temporary file was renamed from "%s" to "%s"', temp_path, move_to_path) + + def is_destination_writable(src_path, dst_path, overwrite=False): """Check if 'dst_path' file writable. @@ -70,6 +104,36 @@ def is_destination_writable(src_path, dst_path, overwrite=False): return not os.path.exists(dst_path) or (not os.path.samefile(src_path, dst_path) and overwrite) +@contextmanager +def temp_context(entity_factory, destination_path, *args, **kwargs): + """Wrap blob object to temporary file path. + + Create entity_factory's object in temporary path and move it back to destination_path + after successful context block processing. + + In case of context block completion (either successfully or with exception), `closes` allocate resources. + + """ + dst_abspath = os.path.abspath(destination_path) + dst_dirname = os.path.dirname(dst_abspath) + dst_basename = os.path.basename(dst_abspath) + + tmp_basename = '{prefix}{name}'.format(prefix=DEFAULT_TEMP_PREFIX, name=dst_basename) + tmp_path = os.path.join(dst_dirname, tmp_basename) + + logging.info('Creating entity of type %s at path "%s"', type(entity_factory).__name__, tmp_path) + + wrapped_object = entity_factory.create(tmp_path, *args, **kwargs) + try: + yield wrapped_object + finally: + wrapped_object.close() + + # This section will be done after successful `with` context block completion + + wrapped_object.move_from_temp() + + class ReportType(object): """Command result report types. @@ -419,7 +483,7 @@ def __init__(self, path, mode='rb'): def create(path): """Create IndexFile for @path. - NOTE: underlying file is truncuated if exists. + NOTE: underlying file is truncated if exists. """ return IndexFile(path=path, mode='wb') @@ -441,6 +505,21 @@ def size(self): """Size of the file.""" return os.fstat(self._file.fileno()).st_size + def close(self): + """Close underlying file(s).""" + self._file.close() + + def move_from_temp(self): + """Remove temporary working prefixes from underlying file(s). + + NOTE: file should be closed beforehand. + TODO: implement via mixin. + """ + if not self._file.closed: + raise RuntimeError("index file should be closed before move {}".format(self._file.name)) + + move_from_temp(self._file.name) + def __getitem__(self, idx): assert (idx + 1) * DiskControl.size <= self.size() self._file.seek(idx * DiskControl.size) @@ -501,6 +580,21 @@ def read(self, offset, size): self._file.seek(offset) return self._file.read(size) + def close(self): + """Close underlying file(s).""" + self._file.close() + + def move_from_temp(self): + """Remove temporary working prefixes from underlying file(s). + + NOTE: file should be closed beforehand. + TODO: implement via mixin. + """ + if not self._file.closed: + raise RuntimeError("data file should be closed before move {}".format(self._file.name)) + + move_from_temp(self._file.name) + def __iter__(self): """Iterate over headers in the blob.""" self._file.seek(0) @@ -538,12 +632,12 @@ def __init__(self, path, mode='rb'): def create(path, mark_index_sorted=False): """Create new Blob at @path. - NOTE: underlying files are truncuated if they are exist. + NOTE: underlying files are truncated if they are exist. """ index_suffix = '.index.sorted' if mark_index_sorted else '.index' create_mode = 'wb' - # Index is checked for existance on Blob creation, so we should create it + # Index is checked for existence on Blob creation, so we should create it # beforehand, but data file would be created within Blob constructor. open(path + index_suffix, create_mode).close() @@ -559,6 +653,24 @@ def data(self): """Return data file.""" return self._data_file + def close(self): + """Close underlying file(s).""" + self._data_file.close() + self._index_file.close() + + def move_from_temp(self): + """Remove temporary working prefixes from underlying file(s). + + NOTE: file(s) should be closed beforehand. + """ + self._data_file.move_from_temp() + self._index_file.move_from_temp() + + @staticmethod + def exists(path): + """Check weather blob and index files exists at specified path.""" + return os.path.exists(path) and (os.path.exists(path + '.index') or os.path.exists(path + '.index.sorted')) + def _murmur_chunk(self, chunk): """Apply murmurhash to chunk and return raw result.""" chunk_size = 4096 @@ -1065,53 +1177,59 @@ def recover_index(data, destination, overwrite=False): basename = os.path.basename(data.path) index_path = os.path.join(destination, basename + '.index') + if os.path.exists(index_path) and not overwrite: + logging.info('Destination file "%s" already exists, skipping it', index_path) + return + if not is_destination_writable(data.path + '.index', index_path, overwrite): raise RuntimeError("can't recover to already existing index file: {}".format(index_path)) - index = IndexFile.create(index_path) - - logging.info('Recovering index %s -> %s', data.path, index_path) + with temp_context(IndexFile, index_path) as index: + logging.info('Recovering index %s -> %s', data.path, index.file.name) - for header in data: - if header: - index.append(header) - continue + for header in data: + if header: + index.append(header) + continue - offset = data.file.tell() - DiskControl.size - logging.error('I have found broken header at offset %s: %s', offset, header) - logging.error('This record can not be skipped, so I break the recovering. ' - 'You can use %s as an index for %s but it does not include ' - 'records after %s offset', - index.path, - data.path, - offset) - break + offset = data.file.tell() - DiskControl.size + logging.error('I have found broken header at offset %s: %s', offset, header) + logging.error('This record can not be skipped, so I break the recovering. ' + 'You can use %s as an index for %s but it does not include ' + 'records after %s offset', + index.path, + data.path, + offset) + break def recover_blob(self, destination, overwrite=False): """Recover blob from data.""" basename = os.path.basename(self._blob.data.path) blob_path = os.path.join(destination, basename) + if Blob.exists(blob_path) and not overwrite: + logging.info('Destination blob file "%s" already exists, skipping it', blob_path) + return + if not is_destination_writable(self._blob.data.path, blob_path, overwrite): raise RuntimeError("can't recover to already existing blob file: {}".format(blob_path)) - blob = Blob.create(path=blob_path) - copied_records = 0 removed_records = 0 skipped_records = 0 - logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path) + with temp_context(Blob, blob_path) as blob: + logging.info('Recovering blob %s -> %s', self._blob.data.path, blob.data.file.name) - for header in self._blob.data: - if not header: - skipped_records += 1 - logging.error('I have faced with broken record which I have to skip.') - elif header.flags.removed: - removed_records += 1 - else: - copy_record(self._blob, blob, header) - copied_records += 1 + for header in self._blob.data: + if not header: + skipped_records += 1 + logging.error('I have faced with broken record which I have to skip.') + elif header.flags.removed: + removed_records += 1 + else: + copy_record(self._blob, blob, header) + copied_records += 1 logging.info('I have copied %s records, skipped %s and removed %s records', copied_records, @@ -1123,21 +1241,25 @@ def copy_valid_records(self, destination, overwrite=False): basename = os.path.basename(self._blob.data.path) blob_path = os.path.join(destination, basename) + if Blob.exists(blob_path) and not overwrite: + logging.info('Destination blob file "%s" already exists, skipping it', blob_path) + return + if not is_destination_writable(self._blob.data.path, blob_path, overwrite): raise RuntimeError("can't copy valid records to already existing blob file: {}".format(blob_path)) - blob = Blob.create(blob_path) - copied_records = 0 copied_size = 0 - self._index_headers += self._stat.data_recoverable_headers - logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path) + with temp_context(Blob, blob_path) as blob: + logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path) - for header in self._index_headers: - copy_record(self._blob, blob, header) - copied_records += 1 - copied_size += header.disk_size + self._index_headers += self._stat.data_recoverable_headers + + for header in self._index_headers: + copy_record(self._blob, blob, header) + copied_records += 1 + copied_size += header.disk_size logging.info('I have copied %s (%s) records %s -> %s ', copied_records,