From 8f65693588e660ee56cb63fbd5028a7c5c8917b0 Mon Sep 17 00:00:00 2001 From: Josh Snyder Date: Tue, 28 Nov 2017 15:33:07 -0800 Subject: [PATCH 1/2] Add pruning support. This entails pretty involved changes to the restoration code paths --- cassandra_mirror/backup.py | 8 +- cassandra_mirror/prune.py | 101 +++++++++++++++++++++++++ cassandra_mirror/restore.py | 146 ++++++++++++++++-------------------- cassandra_mirror/util.py | 15 +++- setup.py | 1 + 5 files changed, 180 insertions(+), 91 deletions(-) create mode 100644 cassandra_mirror/prune.py diff --git a/cassandra_mirror/backup.py b/cassandra_mirror/backup.py index e80156d..56b1eb3 100644 --- a/cassandra_mirror/backup.py +++ b/cassandra_mirror/backup.py @@ -18,7 +18,6 @@ from .obsoletion import cleanup_obsoleted from .obsoletion import mark_obsoleted from .util import MovingTemporaryDirectory -from .util import S3Path from .util import compute_top_prefix from .util import continuity_code from .util import gof3r @@ -31,7 +30,6 @@ from plumbum.cmd import tar logger = logging.getLogger(__name__) - # A recursive directory walker def find(path, depth): if depth == 0: @@ -420,8 +418,8 @@ def upload_global_manifest(columnfamilies, destination): # the newest backup sorts lexicographically lowest. t = time.time() t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t)) - ns_since_epoch = int(time.time() * 1e9) - label = '{:016x} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string) + ns_since_epoch = time.time() * 1e9 + label = '{} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string) destination = destination.with_components('manifests', label) body = transform_cf_for_manifest(columnfamilies) @@ -463,7 +461,7 @@ def do_backup(): locs = Locations(data_dir, data_dir / 'data', state_dir, state_dir / 'links') fix_identity(config, locs) - destination = S3Path(config['s3']['bucket'], compute_top_prefix(config)) + destination = compute_top_prefix(config) cf_specs = backup_all_sstables(config, locs, destination) label = upload_global_manifest(cf_specs, destination) print(label) diff --git a/cassandra_mirror/prune.py b/cassandra_mirror/prune.py new file mode 100644 index 0000000..4bbf9f9 --- /dev/null +++ b/cassandra_mirror/prune.py @@ -0,0 +1,101 @@ +import argparse +import logging +import sys +from time import mktime +from time import time + +import boto3 + +from .restore import get_generations_referenced_by_manifest +from .util import compute_top_prefix +from .util import load_config +from .util import reverse_format_nanoseconds + +s3 = boto3.resource('s3') + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +def to_timestamp(t): + return mktime(t.timetuple()) + +def delete_manifests(source, labels_to_keep, grace_after): + source = source.with_components('manifests') + + trim_length = len(source.key) + 1 + start_label = reverse_format_nanoseconds(grace_after * 1e9) + for o in source.descendants(start=start_label): + if o.key[trim_length:] in labels_to_keep: + continue + + logger.info('Deleting %s', o.key) + o.delete() + +def delete_data(source, generations_to_keep, grace_after): + source = source.with_components('data') + + trim_length = len(source.key) + 1 + for o in source.descendants(): + if to_timestamp(o.last_modified) > grace_after: + continue + + # Trim the key to remove `source` as a prefix + trimmed_key = o.key[trim_length:] + ks, cf, gen, _ = trimmed_key.split('/', 3) + if (ks, cf, gen) in generations_to_keep: + continue + + logger.info('Deleting %s', o.key) + o.delete() + +def verify_labels_is_comprehensive(source, labels_to_keep, grace_after): + source = source.with_components('manifests') + stop_at = reverse_format_nanoseconds(grace_after * 1e9) + + trim_length = len(source.key) + 1 + for o in source.descendants(): + label = o.key[trim_length:] + print((label, stop_at)) + if label > stop_at: + break + + if label not in labels_to_keep: + raise RuntimeError('List of labels to keep does not cover the grace period') + + +def prune(ttl, labels_to_keep): + config = load_config() + + grace_after = time() - ttl * 60 * 60 + + source = compute_top_prefix(config) + + labels_to_keep = set(labels_to_keep) + generations_to_keep = set() + + + verify_labels_is_comprehensive(source, labels_to_keep, grace_after) + + for label in labels_to_keep: + for i in get_generations_referenced_by_manifest(source, label): + ks, cf, gen, _ = i + generations_to_keep.add((ks, cf, gen)) + + delete_manifests(source, labels_to_keep, grace_after) + delete_data(source, generations_to_keep, grace_after) + +def do_prune(): + parser = argparse.ArgumentParser( + description='Prune a cassandra-mirror backup directory of old backups', + ) + + parser.add_argument('ttl', type=int, help='TTL, in hours') + parser.add_argument('label', nargs='+', + help='Backup labels to retain' + ) + + args = parser.parse_args() + + logging.basicConfig(stream=sys.stderr) + + prune(args.ttl, args.label) diff --git a/cassandra_mirror/restore.py b/cassandra_mirror/restore.py index e7d94a6..db9eadd 100644 --- a/cassandra_mirror/restore.py +++ b/cassandra_mirror/restore.py @@ -20,7 +20,6 @@ from plumbum.commands.modifiers import PIPE import boto3 -from .util import S3Path from .util import MovingTemporaryDirectory from .util import compute_top_prefix from .util import continuity_code @@ -47,14 +46,6 @@ def get_common_prefixes(bucket, prefix): for i in result.search('CommonPrefixes'): yield i['Prefix'][cut:-1] -def get_manifest_entries(source, label): - source = source.with_components('manifests', label) - manifest = source.read_utf8() - for i in manifest.splitlines(keepends=False): - max_generation, ks_cf = i.split() - ks, cf = ks_cf.split('/') - yield ks, cf, max_generation - def keypipe_cmd(provider_args, context): import keypipe from keypipe.plumbum_helpers import ThreadCommand @@ -95,7 +86,6 @@ def download_to_path( context = serialize_context(encryption_context) logger.debug("Invoking keypipe with context %s", context) - prefix = marker_path.name + '.' with TemporaryDirectory(prefix=prefix, dir=destination.up()) as d: @@ -134,15 +124,20 @@ def download_to_path( i.link(destination / i.name) - def download_sstable_to_path( config, - path, - objects, - sstable_context, - mutable_mtime + dest, + sstable, ): - if (path / 'data').exists(): + ks, cf, gen, mtime = sstable + dest = dest / ks / cf / gen + + sstable_context = dict(config['context']) + sstable_context['keyspace'] = ks + sstable_context['columnfamily'] = cf + sstable_context['generation'] = int(gen) + + if (dest / 'data').exists(): """Note that we use the existence of a directory named 'data' to inhibit downloads. This stands in contrast to uploads, which we inhibit with separate marker files. @@ -158,17 +153,23 @@ def download_sstable_to_path( config['encryption']['provider']: config['encryption']['args'] } - uploaded_dir = path / 'uploaded' + uploaded_dir = dest / 'uploaded' uploaded_dir.mkdir() - with MovingTemporaryDirectory(path / 'data') as temp: + reversed_mtime = reverse_format_nanoseconds(mtime) + + objects = ( + ('mutable', source.with_components('mutable', reversed_mtime)), + ('immutable', source.with_components('immutable')), + ) + + with MovingTemporaryDirectory(dest / 'data') as temp: for (marker_name, s3_object) in objects: context = dict(sstable_context) - context.update(config['context']) context['component'] = marker_name context['continuity'] = continuity_code if marker_name == 'mutable': - context['timestamp'] = mutable_mtime + context['timestamp'] = mtime marker_path = uploaded_dir / marker_name download_to_path( @@ -181,68 +182,49 @@ def download_sstable_to_path( temp.finalize() -ManifestEntry = namedtuple('ManifestEntry', 'generation, mtime') - +def create_manifest_markers(sources, dest): + """Creates the files indicating that a manifest was uploaded for this + generation. By doing so, we inhibit future manifests for being uploaded for + this generation.""" -def get_sstables_to_download_for_cf(source, ks, cf, max_gen): - manifest = source.with_components('data', ks, cf, max_gen, 'manifest') - manifest_lines = manifest.read_utf8().splitlines(keepends=False) - return [ManifestEntry(*line.split()) for line in manifest_lines] - -def create_metadata_directories(sstables): - for cf_dir, ks, cf, manifest_entries in sstables: - last_entry = manifest_entries[-1] - uploaded_dir = (cf_dir / last_entry.generation / 'uploaded') + for ks, cf, gen in sources: + uploaded_dir = dest / ks / cf / gen / 'uploaded' uploaded_dir.mkdir() - (uploaded_dir / 'manifest').mkdir() - -def _get_sstable_download_instructions( - cf_dir, - source, - context, - entry, -): - generation, mutable_mtime = entry - generation_dir = cf_dir / entry.generation - source = source.with_components(entry.generation) - context['generation'] = int(generation) - - mutable_mtime = int(entry.mtime) - reversed_mtime = reverse_format_nanoseconds(mutable_mtime) + (uploaded_dir / 'manifest').touch() - objects = ( - ('mutable', source.with_components('mutable', reversed_mtime)), - ('immutable', source.with_components('immutable')), - ) +""" +Hierarchical Manifest Layout - return generation_dir, objects, context, mutable_mtime +A global manifest points to individual columnfamilies, each with a maximum +generation. Each generation points to its own data files. Too, its manifest +references zero or more other generations. The CF manifests are not transitive: +we do not consult the manifest of any generation other than the maximum one. -def _get_download_instructions_for_cf( - identity, - source, - sstable, -): - cf_dir, ks, cf, manifest_entries = sstable - source = source.with_components('data', ks, cf) - context = dict( - identity=identity, - keyspace=ks, - columnfamily=cf, - ) +Manifest (manifests/