I've an application that sources data from upstream, processes it and then passes it to downstream.
The application does this using an in-house concept called "Workflows".
Workflows are nothing but a set of activities (that can be sequential or parallel). There is a start activity, then set of transformation activities (which can be sequential or parallel) and then an end activity.
Each activity behind the scene translates to a spark-submit job.
As an example, one of our simple workflow would look like:
Source a file --> Select few columns from file --> Apply a filter condition --> Modify the value of a column based on a condition --> Save the modified file
Each of the above activity mentioned above is submitted as a pypsark job. At each step we create a dataframe and then persist to HDFS.
Scroll below frame for complete data view
Activity# Activity (= pyspark job) Source Action Target Action
1 Source a File df=spark.read.csv('/path/to/source/file/on/hdfs/sourceA.csv') df.write('/path/to/workflow-run-folder/step1.csv')
2 Select few columns from file df=df.select("col1","col2") df.write('/path/to/workflow-run-folder/step2.csv')
3 Apply a filter condition df=df.filter(df.col1='xyz') df.write('/path/to/workflow-run-folder/step3.csv')
4 Modify the value of a column based on a condition df=df.withColumn("col2", df.col2*3) df.write('/path/to/workflow-run-folder/step4.csv')
5 Save the modified filter df.write('/path/to/workflow-run-folder/step5.csv')
We then had requirement to get data lineage.
To achieve this objective, I explored the Spline framework and incorporated it in my application.
We placed the spark-spline-agent bundle on the spark driver (my --deploy-mode is client and --master is yarn) (placed the jar under jars folder of spark-home).
We also brought up the spark-rest-gateway and spline-ui along with arango db (used the spline 0.7 version).
On submitting our spark-jobs, we did the initialized via code by calling the enableLineageTracking on SparkLineageInitializer (for some reason codeless initialization via --packages didn't work).
I can see the spark-jobs listed on Spline-UI under ExecutionEvents screen (one record per spark-job; so for above example, we have 5 spark-jobs listed on Spline-UI).
On clicking on the Execution Plan hyperlink against each job, I can see the lineage that Spline-UI brings through ArangoDB.
Also in ArangoDB, I can see the vertices and edges in graph-view.
However, what I'm looking for is:
We now have a requirement to figure out data lineage "across" the spark jobs we submit.
For above example, I need some way where I provide "col2" as input and it gives me a lineage that should logically translate as:
col2 (Activity-1) -> col2 (Activity-2) -> col2 (Activity-3) -> col2*3 (Activity-4) -> col2*3 (Activity-5)
So if my col2 value at source is, say, value 1, then lineage should be:
1 -> 1 -> 1 -> 3 -> 3
Also against each value, I need to know the activity applied (if it was a pass-through (i.e., no transformations applied) or if there was any transformation on it (in above example, at activity 4, column value was multiplied by 3))
Is there a way I can achieve it by using AQL ?
I understand every spark job will get created as a Document in the document Collection (executionPlan) but if i've to get lineage of a specific attribute across these collections, how can I do it ? In other words, how can i get lineage of an attribute across different spark jobs (i.e., across different execution IDs) ?
Also can I capture the runtime value of the attribute as it goes through different transformations ?
I read through ArangoDB's traverse path graph but not able to get hold of the way i can achieve it via AQL (or throug any other approach in Spline)
Any pointers on this will be highly appreciated.
Thanks !
I've an application that sources data from upstream, processes it and then passes it to downstream.
The application does this using an in-house concept called "Workflows".
Workflows are nothing but a set of activities (that can be sequential or parallel). There is a start activity, then set of transformation activities (which can be sequential or parallel) and then an end activity.
Each activity behind the scene translates to a spark-submit job.
As an example, one of our simple workflow would look like:
Source a file --> Select few columns from file --> Apply a filter condition --> Modify the value of a column based on a condition --> Save the modified fileEach of the above activity mentioned above is submitted as a pypsark job. At each step we create a dataframe and then persist to HDFS.
Scroll below frame for complete data view
We then had requirement to get data lineage.
To achieve this objective, I explored the Spline framework and incorporated it in my application.
We placed the spark-spline-agent bundle on the spark driver (my
--deploy-modeisclientand--masterisyarn) (placed the jar underjarsfolder of spark-home).We also brought up the spark-rest-gateway and spline-ui along with arango db (used the spline 0.7 version).
On submitting our spark-jobs, we did the initialized via code by calling the enableLineageTracking on SparkLineageInitializer (for some reason codeless initialization via
--packagesdidn't work).I can see the spark-jobs listed on Spline-UI under ExecutionEvents screen (one record per spark-job; so for above example, we have 5 spark-jobs listed on Spline-UI).
On clicking on the Execution Plan hyperlink against each job, I can see the lineage that Spline-UI brings through ArangoDB.
Also in ArangoDB, I can see the vertices and edges in graph-view.
However, what I'm looking for is:
We now have a requirement to figure out data lineage "across" the spark jobs we submit.
For above example, I need some way where I provide "col2" as input and it gives me a lineage that should logically translate as:
col2 (Activity-1) -> col2 (Activity-2) -> col2 (Activity-3) -> col2*3 (Activity-4) -> col2*3 (Activity-5)So if my col2 value at source is, say, value 1, then lineage should be:
1 -> 1 -> 1 -> 3 -> 3Also against each value, I need to know the activity applied (if it was a pass-through (i.e., no transformations applied) or if there was any transformation on it (in above example, at activity 4, column value was multiplied by 3))
Is there a way I can achieve it by using AQL ?
I understand every spark job will get created as a Document in the document Collection (executionPlan) but if i've to get lineage of a specific attribute across these collections, how can I do it ? In other words, how can i get lineage of an attribute across different spark jobs (i.e., across different execution IDs) ?
Also can I capture the runtime value of the attribute as it goes through different transformations ?
I read through ArangoDB's traverse path graph but not able to get hold of the way i can achieve it via AQL (or throug any other approach in Spline)
Any pointers on this will be highly appreciated.
Thanks !