Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions cassandra_mirror/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ def upload_global_manifest(columnfamilies, destination, marker_dir):
t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t))
ns_since_epoch = time.time() * 1e9
label = '{} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string)

# Unlike every other marker file, we touch this one _ahead_ of actually
# completing the operation. That's because this marker file doesn't
# actually inhibit an upload. Instead it serves to tell the prune script
# that this backup deserves to be/remain in S3.
(marker_dir / label).touch()

destination = destination.with_components('manifests', label)
Expand All @@ -444,35 +449,27 @@ def backup_all_sstables(config, locs, destination):
for p in cf_paths
]

class Locations(namedtuple('Locations', 'data_dir sstables_dir state_dir links_dir')):
pass

def do_backup():
logging.basicConfig(stream=sys.stderr)
logger.setLevel(logging.DEBUG)

config = load_config()
data_dir = LocalPath(config.get('data_dir', '/var/lib/cassandra'))
if not data_dir.exists():
raise RuntimeException('data_dir does not exist')
config, locs = load_config()

state_dir = config.get('state_dir')
state_dir = (
LocalPath(state_dir) if state_dir is not None
else data_dir / 'mirroring'
)
state_dir.mkdir()
# fix_identity is only relevant for backup. For other code paths there is
# no state directory to rely upon.
fix_identity(config, locs)

lock_fh = (state_dir / 'lock').open('w')
flock(lock_fh.fileno(), LOCK_EX | LOCK_NB)
if not locs.data_dir.exists():
raise RuntimeException('data_dir does not exist')

locs = Locations(data_dir, data_dir / 'data', state_dir, state_dir / 'links')
fix_identity(config, locs)
locs.state_dir.mkdir()
lock_fh = (locs.state_dir / 'lock').open('w')
flock(lock_fh.fileno(), LOCK_EX | LOCK_NB)

destination = compute_top_prefix(config)
cf_specs = backup_all_sstables(config, locs, destination)

marker_dir = (state_dir / 'manifests')
marker_dir = (locs.state_dir / 'manifests')
marker_dir.mkdir()
label = upload_global_manifest(cf_specs, destination, marker_dir)
print(label)
Expand Down
79 changes: 79 additions & 0 deletions cassandra_mirror/check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import argparse
import logging
import sys

from botocore.exceptions import ClientError
import boto3

from .restore import get_sstable_objects
from .restore import get_sstables_for_labels

from .util import compute_top_prefix
from .util import load_config

logger = logging.getLogger(__name__)

def object_exists(path):
try:
path.load()
except ClientError as e:
if e.response['Error']['Code'] != "404":
raise

return False
return True

def check(labels):
config, locs = load_config()

if len(labels) == 0:
marker_dir = (locs.state_dir / 'manifests')
labels = [i.name for i in marker_dir.list()]

s3 = boto3.resource('s3')
source = compute_top_prefix(config)

sstables = get_sstables_for_labels(source, labels)

objects = set()
for s in sstables:
objects.update(get_sstable_objects(source, s))

corrupt_objects = set()
for type_, path in objects:
logger.info('Checking object: %s', path.key)
if not object_exists(path):
corrupt_objects.add(path)

for o in corrupt_objects:
print(o.key)

if len(corrupt_objects):
# An uncaught exception returns a 1, so we'll distinguish ourselves
# from that.
return 2

def do_check():
logging.basicConfig(stream=sys.stderr, level=logging.INFO)

parser = argparse.ArgumentParser(
description='Check the objects associated with one or more Cassandra backups.'
)

# Hmm, we could add an --only-latest flag.
# There are situations where a backup is irreparably corrupted, and the
# only recourse is to upload a new backup. So --only-latest would allow
# any monitoring based on this script to turn OK after a new backup is
# finished.
# Or we could add a --valid-within flag, which would check whether there
# is *any* backup valid within (say) the last 8 hours. As opposed to this
# script which checks all backups.

parser.add_argument('label', nargs='*',
help=('Backup labels to check. If unspecified, all labels known to '
'the local machine will be checked')
)

args = parser.parse_args()

return check(args.label)
15 changes: 9 additions & 6 deletions cassandra_mirror/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import boto3

from .restore import get_generations_referenced_by_manifest
from .restore import get_sstables_for_labels
from .util import compute_top_prefix
from .util import load_config
from .util import reverse_format_nanoseconds
Expand Down Expand Up @@ -58,7 +58,12 @@ def verify_labels_is_comprehensive(source, labels_to_keep, grace_after):
break

if label not in labels_to_keep:
raise RuntimeError('List of labels to keep does not cover the grace period')
# If an instance is replaced more frequently than the backup ttl,
# it will never purge its backups. The way to fix this would be to,
# for each manifest, also upload a list of the other manifests that
# were present at the time of the backup.
raise RuntimeError(
'List of labels to keep does not cover the grace period')

def prune(source, ttl, marker_dir, label_threshold):
label_files = marker_dir.list()
Expand All @@ -78,10 +83,8 @@ def prune(source, ttl, marker_dir, label_threshold):

generations_to_keep = set()

for label in labels_to_keep:
for i in get_generations_referenced_by_manifest(source, label):
ks, cf, gen, _ = i
generations_to_keep.add((ks, cf, gen))
for ks, cf, gen, _ in get_sstables_for_labels(source, labels_to_keep):
generations_to_keep.add((ks, cf, gen))

delete_manifests(source, labels_to_keep, prune_before)
delete_data(source, generations_to_keep)
Expand Down
51 changes: 33 additions & 18 deletions cassandra_mirror/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def pipe():


def download_s3(cmd, s3_object):
logger.info('Downloading %s', s3_object.key)
if gof3r:
gof3r_cmd = s3gof3r[
'get',
Expand All @@ -74,7 +75,7 @@ def download_s3(cmd, s3_object):
else:
with cmd.bgrun(stdin=PIPE) as future:
s3_object.download_fileobj(future.stdin)

future.stdin.close()

def download_to_path(
marker_path,
Expand Down Expand Up @@ -123,9 +124,21 @@ def download_to_path(
for i in temp_destination:
i.link(destination / i.name)

def get_sstable_objects(source, sstable):
ks, cf, gen, mtime = sstable
source = source.with_components('data', ks, cf, gen)
reversed_mtime = reverse_format_nanoseconds(mtime)

objects = (
('mutable', source.with_components('mutable', reversed_mtime)),
('immutable', source.with_components('immutable')),
)

return objects

def download_sstable_to_path(
config,
source,
dest,
sstable,
):
Expand Down Expand Up @@ -156,15 +169,8 @@ def download_sstable_to_path(
uploaded_dir = dest / 'uploaded'
uploaded_dir.mkdir()

reversed_mtime = reverse_format_nanoseconds(mtime)

objects = (
('mutable', source.with_components('mutable', reversed_mtime)),
('immutable', source.with_components('immutable')),
)

with MovingTemporaryDirectory(dest / 'data') as temp:
for (marker_name, s3_object) in objects:
for (marker_name, s3_object) in get_sstable_objects(source, sstable):
context = dict(sstable_context)
context['component'] = marker_name
context['continuity'] = continuity_code
Expand Down Expand Up @@ -218,13 +224,19 @@ def _spider_global_manifest_entries(source, entries):
source = source.with_components('data')
for ks, cf, max_gen in entries:
manifest = source.with_components(ks, cf, max_gen, 'manifest')
logger.info("Spidering manifest: %s", manifest.key)
for l in manifest.read_utf8().splitlines(keepends=False):
gen, mtime = l.split()
yield ks, cf, gen, mtime

def get_generations_referenced_by_manifest(source, label):
entries = _get_global_manifest_entries(source, label)
return _spider_global_manifest_entries(source, entries)
def get_sstables_for_labels(source, labels):
entries = set()
for label in labels:
logger.info('Spidering label: %s', label)
entries.update(_get_global_manifest_entries(source, label))

sstables = set(_spider_global_manifest_entries(source, entries))
return sstables

def compute_cf_dirs(base, sstables):
for ks, cf, entries in sstables:
Expand All @@ -248,21 +260,22 @@ def copy_back(src, dst):
dst
)

def restore(identity, manifest_label, workers):
config = load_config()
def restore(identity, label, workers):
config, locs = load_config()

s3 = boto3.resource('s3')
source = compute_top_prefix(config)
manifest_entries = list(_get_global_manifest_entries(source, label))

dest = LocalPath('mirrored')
dest = LocalPath('mirroring/links')
create_manifest_markers(manifest_entries, dest)

sstables = _spider_global_manifest_entries(source, manifest_entries)

with futures.ThreadPoolExecutor(workers) as executor:
fs = [
executor.submit(download_sstable_to_path, config, dest, sstable)
executor.submit(download_sstable_to_path, config, source, dest,
sstable)
for sstable in sstables
]
for f in futures.as_completed(fs):
Expand All @@ -277,7 +290,7 @@ def restore(identity, manifest_label, workers):
raise

with MovingTemporaryDirectory(LocalPath('data')) as d:
copy_back(LocalPath('mirrored'), d.path)
copy_back(dest, d.path)
d.finalize()

def do_restore():
Expand All @@ -288,7 +301,9 @@ def do_restore():
parser.add_argument('source_identity',
help='The identity (typically UUID) of the node whose backup should be restored'
)
parser.add_argument('manifest_label', nargs='?')
parser.add_argument('manifest_label',
help='The label of the backup to restore',
)

args = parser.parse_args()

Expand Down
18 changes: 17 additions & 1 deletion cassandra_mirror/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import namedtuple
from contextlib import contextmanager

from plumbum.path import LocalPath
Expand Down Expand Up @@ -62,9 +63,24 @@ def _load_config_from_path(path):
config_f = path.open()
return yaml.safe_load(config_f)

class Locations(namedtuple('Locations', 'data_dir sstables_dir state_dir links_dir')):
pass

def _make_locations(config):
data_dir = LocalPath(config.get('data_dir', '/var/lib/cassandra'))
state_dir = config.get('state_dir')
state_dir = (
LocalPath(state_dir) if state_dir is not None
else data_dir / 'mirroring'
)
return Locations(data_dir, data_dir / 'data', state_dir,
state_dir / 'links')

def load_config():
path = _get_config_path()
return _load_config_from_path(path)
config = _load_config_from_path(path)
locs = _make_locations(config)
return config, locs

_multipart_chunksize = 20 * 1024 * 1024

Expand Down
6 changes: 2 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
console_scripts=[
'backup=cassandra_mirror.backup:do_backup',
'restore=cassandra_mirror.restore:do_restore',
'check=cassandra_mirror.check:do_check',
]
),
install_requires=[
'boto3>=1.4.2',
'PyYAML>=3.12',
'plumbum>=1.6.3',
'plumbum>=1.6.4',
],
dependency_links = [
'git+https://github.com/hashbrowncipher/plumbum.git@d57e53955536423857be87ec394e6eb376acaddf#egg=plumbum',
]
)