Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions cassandra_mirror/backup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from collections import namedtuple
from fcntl import flock
from fcntl import LOCK_EX
from fcntl import LOCK_NB
from io import BytesIO
from operator import attrgetter
import functools
Expand All @@ -17,8 +20,8 @@
from .backup_helpers import stat_helper
from .obsoletion import cleanup_obsoleted
from .obsoletion import mark_obsoleted
from .prune import prune
from .util import MovingTemporaryDirectory
from .util import S3Path
from .util import compute_top_prefix
from .util import continuity_code
from .util import gof3r
Expand All @@ -31,7 +34,6 @@
from plumbum.cmd import tar
logger = logging.getLogger(__name__)


# A recursive directory walker
def find(path, depth):
if depth == 0:
Expand Down Expand Up @@ -414,14 +416,15 @@ def transform_cf_for_manifest(cfs):
buf.seek(0)
return buf

def upload_global_manifest(columnfamilies, destination):
def upload_global_manifest(columnfamilies, destination, marker_dir):
# S3 can only scan ascending. Since we usually want to start with the
# newest backup and work backward, we'll format the string so that
# the newest backup sorts lexicographically lowest.
t = time.time()
t_string = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(t))
ns_since_epoch = int(time.time() * 1e9)
label = '{:016x} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string)
ns_since_epoch = time.time() * 1e9
label = '{} {}'.format(reverse_format_nanoseconds(ns_since_epoch), t_string)
(marker_dir / label).touch()

destination = destination.with_components('manifests', label)
body = transform_cf_for_manifest(columnfamilies)
Expand Down Expand Up @@ -460,13 +463,20 @@ def do_backup():
)
state_dir.mkdir()

lock_fh = (state_dir / 'lock').open('w')
flock(lock_fh.fileno(), LOCK_EX | LOCK_NB)

locs = Locations(data_dir, data_dir / 'data', state_dir, state_dir / 'links')
fix_identity(config, locs)

destination = S3Path(config['s3']['bucket'], compute_top_prefix(config))
destination = compute_top_prefix(config)
cf_specs = backup_all_sstables(config, locs, destination)
label = upload_global_manifest(cf_specs, destination)

marker_dir = (state_dir / 'manifests')
marker_dir.mkdir()
label = upload_global_manifest(cf_specs, destination, marker_dir)
print(label)
prune(destination, config['ttl'], marker_dir, 8)

mark_obsoleted(locs)
cleanup_obsoleted(locs, 0)
90 changes: 90 additions & 0 deletions cassandra_mirror/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import argparse
import logging
import os
import sys
from time import mktime
from time import time

import boto3

from .restore import get_generations_referenced_by_manifest
from .util import compute_top_prefix
from .util import load_config
from .util import reverse_format_nanoseconds

s3 = boto3.resource('s3')

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def to_timestamp(t):
return mktime(t.timetuple())

def delete_manifests(source, labels_to_keep, prune_before):
source = source.with_components('manifests')

trim_length = len(source.key) + 1
start_label = reverse_format_nanoseconds(prune_before * 1e9)
for o in source.descendants(start=start_label):
if o.key[trim_length:] in labels_to_keep:
continue

logger.info('Deleting %s', o.key)
o.delete()

def delete_data(source, generations_to_keep):
source = source.with_components('data')

trim_length = len(source.key) + 1
for o in source.descendants():
# Trim the key to remove `source` as a prefix
trimmed_key = o.key[trim_length:]
ks, cf, gen, _ = trimmed_key.split('/', 3)
if (ks, cf, gen) in generations_to_keep:
continue

logger.info('Deleting %s', o.key)
o.delete()

def verify_labels_is_comprehensive(source, labels_to_keep, grace_after):
source = source.with_components('manifests')
stop_at = reverse_format_nanoseconds(grace_after * 1e9)

trim_length = len(source.key) + 1
for o in source.descendants():
label = o.key[trim_length:]
print((label, stop_at))
if label > stop_at:
break

if label not in labels_to_keep:
raise RuntimeError('List of labels to keep does not cover the grace period')

def prune(source, ttl, marker_dir, label_threshold):
label_files = marker_dir.list()

