Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions cert_pruner/pruner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self, days_old, profile=None):
self._iam_client = self.session.client('iam')
self._elb_client = self.session.client('elb')
self._elbv2_client = self.session.client('elbv2')
self._cloudfront_client = self.session.client('cloudfront')
self.days_old = days_old

def _list_elb_certificates(self):
Expand Down Expand Up @@ -66,8 +67,10 @@ def _list_elbv2_certificates(self):

# Loop through results and save all certificates attached to a listener
for lb in lbs['LoadBalancers']:
for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']):
certificates.add(cert)
# An error occurred (ValidationError) when calling the DescribeListenerCertificates operation: This operation does not support Network Load Balancer Listeners.
if 'network' != lb['Type']:
for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']):
certificates.add(cert)

# Check for the next ALB marker
if lbs.get('NextMarker'):
Expand All @@ -77,6 +80,41 @@ def _list_elbv2_certificates(self):

return certificates

def _list_cloudfront_certIDs(self):
"""
List all certificate IDs for IAM certs from all CloudFront distributions

:return: Set of IAM certificate IDs
:rtype: set of str
"""
marker = None
certIDs = set()

while True:
# Fetch the data
if marker:
distributions = self._cloudfront_client.list_distributions(Marker=marker)
else:
distributions = self._cloudfront_client.list_distributions()

# Loop through results and save all certificates IDs listed for IAM
items = []
if 'DistributionList' in distributions and 'Items' in distributions['DistributionList']:
items = distributions['DistributionList']['Items']
for dist in items:
if 'ViewerCertificate' in dist:
cert = dist['ViewerCertificate']
if 'iam' == cert['CertificateSource']:
certIDs.add( cert['IAMCertificateId'] )

# Check for the next ALB marker
if distributions.get('NextMarker'):
marker = distributions['NextMarker']
else:
break

return certIDs

def _get_elbv2_listeners(self, arn):
"""
Get all listener ARNs for a particular ELBv2 load balancer
Expand Down Expand Up @@ -185,6 +223,7 @@ def prune(self, delete=False):

:param bool delete: Whether to perform a dry run or actually start the deletion
"""
deltaDaysOld = timedelta(days=self.days_old)
attached_certs = set()
prune_certs = []

Expand All @@ -196,10 +235,13 @@ def prune(self, delete=False):
for cert in self._list_elbv2_certificates():
attached_certs.add(cert)

# Find all attached certificates on CloudFront
distr_certs = self._list_cloudfront_certIDs()

# Check each IAM certificate to see if it is in the attached certificates set
print('Unattached Certificates:')
for cert in self._list_iam_certificates():
if cert['Arn'] not in attached_certs:
if cert['Arn'] not in attached_certs and cert['ServerCertificateId'] not in distr_certs:
prune_certs.append(cert)
print('\tUNATTACHED: Expiring %s - %s' % (cert['Expiration'], cert['ServerCertificateName']))

Expand All @@ -212,13 +254,19 @@ def prune(self, delete=False):

# Check if the certificate has already expired or if it was uploaded more than days_old days ago
now = datetime.now(pytz.utc)
expired = cert['Expiration'] < now
if expired:
# I want to change the logic and not delete certs which might have expired, if we specify --days -90
#expired = cert['Expiration'] < now
if self.days_old < 0:
expired = now - cert['Expiration'] > abs(deltaDaysOld)
#print( 'DEBUG: deltaDaysOld =', deltaDaysOld, 'expired =', expired, "cert['Expiration'] =", cert['Expiration'], "now - cert['Expiration'] =", now - cert['Expiration'] )
else:
expired = cert['Expiration'] < now
if cert['Expiration'] < now:
expire_word = 'Expired '
else:
expire_word = 'Expiring'

if expired or (self.days_old >= 0 and now - cert['UploadDate'] > timedelta(days=self.days_old)):
if expired or (self.days_old >= 0 and now - cert['UploadDate'] > deltaDaysOld ):
delete_certs.append({'cert': cert, 'expire_word': expire_word})
else:
keep_certs.append({'cert': cert, 'expire_word': expire_word})
Expand All @@ -244,7 +292,7 @@ def prune(self, delete=False):
if delete:
# Perform the deletion
self._iam_client.delete_server_certificate(
ServerCertificateName=cert['ServerCertificateName']
ServerCertificateName=cert['cert']['ServerCertificateName']
)

print('\tDELETED: Uploaded %s, %s %s - %s' % (
Expand Down