From 1cd40dceacd12044e98393020cc9427e51b928be Mon Sep 17 00:00:00 2001 From: Luis Henrique Date: Wed, 17 Apr 2024 11:35:43 -0300 Subject: [PATCH 1/3] refactor: update eval function --- faster_sam/cloudformation.py | 53 ++++++++++-------------------------- 1 file changed, 15 insertions(+), 38 deletions(-) diff --git a/faster_sam/cloudformation.py b/faster_sam/cloudformation.py index fa8e7cef..edb9bb61 100644 --- a/faster_sam/cloudformation.py +++ b/faster_sam/cloudformation.py @@ -586,50 +586,27 @@ def eval(function: Dict[str, Any], template: Dict[str, Any]) -> Any: NotImplementedError If the intrinsic function is not implemented. """ - implemented = ( - "Fn::Base64", - "Fn::FindInMap", - "Fn::GetAtt", - "Fn::Join", - "Fn::Select", - "Fn::Split", - "Fn::Sub", - "Ref", - ) + functions = { + "Fn::Base64": IntrinsicFunctions.base64, + "Fn::FindInMap": IntrinsicFunctions.find_in_map, + "Fn::GetAtt": IntrinsicFunctions.get_att, + "Fn::Join": IntrinsicFunctions.join, + "Fn::Select": IntrinsicFunctions.select, + "Fn::Split": IntrinsicFunctions.split, + "Fn::Sub": IntrinsicFunctions.sub, + "Ref": IntrinsicFunctions.ref, + } fun, val = list(function.items())[0] - if fun not in implemented: + if fun in functions.keys(): + return functions[fun](val, template) + else: logging.warning(f"{fun} intrinsic function not implemented") - - if "Fn::Base64" == fun: - return IntrinsicFunctions.base64(val) - - if "Fn::FindInMap" == fun: - return IntrinsicFunctions.find_in_map(val, template) - - if "Fn::GetAtt" == fun: - return IntrinsicFunctions.get_att(val, template) - - if "Fn::Join" == fun: - return IntrinsicFunctions.join(val, template) - - if "Fn::Select" == fun: - return IntrinsicFunctions.select(val, template) - - if "Fn::Split" == fun: - return IntrinsicFunctions.split(val, template) - - if "Fn::Sub" == fun: - return IntrinsicFunctions.sub(val, template) - - if "Ref" == fun: - return IntrinsicFunctions.ref(val, template) - - return None + return None @staticmethod - def base64(value: str) -> str: + def base64(value: str, template: Dict[str, Any]) -> str: """ Encode a string to base64. From 6b696bccdf9b79be5a7cafca8220d8642098b751 Mon Sep 17 00:00:00 2001 From: Luis Henrique Date: Thu, 18 Apr 2024 13:17:22 -0300 Subject: [PATCH 2/3] wip --- faster_sam/cloudformation.py | 53 ++++++++++++++++++++++----- tests/fixtures/templates/example8.yml | 21 +++++++++++ tests/test_cloudformation.py | 5 +++ 3 files changed, 69 insertions(+), 10 deletions(-) create mode 100644 tests/fixtures/templates/example8.yml diff --git a/faster_sam/cloudformation.py b/faster_sam/cloudformation.py index 1e8997a6..25d01a43 100644 --- a/faster_sam/cloudformation.py +++ b/faster_sam/cloudformation.py @@ -344,6 +344,7 @@ def __init__( self.template = self.load(template_path) self.include_files() + self.template = self.evaluate_and_replace(self.template) self.set_parameters(parameters) @property @@ -545,17 +546,49 @@ def find_environment(self) -> Dict[str, Any]: for function in self.functions.values(): variables.update(function.environment) - environment = {} - - for key, val in variables.items(): - if isinstance(val, (str, int, float)): - environment[key] = str(val) - else: - value = IntrinsicFunctions.eval(val, self.template) - if value is not None: - environment[key] = str(value) + for key, value in variables.items(): + variables[key] = str(value) + + return variables + + def evaluate_and_replace(self, obj) -> Dict[str, Any]: + import ipdb + ipdb.set_trace() + functions = [ + "Fn::Base64", + "Fn::FindInMap", + "Fn::GetAtt", + "Fn::Join", + "Fn::Select", + "Fn::Split", + "Fn::Sub", + "Ref", + ] - return environment + if isinstance(obj, dict): + for key, value in obj.items(): + if key in functions: + obj[key] = IntrinsicFunctions.eval(obj, self.template) + + elif isinstance(value, dict): + obj[key] = self.evaluate_and_replace(value) + + return obj + + elif isinstance(obj, list): + for i, item in enumerate(obj): + if isinstance(item, dict): + for key, value in item.items(): + if key in functions: + obj[i] = IntrinsicFunctions.eval(obj, self.template) + + elif isinstance(value, dict): + obj[i] = self.evaluate_and_replace(item) + + elif isinstance(item, str) and item.startswith("!"): + obj[i] = IntrinsicFunctions.eval(item, self.template) + + return obj class IntrinsicFunctions: diff --git a/tests/fixtures/templates/example8.yml b/tests/fixtures/templates/example8.yml new file mode 100644 index 00000000..c1d9c8bf --- /dev/null +++ b/tests/fixtures/templates/example8.yml @@ -0,0 +1,21 @@ +Resources: + ApiGateway: + Type: AWS::Serverless::Api + Properties: + Name: sam-api + StageName: v1 + + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Events: + HelloWorld: + Properties: + Path: /hello + Method: get + RestApiId: + Fn::Select: + - 1 + - Fn::Split: + - "|" + - "dev@gmail|dotz@gmail.com" diff --git a/tests/test_cloudformation.py b/tests/test_cloudformation.py index f9ac0afa..40b5aa62 100644 --- a/tests/test_cloudformation.py +++ b/tests/test_cloudformation.py @@ -269,6 +269,11 @@ def test_lambda_handler(self): handler_path = cloudformation.functions[key].handler self.assertEqual(handler_path, "tests.fixtures.handlers.lambda_handler.handler") + def test(self): + cloudformation = CloudformationTemplate("tests/fixtures/templates/example8.yml") + + print(cloudformation.template) + class TestIntrinsicFunctions(unittest.TestCase): def test_getatt_function(self): From 8f0fac3f9c261795aa04a277dfc6958be3e2bdbc Mon Sep 17 00:00:00 2001 From: Luis Henrique Date: Thu, 18 Apr 2024 15:35:14 -0300 Subject: [PATCH 3/3] wip --- faster_sam/cloudformation.py | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/faster_sam/cloudformation.py b/faster_sam/cloudformation.py index 25d01a43..f6efb3bf 100644 --- a/faster_sam/cloudformation.py +++ b/faster_sam/cloudformation.py @@ -551,9 +551,7 @@ def find_environment(self) -> Dict[str, Any]: return variables - def evaluate_and_replace(self, obj) -> Dict[str, Any]: - import ipdb - ipdb.set_trace() + def evaluate_and_replace(self, obj: Dict[str, Any], parent_key=None) -> Dict[str, Any]: functions = [ "Fn::Base64", "Fn::FindInMap", @@ -566,28 +564,17 @@ def evaluate_and_replace(self, obj) -> Dict[str, Any]: ] if isinstance(obj, dict): - for key, value in obj.items(): + for key in obj.keys(): + value = obj[key] + if key in functions: - obj[key] = IntrinsicFunctions.eval(obj, self.template) - - elif isinstance(value, dict): - obj[key] = self.evaluate_and_replace(value) + result = IntrinsicFunctions.eval(key, self.template) - return obj + obj[parent_key] = result + + elif isinstance(value, dict): + obj[key] = self.evaluate_and_replace(value, parent_key=key) - elif isinstance(obj, list): - for i, item in enumerate(obj): - if isinstance(item, dict): - for key, value in item.items(): - if key in functions: - obj[i] = IntrinsicFunctions.eval(obj, self.template) - - elif isinstance(value, dict): - obj[i] = self.evaluate_and_replace(item) - - elif isinstance(item, str) and item.startswith("!"): - obj[i] = IntrinsicFunctions.eval(item, self.template) - return obj