Conversation
elijahbenizzy
left a comment
There was a problem hiding this comment.
Some thoughts -- don't fully understand it -- I need to look into the flytekit API. I think we might want a better toolset for compiling to other frameworks...
| class PandasSeriesTransformer(TypeTransformer[pd.Series]): | ||
| """ | ||
| Creates a transformer to handle PandasSeries, Similar to PandasTransformer | ||
| in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py |
There was a problem hiding this comment.
What's different between the one here and that one?
hamilton/experimental/h_flytekit.py
Outdated
| # map inputs | ||
| for input_node in node.dependencies: | ||
| if input_node.name not in self.workflow.inputs: | ||
| self.workflow.add_workflow_input(input_node.name, input_node.type) |
There was a problem hiding this comment.
Should we check if they're python approved types?
There was a problem hiding this comment.
There is fairly limited support for custom types, but yeah a check can be added.
| self.workflow.add_workflow_input(input_node.name, input_node.type) | ||
| input_kwargs[input_node.name] = self.workflow.inputs[input_node.name] | ||
| # add the node to workflow | ||
| wf_node = self.workflow.add_entity(task, **input_kwargs) |
There was a problem hiding this comment.
I might be misreading, but why would it already have outputs/where does it get it from?
There was a problem hiding this comment.
Yep, See Imperative Workflow . It happens when you add the entity i.e. task with input_kwargs
hamilton/experimental/h_flytekit.py
Outdated
| if len(outputs) == 1: | ||
| self.workflow.add_workflow_output(node.name, list(outputs.values())[0], node.type) | ||
| else: | ||
| self.workflow.add_workflow_output(node.name, outputs, node.type) |
There was a problem hiding this comment.
And why might it have multiple outputs?
| Flytekit Python is the Python Library for easily authoring, testing, deploying, | ||
| and interacting with Flyte tasks, workflows, and launch plans | ||
|
|
||
| """ |
There was a problem hiding this comment.
On the right track @ramannanda9 !
Here we'd probably need an explanation here of how we're actually using Flyte. E.g. the code that we map things to, how does it behave/work and are there any caveats, decisions made, etc.
|
|
||
|
|
||
| # Register with FlyteKit type engine | ||
| TypeEngine.register(PandasSeriesTransformer()) |
There was a problem hiding this comment.
@ramannanda9 since Hamilton supports any object type, we need to think how to handle arbitrary python object types here too.
There was a problem hiding this comment.
Yeah not the best idea, but they ideally could default to just pickle dumping rather than raising unsupported type.
f428ae8 to
25498cc
Compare
See #139. The GraphAdapter treats a node as a PythonFunctionTask, it adds the node to an ImperativeWorkflow in flyte. The execution of workflow is performed during build_output. This way hamilton functions can be executed in Flyte runtime. We end up adding a PandasSeriesTransformer as that is required for using PandasSeries as function outputs and inputs of flight task nodes. Any customtype that is not a dataclass, native type or has support already in flyte will raise a ValueError Adds FlyteKitGraphAdapter tests Adds tests to show unsupported types
25498cc to
93f150c
Compare
[Short description explaining the high-level reason for the pull request]
Changes
How I tested this
Notes
Checklist