From b01e84888ac138ec8d66554c03a463705a7db0a1 Mon Sep 17 00:00:00 2001 From: devt Date: Mon, 9 Sep 2019 11:26:52 -0500 Subject: [PATCH 1/2] Fix NLB, cover CloudFront 3 changes: * DescribeListenerCertificates is not a valid operation for an NLB * check if CloudFront is using a cert when determining if it is in use * Change logic of what is considered ''expired" when you specify a negative '--days' --- cert_pruner/pruner.py | 59 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/cert_pruner/pruner.py b/cert_pruner/pruner.py index d477bde..7adf547 100644 --- a/cert_pruner/pruner.py +++ b/cert_pruner/pruner.py @@ -12,6 +12,7 @@ def __init__(self, days_old, profile=None): self._iam_client = self.session.client('iam') self._elb_client = self.session.client('elb') self._elbv2_client = self.session.client('elbv2') + self._cloudfront_client = self.session.client('cloudfront') self.days_old = days_old def _list_elb_certificates(self): @@ -66,8 +67,10 @@ def _list_elbv2_certificates(self): # Loop through results and save all certificates attached to a listener for lb in lbs['LoadBalancers']: - for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']): - certificates.add(cert) + # An error occurred (ValidationError) when calling the DescribeListenerCertificates operation: This operation does not support Network Load Balancer Listeners. + if 'network' != lb['Type']: + for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']): + certificates.add(cert) # Check for the next ALB marker if lbs.get('NextMarker'): @@ -77,6 +80,38 @@ def _list_elbv2_certificates(self): return certificates + def _list_cloudfront_certIDs(self): + """ + List all certificate IDs for IAM certs from all CloudFront distributions + + :return: Set of IAM certificate IDs + :rtype: set of str + """ + marker = None + certIDs = set() + + while True: + # Fetch the data + if marker: + distributions = self._cloudfront_client.list_distributions(Marker=marker) + else: + distributions = self._cloudfront_client.list_distributions() + + # Loop through results and save all certificates IDs listed for IAM + for dist in distributions['DistributionList']['Items']: + if 'ViewerCertificate' in dist: + cert = dist['ViewerCertificate'] + if 'iam' == cert['CertificateSource']: + certIDs.add( cert['IAMCertificateId'] ) + + # Check for the next ALB marker + if distributions.get('NextMarker'): + marker = distributions['NextMarker'] + else: + break + + return certIDs + def _get_elbv2_listeners(self, arn): """ Get all listener ARNs for a particular ELBv2 load balancer @@ -185,6 +220,7 @@ def prune(self, delete=False): :param bool delete: Whether to perform a dry run or actually start the deletion """ + deltaDaysOld = timedelta(days=self.days_old) attached_certs = set() prune_certs = [] @@ -196,10 +232,13 @@ def prune(self, delete=False): for cert in self._list_elbv2_certificates(): attached_certs.add(cert) + # Find all attached certificates on CloudFront + distr_certs = self._list_cloudfront_certIDs() + # Check each IAM certificate to see if it is in the attached certificates set print('Unattached Certificates:') for cert in self._list_iam_certificates(): - if cert['Arn'] not in attached_certs: + if cert['Arn'] not in attached_certs and cert['ServerCertificateId'] not in distr_certs: prune_certs.append(cert) print('\tUNATTACHED: Expiring %s - %s' % (cert['Expiration'], cert['ServerCertificateName'])) @@ -212,13 +251,19 @@ def prune(self, delete=False): # Check if the certificate has already expired or if it was uploaded more than days_old days ago now = datetime.now(pytz.utc) - expired = cert['Expiration'] < now - if expired: + # I want to change the logic and not delete certs which might have expired, if we specify --days -90 + #expired = cert['Expiration'] < now + if self.days_old < 0: + expired = now - cert['Expiration'] > abs(deltaDaysOld) + #print( 'DEBUG: deltaDaysOld =', deltaDaysOld, 'expired =', expired, "cert['Expiration'] =", cert['Expiration'], "now - cert['Expiration'] =", now - cert['Expiration'] ) + else: + expired = cert['Expiration'] < now + if cert['Expiration'] < now: expire_word = 'Expired ' else: expire_word = 'Expiring' - if expired or (self.days_old >= 0 and now - cert['UploadDate'] > timedelta(days=self.days_old)): + if expired or (self.days_old >= 0 and now - cert['UploadDate'] > deltaDaysOld ): delete_certs.append({'cert': cert, 'expire_word': expire_word}) else: keep_certs.append({'cert': cert, 'expire_word': expire_word}) @@ -244,7 +289,7 @@ def prune(self, delete=False): if delete: # Perform the deletion self._iam_client.delete_server_certificate( - ServerCertificateName=cert['ServerCertificateName'] + ServerCertificateName=cert['cert']['ServerCertificateName'] ) print('\tDELETED: Uploaded %s, %s %s - %s' % ( From e87c0d65b5c19eb6930eedf951a49fd275113454 Mon Sep 17 00:00:00 2001 From: devt Date: Mon, 9 Sep 2019 12:04:47 -0500 Subject: [PATCH 2/2] Fix bug when account does not have any CloudFront distributions --- cert_pruner/pruner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cert_pruner/pruner.py b/cert_pruner/pruner.py index 7adf547..364e627 100644 --- a/cert_pruner/pruner.py +++ b/cert_pruner/pruner.py @@ -98,7 +98,10 @@ def _list_cloudfront_certIDs(self): distributions = self._cloudfront_client.list_distributions() # Loop through results and save all certificates IDs listed for IAM - for dist in distributions['DistributionList']['Items']: + items = [] + if 'DistributionList' in distributions and 'Items' in distributions['DistributionList']: + items = distributions['DistributionList']['Items'] + for dist in items: if 'ViewerCertificate' in dist: cert = dist['ViewerCertificate'] if 'iam' == cert['CertificateSource']: