Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 161 additions & 39 deletions eblob_kit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import struct
import sys

from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta

import click
import pyhash


DEFAULT_TEMP_PREFIX = 'tmp_'


def dump_digest(verbosity, results_digest):
"""Dump report to console as JSON.

Expand Down Expand Up @@ -61,6 +65,36 @@ def dump_to_file(file_name, results):
logging.error('Failed to dump json report file %s: %s', file_name, e)


def move_from_temp(temp_path, prefix=DEFAULT_TEMP_PREFIX):
"""Rename working temporary file to its origins.

Rename file `temp_path`, restoring its original name (removing {prefix}):
some/path/{prefix}file -> some/path/file

Raises:
ValueError - if temp_path file doesn't starts with prefix.
OSError - in case of renaming failure.

"""
abs_path = os.path.abspath(temp_path)

directory_path = os.path.dirname(abs_path)
basename = os.path.basename(abs_path)

if not basename.startswith(prefix):
ValueError("temporary file {filename} doesn't contain valid prefix '{prefix}', couldn't remove prefix".format(
filename=basename,
prefix=prefix))

stripped_basename = basename.lstrip(prefix)
move_to_path = os.path.join(directory_path, stripped_basename)

# NOTE: throws OSError, if temp_path not exists.
os.rename(temp_path, move_to_path)

logging.info('Temporary file was renamed from "%s" to "%s"', temp_path, move_to_path)


def is_destination_writable(src_path, dst_path, overwrite=False):
"""Check if 'dst_path' file writable.

Expand All @@ -70,6 +104,36 @@ def is_destination_writable(src_path, dst_path, overwrite=False):
return not os.path.exists(dst_path) or (not os.path.samefile(src_path, dst_path) and overwrite)


@contextmanager
def temp_context(entity_factory, destination_path, *args, **kwargs):
"""Wrap blob object to temporary file path.

Create entity_factory's object in temporary path and move it back to destination_path
after successful context block processing.

In case of context block completion (either successfully or with exception), `closes` allocate resources.

"""
dst_abspath = os.path.abspath(destination_path)
dst_dirname = os.path.dirname(dst_abspath)
dst_basename = os.path.basename(dst_abspath)

tmp_basename = '{prefix}{name}'.format(prefix=DEFAULT_TEMP_PREFIX, name=dst_basename)
tmp_path = os.path.join(dst_dirname, tmp_basename)

logging.info('Creating entity of type %s at path "%s"', type(entity_factory).__name__, tmp_path)

wrapped_object = entity_factory.create(tmp_path, *args, **kwargs)
try:
yield wrapped_object
finally:
wrapped_object.close()

# This section will be done after successful `with` context block completion

wrapped_object.move_from_temp()


class ReportType(object):
"""Command result report types.

Expand Down Expand Up @@ -419,7 +483,7 @@ def __init__(self, path, mode='rb'):
def create(path):
"""Create IndexFile for @path.

NOTE: underlying file is truncuated if exists.
NOTE: underlying file is truncated if exists.
"""
return IndexFile(path=path, mode='wb')

Expand All @@ -441,6 +505,21 @@ def size(self):
"""Size of the file."""
return os.fstat(self._file.fileno()).st_size

def close(self):
"""Close underlying file(s)."""
self._file.close()

def move_from_temp(self):
"""Remove temporary working prefixes from underlying file(s).

NOTE: file should be closed beforehand.
TODO: implement via mixin.
"""
if not self._file.closed:
raise RuntimeError("index file should be closed before move {}".format(self._file.name))

move_from_temp(self._file.name)

def __getitem__(self, idx):
assert (idx + 1) * DiskControl.size <= self.size()
self._file.seek(idx * DiskControl.size)
Expand Down Expand Up @@ -501,6 +580,21 @@ def read(self, offset, size):
self._file.seek(offset)
return self._file.read(size)

def close(self):
"""Close underlying file(s)."""
self._file.close()

def move_from_temp(self):
"""Remove temporary working prefixes from underlying file(s).

NOTE: file should be closed beforehand.
TODO: implement via mixin.
"""
if not self._file.closed:
raise RuntimeError("data file should be closed before move {}".format(self._file.name))

move_from_temp(self._file.name)

def __iter__(self):
"""Iterate over headers in the blob."""
self._file.seek(0)
Expand Down Expand Up @@ -538,12 +632,12 @@ def __init__(self, path, mode='rb'):
def create(path, mark_index_sorted=False):
"""Create new Blob at @path.

