diff --git a/cert_pruner/pruner.py b/cert_pruner/pruner.py index d477bde..364e627 100644 --- a/cert_pruner/pruner.py +++ b/cert_pruner/pruner.py @@ -12,6 +12,7 @@ def __init__(self, days_old, profile=None): self._iam_client = self.session.client('iam') self._elb_client = self.session.client('elb') self._elbv2_client = self.session.client('elbv2') + self._cloudfront_client = self.session.client('cloudfront') self.days_old = days_old def _list_elb_certificates(self): @@ -66,8 +67,10 @@ def _list_elbv2_certificates(self): # Loop through results and save all certificates attached to a listener for lb in lbs['LoadBalancers']: - for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']): - certificates.add(cert) + # An error occurred (ValidationError) when calling the DescribeListenerCertificates operation: This operation does not support Network Load Balancer Listeners. + if 'network' != lb['Type']: + for cert in self._get_elbv2_listener_certificates(lb['LoadBalancerArn']): + certificates.add(cert) # Check for the next ALB marker if lbs.get('NextMarker'): @@ -77,6 +80,41 @@ def _list_elbv2_certificates(self): return certificates + def _list_cloudfront_certIDs(self): + """ + List all certificate IDs for IAM certs from all CloudFront distributions + + :return: Set of IAM certificate IDs + :rtype: set of str + """ + marker = None + certIDs = set() + + while True: + # Fetch the data + if marker: + distributions = self._cloudfront_client.list_distributions(Marker=marker) + else: + distributions = self._cloudfront_client.list_distributions() + + # Loop through results and save all certificates IDs listed for IAM + items = [] + if 'DistributionList' in distributions and 'Items' in distributions['DistributionList']: + items = distributions['DistributionList']['Items'] + for dist in items: + if 'ViewerCertificate' in dist: + cert = dist['ViewerCertificate'] + if 'iam' == cert['CertificateSource']: + certIDs.add( cert['IAMCertificateId'] ) + + # Check for the next ALB marker + if distributions.get('NextMarker'): + marker = distributions['NextMarker'] + else: + break + + return certIDs + def _get_elbv2_listeners(self, arn): """ Get all listener ARNs for a particular ELBv2 load balancer @@ -185,6 +223,7 @@ def prune(self, delete=False): :param bool delete: Whether to perform a dry run or actually start the deletion """ + deltaDaysOld = timedelta(days=self.days_old) attached_certs = set() prune_certs = [] @@ -196,10 +235,13 @@ def prune(self, delete=False): for cert in self._list_elbv2_certificates(): attached_certs.add(cert) + # Find all attached certificates on CloudFront + distr_certs = self._list_cloudfront_certIDs() + # Check each IAM certificate to see if it is in the attached certificates set print('Unattached Certificates:') for cert in self._list_iam_certificates(): - if cert['Arn'] not in attached_certs: + if cert['Arn'] not in attached_certs and cert['ServerCertificateId'] not in distr_certs: prune_certs.append(cert) print('\tUNATTACHED: Expiring %s - %s' % (cert['Expiration'], cert['ServerCertificateName'])) @@ -212,13 +254,19 @@ def prune(self, delete=False): # Check if the certificate has already expired or if it was uploaded more than days_old days ago now = datetime.now(pytz.utc) - expired = cert['Expiration'] < now - if expired: + # I want to change the logic and not delete certs which might have expired, if we specify --days -90 + #expired = cert['Expiration'] < now + if self.days_old < 0: + expired = now - cert['Expiration'] > abs(deltaDaysOld) + #print( 'DEBUG: deltaDaysOld =', deltaDaysOld, 'expired =', expired, "cert['Expiration'] =", cert['Expiration'], "now - cert['Expiration'] =", now - cert['Expiration'] ) + else: + expired = cert['Expiration'] < now + if cert['Expiration'] < now: expire_word = 'Expired ' else: expire_word = 'Expiring' - if expired or (self.days_old >= 0 and now - cert['UploadDate'] > timedelta(days=self.days_old)): + if expired or (self.days_old >= 0 and now - cert['UploadDate'] > deltaDaysOld ): delete_certs.append({'cert': cert, 'expire_word': expire_word}) else: keep_certs.append({'cert': cert, 'expire_word': expire_word}) @@ -244,7 +292,7 @@ def prune(self, delete=False): if delete: # Perform the deletion self._iam_client.delete_server_certificate( - ServerCertificateName=cert['ServerCertificateName'] + ServerCertificateName=cert['cert']['ServerCertificateName'] ) print('\tDELETED: Uploaded %s, %s %s - %s' % (