labels_to_keep = set()
label_files_to_delete = []

prune_before = time() - ttl * 60 * 60
for label_file in label_files:
if label_file.stat().st_mtime < prune_before:
label_files_to_delete.append(label_file)
else:
labels_to_keep.add(label_file.name)

if len(label_files_to_delete) < label_threshold:
return

generations_to_keep = set()

for label in labels_to_keep:
for i in get_generations_referenced_by_manifest(source, label):
ks, cf, gen, _ = i
generations_to_keep.add((ks, cf, gen))

delete_manifests(source, labels_to_keep, prune_before)
delete_data(source, generations_to_keep)

for label_file in label_files_to_delete:
label_file.delete()
146 changes: 63 additions & 83 deletions cassandra_mirror/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from plumbum.commands.modifiers import PIPE
import boto3

from .util import S3Path
from .util import MovingTemporaryDirectory
from .util import compute_top_prefix
from .util import continuity_code
Expand All @@ -47,14 +46,6 @@ def get_common_prefixes(bucket, prefix):
for i in result.search('CommonPrefixes'):
yield i['Prefix'][cut:-1]

def get_manifest_entries(source, label):
source = source.with_components('manifests', label)
manifest = source.read_utf8()
for i in manifest.splitlines(keepends=False):
max_generation, ks_cf = i.split()
ks, cf = ks_cf.split('/')
yield ks, cf, max_generation

def keypipe_cmd(provider_args, context):
import keypipe
from keypipe.plumbum_helpers import ThreadCommand
Expand Down Expand Up @@ -95,7 +86,6 @@ def download_to_path(
context = serialize_context(encryption_context)
logger.debug("Invoking keypipe with context %s", context)


prefix = marker_path.name + '.'

with TemporaryDirectory(prefix=prefix, dir=destination.up()) as d:
Expand Down Expand Up @@ -134,15 +124,20 @@ def download_to_path(
i.link(destination / i.name)



def download_sstable_to_path(
config,
path,
objects,
sstable_context,
mutable_mtime
dest,
sstable,
):
if (path / 'data').exists():
ks, cf, gen, mtime = sstable
dest = dest / ks / cf / gen

sstable_context = dict(config['context'])
sstable_context['keyspace'] = ks
sstable_context['columnfamily'] = cf
sstable_context['generation'] = int(gen)

if (dest / 'data').exists():
"""Note that we use the existence of a directory named 'data' to
inhibit downloads. This stands in contrast to uploads, which we inhibit
with separate marker files.
Expand All @@ -158,17 +153,23 @@ def download_sstable_to_path(
config['encryption']['provider']: config['encryption']['args']
}

uploaded_dir = path / 'uploaded'
uploaded_dir = dest / 'uploaded'
uploaded_dir.mkdir()

with MovingTemporaryDirectory(path / 'data') as temp:
reversed_mtime = reverse_format_nanoseconds(mtime)

objects = (
('mutable', source.with_components('mutable', reversed_mtime)),
('immutable', source.with_components('immutable')),
)

with MovingTemporaryDirectory(dest / 'data') as temp:
for (marker_name, s3_object) in objects:
context = dict(sstable_context)
context.update(config['context'])
context['component'] = marker_name
context['continuity'] = continuity_code
if marker_name == 'mutable':
context['timestamp'] = mutable_mtime
context['timestamp'] = mtime

marker_path = uploaded_dir / marker_name
download_to_path(
Expand All @@ -181,68 +182,49 @@ def download_sstable_to_path(

temp.finalize()

ManifestEntry = namedtuple('ManifestEntry', 'generation, mtime')

def create_manifest_markers(sources, dest):
"""Creates the files indicating that a manifest was uploaded for this
generation. By doing so, we inhibit future manifests for being uploaded for
this generation."""

def get_sstables_to_download_for_cf(source, ks, cf, max_gen):
manifest = source.with_components('data', ks, cf, max_gen, 'manifest')
manifest_lines = manifest.read_utf8().splitlines(keepends=False)
return [ManifestEntry(*line.split()) for line in manifest_lines]

def create_metadata_directories(sstables):
for cf_dir, ks, cf, manifest_entries in sstables:
last_entry = manifest_entries[-1]
uploaded_dir = (cf_dir / last_entry.generation / 'uploaded')
for ks, cf, gen in sources:
uploaded_dir = dest / ks / cf / gen / 'uploaded'
uploaded_dir.mkdir()
(uploaded_dir / 'manifest').mkdir()

def _get_sstable_download_instructions(
cf_dir,
source,
context,
entry,
):
generation, mutable_mtime = entry
generation_dir = cf_dir / entry.generation
source = source.with_components(entry.generation)
context['generation'] = int(generation)

mutable_mtime = int(entry.mtime)
reversed_mtime = reverse_format_nanoseconds(mutable_mtime)
(uploaded_dir / 'manifest').touch()

objects = (
('mutable', source.with_components('mutable', reversed_mtime)),
('immutable', source.with_components('immutable')),
)
"""
Hierarchical Manifest Layout

return generation_dir, objects, context, mutable_mtime
A global manifest points to individual columnfamilies, each with a maximum
generation. Each generation points to its own data files. Too, its manifest
references zero or more other generations. The CF manifests are not transitive:
we do not consult the manifest of any generation other than the maximum one.

def _get_download_instructions_for_cf(
identity,
source,
sstable,
):
cf_dir, ks, cf, manifest_entries = sstable
source = source.with_components('data', ks, cf)
context = dict(
identity=identity,
keyspace=ks,
columnfamily=cf,
)
Manifest (manifests/<label>)
|_ CF Manifests (data/<ks>/<cf>/<gen>/manifest)
|_ Generations (data/<ks>/<cf>/<gen>)
|_ Immutable component (data/<ks>/<cf>/<gen>/immutable)
|_ Mutable component (data/<ks>/<cf>/<gen>/mutable/<mtime>)
"""

for entry in manifest_entries:
yield _get_sstable_download_instructions(
cf_dir, source, dict(context), entry
)
def _get_global_manifest_entries(source, label):
source = source.with_components('manifests', label)
for i in source.read_utf8().splitlines(keepends=False):
max_generation, ks_cf = i.split()
ks, cf = ks_cf.split('/')
yield ks, cf, max_generation

def get_download_instructions(identity, source, sstables):
for i in sstables:
yield from _get_download_instructions_for_cf(identity, source, i)
def _spider_global_manifest_entries(source, entries):
source = source.with_components('data')
for ks, cf, max_gen in entries:
manifest = source.with_components(ks, cf, max_gen, 'manifest')
for l in manifest.read_utf8().splitlines(keepends=False):
gen, mtime = l.split()
yield ks, cf, gen, mtime

def get_sstables_to_download(source, label):
for ks, cf, max_gen in get_manifest_entries(source, label):
tables = get_sstables_to_download_for_cf(source, ks, cf, max_gen)
yield ks, cf, tables
def get_generations_referenced_by_manifest(source, label):
entries = _get_global_manifest_entries(source, label)
return _spider_global_manifest_entries(source, entries)

def compute_cf_dirs(base, sstables):
for ks, cf, entries in sstables:
Expand Down Expand Up @@ -270,20 +252,18 @@ def restore(identity, manifest_label, workers):
config = load_config()

s3 = boto3.resource('s3')
source = S3Path(config['s3']['bucket'], compute_top_prefix(config))
source = compute_top_prefix(config)
manifest_entries = list(_get_global_manifest_entries(source, label))

i = get_sstables_to_download(source, manifest_label)
sstables = list(compute_cf_dirs(LocalPath('mirrored'), i))
dest = LocalPath('mirrored')
create_manifest_markers(manifest_entries, dest)

create_metadata_directories(sstables)
instructions = get_download_instructions(identity, source, sstables)
sstables = _spider_global_manifest_entries(source, manifest_entries)

# It is assumed we'll be disk-bound, so I've chosen a typical disk queue
# depth.
with futures.ThreadPoolExecutor(workers) as executor:
fs = [
executor.submit(download_sstable_to_path, config, *i)
for i in instructions
executor.submit(download_sstable_to_path, config, dest, sstable)
for sstable in sstables
]
for f in futures.as_completed(fs):
try:
Expand Down
Loading