Skip to content
Open

Demo #69

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions avro2tf/src/demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Avro2TF

![](https://github.com/linkedin/Avro2TF/blob/master/avro2tf-logo.png)

This demo has 2 parts:

1. Spark: this can run locally or in a cluster. It can be directly loaded into intelliJ. The SingleRun object will be able to take the data directory and convert it to tensor format that can be consumed by TensorFlow.
2. TensorFlow: once SingleRun completed, the data will be stored in tensorflow/data directory. User can use the following command to setup virtual env. and Jupyter Notebook to run the test program


## Setting up Virtual Environment with TensorFlow and Jupyter Notebook

1. Assume Python 3 is installed, and under avro2tf/src/demo/tensorflow:


pip install virtualenv
python3 ${PYTHON}/lib/python/site-packages/virtualenv.py venv
source venv/bin/activate
pip install tensorflow
pip install jupyter
pip install sklearn
jupyter notebook

2. open notebook/keras_tf2_movielens.ipynb

119 changes: 119 additions & 0 deletions avro2tf/src/demo/spark/config/avro2tf_config_movielens.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
{
"features": [
{
"inputFeatureInfo": {
"columnExpr": "user_id"
},
"outputTensorInfo": {
"name": "userId",
"dtype": "long",
"shape": [
-1
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "movie_id"
},
"outputTensorInfo": {
"name": "movieId",
"dtype": "long",
"shape": [
-1
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "user_id",
"transformConfig": {
"hashInfo": {
"hashBucketSize": 1000,
"numHashFunctions": 4
}
}
},
"outputTensorInfo": {
"name": "userId_hashed",
"dtype": "long",
"shape": [
4
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "movie_id",
"transformConfig": {
"hashInfo": {
"hashBucketSize": 1000,
"numHashFunctions": 4
}
}
},
"outputTensorInfo": {
"name": "movieId_hashed",
"dtype": "long",
"shape": [
4
]
}
},
{
"inputFeatureInfo": {
"columnExpr": "user_features"
},
"outputTensorInfo": {
"name": "user_features",
"dtype": "float",
"shape": [],
"isSparse" : true
}
},
{
"inputFeatureInfo": {
"columnExpr": "movie_features"
},
"outputTensorInfo": {
"name": "movie_features",
"dtype": "float",
"shape": [],
"isSparse" : true
}
},
{
"inputFeatureInfo": {
"columnExpr": "user_emb_features.value"
},
"outputTensorInfo": {
"name": "user_embb_features",
"dtype": "float",
"shape": [64]
}
},
{
"inputFeatureInfo": {
"columnExpr": "movie_emb_features.value"
},
"outputTensorInfo": {
"name": "movie_embb_features",
"dtype": "float",
"shape": [64]
}
}

],
"labels": [
{
"inputFeatureInfo": {
"columnExpr": "response"
},
"outputTensorInfo": {
"name": "response",
"dtype": "double",
"shape": []
}
}
]
}
Binary file added avro2tf/src/demo/spark/data/test/part-000.avro
Binary file not shown.
Binary file added avro2tf/src/demo/spark/data/test/part-001.avro
Binary file not shown.
Binary file added avro2tf/src/demo/spark/data/test/part-002.avro
Binary file not shown.
Binary file added avro2tf/src/demo/spark/data/test/part-003.avro
Binary file not shown.
Binary file added avro2tf/src/demo/spark/data/test/part-004.avro
Binary file not shown.
Binary file added avro2tf/src/demo/spark/data/train/part-000.avro
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
86 changes: 86 additions & 0 deletions avro2tf/src/demo/spark/scala/com/linkedin/avro2tf/SingleRun.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.linkedin.avro2tf

import com.linkedin.avro2tf.constants.Avro2TFJobParamNames
import com.linkedin.avro2tf.jobs.Avro2TF
import com.linkedin.avro2tf.parsers.{Avro2TFJobParamsParser, Avro2TFParams}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* This is a simple job that can run locally to process small aount of data
* This job consists of 3 runs:
* 1. generate training data and meta data and feature mapping
* 2. generate validation data using meta data
* 3. generate test data using meta data
*/
object SingleRun {
def main(args: Array[String]) = {
val session = getSparkSession("single run")

val trainParams = parseTrainParamsFromMap()
println(trainParams)
Avro2TF.run(session, trainParams)

val validateParams = parseValidateParamsFromMap()
println(validateParams)
Avro2TF.run(session, validateParams)

val testParams = parseTestParamsFromMap()
println(testParams)
Avro2TF.run(session, testParams)

session.stop()
}

def parseTrainParamsFromMap(): Avro2TFParams = {
val paramsMap = Map(
Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/train",
Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data",
Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json",
Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord"
)
val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq
println(prams)
Avro2TFJobParamsParser.parse(prams)
}

def parseValidateParamsFromMap(): Avro2TFParams = {
val paramsMap = Map(
Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/validate",
Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data",
Avro2TFJobParamNames.EXECUTION_MODE -> "validation",
Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json",
Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord"
)
val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq
println(prams)
Avro2TFJobParamsParser.parse(prams)
}

def parseTestParamsFromMap(): Avro2TFParams = {
val paramsMap = Map(
Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/test",
Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data",
Avro2TFJobParamNames.EXECUTION_MODE -> "test",
Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json",
Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord"
)
val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq
println(prams)
Avro2TFJobParamsParser.parse(prams)
}

private def getSparkSession(name: String): SparkSession = {
val sparkConf = new SparkConf()
.registerKryoClasses(Array())
.set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.sql.avro.compression.codec", "deflate")
.set("spark.sql.avro.deflate.level", "5")

SparkSession.builder
.master("local[*]")
.appName(name)
.config(sparkConf)
.getOrCreate()
}
}
Loading