diff --git a/photon-api/src/integTest/scala/com/linkedin/photon/ml/data/RandomEffectDataSetIntegTest.scala b/photon-api/src/integTest/scala/com/linkedin/photon/ml/data/RandomEffectDataSetIntegTest.scala index bf265c15..a431a333 100644 --- a/photon-api/src/integTest/scala/com/linkedin/photon/ml/data/RandomEffectDataSetIntegTest.scala +++ b/photon-api/src/integTest/scala/com/linkedin/photon/ml/data/RandomEffectDataSetIntegTest.scala @@ -123,7 +123,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils { Some(activeDataLowerBound)) val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap)) - val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None) + val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None, None) val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count() assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects) @@ -155,7 +155,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils { Some(activeDataLowerBound)) val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap)) - val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD)) + val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD), None) val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count() assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala index b28eb6c3..dec080b9 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala @@ -14,8 +14,6 @@ */ package com.linkedin.photon.ml.algorithm -import scala.collection.Set - import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataSet.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataSet.scala index f836c763..7c2529c8 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataSet.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataSet.scala @@ -102,6 +102,46 @@ protected[ml] case class LocalDataSet(dataPoints: Array[(UniqueSampleId, Labeled LocalDataSet(projectedDataPoints) } + /** + * Filter features by binomial ratio confidence intervals. + * + * @param globalFeatureInstances The global instances with the features present + * @param globalPositiveInstances The global positive instances with the features present + * @param binaryIndices The binary feature columns indices + * @param nonBinaryIndices The binary feature columns indices + * @param intervalBound The lower bound threshold of the confidence interval used to filter features + * @param zScore The Z-score for the chosen two-tailed confidence level + * @return The filtered dataset + */ + def filterFeaturesByRatioCIBound( + globalFeatureInstances: Array[Double], + globalPositiveInstances: Array[Double], + binaryIndices: Set[Int], + nonBinaryIndices: Set[Int], + intervalBound: Double = 1.0, + zScore: Double = 2.575): LocalDataSet = { + + val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) } + val lowerBounds = LocalDataSet.computeRatioCILowerBound( + labelAndFeatures, + globalFeatureInstances, + globalPositiveInstances, + binaryIndices, + numFeatures, + zScore) + val filteredBinaryFeaturesIndexSet = lowerBounds.filter(_._2 > intervalBound).keySet + val filteredFeaturesIndexSet = filteredBinaryFeaturesIndexSet ++ nonBinaryIndices + + val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) => + + val filteredFeatures = LocalDataSet.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet) + + (id, LabeledPoint(label, filteredFeatures, offset, weight)) + } + + LocalDataSet(filteredActivities) + } + /** * Filter features by Pearson correlation score. * @@ -143,6 +183,8 @@ object LocalDataSet { * @param isSortedByFirstIndex Whether or not to sort the data by global ID * @return A new LocalDataSet */ + val EPSILON = 0.5 + protected[ml] def apply( dataPoints: Array[(UniqueSampleId, LabeledPoint)], isSortedByFirstIndex: Boolean): LocalDataSet = { @@ -176,6 +218,105 @@ object LocalDataSet { result } + /** + * Compute feature ratio confidence interval lower bounds. + * + * @param labelAndFeatures An [[Array]] of (label, feature vector) tuples + * @param zScore The Z-score for the chosen two-tailed confidence level + * @param globalFeatureInstances The global instances with the features present + * @param globalPositiveInstances The global positive instances with the features present + * @param binaryIndices The binary feature columns indices + * @param epsilon The constant used to compute for extreme case of ratio modeling + * @return the lowerBounds for feature columns + */ + protected[ml] def computeRatioCILowerBound( + labelAndFeatures: Array[(Double, Vector[Double])], + globalFeatureInstances: Array[Double], + globalPositiveInstances: Array[Double], + binaryIndices: Set[Int], + numFeatures: Int, + zScore: Double): Map[Int, Double] = { + + val n = globalFeatureInstances + val y = globalPositiveInstances + + val m = labelAndFeatures.map(_._2).reduce(_ + _) + val x = labelAndFeatures + .filter(_._1 > MathConst.POSITIVE_RESPONSE_THRESHOLD) + .map(_._2) + .foldLeft(Vector.zeros[Double](numFeatures))(_ + _) + + binaryIndices + .map { key => + val x_col = x(key) + val m_col = m(key) + val y_col = y(key) + val n_col = n(key) + + val lowerBound = if (y_col == 0.0 || y_col == n_col) { + 0D + } else { + val (t, variance) = computeTAndVariance(math.max(x_col, EPSILON), m_col, y_col, n_col) + + if (t < 1D) { + 1D / computeUpperBound(t, variance, zScore) + } else { + computeLowerBound(t, variance, zScore) + } + } + + (key, lowerBound) + } + .toMap + } + + /** + * Compute t value and variance for ratio modelling. + * + * @param x The count for f_i == 1 and label == 1 in the local population + * @param m The count for f_i == 1 in the local population + * @param y The count for f_i == 1 and label == 1 in the global population + * @param n The count for f_i == 1 in the global population + * @return The mean and variance for ratio t + */ + protected[ml] def computeTAndVariance(x: Double, m: Double, y: Double, n: Double): (Double, Double) = + if (m == 0.0 || n == 0.0 || y == 0.0 || x == 0.0) { + (0.0, 0.0) + } else { + val t = (x / m) / (y / n) + val variance = 1.0 / x - 1.0 / m + 1.0 / y - 1.0 / n + + (t, variance) + } + + /** + * Compute the confidence interval lowerbound for ratio modelling. + * + * @param t The value of ratio + * @param variance The variance of the ratio + * @param zScore The Z-score for the chosen two-tailed confidence level + * @return The lowerbound for ratio t + */ + protected[ml] def computeLowerBound( + t: Double, + variance: Double, + zScore: Double): Double = + t * math.exp(-math.sqrt(variance) * zScore) + + /** + * Compute the confidence interval upperbound for ratio modelling. + * + * @param t The value of ratio + * @param variance The variance of the ratio + * @param zScore The Z-score for the chosen two-tailed confidence level + * @return The upperbound for ratio t + */ + protected[ml] def computeUpperBound( + t: Double, + variance: Double, + zScore: Double): Double = + t * math.exp(math.sqrt(variance) * zScore) + /** * Compute Pearson correlation scores. * diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataSet.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataSet.scala index 22e6f8cc..82bf2226 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataSet.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataSet.scala @@ -14,7 +14,6 @@ */ package com.linkedin.photon.ml.data -import scala.collection.Set import scala.util.hashing.byteswap64 import org.apache.spark.broadcast.Broadcast @@ -23,8 +22,10 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.{Partitioner, SparkContext} import com.linkedin.photon.ml.Types.{FeatureShardId, REId, REType, UniqueSampleId} +import com.linkedin.photon.ml.constants.MathConst import com.linkedin.photon.ml.data.scoring.CoordinateDataScores import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike} +import com.linkedin.photon.ml.stat.BasicStatisticalSummary /** * Data set implementation for random effect datasets: @@ -239,19 +240,28 @@ object RandomEffectDataSet { gameDataSet: RDD[(UniqueSampleId, GameDatum)], randomEffectDataConfiguration: RandomEffectDataConfiguration, randomEffectPartitioner: Partitioner, - existingModelKeysRddOpt: Option[RDD[REId]]): RandomEffectDataSet = { + existingModelKeysRddOpt: Option[RDD[REId]], + featureShardStatsMapOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]]): RandomEffectDataSet = { val randomEffectType = randomEffectDataConfiguration.randomEffectType val featureShardId = randomEffectDataConfiguration.featureShardId val gameDataPartitioner = gameDataSet.partitioner.get + val globalPositiveInstances = gameDataSet + .values + .filter( _.response > MathConst.POSITIVE_RESPONSE_THRESHOLD) + .map( _.featureShardContainer(randomEffectDataConfiguration.featureShardId)) + .reduce(_ + _) + .toArray + val rawActiveData = generateActiveData( gameDataSet, randomEffectDataConfiguration, randomEffectPartitioner, existingModelKeysRddOpt) - val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration) + + val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration, featureShardStatsMapOpt, globalPositiveInstances) .setName("Active data") .persist(StorageLevel.DISK_ONLY) @@ -462,6 +472,7 @@ object RandomEffectDataSet { val passiveDataRandomEffectIds = passiveDataRandomEffectIdCountsMap .filter(_._2 > passiveDataLowerBound) .keySet + .toSet val sparkContext = gameDataSet.sparkContext val passiveDataRandomEffectIdsBroadcast = sparkContext.broadcast(passiveDataRandomEffectIds) val filteredPassiveData = passiveData @@ -488,21 +499,66 @@ object RandomEffectDataSet { */ private def featureSelectionOnActiveData( activeData: RDD[(REId, LocalDataSet)], - randomEffectDataConfiguration: RandomEffectDataConfiguration): RDD[(REId, LocalDataSet)] = { + randomEffectDataConfiguration: RandomEffectDataConfiguration, + featureShardStatsOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]], + globalPositiveInstances: Array[Double]): RDD[(REId, LocalDataSet)] = { + + featureShardStatsOpt match { + case Some(featureShardStats) => + val globalFeatureShardStats = featureShardStats(randomEffectDataConfiguration.featureShardId) + val (binaryIndices, nonBinaryIndices) = segregateBinaryFeatures(globalFeatureShardStats) + + val broadcastBinaryFeatureIndices = activeData.sparkContext.broadcast(binaryIndices) + val broadcastNonBinaryFeatureIndices = activeData.sparkContext.broadcast(nonBinaryIndices) + val broadcastGlobalPositiveInstances = activeData.sparkContext.broadcast(globalPositiveInstances) + val broadcastGlobalFeatureInstances = activeData.sparkContext.broadcast(globalFeatureShardStats.numNonzeros.toArray) + val filteredActiveData = activeData.mapValues{ localDataSet => + localDataSet.filterFeaturesByRatioCIBound( + broadcastGlobalFeatureInstances.value, + broadcastGlobalPositiveInstances.value, + broadcastBinaryFeatureIndices.value, + broadcastNonBinaryFeatureIndices.value) + } - randomEffectDataConfiguration - .numFeaturesToSamplesRatioUpperBound - .map { numFeaturesToSamplesRatioUpperBound => - activeData.mapValues { localDataSet => - var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt + broadcastGlobalFeatureInstances.unpersist + broadcastBinaryFeatureIndices.unpersist + broadcastNonBinaryFeatureIndices.unpersist + broadcastGlobalPositiveInstances.unpersist - // In case the above product overflows - if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue - val filteredLocalDataSet = localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep) + filteredActiveData - filteredLocalDataSet - } - } - .getOrElse(activeData) + case None => + randomEffectDataConfiguration + .numFeaturesToSamplesRatioUpperBound + .map { numFeaturesToSamplesRatioUpperBound => + activeData.mapValues { localDataSet => + var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt + // In case the above product overflows + if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue + + localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep) + } + } + .getOrElse(activeData) + } + } + + /** + * Split the feature indices into binary features and non-binary features + * + * @param featureStatsSummary The feature statistical summary for global population + * @return The indices for binary and non-binary feature columns + */ + private def segregateBinaryFeatures(featureStatsSummary: BasicStatisticalSummary): (Set[Int], Set[Int]) = { + + val nonZerosDiff = (featureStatsSummary.mean * featureStatsSummary.count.toDouble) - featureStatsSummary.numNonzeros + val allFeatureIndices = nonZerosDiff.keySet + val candidates = nonZerosDiff.findAll(_ < MathConst.EPSILON).toSet + val maxOne = featureStatsSummary.max.findAll(_ == 1D).toSet + + val binaryIndices = candidates.intersect(maxOne) + val nonBinaryIndices = allFeatureIndices.diff(binaryIndices) ++ featureStatsSummary.interceptIndex + + (binaryIndices, nonBinaryIndices) } } diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala index 2a82149a..81b04a11 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala @@ -41,6 +41,7 @@ import com.linkedin.photon.ml.optimization.game._ import com.linkedin.photon.ml.projector.{IdentityProjection, IndexMapProjectorRDD, ProjectionMatrixBroadcast} import com.linkedin.photon.ml.sampling.DownSamplerHelper import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike} +import com.linkedin.photon.ml.stat.BasicStatisticalSummary import com.linkedin.photon.ml.supervised.classification.{LogisticRegressionModel, SmoothedHingeLossLinearSVMModel} import com.linkedin.photon.ml.supervised.regression.{LinearRegressionModel, PoissonRegressionModel} import com.linkedin.photon.ml.util._ @@ -102,6 +103,13 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P "coordinate, but the shifts and factors are different for each shard.", PhotonParamValidators.nonEmpty[TraversableOnce, (CoordinateId, NormalizationContext)]) + val featureShardStatistics: Param[Map[FeatureShardId, BasicStatisticalSummary]] = + ParamUtils.createParam[Map[FeatureShardId, BasicStatisticalSummary]]( + "feature shard statistics", + "A map of shard name to feature space statistical summary. " + + "Used to filter random effect features by binomial proportions.", + PhotonParamValidators.nonEmpty[TraversableOnce, (FeatureShardId, BasicStatisticalSummary)]) + val initialModel: Param[GameModel] = ParamUtils.createParam( "initial model", "Prior model to use as a starting point for training.") @@ -154,6 +162,9 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P def setCoordinateNormalizationContexts(value: Map[CoordinateId, NormalizationContext]): this.type = set(coordinateNormalizationContexts, value) + def setFeatureShardStatistics(value: Map[FeatureShardId, BasicStatisticalSummary]) : this.type = + set(featureShardStatistics, value) + def setInitialModel(value: GameModel): this.type = set(initialModel, value) def setPartialRetrainLockedCoordinates(value: Set[CoordinateId]): this.type = @@ -526,7 +537,12 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P None } - val rawRandomEffectDataSet = RandomEffectDataSet(gameDataSet, reConfig, rePartitioner, existingModelKeysRddOpt) + val rawRandomEffectDataSet = RandomEffectDataSet( + gameDataSet, + reConfig, + rePartitioner, + existingModelKeysRddOpt, + get(featureShardStatistics)) .setName(s"Random Effect Data Set: $coordinateId") .persistRDD(StorageLevel.DISK_ONLY) .materialize() diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDataSetTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDataSetTest.scala index 6532537a..4a96fffc 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDataSetTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDataSetTest.scala @@ -18,7 +18,7 @@ import java.util.Random import breeze.linalg.{SparseVector, Vector} import org.testng.Assert._ -import org.testng.annotations.Test +import org.testng.annotations.{DataProvider, Test} import com.linkedin.photon.ml.constants.MathConst import com.linkedin.photon.ml.test.CommonTestUtils @@ -51,6 +51,187 @@ class LocalDataSetTest { } } + @DataProvider + def dataForRandomEffectFeatureSelection(): Array[Array[Any]] ={ + val binaryIndices = Set[Int](0) + val binaryIndicesWithNonBinary = Set[Int](1) + val nonBinaryIndices = Set[Int]() + val nonBinaryIndicesWithNonBinary = Set[Int](0) + + // 8 cases of features + // case 1 for x = 0, y = 0 + // case 2 for x = 0, y != 0 + // case 3 for nonBinary + // case 4 for x = m, y = n + // case 5 for t > 1 not selected + // case 6 for t > 1 selected + // case 7 for t < 1 not selected + // case 8 for t < 1 selected + + // First case x == 0 and y == 0 + val x_0_y_0 = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + + val x_0_y_0_l = Array(0.0, 0.0, 0.0, 0.0, 0.0) + + val x_0_y_0_expected = Map(0 -> 0.0) + + val x_0_y_0_stats = Array(25.0) + + val x_0_y_0_g_positive = Array(0.0) + + // Second case x == 0 and y != 0 + val x_0_y_not_0 = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + val x_0_y_not_0_l = Array(0.0, 0.0, 0.0, 0.0, 0.0) + + val x_0_y_not_0_expected = Map(0 -> 0.122811936) + + val x_0_y_not_0_stats = Array(50.0) + + val x_0_y_not_0_g_positive = Array(20.0) + + // Third case two column, one is non-binary and the other is binary + val nonBinary = Array( + SparseVector(3.0, 0.0), + SparseVector(3.0, 0.0), + SparseVector(3.0, 0.0), + SparseVector(3.0, 0.0), + SparseVector(3.0, 0.0) + ) + val nonBinary_l = Array(0.0, 0.0, 0.0, 0.0, 0.0) + + val nonBinary_expected = Map(1 -> 0.0) + + val nonBinary_stats = Array(25.0, 0.0) + + val nonBinary_g_positive = Array(0.0, 0.0) + + // Forth case x == m and y == n + + val x_m_y_n = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + val x_m_y_n_l = Array(1.0, 1.0, 1.0, 1.0, 1.0) + + val x_m_y_n_expected = Map(0 -> 0.0) + + val x_m_y_n_stats = Array(50.0) + + val x_m_y_n_g_positive = Array(50.0) + + // Fifth case t > 1 but not selected + + val t_bt_1_not_selected = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + val t_bt_1_not_selected_l = Array(0.0, 1.0, 1.0, 1.0, 1.0) + + val t_bt_1_not_selected_expected = Map(0 -> 0.96543857) + + val t_bt_1_not_selected_stats = Array(50.0) + + val t_bt_1_not_selected_g_positive = Array(20.0) + + // Sixth case t > 1 selected + + val t_bt_1_selected = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + val t_bt_1_selected_l = Array(1.0, 1.0, 0.0, 0.0, 1.0) + + val t_bt_1_selected_expected = Map(0 -> 1.419596526) + + val t_bt_1_selected_stats = Array(50.0) + + val t_bt_1_selected_g_positive = Array(5.0) + + // Seventh case t < 1 but not selected + val t_lt_1_not_selected = Array( + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0), + SparseVector(1.0) + ) + val t_lt_1_not_selected_l = Array(0.0, 0.0, 0.0, 0.0, 1.0) + + val t_lt_1_not_selected_expected = Map(0 -> 0.140309013) + + val t_lt_1_not_selected_stats = Array(50.0) + + val t_lt_1_not_selected_g_positive = Array(15.0) + + // Eighth case t < 1 selected + val t_lt_1_selected = Array.fill[Vector[Double]](30)(Vector(1.0)) + + val t_lt_1_selected_l = Array.fill[Double](30)(0.0) + Seq(0,1,2).foreach(t_lt_1_selected_l(_) = 1) + + val t_lt_1_selected_expected = Map(0 -> 1.816122265) + + val t_lt_1_selected_stats = Array(100.0) + + val t_lt_1_selected_g_positive = Array(75.0) + + Array( + Array[Any](x_0_y_0, x_0_y_0_l, x_0_y_0_expected, x_0_y_0_stats, binaryIndices , nonBinaryIndices, x_0_y_0_g_positive, 0), + Array[Any](x_0_y_not_0, x_0_y_not_0_l, x_0_y_not_0_expected, x_0_y_not_0_stats, binaryIndices, nonBinaryIndices, x_0_y_not_0_g_positive, 0), + Array[Any](nonBinary, nonBinary_l, nonBinary_expected, nonBinary_stats, binaryIndicesWithNonBinary, nonBinaryIndicesWithNonBinary, nonBinary_g_positive, 1), + Array[Any](x_m_y_n, x_m_y_n_l, x_m_y_n_expected, x_m_y_n_stats, binaryIndices, nonBinaryIndices, x_m_y_n_g_positive, 0), + Array[Any](t_bt_1_not_selected, t_bt_1_not_selected_l, t_bt_1_not_selected_expected, t_bt_1_not_selected_stats, binaryIndices, nonBinaryIndices, t_bt_1_not_selected_g_positive, 0), + Array[Any](t_bt_1_selected, t_bt_1_selected_l, t_bt_1_selected_expected, t_bt_1_selected_stats, binaryIndices, nonBinaryIndices, t_bt_1_selected_g_positive, 1), + Array[Any](t_lt_1_not_selected, t_lt_1_not_selected_l, t_lt_1_not_selected_expected, t_lt_1_not_selected_stats, binaryIndices, nonBinaryIndices, t_lt_1_not_selected_g_positive, 0), + Array[Any](t_lt_1_selected, t_lt_1_selected_l, t_lt_1_selected_expected, t_lt_1_selected_stats, binaryIndices, nonBinaryIndices, t_lt_1_selected_g_positive, 1) + ) + } + + @Test(dataProvider = "dataForRandomEffectFeatureSelection") + def testFilterFeaturesByRatioCIBound( + features: Array[Vector[Double]], + labels: Array[Double], + expected: Map[Int, Double], + globalFeatureInstances: Array[Double], + binaryIndices: Set[Int], + nonbinaryIndices: Set[Int], + globalPositiveInstances: Array[Double], + selectedFeatureNum: Int): Unit = { + + val numSamples = labels.length + + val localDataSet = + LocalDataSet( + Array.tabulate(numSamples)(i => (i.toLong, LabeledPoint(labels(i), features(i), offset = 0.0, weight = 1.0)))) + + val filteredDataPoints = localDataSet + .filterFeaturesByRatioCIBound(globalFeatureInstances, globalPositiveInstances, binaryIndices, nonbinaryIndices) + .dataPoints + val filteredFeatureNum = filteredDataPoints.map(_._2.features.activeSize) + assertTrue(filteredFeatureNum.forall( _ == selectedFeatureNum)) + } + /** * Test feature filtering using Pearson correlation score. */ @@ -132,4 +313,69 @@ class LocalDataSetTest { filteredDataPointsAll .forall(dataPoint => dataPoint._2.features.activeKeysIterator.toSet == Set(0, 1, 2, 3, 4, 5))) } + + /** + * Test the Ratio Lower Bound computation + */ + @Test(dataProvider = "dataForRandomEffectFeatureSelection") + def testComputeRatioCILowerBound( + features: Array[Vector[Double]], + labels: Array[Double], + expected: Map[Int, Double], + globalFeatureInstances: Array[Double], + binaryIndices: Set[Int], + nonBinaryIndices: Set[Int], + globalPositiveInstances: Array[Double], + selectedFeatureNum: Int): Unit = { + + val labelAndFeatures = labels.zip(features) + val computed = LocalDataSet.computeRatioCILowerBound( + labelAndFeatures, + globalFeatureInstances, + globalPositiveInstances, + binaryIndices, + features(0).size, + 2.575) + + computed.foreach { case (key, value) => + assertEquals( + value, + expected(key), + CommonTestUtils.LOW_PRECISION_TOLERANCE, + s"Computed Ratio Confidence Interval LowerBound score is $value, while the expected value is ${expected(key)}") + } + } + + /** + * Test the t and variance computation + */ + @Test + def testComputeTAndVariance(): Unit = { + + val x = 5.0 + val y = 4.0 + val m = 10.0 + val n = 20.0 + val (t, variance) = LocalDataSet.computeTAndVariance(x, m, y, n) + + assertEquals(t, 2.5, CommonTestUtils.HIGH_PRECISION_TOLERANCE) + assertEquals(variance, 0.3, CommonTestUtils.HIGH_PRECISION_TOLERANCE) + } + + /** + * Test the lower bound and upper bound computation + */ + @Test + def testUpperBoundAndLowerBound(): Unit = { + + val t = 2.0 + val variance = 4.0 + val zScore = 2.575 + + val lowerBound = LocalDataSet.computeLowerBound(t, variance, zScore) + val upperBound = LocalDataSet.computeUpperBound(t, variance, zScore) + + assertEquals(lowerBound, 0.011598809453684283, CommonTestUtils.HIGH_PRECISION_TOLERANCE) + assertEquals(upperBound, 344.862980633708, CommonTestUtils.HIGH_PRECISION_TOLERANCE) + } } diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala index 4bf6f07f..101d12ef 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala @@ -50,16 +50,9 @@ import com.linkedin.photon.ml.util._ /** * This object is the entry point and driver for GAME training. There is a separate driver object for scoring. */ +//TODO: Will need to add CLI parameters to enable feature filtering and control filtering thresholds object GameTrainingDriver extends GameDriver { - // - // These types make the code easier to read, and are somewhat specific to the GAME Driver - // - - type FeatureShardStatistics = Iterable[(FeatureShardId, BasicStatisticalSummary)] - type FeatureShardStatisticsOpt = Option[FeatureShardStatistics] - type IndexMapLoaders = Map[FeatureShardId, IndexMapLoader] - // // Members // @@ -425,12 +418,12 @@ object GameTrainingDriver extends GameDriver { } } - val featureShardStats = Timed("Calculate statistics for each feature shard") { + val featureShardStatsOpt = Timed("Calculate statistics for each feature shard") { calculateAndSaveFeatureShardStats(trainingData, featureIndexMapLoaders) } val normalizationContexts = Timed("Prepare normalization contexts") { - prepareNormalizationContexts(trainingData, featureIndexMapLoaders, featureShardStats) + prepareNormalizationContexts(trainingData, featureIndexMapLoaders, featureShardStatsOpt) } val gameOptimizationConfigs = Timed("Prepare optimization configuration(s)") { @@ -451,6 +444,14 @@ object GameTrainingDriver extends GameDriver { .setComputeVariance(getOrDefault(computeVariance)) .setIgnoreThresholdForNewModels(getOrDefault(ignoreThresholdForNewModels)) + //TODO: the tasks using random effect feature selection will need to be controlled by parameter + if (getRequiredParam(trainingTask) == TaskType.LOGISTIC_REGRESSION) { + featureShardStatsOpt match { + case Some(featureShardStats) => estimator.setFeatureShardStatistics(featureShardStats) + case _ => + } + } + get(inputColumnNames).foreach(estimator.setInputColumnNames) modelOpt.foreach(estimator.setInitialModel) get(partialRetrainLockedCoordinates).foreach(estimator.setPartialRetrainLockedCoordinates) @@ -553,8 +554,8 @@ object GameTrainingDriver extends GameDriver { */ private def prepareNormalizationContexts( trainingData: DataFrame, - featureIndexMapLoaders: IndexMapLoaders, - statistics: FeatureShardStatisticsOpt): Option[Map[CoordinateId, NormalizationContext]] = + featureIndexMapLoaders: Map[FeatureShardId, IndexMapLoader], + statistics: Option[Map[FeatureShardId, BasicStatisticalSummary]]): Option[Map[CoordinateId, NormalizationContext]] = Utils.filter(getOrDefault(normalization) != NormalizationType.NONE) { val featureShardToNormalizationContextMap = statistics @@ -562,7 +563,6 @@ object GameTrainingDriver extends GameDriver { .map { case (featureShardId, featureShardStats) => (featureShardId, NormalizationContext(getOrDefault(normalization), featureShardStats)) } - .toMap getRequiredParam(coordinateConfigurations).mapValues { coordinateConfig => featureShardToNormalizationContextMap(coordinateConfig.dataConfiguration.featureShardId) @@ -575,11 +575,11 @@ object GameTrainingDriver extends GameDriver { * * @param trainingData The training data * @param featureIndexMapLoaders The index map loaders - * @return Basic for each feature shard + * @return BasicStatisticalSummary for each feature shard */ private def calculateAndSaveFeatureShardStats( trainingData: DataFrame, - featureIndexMapLoaders: IndexMapLoaders): FeatureShardStatisticsOpt = + featureIndexMapLoaders: Map[FeatureShardId, IndexMapLoader]): Option[Map[FeatureShardId, BasicStatisticalSummary]] = get(dataSummaryDirectory).map { summarizationOutputDir: Path => calculateStatistics(trainingData, featureIndexMapLoaders) .tap { case (featureShardId, featureShardStats) => @@ -588,7 +588,7 @@ object GameTrainingDriver extends GameDriver { ModelProcessingUtils.writeBasicStatistics(sc, featureShardStats, outputPath, indexMap) } - } + } /** * Calculate basic statistics (same as spark-ml) on a DataFrame. @@ -599,7 +599,7 @@ object GameTrainingDriver extends GameDriver { */ private def calculateStatistics( data: DataFrame, - featureIndexMapLoaders: IndexMapLoaders): FeatureShardStatistics = + featureIndexMapLoaders: Map[FeatureShardId, IndexMapLoader]): Map[FeatureShardId, BasicStatisticalSummary] = featureIndexMapLoaders.map { case (featureShardId, indexMapLoader) => val summary = BasicStatisticalSummary( @@ -823,6 +823,7 @@ object GameTrainingDriver extends GameDriver { } } + /** * Entry point to the driver. * diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/stat/BasicStatisticalSummary.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/stat/BasicStatisticalSummary.scala index e42465b5..7560f59e 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/stat/BasicStatisticalSummary.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/stat/BasicStatisticalSummary.scala @@ -33,6 +33,8 @@ import com.linkedin.photon.ml.util.{Logging, VectorUtils} * standard statistical practice. A degree of freedom is lost when using an estimated mean to compute the variance. * * TODO: rename just "BasicStatistics": descriptive statistics are summaries of the data by definition + * TODO: need to replace the MultivariateStatisticalSummary with the spark.ml.stat.Summarizer + * (as part of the changes from spark.mllib to spark.ml) */ case class BasicStatisticalSummary( mean: BreezeVector[Double], diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala index 5f1afc87..36560480 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala @@ -178,6 +178,7 @@ object VectorUtils { * @param mllibVector The mllib vector * @return The Breeze vector */ + @deprecated def mllibToBreeze(mllibVector: SparkVector): Vector[Double] = mllibVector match {