Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None)
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None, None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down Expand Up @@ -155,7 +155,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD))
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD), None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package com.linkedin.photon.ml.algorithm

import scala.collection.Set

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,46 @@ protected[ml] case class LocalDataSet(dataPoints: Array[(UniqueSampleId, Labeled
LocalDataSet(projectedDataPoints)
}

/**
* Filter features by binomial ratio confidence intervals.
*
* @param globalFeatureInstances The global instances with the features present
* @param globalPositiveInstances The global positive instances with the features present
* @param binaryIndices The binary feature columns indices
* @param nonBinaryIndices The binary feature columns indices
* @param intervalBound The lower bound threshold of the confidence interval used to filter features
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The filtered dataset
*/
def filterFeaturesByRatioCIBound(
globalFeatureInstances: Array[Double],
globalPositiveInstances: Array[Double],
binaryIndices: Set[Int],
nonBinaryIndices: Set[Int],
intervalBound: Double = 1.0,
zScore: Double = 2.575): LocalDataSet = {

val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) }
val lowerBounds = LocalDataSet.computeRatioCILowerBound(
labelAndFeatures,
globalFeatureInstances,
globalPositiveInstances,
binaryIndices,
numFeatures,
zScore)
val filteredBinaryFeaturesIndexSet = lowerBounds.filter(_._2 > intervalBound).keySet
val filteredFeaturesIndexSet = filteredBinaryFeaturesIndexSet ++ nonBinaryIndices

val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) =>

val filteredFeatures = LocalDataSet.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet)

(id, LabeledPoint(label, filteredFeatures, offset, weight))
}

LocalDataSet(filteredActivities)
}

/**
* Filter features by Pearson correlation score.
*
Expand Down Expand Up @@ -143,6 +183,8 @@ object LocalDataSet {
* @param isSortedByFirstIndex Whether or not to sort the data by global ID
* @return A new LocalDataSet
*/
val EPSILON = 0.5

