diff --git a/avro2tf/src/demo/README.md b/avro2tf/src/demo/README.md new file mode 100644 index 0000000..9a7cea2 --- /dev/null +++ b/avro2tf/src/demo/README.md @@ -0,0 +1,25 @@ +# Avro2TF + +![](https://github.com/linkedin/Avro2TF/blob/master/avro2tf-logo.png) + +This demo has 2 parts: + +1. Spark: this can run locally or in a cluster. It can be directly loaded into intelliJ. The SingleRun object will be able to take the data directory and convert it to tensor format that can be consumed by TensorFlow. +2. TensorFlow: once SingleRun completed, the data will be stored in tensorflow/data directory. User can use the following command to setup virtual env. and Jupyter Notebook to run the test program + + +## Setting up Virtual Environment with TensorFlow and Jupyter Notebook + +1. Assume Python 3 is installed, and under avro2tf/src/demo/tensorflow: + + + pip install virtualenv + python3 ${PYTHON}/lib/python/site-packages/virtualenv.py venv + source venv/bin/activate + pip install tensorflow + pip install jupyter + pip install sklearn + jupyter notebook + +2. open notebook/keras_tf2_movielens.ipynb + diff --git a/avro2tf/src/demo/spark/config/avro2tf_config_movielens.json b/avro2tf/src/demo/spark/config/avro2tf_config_movielens.json new file mode 100644 index 0000000..1df2df9 --- /dev/null +++ b/avro2tf/src/demo/spark/config/avro2tf_config_movielens.json @@ -0,0 +1,119 @@ +{ + "features": [ + { + "inputFeatureInfo": { + "columnExpr": "user_id" + }, + "outputTensorInfo": { + "name": "userId", + "dtype": "long", + "shape": [ + -1 + ] + } + }, + { + "inputFeatureInfo": { + "columnExpr": "movie_id" + }, + "outputTensorInfo": { + "name": "movieId", + "dtype": "long", + "shape": [ + -1 + ] + } + }, + { + "inputFeatureInfo": { + "columnExpr": "user_id", + "transformConfig": { + "hashInfo": { + "hashBucketSize": 1000, + "numHashFunctions": 4 + } + } + }, + "outputTensorInfo": { + "name": "userId_hashed", + "dtype": "long", + "shape": [ + 4 + ] + } + }, + { + "inputFeatureInfo": { + "columnExpr": "movie_id", + "transformConfig": { + "hashInfo": { + "hashBucketSize": 1000, + "numHashFunctions": 4 + } + } + }, + "outputTensorInfo": { + "name": "movieId_hashed", + "dtype": "long", + "shape": [ + 4 + ] + } + }, + { + "inputFeatureInfo": { + "columnExpr": "user_features" + }, + "outputTensorInfo": { + "name": "user_features", + "dtype": "float", + "shape": [], + "isSparse" : true + } + }, + { + "inputFeatureInfo": { + "columnExpr": "movie_features" + }, + "outputTensorInfo": { + "name": "movie_features", + "dtype": "float", + "shape": [], + "isSparse" : true + } + }, + { + "inputFeatureInfo": { + "columnExpr": "user_emb_features.value" + }, + "outputTensorInfo": { + "name": "user_embb_features", + "dtype": "float", + "shape": [64] + } + }, + { + "inputFeatureInfo": { + "columnExpr": "movie_emb_features.value" + }, + "outputTensorInfo": { + "name": "movie_embb_features", + "dtype": "float", + "shape": [64] + } + } + + ], + "labels": [ + { + "inputFeatureInfo": { + "columnExpr": "response" + }, + "outputTensorInfo": { + "name": "response", + "dtype": "double", + "shape": [] + } + } + ] +} diff --git a/avro2tf/src/demo/spark/data/test/part-000.avro b/avro2tf/src/demo/spark/data/test/part-000.avro new file mode 100644 index 0000000..1daa27a Binary files /dev/null and b/avro2tf/src/demo/spark/data/test/part-000.avro differ diff --git a/avro2tf/src/demo/spark/data/test/part-001.avro b/avro2tf/src/demo/spark/data/test/part-001.avro new file mode 100644 index 0000000..9463643 Binary files /dev/null and b/avro2tf/src/demo/spark/data/test/part-001.avro differ diff --git a/avro2tf/src/demo/spark/data/test/part-002.avro b/avro2tf/src/demo/spark/data/test/part-002.avro new file mode 100644 index 0000000..7dd3975 Binary files /dev/null and b/avro2tf/src/demo/spark/data/test/part-002.avro differ diff --git a/avro2tf/src/demo/spark/data/test/part-003.avro b/avro2tf/src/demo/spark/data/test/part-003.avro new file mode 100644 index 0000000..87cc8bc Binary files /dev/null and b/avro2tf/src/demo/spark/data/test/part-003.avro differ diff --git a/avro2tf/src/demo/spark/data/test/part-004.avro b/avro2tf/src/demo/spark/data/test/part-004.avro new file mode 100644 index 0000000..2a4a849 Binary files /dev/null and b/avro2tf/src/demo/spark/data/test/part-004.avro differ diff --git a/avro2tf/src/demo/spark/data/train/part-000.avro b/avro2tf/src/demo/spark/data/train/part-000.avro new file mode 100644 index 0000000..c887e45 Binary files /dev/null and b/avro2tf/src/demo/spark/data/train/part-000.avro differ diff --git a/avro2tf/src/demo/spark/data/validate/part-000.avro b/avro2tf/src/demo/spark/data/validate/part-000.avro new file mode 100644 index 0000000..5bd1bbb Binary files /dev/null and b/avro2tf/src/demo/spark/data/validate/part-000.avro differ diff --git a/avro2tf/src/demo/spark/data/validate/part-001.avro b/avro2tf/src/demo/spark/data/validate/part-001.avro new file mode 100644 index 0000000..e5c1a02 Binary files /dev/null and b/avro2tf/src/demo/spark/data/validate/part-001.avro differ diff --git a/avro2tf/src/demo/spark/data/validate/part-002.avro b/avro2tf/src/demo/spark/data/validate/part-002.avro new file mode 100644 index 0000000..7843e10 Binary files /dev/null and b/avro2tf/src/demo/spark/data/validate/part-002.avro differ diff --git a/avro2tf/src/demo/spark/data/validate/part-003.avro b/avro2tf/src/demo/spark/data/validate/part-003.avro new file mode 100644 index 0000000..a8acb08 Binary files /dev/null and b/avro2tf/src/demo/spark/data/validate/part-003.avro differ diff --git a/avro2tf/src/demo/spark/data/validate/part-004.avro b/avro2tf/src/demo/spark/data/validate/part-004.avro new file mode 100644 index 0000000..b378716 Binary files /dev/null and b/avro2tf/src/demo/spark/data/validate/part-004.avro differ diff --git a/avro2tf/src/demo/spark/scala/com/linkedin/avro2tf/SingleRun.scala b/avro2tf/src/demo/spark/scala/com/linkedin/avro2tf/SingleRun.scala new file mode 100644 index 0000000..b524528 --- /dev/null +++ b/avro2tf/src/demo/spark/scala/com/linkedin/avro2tf/SingleRun.scala @@ -0,0 +1,86 @@ +package com.linkedin.avro2tf + +import com.linkedin.avro2tf.constants.Avro2TFJobParamNames +import com.linkedin.avro2tf.jobs.Avro2TF +import com.linkedin.avro2tf.parsers.{Avro2TFJobParamsParser, Avro2TFParams} +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +/** + * This is a simple job that can run locally to process small aount of data + * This job consists of 3 runs: + * 1. generate training data and meta data and feature mapping + * 2. generate validation data using meta data + * 3. generate test data using meta data + */ +object SingleRun { + def main(args: Array[String]) = { + val session = getSparkSession("single run") + + val trainParams = parseTrainParamsFromMap() + println(trainParams) + Avro2TF.run(session, trainParams) + + val validateParams = parseValidateParamsFromMap() + println(validateParams) + Avro2TF.run(session, validateParams) + + val testParams = parseTestParamsFromMap() + println(testParams) + Avro2TF.run(session, testParams) + + session.stop() + } + + def parseTrainParamsFromMap(): Avro2TFParams = { + val paramsMap = Map( + Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/train", + Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data", + Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json", + Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord" + ) + val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq + println(prams) + Avro2TFJobParamsParser.parse(prams) + } + + def parseValidateParamsFromMap(): Avro2TFParams = { + val paramsMap = Map( + Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/validate", + Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data", + Avro2TFJobParamNames.EXECUTION_MODE -> "validation", + Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json", + Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord" + ) + val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq + println(prams) + Avro2TFJobParamsParser.parse(prams) + } + + def parseTestParamsFromMap(): Avro2TFParams = { + val paramsMap = Map( + Avro2TFJobParamNames.INPUT_PATHS -> "avro2tf/src/demo/spark/data/test", + Avro2TFJobParamNames.WORKING_DIR -> "avro2tf/src/demo/tensorflow/data", + Avro2TFJobParamNames.EXECUTION_MODE -> "test", + Avro2TFJobParamNames.AVRO2TF_CONFIG_PATH -> "avro2tf/src/demo/spark/config/avro2tf_config_movielens.json", + Avro2TFJobParamNames.OUTPUT_FORMAT -> "tfrecord" + ) + val prams = paramsMap.flatMap(x => Seq(s"--${x._1}", x._2.toString)).toSeq + println(prams) + Avro2TFJobParamsParser.parse(prams) + } + + private def getSparkSession(name: String): SparkSession = { + val sparkConf = new SparkConf() + .registerKryoClasses(Array()) + .set("spark.driver.bindAddress", "127.0.0.1") + .set("spark.sql.avro.compression.codec", "deflate") + .set("spark.sql.avro.deflate.level", "5") + + SparkSession.builder + .master("local[*]") + .appName(name) + .config(sparkConf) + .getOrCreate() + } +} \ No newline at end of file diff --git a/avro2tf/src/demo/tensorflow/notebook/keras_tf2_movielens.ipynb b/avro2tf/src/demo/tensorflow/notebook/keras_tf2_movielens.ipynb new file mode 100644 index 0000000..d0ac4fb --- /dev/null +++ b/avro2tf/src/demo/tensorflow/notebook/keras_tf2_movielens.ipynb @@ -0,0 +1,698 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Avro2TF TensorFlow 2 Keras Tutorial (MovieLens) (TFRecord) - TensorFlow Training" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Assuming we have already generated the tensor data using **Avro2TF**. The data is stored in the ../data/train for training, ../validate ../test for validation and test.\n", + "\n", + "Based on the meta data stored in the data/train/metadata/tensor_metadata.json\n", + "\n", + "is designed to fill the gap of data processing before training to make your training data ready to be consumed by deep learning training frameworks. It reads raw user input data with any format supported by Spark to generate Avro or TFRecord tensorized training data.\n", + "\n", + "Below is an interactive tutorial for using Avro2TF. In this tutorial, we will go through some exercises to get comfortable with Avro2TF and understand how it works." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import tensorflow as tf\n", + "from tensorflow import feature_column\n", + "AUTO = tf.data.experimental.AUTOTUNE # used in tf.data.Dataset API" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Tensor Meta Data\n", + "The meta data is generated from Avro2TF to describe the dataset. For example, user_features is a 571 dimension sparsed vector. We could use this information to form our input tensor shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!cat ../data/metadata/tensor_metadata.json" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "_USER_FEATURE_DIM = 571\n", + "_MOVIE_FEATURE_DIM = 22\n", + "_NUM_USERS = 943\n", + "_NUM_MOVIES = 1682" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Training data in TFRecord format generated by Avro2TF. \n", + "\n", + "(Jupyter notebook: \"avro2tf_open_source_tutorial_text_tfrecord_data_processing\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tf_record_train_path = \"../data/trainingData/part-r-*\"\n", + "tf_record_validate_path = \"../data/validationData/part-r-*\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Hash ID into multiple ids with a range\n", + "\n", + "This will compressed the IDs into a smaller set of dimensions and then be used for embedding learning." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_dynamic_hash_ids_from_id(input_feature, num_hash_functions, hash_bucket_size, use_dense=False):\n", + " if input_feature.dtype != tf.string:\n", + " input_feature = tf.as_string(input_feature)\n", + " hashed_ids = []\n", + " for i in range(num_hash_functions):\n", + " hashed_id = tf.strings.to_hash_bucket_strong(input_feature,\n", + " hash_bucket_size,\n", + " [i+7, i + 113],\n", + " name=\"hash_op_name\")\n", + " if use_dense:\n", + " hashed_ids.append(tf.one_hot(hashed_id, hash_bucket_size))\n", + " else:\n", + " hashed_ids.append(hashed_id + i * hash_bucket_size)\n", + " hashed_ids = tf.stack(hashed_ids, -1)\n", + " return hashed_ids\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### TF-Record parsing function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def read_tfrecord(example):\n", + " \n", + " # Define context_features\n", + " features = {\n", + " 'user_features_indices': tf.io.VarLenFeature(dtype=tf.int64),\n", + " 'user_features_values': tf.io.VarLenFeature(dtype=tf.float32),\n", + " 'movie_features_indices': tf.io.VarLenFeature(dtype=tf.int64),\n", + " 'movie_features_values': tf.io.VarLenFeature(dtype=tf.float32),\n", + " 'userId_hashed': tf.io.FixedLenFeature([4], dtype=tf.int64),\n", + " 'movieId_hashed': tf.io.FixedLenFeature([4], dtype=tf.int64),\n", + " 'user_embb_features': tf.io.FixedLenFeature([64], dtype=tf.float32),\n", + " 'movie_embb_features': tf.io.FixedLenFeature([64], dtype=tf.float32),\n", + " 'userId': tf.io.FixedLenFeature([], dtype=tf.int64),\n", + " 'movieId': tf.io.FixedLenFeature([], dtype=tf.int64),\n", + " 'response': tf.io.FixedLenFeature([], dtype=tf.int64) # Need to use tf.FixedLenFeature() for our \"response\"\n", + " }\n", + " \n", + " # decode the TFRecord\n", + " tf_example = tf.io.parse_single_example(example, features)\n", + " \n", + " # we can get user feature dimension from meta data\n", + " tf_user_features = tf.compat.v1.sparse.merge(tf_example[\"user_features_indices\"], \n", + " tf_example[\"user_features_values\"],\n", + " _USER_FEATURE_DIM)\n", + "\n", + " tf_movie_features = tf.compat.v1.sparse.merge(tf_example[\"movie_features_indices\"], \n", + " tf_example[\"movie_features_values\"],\n", + " _MOVIE_FEATURE_DIM)\n", + "\n", + " \n", + " user_id_hashed2 = get_dynamic_hash_ids_from_id(tf_example[\"userId\"], 4, 1000)\n", + " movie_id_hashed2 = get_dynamic_hash_ids_from_id(tf_example[\"movieId\"], 4, 1000)\n", + " \n", + " \n", + " tf_labelTensor = tf_example[\"response\"]\n", + " tf_labelTensor = tf.cast(tf_labelTensor, tf.int64)\n", + "\n", + " return (\n", + " (user_id_hashed2, movie_id_hashed2, \n", + " tf.sparse.to_dense(tf_user_features), tf.sparse.to_dense(tf_movie_features),\n", + " tf_example[\"userId\"], tf_example[\"movieId\"],\n", + " tf_example[\"user_embb_features\"], tf_example[\"movie_embb_features\"]),\n", + " tf_labelTensor)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Test parser with dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# read from TFRecords. For optimal performance, read from multiple\n", + "# TFRecord files at once and set the option experimental_deterministic = False\n", + "# to allow order-altering optimizations.\n", + "\n", + "option_no_order = tf.data.Options()\n", + "option_no_order.experimental_deterministic = False\n", + "filenames = tf.io.gfile.glob(tf_record_train_path)\n", + "# filenames = tf.io.gfile.glob(tf_record_validate_path)\n", + "\n", + "dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO)\n", + "dataset = dataset.with_options(option_no_order)\n", + "dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)\n", + "dataset = dataset.shuffle(300)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for tf_example, tf_labelTensor in dataset.take(2):\n", + " print(\"{} | {}\\n\".format(tf_example, tf_labelTensor))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Setup training strategy based on hardware" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Detect hardware\n", + "try:\n", + " tpu = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection\n", + "except ValueError:\n", + " tpu = None\n", + " gpus = tf.config.experimental.list_logical_devices(\"GPU\")\n", + " \n", + "# Select appropriate distribution strategy for hardware\n", + "if tpu:\n", + " tf.config.experimental_connect_to_cluster(tpu)\n", + " tf.tpu.experimental.initialize_tpu_system(tpu)\n", + " strategy = tf.distribute.experimental.TPUStrategy(tpu)\n", + " print('Running on TPU ', tpu.master()) \n", + "elif len(gpus) > 0:\n", + " strategy = tf.distribute.MirroredStrategy(gpus) # this works for 1 to multiple GPUs\n", + " print('Running on ', len(gpus), ' GPU(s) ')\n", + "else:\n", + " strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU\n", + " print('Running on CPU')\n", + "\n", + "# How many accelerators do we have ?\n", + "print(\"Number of accelerators: \", strategy.num_replicas_in_sync)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define metric hook up" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Hook up to AUC\n", + "from sklearn.metrics import roc_auc_score\n", + "\n", + "def auroc(y_true, y_pred):\n", + " return tf.py_function(roc_auc_score, (y_true, y_pred), tf.double)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get some basic prediction stats\n", + "def mean_pred(y_true, y_pred):\n", + " return tf.keras.backend.mean(y_pred)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model loader and driver" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def load_dataset(filenames):\n", + " # read from TFRecords. For optimal performance, read from multiple\n", + " # TFRecord files at once and set the option experimental_deterministic = False\n", + " # to allow order-altering optimizations.\n", + "\n", + " option_no_order = tf.data.Options()\n", + " option_no_order.experimental_deterministic = False\n", + "\n", + " dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO)\n", + " dataset = dataset.with_options(option_no_order)\n", + " dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)\n", + " return dataset\n", + "\n", + "\n", + "def get_batched_dataset(filenames, batch_size, train=False):\n", + " dataset = load_dataset(filenames)\n", + " dataset = dataset.cache() # This dataset fits in RAM\n", + " if train:\n", + " # Best practices for Keras:\n", + " # Training dataset: repeat then batch\n", + " # Evaluation dataset: do not repeat\n", + " dataset = dataset.repeat()\n", + " dataset = dataset.batch(batch_size)\n", + " dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)\n", + " # should shuffle too but this dataset was well shuffled on disk already\n", + " return dataset\n", + " # source: Dataset performance guide: https://www.tensorflow.org/guide/performance/datasets\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def run_model(model, tf_record_train_path, tf_record_validate_path, \n", + " batch_size, epoch, num_record_epoch):\n", + " print(model.summary())\n", + " # tf.keras.utils.plot_model(model, 'my_embedding.png')\n", + " \n", + " training_filenames = tf.io.gfile.glob(tf_record_train_path)\n", + " validation_filenames = tf.io.gfile.glob(tf_record_validate_path) # choose different file\n", + " \n", + " # instantiate the datasets\n", + " training_dataset = get_batched_dataset(training_filenames, batch_size, train=True)\n", + " validation_dataset = get_batched_dataset(validation_filenames, batch_size, train=False)\n", + "\n", + " history = model.fit(training_dataset, steps_per_epoch=num_record_epoch/batch_size, \n", + " epochs=epoch,\n", + " validation_data=validation_dataset)\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### First model just utilize user and movie features" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def m1_feature_cross():\n", + " # user embedding with 1D pool, follow by adjustment layer\n", + " user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')\n", + "\n", + " # movie embedding with 1D pool, follow by adjustment layer\n", + " movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')\n", + " \n", + " # Raw IDs\n", + " user_id = tf.keras.layers.Input(shape=(), name='user_id')\n", + " movie_id = tf.keras.layers.Input(shape=(), name='movie_id')\n", + " \n", + " # user features & adjustment layer\n", + " user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')\n", + "\n", + " # movie features & adjustment layer\n", + " movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')\n", + "\n", + " # user & movie feature cross\n", + " umf = tf.keras.layers.concatenate([user_features, movie_features], axis=-1)\n", + " \n", + " umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)\n", + "\n", + "\n", + " # activation layer\n", + " output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)\n", + "\n", + " model = tf.keras.models.Model(inputs=[user_hash,\n", + " movie_hash,\n", + " user_features, \n", + " movie_features,\n", + " user_id, movie_id], \n", + " outputs=output)\n", + " model.compile(optimizer='adam', loss='binary_crossentropy', \n", + " metrics=['accuracy', mean_pred, auroc])\n", + " return model \n", + " \n", + "\n", + "run_model(m1_feature_cross(), \n", + " tf_record_train_path, \n", + " tf_record_validate_path, \n", + " batch_size=100, \n", + " epoch=30, \n", + " num_record_epoch=9000) \n", + " \n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model 2: Dynamic embedding and max pool model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def m2_feature_emb_cross():\n", + " # user embedding with 1D pool, follow by adjustment layer\n", + " user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')\n", + "\n", + " user_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(user_hash)\n", + "\n", + " user_emb_max = tf.keras.layers.GlobalMaxPool1D()(user_emb)\n", + "\n", + " user_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='user_emb_max2')(user_emb_max)\n", + "\n", + "\n", + " # movie embedding with 1D pool, follow by adjustment layer\n", + " movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')\n", + "\n", + " movie_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(movie_hash)\n", + "\n", + " movie_emb_max = tf.keras.layers.GlobalMaxPool1D()(movie_emb)\n", + "\n", + " movie_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='movie_emb_max2')(movie_emb_max)\n", + " \n", + " # Cross user & movie at ID level\n", + " um_emb_interact = tf.math.multiply(user_emb_max2, movie_emb_max2)\n", + "# um_emb_interact = tf.keras.layers.concatenate([user_emb_max, movie_emb_max], axis=-1)\n", + "\n", + " \n", + " # Raw IDs\n", + " user_id = tf.keras.layers.Input(shape=(), name='user_id')\n", + " movie_id = tf.keras.layers.Input(shape=(), name='movie_id')\n", + " \n", + " # user features & adjustment layer\n", + " user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')\n", + "\n", + " # movie features & adjustment layer\n", + " movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')\n", + "\n", + " # user & movie feature cross\n", + " umf = tf.keras.layers.concatenate([user_features, movie_features, um_emb_interact], axis=-1)\n", + " \n", + " umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)\n", + "\n", + "\n", + " # activation layer\n", + " output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)\n", + "\n", + " model = tf.keras.models.Model(inputs=[user_hash,\n", + " movie_hash,\n", + " user_features, \n", + " movie_features,\n", + " user_id, movie_id], \n", + " outputs=output)\n", + " model.compile(optimizer='adam', loss='binary_crossentropy', \n", + " metrics=['accuracy', mean_pred, auroc])\n", + " return model \n", + " \n", + "\n", + "run_model(m2_feature_emb_cross(), \n", + " tf_record_train_path, \n", + " tf_record_validate_path, \n", + " batch_size=100, \n", + " epoch=30, \n", + " num_record_epoch=9000) \n", + " \n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Pre-trained embedding " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def m3_feature_pretrain_emb_cross():\n", + " # user embedding with 1D pool, follow by adjustment layer\n", + " user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')\n", + "\n", + " user_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(user_hash)\n", + "\n", + " user_emb_max = tf.keras.layers.GlobalMaxPool1D()(user_emb)\n", + "\n", + " user_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='user_emb_max2')(user_emb_max)\n", + "\n", + "\n", + " # movie embedding with 1D pool, follow by adjustment layer\n", + " movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')\n", + "\n", + " movie_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(movie_hash)\n", + "\n", + " movie_emb_max = tf.keras.layers.GlobalMaxPool1D()(movie_emb)\n", + "\n", + " movie_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='movie_emb_max2')(movie_emb_max)\n", + " \n", + " # Cross user & movie at ID level\n", + " um_emb_interact = tf.math.multiply(user_emb_max2, movie_emb_max2)\n", + "# um_emb_interact = tf.keras.layers.concatenate([user_emb_max, movie_emb_max], axis=-1)\n", + "\n", + "\n", + " # Pretrained embbedding\n", + " user_p_emb = tf.keras.layers.Input(shape=(64), name='user_embb_features')\n", + " movie_p_emb = tf.keras.layers.Input(shape=(64), name='movie_embb_features')\n", + " \n", + " uf_pe = tf.keras.layers.Dense(16, activation='relu', name='uf_pe')(user_p_emb)\n", + " mf_pe = tf.keras.layers.Dense(16, activation='relu', name='mf_pe')(movie_p_emb)\n", + " \n", + " um_emb_pe = tf.keras.layers.concatenate([uf_pe, mf_pe], axis=-1)\n", + " \n", + " # Raw IDs\n", + " user_id = tf.keras.layers.Input(shape=(), name='user_id')\n", + " movie_id = tf.keras.layers.Input(shape=(), name='movie_id')\n", + " \n", + " # user features & adjustment layer\n", + " user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')\n", + "\n", + " # movie features & adjustment layer\n", + " movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')\n", + "\n", + " # user & movie feature cross\n", + " umf = tf.keras.layers.concatenate([user_features, movie_features, um_emb_interact, um_emb_pe], axis=-1)\n", + " \n", + " umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)\n", + "\n", + "\n", + " # activation layer\n", + " output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)\n", + "\n", + " model = tf.keras.models.Model(inputs=[user_hash,\n", + " movie_hash,\n", + " user_features, \n", + " movie_features,\n", + " user_id, movie_id,\n", + " user_p_emb, \n", + " movie_p_emb], \n", + " outputs=output)\n", + " model.compile(optimizer='adam', loss='binary_crossentropy', \n", + " metrics=['accuracy', mean_pred, auroc])\n", + " return model \n", + " \n", + "\n", + "run_model(m3_feature_pretrain_emb_cross(), \n", + " tf_record_train_path, \n", + " tf_record_validate_path, \n", + " batch_size=100, \n", + " epoch=30, \n", + " num_record_epoch=9000) \n", + " \n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Prediction and scoring" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model = m3_feature_pretrain_emb_cross()\n", + "model.predict(validation_dataset)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model.inputs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_emb = tf.keras.Model(model.inputs, \n", + " (model.get_layer(\"user_emb_max\").output,\n", + " model.get_layer(\"movie_emb_max\").output,\n", + " model.get_layer(\"user_id2\").output))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_emb = tf.keras.Model((model.get_layer(\"user_hash\").input, model.get_layer(\"movie_hash\").input), \n", + " (model.get_layer(\"user_emb_max\").output,\n", + " model.get_layer(\"movie_emb_max\").output))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(user_emb.summary())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "output_data = user_emb.predict(validation_dataset)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "input_data = [np.asarray([1,2,3,4]), np.asarray([5,6,7,8])]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_emb.predict( input_data )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "len(output_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/avro2tf/src/main/scala/com/linkedin/avro2tf/jobs/FeatureListGeneration.scala b/avro2tf/src/main/scala/com/linkedin/avro2tf/jobs/FeatureListGeneration.scala index 1ad7f90..1b1a406 100644 --- a/avro2tf/src/main/scala/com/linkedin/avro2tf/jobs/FeatureListGeneration.scala +++ b/avro2tf/src/main/scala/com/linkedin/avro2tf/jobs/FeatureListGeneration.scala @@ -2,6 +2,7 @@ package com.linkedin.avro2tf.jobs import java.io.OutputStreamWriter import java.nio.charset.StandardCharsets.UTF_8 +import java.util.regex.Pattern import scala.collection.mutable @@ -277,6 +278,7 @@ object FeatureListGeneration { val tensorGroups = getTensorGroupsToWriteFeatureLists(params, fileSystem) val tmpFeatureListDir = s"${params.workingDir.rootPath}/$TMP_FEATURE_LIST" val vocabSizeCap = Avro2TFConfigHelper.getOutputTensorVocabSizeCap(params) + val pattern = Pattern.compile(SPLIT_REGEX) // merge and write feature lists for output tensors with shared feature list setting tensorGroups.foreach( // each element is an array containing the output tensor names sharing one feature list tensors => { @@ -301,7 +303,7 @@ object FeatureListGeneration { // the format of each line is feature_entry,count // first get feature_entry, if need process prefix (ntv), remove prefix from feature_entry, make // sure prefix is unique - val words = line.split(SPLIT_REGEX) + val words = pattern.split(line) val lineWithoutCount = if (words.isEmpty) "" else words.head val featureEntry = if (needProcessPrefix) { val prefixSplit = lineWithoutCount.split(SEPARATOR_NAME_TERM)