diff --git a/cassandra_mirror/backup.py b/cassandra_mirror/backup.py index e80156d..d1b2e6a 100644 --- a/cassandra_mirror/backup.py +++ b/cassandra_mirror/backup.py @@ -1,4 +1,7 @@ from collections import namedtuple +from fcntl import flock +from fcntl import LOCK_EX +from fcntl import LOCK_NB from io import BytesIO from operator import attrgetter import functools @@ -17,8 +20,8 @@ from .backup_helpers import stat_helper from .obsoletion import cleanup_obsoleted from .obsoletion import mark_obsoleted +from .prune import prune from .util import MovingTemporaryDirectory -from .util import S3Path from .util import compute_top_prefix from .util import continuity_code from .util import gof3r @@ -31,7 +34,6 @@ from plumbum.cmd import tar logger = logging.getLogger(__name__) - # A recursive directory walker def find(path, depth): if depth == 0: @@ -414,14 +416,15 @@ def transform_cf_for_manifest(cfs): buf.seek(0) return buf -def upload_global_manifest(columnfamilies, destination): +def upload_global_manifest(columnfamilies, destination, marker_dir): # S3 can only scan ascending. Since we usually want to start with the # newest backup and work backward, we'll format the string so that # the newest backup sorts lexicographically lowest. t = time.time() t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t)) - ns_since_epoch = int(time.time() * 1e9) - label = '{:016x} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string) + ns_since_epoch = time.time() * 1e9 + label = '{} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string) + (marker_dir / label).touch() destination = destination.with_components('manifests', label) body = transform_cf_for_manifest(columnfamilies) @@ -460,13 +463,20 @@ def do_backup(): ) state_dir.mkdir() + lock_fh = (state_dir / 'lock').open('w') + flock(lock_fh.fileno(), LOCK_EX | LOCK_NB) + locs = Locations(data_dir, data_dir / 'data', state_dir, state_dir / 'links') fix_identity(config, locs) - destination = S3Path(config['s3']['bucket'], compute_top_prefix(config)) + destination = compute_top_prefix(config) cf_specs = backup_all_sstables(config, locs, destination) - label = upload_global_manifest(cf_specs, destination) + + marker_dir = (state_dir / 'manifests') + marker_dir.mkdir() + label = upload_global_manifest(cf_specs, destination, marker_dir) print(label) + prune(destination, config['ttl'], marker_dir, 8) mark_obsoleted(locs) cleanup_obsoleted(locs, 0) diff --git a/cassandra_mirror/prune.py b/cassandra_mirror/prune.py new file mode 100644 index 0000000..93b1447 --- /dev/null +++ b/cassandra_mirror/prune.py @@ -0,0 +1,90 @@ +import argparse +import logging +import os +import sys +from time import mktime +from time import time + +import boto3 + +from .restore import get_generations_referenced_by_manifest +from .util import compute_top_prefix +from .util import load_config +from .util import reverse_format_nanoseconds + +s3 = boto3.resource('s3') + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +def to_timestamp(t): + return mktime(t.timetuple()) + +def delete_manifests(source, labels_to_keep, prune_before): + source = source.with_components('manifests') + + trim_length = len(source.key) + 1 + start_label = reverse_format_nanoseconds(prune_before * 1e9) + for o in source.descendants(start=start_label): + if o.key[trim_length:] in labels_to_keep: + continue + + logger.info('Deleting %s', o.key) + o.delete() + +def delete_data(source, generations_to_keep): + source = source.with_components('data') + + trim_length = len(source.key) + 1 + for o in source.descendants(): + # Trim the key to remove `source` as a prefix + trimmed_key = o.key[trim_length:] + ks, cf, gen, _ = trimmed_key.split('/', 3) + if (ks, cf, gen) in generations_to_keep: + continue + + logger.info('Deleting %s', o.key) + o.delete() + +def verify_labels_is_comprehensive(source, labels_to_keep, grace_after): + source = source.with_components('manifests') + stop_at = reverse_format_nanoseconds(grace_after * 1e9) + + trim_length = len(source.key) + 1 + for o in source.descendants(): + label = o.key[trim_length:] + print((label, stop_at)) + if label > stop_at: + break + + if label not in labels_to_keep: + raise RuntimeError('List of labels to keep does not cover the grace period') + +def prune(source, ttl, marker_dir, label_threshold): + label_files = marker_dir.list() + + labels_to_keep = set() + label_files_to_delete = [] + + prune_before = time() - ttl * 60 * 60 + for label_file in label_files: + if label_file.stat().st_mtime < prune_before: + label_files_to_delete.append(label_file) + else: + labels_to_keep.add(label_file.name) + + if len(label_files_to_delete) < label_threshold: + return + + generations_to_keep = set() + + for label in labels_to_keep: + for i in get_generations_referenced_by_manifest(source, label): + ks, cf, gen, _ = i + generations_to_keep.add((ks, cf, gen)) + + delete_manifests(source, labels_to_keep, prune_before) + delete_data(source, generations_to_keep) + + for label_file in label_files_to_delete: + label_file.delete() diff --git a/cassandra_mirror/restore.py b/cassandra_mirror/restore.py index e7d94a6..db9eadd 100644 --- a/cassandra_mirror/restore.py +++ b/cassandra_mirror/restore.py @@ -20,7 +20,6 @@ from plumbum.commands.modifiers import PIPE import boto3 -from .util import S3Path from .util import MovingTemporaryDirectory from .util import compute_top_prefix from .util import continuity_code @@ -47,14 +46,6 @@ def get_common_prefixes(bucket, prefix): for i in result.search('CommonPrefixes'): yield i['Prefix'][cut:-1] -def get_manifest_entries(source, label): - source = source.with_components('manifests', label) - manifest = source.read_utf8() - for i in manifest.splitlines(keepends=False): - max_generation, ks_cf = i.split() - ks, cf = ks_cf.split('/') - yield ks, cf, max_generation - def keypipe_cmd(provider_args, context): import keypipe from keypipe.plumbum_helpers import ThreadCommand @@ -95,7 +86,6 @@ def download_to_path( context = serialize_context(encryption_context) logger.debug("Invoking keypipe with context %s", context) - prefix = marker_path.name + '.' with TemporaryDirectory(prefix=prefix, dir=destination.up()) as d: @@ -134,15 +124,20 @@ def download_to_path( i.link(destination / i.name) - def download_sstable_to_path( config, - path, - objects, - sstable_context, - mutable_mtime + dest, + sstable, ): - if (path / 'data').exists(): + ks, cf, gen, mtime = sstable + dest = dest / ks / cf / gen + + sstable_context = dict(config['context']) + sstable_context['keyspace'] = ks + sstable_context['columnfamily'] = cf + sstable_context['generation'] = int(gen) + + if (dest / 'data').exists(): """Note that we use the existence of a directory named 'data' to inhibit downloads. This stands in contrast to uploads, which we inhibit with separate marker files. @@ -158,17 +153,23 @@ def download_sstable_to_path( config['encryption']['provider']: config['encryption']['args'] } - uploaded_dir = path / 'uploaded' + uploaded_dir = dest / 'uploaded' uploaded_dir.mkdir() - with MovingTemporaryDirectory(path / 'data') as temp: + reversed_mtime = reverse_format_nanoseconds(mtime) + + objects = ( + ('mutable', source.with_components('mutable', reversed_mtime)), + ('immutable', source.with_components('immutable')), + ) + + with MovingTemporaryDirectory(dest / 'data') as temp: for (marker_name, s3_object) in objects: context = dict(sstable_context) - context.update(config['context']) context['component'] = marker_name context['continuity'] = continuity_code if marker_name == 'mutable': - context['timestamp'] = mutable_mtime + context['timestamp'] = mtime marker_path = uploaded_dir / marker_name download_to_path( @@ -181,68 +182,49 @@ def download_sstable_to_path( temp.finalize() -ManifestEntry = namedtuple('ManifestEntry', 'generation, mtime') - +def create_manifest_markers(sources, dest): + """Creates the files indicating that a manifest was uploaded for this + generation. By doing so, we inhibit future manifests for being uploaded for + this generation.""" -def get_sstables_to_download_for_cf(source, ks, cf, max_gen): - manifest = source.with_components('data', ks, cf, max_gen, 'manifest') - manifest_lines = manifest.read_utf8().splitlines(keepends=False) - return [ManifestEntry(*line.split()) for line in manifest_lines] - -def create_metadata_directories(sstables): - for cf_dir, ks, cf, manifest_entries in sstables: - last_entry = manifest_entries[-1] - uploaded_dir = (cf_dir / last_entry.generation / 'uploaded') + for ks, cf, gen in sources: + uploaded_dir = dest / ks / cf / gen / 'uploaded' uploaded_dir.mkdir() - (uploaded_dir / 'manifest').mkdir() - -def _get_sstable_download_instructions( - cf_dir, - source, - context, - entry, -): - generation, mutable_mtime = entry - generation_dir = cf_dir / entry.generation - source = source.with_components(entry.generation) - context['generation'] = int(generation) - - mutable_mtime = int(entry.mtime) - reversed_mtime = reverse_format_nanoseconds(mutable_mtime) + (uploaded_dir / 'manifest').touch() - objects = ( - ('mutable', source.with_components('mutable', reversed_mtime)), - ('immutable', source.with_components('immutable')), - ) +""" +Hierarchical Manifest Layout - return generation_dir, objects, context, mutable_mtime +A global manifest points to individual columnfamilies, each with a maximum +generation. Each generation points to its own data files. Too, its manifest +references zero or more other generations. The CF manifests are not transitive: +we do not consult the manifest of any generation other than the maximum one. -def _get_download_instructions_for_cf( - identity, - source, - sstable, -): - cf_dir, ks, cf, manifest_entries = sstable - source = source.with_components('data', ks, cf) - context = dict( - identity=identity, - keyspace=ks, - columnfamily=cf, - ) +Manifest (manifests/