protected[ml] def apply(
dataPoints: Array[(UniqueSampleId, LabeledPoint)],
isSortedByFirstIndex: Boolean): LocalDataSet = {
Expand Down Expand Up @@ -176,6 +218,105 @@ object LocalDataSet {
result
}

/**
* Compute feature ratio confidence interval lower bounds.
*
* @param labelAndFeatures An [[Array]] of (label, feature vector) tuples
* @param zScore The Z-score for the chosen two-tailed confidence level
* @param globalFeatureInstances The global instances with the features present
* @param globalPositiveInstances The global positive instances with the features present
* @param binaryIndices The binary feature columns indices
* @param epsilon The constant used to compute for extreme case of ratio modeling
* @return the lowerBounds for feature columns
*/
protected[ml] def computeRatioCILowerBound(
labelAndFeatures: Array[(Double, Vector[Double])],
globalFeatureInstances: Array[Double],
globalPositiveInstances: Array[Double],
binaryIndices: Set[Int],
numFeatures: Int,
zScore: Double): Map[Int, Double] = {

val n = globalFeatureInstances
val y = globalPositiveInstances

val m = labelAndFeatures.map(_._2).reduce(_ + _)
val x = labelAndFeatures
.filter(_._1 > MathConst.POSITIVE_RESPONSE_THRESHOLD)
.map(_._2)
.foldLeft(Vector.zeros[Double](numFeatures))(_ + _)

binaryIndices
.map { key =>
val x_col = x(key)
val m_col = m(key)
val y_col = y(key)
val n_col = n(key)

val lowerBound = if (y_col == 0.0 || y_col == n_col) {
0D

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for m_col = 0, we may also want to return 0D here. Since if it enter next branch it will produce Infinity.

} else {
val (t, variance) = computeTAndVariance(math.max(x_col, EPSILON), m_col, y_col, n_col)

if (t < 1D) {
1D / computeUpperBound(t, variance, zScore)
} else {
computeLowerBound(t, variance, zScore)
}
}

(key, lowerBound)
}
.toMap
}

/**
* Compute t value and variance for ratio modelling.
*
* @param x The count for f_i == 1 and label == 1 in the local population
* @param m The count for f_i == 1 in the local population
* @param y The count for f_i == 1 and label == 1 in the global population
* @param n The count for f_i == 1 in the global population
* @return The mean and variance for ratio t
*/
protected[ml] def computeTAndVariance(x: Double, m: Double, y: Double, n: Double): (Double, Double) =
if (m == 0.0 || n == 0.0 || y == 0.0 || x == 0.0) {
(0.0, 0.0)
} else {
val t = (x / m) / (y / n)
val variance = 1.0 / x - 1.0 / m + 1.0 / y - 1.0 / n

(t, variance)
}

/**
* Compute the confidence interval lowerbound for ratio modelling.
*
* @param t The value of ratio
* @param variance The variance of the ratio
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The lowerbound for ratio t
*/
protected[ml] def computeLowerBound(
t: Double,
variance: Double,
zScore: Double): Double =
t * math.exp(-math.sqrt(variance) * zScore)

/**
* Compute the confidence interval upperbound for ratio modelling.
*
* @param t The value of ratio
* @param variance The variance of the ratio
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The upperbound for ratio t
*/
protected[ml] def computeUpperBound(
t: Double,
variance: Double,
zScore: Double): Double =
t * math.exp(math.sqrt(variance) * zScore)

/**
* Compute Pearson correlation scores.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package com.linkedin.photon.ml.data

import scala.collection.Set
import scala.util.hashing.byteswap64

import org.apache.spark.broadcast.Broadcast
Expand All @@ -23,8 +22,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Partitioner, SparkContext}

import com.linkedin.photon.ml.Types.{FeatureShardId, REId, REType, UniqueSampleId}
import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
import com.linkedin.photon.ml.stat.BasicStatisticalSummary

/**
* Data set implementation for random effect datasets:
Expand Down Expand Up @@ -239,19 +240,28 @@ object RandomEffectDataSet {
gameDataSet: RDD[(UniqueSampleId, GameDatum)],
randomEffectDataConfiguration: RandomEffectDataConfiguration,
randomEffectPartitioner: Partitioner,
existingModelKeysRddOpt: Option[RDD[REId]]): RandomEffectDataSet = {
existingModelKeysRddOpt: Option[RDD[REId]],
featureShardStatsMapOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]]): RandomEffectDataSet = {

val randomEffectType = randomEffectDataConfiguration.randomEffectType
val featureShardId = randomEffectDataConfiguration.featureShardId

val gameDataPartitioner = gameDataSet.partitioner.get

val globalPositiveInstances = gameDataSet
.values
.filter( _.response > MathConst.POSITIVE_RESPONSE_THRESHOLD)
.map( _.featureShardContainer(randomEffectDataConfiguration.featureShardId))
.reduce(_ + _)
.toArray

val rawActiveData = generateActiveData(
gameDataSet,
randomEffectDataConfiguration,
randomEffectPartitioner,
existingModelKeysRddOpt)
val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration)

val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration, featureShardStatsMapOpt, globalPositiveInstances)
.setName("Active data")
.persist(StorageLevel.DISK_ONLY)

Expand Down Expand Up @@ -462,6 +472,7 @@ object RandomEffectDataSet {
val passiveDataRandomEffectIds = passiveDataRandomEffectIdCountsMap
.filter(_._2 > passiveDataLowerBound)
.keySet
.toSet
val sparkContext = gameDataSet.sparkContext
val passiveDataRandomEffectIdsBroadcast = sparkContext.broadcast(passiveDataRandomEffectIds)
val filteredPassiveData = passiveData
Expand All @@ -488,21 +499,66 @@ object RandomEffectDataSet {
*/
private def featureSelectionOnActiveData(
activeData: RDD[(REId, LocalDataSet)],
randomEffectDataConfiguration: RandomEffectDataConfiguration): RDD[(REId, LocalDataSet)] = {
randomEffectDataConfiguration: RandomEffectDataConfiguration,
featureShardStatsOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]],
globalPositiveInstances: Array[Double]): RDD[(REId, LocalDataSet)] = {

featureShardStatsOpt match {
case Some(featureShardStats) =>
val globalFeatureShardStats = featureShardStats(randomEffectDataConfiguration.featureShardId)
val (binaryIndices, nonBinaryIndices) = segregateBinaryFeatures(globalFeatureShardStats)

val broadcastBinaryFeatureIndices = activeData.sparkContext.broadcast(binaryIndices)
val broadcastNonBinaryFeatureIndices = activeData.sparkContext.broadcast(nonBinaryIndices)
val broadcastGlobalPositiveInstances = activeData.sparkContext.broadcast(globalPositiveInstances)
val broadcastGlobalFeatureInstances = activeData.sparkContext.broadcast(globalFeatureShardStats.numNonzeros.toArray)
val filteredActiveData = activeData.mapValues{ localDataSet =>
localDataSet.filterFeaturesByRatioCIBound(
broadcastGlobalFeatureInstances.value,
broadcastGlobalPositiveInstances.value,
broadcastBinaryFeatureIndices.value,
broadcastNonBinaryFeatureIndices.value)
}

randomEffectDataConfiguration
.numFeaturesToSamplesRatioUpperBound
.map { numFeaturesToSamplesRatioUpperBound =>
activeData.mapValues { localDataSet =>
var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt
broadcastGlobalFeatureInstances.unpersist
broadcastBinaryFeatureIndices.unpersist
broadcastNonBinaryFeatureIndices.unpersist
broadcastGlobalPositiveInstances.unpersist

// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue
val filteredLocalDataSet = localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
filteredActiveData

filteredLocalDataSet
}
}
.getOrElse(activeData)
case None =>
randomEffectDataConfiguration
.numFeaturesToSamplesRatioUpperBound
.map { numFeaturesToSamplesRatioUpperBound =>
activeData.mapValues { localDataSet =>
var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt
// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue
Comment thread
YazhiGao marked this conversation as resolved.

localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
}
}
.getOrElse(activeData)
}
}