NOTE: underlying files are truncuated if they are exist.
NOTE: underlying files are truncated if they are exist.
"""
index_suffix = '.index.sorted' if mark_index_sorted else '.index'

create_mode = 'wb'
# Index is checked for existance on Blob creation, so we should create it
# Index is checked for existence on Blob creation, so we should create it
# beforehand, but data file would be created within Blob constructor.
open(path + index_suffix, create_mode).close()

Expand All @@ -559,6 +653,24 @@ def data(self):
"""Return data file."""
return self._data_file

def close(self):
"""Close underlying file(s)."""
self._data_file.close()
self._index_file.close()

def move_from_temp(self):
"""Remove temporary working prefixes from underlying file(s).

NOTE: file(s) should be closed beforehand.
"""
self._data_file.move_from_temp()
self._index_file.move_from_temp()

@staticmethod
def exists(path):
"""Check weather blob and index files exists at specified path."""
return os.path.exists(path) and (os.path.exists(path + '.index') or os.path.exists(path + '.index.sorted'))

def _murmur_chunk(self, chunk):
"""Apply murmurhash to chunk and return raw result."""
chunk_size = 4096
Expand Down Expand Up @@ -1065,53 +1177,59 @@ def recover_index(data, destination, overwrite=False):
basename = os.path.basename(data.path)
index_path = os.path.join(destination, basename + '.index')

if os.path.exists(index_path) and not overwrite:
logging.info('Destination file "%s" already exists, skipping it', index_path)
return

if not is_destination_writable(data.path + '.index', index_path, overwrite):
raise RuntimeError("can't recover to already existing index file: {}".format(index_path))

index = IndexFile.create(index_path)

logging.info('Recovering index %s -> %s', data.path, index_path)
with temp_context(IndexFile, index_path) as index:
logging.info('Recovering index %s -> %s', data.path, index.file.name)

for header in data:
if header:
index.append(header)
continue
for header in data:
if header:
index.append(header)
continue

offset = data.file.tell() - DiskControl.size
logging.error('I have found broken header at offset %s: %s', offset, header)
logging.error('This record can not be skipped, so I break the recovering. '
'You can use %s as an index for %s but it does not include '
'records after %s offset',
index.path,
data.path,
offset)
break
offset = data.file.tell() - DiskControl.size
logging.error('I have found broken header at offset %s: %s', offset, header)
logging.error('This record can not be skipped, so I break the recovering. '
'You can use %s as an index for %s but it does not include '
'records after %s offset',
index.path,
data.path,
offset)
break

def recover_blob(self, destination, overwrite=False):
"""Recover blob from data."""
basename = os.path.basename(self._blob.data.path)
blob_path = os.path.join(destination, basename)

if Blob.exists(blob_path) and not overwrite:
logging.info('Destination blob file "%s" already exists, skipping it', blob_path)
return

if not is_destination_writable(self._blob.data.path, blob_path, overwrite):
raise RuntimeError("can't recover to already existing blob file: {}".format(blob_path))

blob = Blob.create(path=blob_path)

copied_records = 0
removed_records = 0
skipped_records = 0

logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path)
with temp_context(Blob, blob_path) as blob:
logging.info('Recovering blob %s -> %s', self._blob.data.path, blob.data.file.name)

for header in self._blob.data:
if not header:
skipped_records += 1
logging.error('I have faced with broken record which I have to skip.')
elif header.flags.removed:
removed_records += 1
else:
copy_record(self._blob, blob, header)
copied_records += 1
for header in self._blob.data:
if not header:
skipped_records += 1
logging.error('I have faced with broken record which I have to skip.')
elif header.flags.removed:
removed_records += 1
else:
copy_record(self._blob, blob, header)
copied_records += 1

logging.info('I have copied %s records, skipped %s and removed %s records',
copied_records,
Expand All @@ -1123,21 +1241,25 @@ def copy_valid_records(self, destination, overwrite=False):
basename = os.path.basename(self._blob.data.path)
blob_path = os.path.join(destination, basename)

if Blob.exists(blob_path) and not overwrite:
logging.info('Destination blob file "%s" already exists, skipping it', blob_path)
return

if not is_destination_writable(self._blob.data.path, blob_path, overwrite):
raise RuntimeError("can't copy valid records to already existing blob file: {}".format(blob_path))

blob = Blob.create(blob_path)

copied_records = 0
copied_size = 0

self._index_headers += self._stat.data_recoverable_headers
logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path)
with temp_context(Blob, blob_path) as blob:
logging.info('Recovering blob %s -> %s', self._blob.data.path, blob_path)

for header in self._index_headers:
copy_record(self._blob, blob, header)
copied_records += 1
copied_size += header.disk_size
self._index_headers += self._stat.data_recoverable_headers

for header in self._index_headers:
copy_record(self._blob, blob, header)
copied_records += 1
copied_size += header.disk_size

logging.info('I have copied %s (%s) records %s -> %s ',
copied_records,
Expand Down