diff --git a/cassandra_mirror/backup.py b/cassandra_mirror/backup.py index a4fc1f3..98900fd 100644 --- a/cassandra_mirror/backup.py +++ b/cassandra_mirror/backup.py @@ -1,4 +1,5 @@ from collections import namedtuple +from io import BytesIO from operator import attrgetter import functools import logging @@ -6,25 +7,18 @@ import time import sys -# This import patches plumbum's BaseCommand. -from keypipe.plumbum_helpers import ThreadCommand +from plumbum import local +from plumbum import BG from plumbum import FG from plumbum import LocalPath +from plumbum.commands.processes import CommandNotFound import boto3 -import keypipe - -from plumbum.cmd import lz4 -from plumbum.cmd import tar -from plumbum.cmd import s3gof3r from .identity import get_identity - from .backup_helpers import is_sstable_toc from .backup_helpers import stat_helper - from .obsoletion import cleanup_obsoleted from .obsoletion import mark_obsoleted - from .util import MovingTemporaryDirectory from .util import compute_top_prefix from .util import continuity_code @@ -32,10 +26,30 @@ from .util import serialize_context from .util import timed_touch +from plumbum.cmd import lz4 +from plumbum.cmd import tar +gof3r = None +try: + # local.get raises exceptions, violating Python norms of .get() not + # raising exceptions + gof3r = local.get('gof3r') +except CommandNotFound: + pass + logger = logging.getLogger(__name__) s3 = boto3.resource('s3') +# despite being class-like, s3.Object is not a class +def S3Path(bucket_name, key): + self = s3.Object(bucket_name, key) + def _with_components(*components): + new_key = '/'.join((self.key,) + components) + return S3Path(self.bucket_name, new_key) + self.with_components = _with_components + + return self + # A recursive directory walker def find(path, depth): if depth == 0: @@ -143,9 +157,14 @@ def get_context(self): continuity=continuity_code, ) -def compute_destination_prefix(config, identity, cf_path): + def format_generation(self): + return '{:010}'.format(self.generation) + + + +def compute_destination_prefix(top_prefix, cf_path): return '/'.join(( - compute_top_prefix(config, identity), + top_prefix, cf_path.dirname.name, cf_path.name, )) @@ -161,30 +180,47 @@ def tar_stream_command(dirname, files): cmd = tar.__getitem__(tar_args) return cmd, size, max_mtime +def upload_s3(cmd, s3_object): + if gof3r: + gof3r_cmd = gof3r[ + 'put', + '--no-md5', + '-b', s3_object.bucket_name, + '-k', s3_object.key, + ] + + cmd | gof3r_cmd & FG + else: + future = cmd & BG + s3_object.upload_fileobj(future.proc.stdout) + +def wrap_with_keypipe(cmd, context, config): + # This import patches plumbum's BaseCommand. + from keypipe.plumbum_helpers import ThreadCommand + import keypipe + + keypipe_partial = functools.partial( + keypipe.seal, + config['provider'], + config['args'], + context, + ) + cmd = cmd / keypipe_partial + def upload_pipe(data_cmd, s3_object, encryption_context, encryption_config): """Pipes the output of data_cmd into S3. Dataflow: data_cmd | lz4 | keypipe | s3 """ - gof3r_cmd = s3gof3r[ - 'put', - '--no-md5', - '-b', s3_object.bucket_name, - '-k', s3_object.key, - ] - context = serialize_context(encryption_context) - logger.debug("Invoking keypipe with context %s", context) + logger.debug("Processing SSTable with context %s", context.decode('ascii')) + if encryption_config: + cmd = wrap_with_keypipe(cmd, context, encryption_config) - keypipe_partial = functools.partial( - keypipe.seal, - encryption_config['provider'], - encryption_config['args'], - context, - ) + cmd = data_cmd | lz4 - (data_cmd | lz4 / keypipe_partial | gof3r_cmd) & FG + upload_s3(cmd, s3_object) def upload_sstable_component( marker_info, @@ -218,12 +254,11 @@ def merge_dicts(*args): ret.update(d) return ret -def upload_sstable_immutable(config, identity, sstable, prefix, context, marker_dir): +def upload_sstable_immutable(sstable, destination, encryption_config, + context, marker_dir): + marker = marker_dir / 'immutable' if not marker.exists(): - destination_key = '/'.join((prefix, 'immutable')) - destination = s3.Object(config['s3']['bucket'], destination_key) - files = sstable.get_immutable_files() cmd, size, mtime = tar_stream_command( sstable.path.dirname, @@ -234,13 +269,13 @@ def upload_sstable_immutable(config, identity, sstable, prefix, context, marker_ (marker, mtime), size, cmd, - destination, + destination.with_components('immutable'), merge_dicts(context, dict(component='immutable')), - config['encryption'] + encryption_config, ) -def upload_sstable_mutable(config, identity, sstable, prefix, context, marker_dir): - marker = marker_dir / 'mutable' +def upload_sstable_mutable(sstable, destination, encryption_config, + context, marker_dir): files = sstable.get_mutable_files() cmd, size, mtime = tar_stream_command( @@ -248,17 +283,19 @@ def upload_sstable_mutable(config, identity, sstable, prefix, context, marker_di files, ) + marker = marker_dir / 'mutable' if ( not marker.exists() or mtime > marker.stat().st_mtime_ns ): + # We namespace the mutable components by their mtime + # If we did not, a point-in-time recovery would get incorrect + # repairedAt times, and we would not repair these SSTables. reversed_mtime = (1 << 64) - mtime - destination_key = '/'.join(( - prefix, + destination = destination.with_components( 'mutable', - '{:020}'.format(reversed_mtime) - )) - destination = s3.Object(config['s3']['bucket'], destination_key) + '{:016x}'.format(reversed_mtime) + ) upload_sstable_component( (marker, mtime), @@ -266,29 +303,26 @@ def upload_sstable_mutable(config, identity, sstable, prefix, context, marker_di cmd, destination, merge_dicts(context, dict(component='mutable', timestamp=mtime)), - config['encryption'], + encryption_config, ) return mtime - -def upload_sstable(config, identity, upload_prefix, sstable): +def upload_sstable(config, destination, sstable): marker_dir = sstable.path.dirname.dirname / 'uploaded' marker_dir.mkdir() context = dict(config['context']) context.update(sstable.get_context()) - context['identity'] = identity # the S3 prefix to be used by all uploaded components - sstable_prefix = '/'.join(( - upload_prefix, - 'data', - '{:010}'.format(sstable.generation), - )) + destination = destination.with_components(sstable.format_generation()) - upload_sstable_immutable(config, identity, sstable, sstable_prefix, context, marker_dir) - mtime = upload_sstable_mutable(config, identity, sstable, sstable_prefix, context, marker_dir) + encryption_config = config.get('encryption') + upload_sstable_immutable(sstable, destination, encryption_config, + context, marker_dir) + mtime = upload_sstable_mutable(sstable, destination, encryption_config, + context, marker_dir) return sstable.generation, mtime @@ -309,7 +343,7 @@ def hardlink_sstable(state_dir, sstable): state_path = ( state_dir / sstable.keyspace / '{}-{}'.format(sstable.cf_name, sstable.cf_uuid) / - '{:010}'.format(sstable.generation) / + sstable.format_generation() / 'data' ) if not state_path.exists(): @@ -329,49 +363,39 @@ def hardlink_sstable(state_dir, sstable): return SSTable.from_copied_toc(state_path / sstable.path.name) -def actual_upload_manifest(bucket, prefix, sstable, tables): - # S3 can only scan ascending. Since we usually want to start with the - # highest generation and work downward, we'll format the string so that - # the highest generation sorts lexicographically lowest. - generation_string = '{:010}-{:010}'.format( - # the maximum generation is actually 2**31 - 1 - (1 << 32) - sstable.generation, - sstable.generation, - ) - - s3_key = '/'.join(( - prefix, +def actual_upload_cf_manifest(destination, sstable, tables): + destination = destination.with_components( + sstable.format_generation(), 'manifest', - generation_string, - )) + ) - body = ''.join([ + body = BytesIO(''.join([ '{:010} {}\n'.format(generation, mtime) for (generation, mtime) in tables - ]).encode('utf8') - - s3 = boto3.client('s3') - s3.put_object( - Bucket=bucket, - Key=s3_key, - Body=body, - ) + ]).encode('utf8')) + destination.upload_fileobj(body) -def upload_manifest(bucket, prefix, sstable, tables): +def upload_cf_manifest(destination, sstable, tables): marker = sstable.path.dirname.dirname / 'uploaded' / 'manifest' if not marker.exists(): - actual_upload_manifest(bucket, prefix, sstable, tables) + actual_upload_cf_manifest(destination, sstable, tables) marker.touch() -def backup_columnfamily(links_dir, config, identity, cf_path): +def backup_columnfamily(cf_path, links_dir, destination, config): """Performs backup of a single columnfamily.""" + ks_dir, cf = cf_path.dirname, cf_path.basename + ks = ks_dir.basename + descriptor = '{}/{}'.format(ks, cf) linked_sstables = [ hardlink_sstable(links_dir, t) for t in get_sstables_for_columnfamily(cf_path) ] + if len(linked_sstables) == 0: + return None + """Invariant: all SSTables in linked_sstables are now hardlinked. If we didn't hard link the SSTables, Cassandra could delete files we were @@ -380,12 +404,12 @@ def backup_columnfamily(links_dir, config, identity, cf_path): successive runs, and we would never upload a manifest file. """ - upload_prefix = compute_destination_prefix(config, identity, cf_path) + destination = destination.with_components('data', ks, cf) - # uploaded_sstables is a list of (generation, mtime) pairs that + # seen_sstables is a list of (generation, mtime) pairs that # upload_manifest can format into a manifest. - uploaded_sstables = [ - upload_sstable(config, identity, upload_prefix, t) + seen_sstables = [ + upload_sstable(config, destination, t) for t in linked_sstables ] @@ -396,28 +420,71 @@ def backup_columnfamily(links_dir, config, identity, cf_path): mention SSTable(s) that haven't been uploaded. """ - if len(linked_sstables) > 0: - upload_manifest( - config['s3']['bucket'], - upload_prefix, - linked_sstables[-1], - uploaded_sstables - ) + """ + get_sstables_for_columnfamily returned us SSTables from lowest to highest, + so to get the highest generation, we just take the last one. + """ + last_sstable = linked_sstables[-1] + + upload_cf_manifest( + destination, + last_sstable, + seen_sstables, + ) + + return descriptor, last_sstable.format_generation() + +def transform_cf_for_manifest(cfs): + buf = BytesIO() -def backup_all_sstables(config, locs): - identity = get_identity(locs.state_dir) - for cf_path in get_columnfamilies(locs.data_dir): - backup_columnfamily(locs.links_dir, config, identity, cf_path) + # filter out cfs with no SSTables + for generation, path in filter(None, cfs): + buf.write('{} {}\n'.format(path, generation).encode('utf-8')) + buf.seek(0) + return buf + +def upload_global_manifest(columnfamilies, destination): + # S3 can only scan ascending. Since we usually want to start with the + # newest backup and work backward, we'll format the string so that + # the newest backup sorts lexicographically lowest. + t = time.time() + t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t)) + ns_since_epoch = int(time.time() * 1e9) + label = '{:016x} {}'.format((1 << 64) - ns_since_epoch, t_string) + + destination = destination.with_components('manifests', label) + body = transform_cf_for_manifest(columnfamilies) + + destination.upload_fileobj(body) + return destination + +def fix_identity(config, locs): + identity = config['context'].get('identity') + if identity is None: + config['context']['identity'] = get_identity(locs.state_dir) + +def backup_all_sstables(config, locs, destination): + cf_paths = list(get_columnfamilies(locs.data_dir)) + return [ + backup_columnfamily(p, locs.links_dir, destination, config) + for p in cf_paths + ] class Locations(namedtuple('Locations', 'data_dir sstables_dir state_dir links_dir')): pass +def format_s3_url(o): + return 's3://{}/{}'.format(o.bucket_name, o.key) + def do_backup(): logging.basicConfig(stream=sys.stderr) logger.setLevel(logging.DEBUG) config = load_config() data_dir = LocalPath(config.get('data_dir', '/var/lib/cassandra')) + if not data_dir.exists(): + raise RuntimeException('data_dir does not exist') + state_dir = config.get('state_dir') state_dir = ( LocalPath(state_dir) if state_dir is not None @@ -426,6 +493,12 @@ def do_backup(): state_dir.mkdir() locs = Locations(data_dir, data_dir / 'data', state_dir, state_dir / 'links') - backup_all_sstables(config, locs) + fix_identity(config, locs) + + destination = S3Path(config['s3']['bucket'], compute_top_prefix(config)) + cf_specs = backup_all_sstables(config, locs, destination) + manifest = upload_global_manifest(cf_specs, destination) + print(format_s3_url(manifest)) + mark_obsoleted(locs) cleanup_obsoleted(locs, 0) diff --git a/cassandra_mirror/util.py b/cassandra_mirror/util.py index e1f03ec..1cc4990 100644 --- a/cassandra_mirror/util.py +++ b/cassandra_mirror/util.py @@ -5,7 +5,6 @@ from tempfile import TemporaryDirectory import os -from cachetools.func import ttl_cache import boto3 import json import yaml @@ -99,12 +98,12 @@ def finalize(self): pass -def compute_top_prefix(config, identity): +def compute_top_prefix(config): prefix = config['s3']['prefix_format'].format(**config['context']) return '/'.join(( prefix, - 'v1', - identity + 'v2', + config['context']['identity'] )) def serialize_context(o): diff --git a/config.yaml.example b/config.yaml.example new file mode 100644 index 0000000..525c8a3 --- /dev/null +++ b/config.yaml.example @@ -0,0 +1,13 @@ +--- + +context: + cluster: mycluster + # identity (optional, default: node uuid queried via JMX) + +s3: + bucket: mybucket + # prefix_format is used with Python string formatting as + # prefix_format.format(**context) + # Use it to namespace your backups however you desire. + prefix_format: 'backups/cassandra/{cluster}' + diff --git a/setup.py b/setup.py index 1b67854..8509486 100644 --- a/setup.py +++ b/setup.py @@ -3,14 +3,19 @@ setup( name='cassandra-mirror', - version='0.0.11', + version='0.1.1', author='Josh Snyder', - author_email='josh.snyder@fitbit.com', + author_email='josh@code406.com', packages=find_packages(), entry_points=dict( - console_scripts=[ + console_scripts=[ 'backup=cassandra_mirror.backup:do_backup', 'restore=cassandra_mirror.restore:do_restore', ] ), + install_requires=[ + 'boto3>=1.4.2', + 'PyYAML>=3.12', + 'plumbum>=1.6.3', + ], )