/**
* Split the feature indices into binary features and non-binary features
*
* @param featureStatsSummary The feature statistical summary for global population
* @return The indices for binary and non-binary feature columns
*/
private def segregateBinaryFeatures(featureStatsSummary: BasicStatisticalSummary): (Set[Int], Set[Int]) = {

val nonZerosDiff = (featureStatsSummary.mean * featureStatsSummary.count.toDouble) - featureStatsSummary.numNonzeros
val allFeatureIndices = nonZerosDiff.keySet
val candidates = nonZerosDiff.findAll(_ < MathConst.EPSILON).toSet
val maxOne = featureStatsSummary.max.findAll(_ == 1D).toSet

val binaryIndices = candidates.intersect(maxOne)
val nonBinaryIndices = allFeatureIndices.diff(binaryIndices) ++ featureStatsSummary.interceptIndex

(binaryIndices, nonBinaryIndices)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.linkedin.photon.ml.optimization.game._
import com.linkedin.photon.ml.projector.{IdentityProjection, IndexMapProjectorRDD, ProjectionMatrixBroadcast}
import com.linkedin.photon.ml.sampling.DownSamplerHelper
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
import com.linkedin.photon.ml.stat.BasicStatisticalSummary
import com.linkedin.photon.ml.supervised.classification.{LogisticRegressionModel, SmoothedHingeLossLinearSVMModel}
import com.linkedin.photon.ml.supervised.regression.{LinearRegressionModel, PoissonRegressionModel}
import com.linkedin.photon.ml.util._
Expand Down Expand Up @@ -102,6 +103,13 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
"coordinate, but the shifts and factors are different for each shard.",
PhotonParamValidators.nonEmpty[TraversableOnce, (CoordinateId, NormalizationContext)])

val featureShardStatistics: Param[Map[FeatureShardId, BasicStatisticalSummary]] =
ParamUtils.createParam[Map[FeatureShardId, BasicStatisticalSummary]](
"feature shard statistics",
"A map of shard name to feature space statistical summary. " +
"Used to filter random effect features by binomial proportions.",
PhotonParamValidators.nonEmpty[TraversableOnce, (FeatureShardId, BasicStatisticalSummary)])

val initialModel: Param[GameModel] = ParamUtils.createParam(
"initial model",
"Prior model to use as a starting point for training.")
Expand Down Expand Up @@ -154,6 +162,9 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
def setCoordinateNormalizationContexts(value: Map[CoordinateId, NormalizationContext]): this.type =
set(coordinateNormalizationContexts, value)

def setFeatureShardStatistics(value: Map[FeatureShardId, BasicStatisticalSummary]) : this.type =
set(featureShardStatistics, value)

def setInitialModel(value: GameModel): this.type = set(initialModel, value)

def setPartialRetrainLockedCoordinates(value: Set[CoordinateId]): this.type =
Expand Down Expand Up @@ -526,7 +537,12 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
None
}

val rawRandomEffectDataSet = RandomEffectDataSet(gameDataSet, reConfig, rePartitioner, existingModelKeysRddOpt)
val rawRandomEffectDataSet = RandomEffectDataSet(
gameDataSet,
reConfig,
rePartitioner,
existingModelKeysRddOpt,
get(featureShardStatistics))
.setName(s"Random Effect Data Set: $coordinateId")
.persistRDD(StorageLevel.DISK_ONLY)
.materialize()
Expand Down
Loading