diff --git a/photon-api/src/integTest/scala/com/linkedin/photon/ml/algorithm/CoordinateFactoryIntegTest.scala b/photon-api/src/integTest/scala/com/linkedin/photon/ml/algorithm/CoordinateFactoryIntegTest.scala index 0015ecdb..6f47fd50 100644 --- a/photon-api/src/integTest/scala/com/linkedin/photon/ml/algorithm/CoordinateFactoryIntegTest.scala +++ b/photon-api/src/integTest/scala/com/linkedin/photon/ml/algorithm/CoordinateFactoryIntegTest.scala @@ -23,6 +23,7 @@ import com.linkedin.photon.ml.TaskType import com.linkedin.photon.ml.Types.REId import com.linkedin.photon.ml.data.{FixedEffectDataset, LocalDataset, RandomEffectDataset} import com.linkedin.photon.ml.function.{DistributedObjectiveFunction, ObjectiveFunctionHelper, SingleNodeObjectiveFunction} +import com.linkedin.photon.ml.model.{FixedEffectModel, RandomEffectModel} import com.linkedin.photon.ml.normalization.NormalizationContext import com.linkedin.photon.ml.optimization.game.{FixedEffectOptimizationConfiguration, RandomEffectOptimizationConfiguration} import com.linkedin.photon.ml.optimization.{OptimizerConfig, OptimizerType, SingleNodeOptimizationProblem, VarianceComputationType} @@ -46,6 +47,7 @@ class CoordinateFactoryIntegTest extends SparkTestUtils { val mockDataset = mock(classOf[FixedEffectDataset]) val optimizationConfiguration = FixedEffectOptimizationConfiguration(OPTIMIZER_CONFIG) + val priorModelOpt: Option[FixedEffectModel] = None doReturn(sc).when(mockDataset).sparkContext @@ -57,6 +59,7 @@ class CoordinateFactoryIntegTest extends SparkTestUtils { DOWN_SAMPLER_FACTORY, MOCK_NORMALIZATION, VARIANCE_COMPUTATION_TYPE, + priorModelOpt, INTERCEPT_INDEX) coordinate match { @@ -78,8 +81,10 @@ class CoordinateFactoryIntegTest extends SparkTestUtils { val mockProjectorsRDD = mock(classOf[RDD[(REId, LinearSubspaceProjector)]]) val mockProblemsRDD = mock(classOf[RDD[(REId, SingleNodeOptimizationProblem[SingleNodeObjectiveFunction])]]) val optimizationConfiguration = RandomEffectOptimizationConfiguration(OPTIMIZER_CONFIG) + val priorModelOpt: Option[RandomEffectModel] = None doReturn(sc).when(mockDataset).sparkContext + doReturn(sc).when(mockProjectorsRDD).sparkContext doReturn(mockDataRDD).when(mockDataset).activeData doReturn(mockDataRDD) .when(mockDataRDD) @@ -97,6 +102,7 @@ class CoordinateFactoryIntegTest extends SparkTestUtils { DOWN_SAMPLER_FACTORY, MOCK_NORMALIZATION, VARIANCE_COMPUTATION_TYPE, + priorModelOpt, INTERCEPT_INDEX) coordinate match { @@ -124,6 +130,7 @@ class CoordinateFactoryIntegTest extends SparkTestUtils { DOWN_SAMPLER_FACTORY, MOCK_NORMALIZATION, VARIANCE_COMPUTATION_TYPE, + None, INTERCEPT_INDEX) } } @@ -139,7 +146,7 @@ object CoordinateFactoryIntegTest { private val INTERCEPT_INDEX = None private val OPTIMIZER_CONFIG = OptimizerConfig(OPTIMIZER_TYPE, MAX_ITER, TOLERANCE) - private val MOCK_NORMALIZATION = mock(classOf[NormalizationContext]) + private val MOCK_NORMALIZATION = mock(classOf[NormalizationContext], withSettings().serializable()) private val GLM_CONSTRUCTOR = LogisticRegressionModel.apply _ private val LOSS_FUNCTION_FACTORY = ObjectiveFunctionHelper.buildFactory(TRAINING_TASK, TREE_AGGREGATE_DEPTH) private val DOWN_SAMPLER_FACTORY = DownSamplerHelper.buildFactory(TRAINING_TASK) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/CoordinateFactory.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/CoordinateFactory.scala index 95d8bb28..dec1ca2c 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/CoordinateFactory.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/CoordinateFactory.scala @@ -17,7 +17,7 @@ package com.linkedin.photon.ml.algorithm import com.linkedin.photon.ml.data.{Dataset, FixedEffectDataset, RandomEffectDataset} import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory} import com.linkedin.photon.ml.function.ObjectiveFunction -import com.linkedin.photon.ml.model.Coefficients +import com.linkedin.photon.ml.model.{Coefficients, DatumScoringModel, FixedEffectModel, RandomEffectModel} import com.linkedin.photon.ml.normalization.NormalizationContext import com.linkedin.photon.ml.optimization.DistributedOptimizationProblem import com.linkedin.photon.ml.optimization.VarianceComputationType.VarianceComputationType @@ -45,6 +45,7 @@ object CoordinateFactory { * @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled) * @param normalizationContext The [[NormalizationContext]] * @param varianceComputationType Should the trained coefficient variances be computed in addition to the means? + * @param priorModelOpt The prior model for warm-start and incremental training * @param interceptIndexOpt The index of the intercept, if one is present * @return A [[Coordinate]] for the [[Dataset]] of type [[D]] */ @@ -56,15 +57,17 @@ object CoordinateFactory { downSamplerFactory: DownSamplerFactory, normalizationContext: NormalizationContext, varianceComputationType: VarianceComputationType, + priorModelOpt: Option[DatumScoringModel], interceptIndexOpt: Option[Int]): Coordinate[D] = { val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig) - (dataset, coordinateOptConfig, lossFunctionFactory) match { + (dataset, coordinateOptConfig, lossFunctionFactory, priorModelOpt) match { case ( - fEDataset: FixedEffectDataset, - fEOptConfig: FixedEffectOptimizationConfiguration, - distributedLossFunctionFactory: DistributedObjectiveFunctionFactory) => + fEDataset: FixedEffectDataset, + fEOptConfig: FixedEffectOptimizationConfiguration, + distributedLossFunctionFactory: DistributedObjectiveFunctionFactory, + fixedEffectModelOpt: Option[FixedEffectModel]) => val downSamplerOpt = if (DownSampler.isValidDownSamplingRate(fEOptConfig.downSamplingRate)) { Some(downSamplerFactory(fEOptConfig.downSamplingRate)) @@ -77,21 +80,23 @@ object CoordinateFactory { fEDataset, DistributedOptimizationProblem( fEOptConfig, - distributedLossFunctionFactory(interceptIndexOpt), + distributedLossFunctionFactory(fixedEffectModelOpt.map(_.model), interceptIndexOpt), downSamplerOpt, glmConstructor, normalizationPhotonBroadcast, varianceComputationType)).asInstanceOf[Coordinate[D]] case ( - rEDataset: RandomEffectDataset, - rEOptConfig: RandomEffectOptimizationConfiguration, - singleNodeLossFunctionFactory: SingleNodeObjectiveFunctionFactory) => + rEDataset: RandomEffectDataset, + rEOptConfig: RandomEffectOptimizationConfiguration, + singleNodeLossFunctionFactory: SingleNodeObjectiveFunctionFactory, + randomEffectModelOpt: Option[RandomEffectModel]) => RandomEffectCoordinate( rEDataset, rEOptConfig, singleNodeLossFunctionFactory, + randomEffectModelOpt, glmConstructor, normalizationContext, varianceComputationType, @@ -100,9 +105,10 @@ object CoordinateFactory { case _ => throw new UnsupportedOperationException( s"""Cannot build coordinate for the following input class combination: - | ${dataset.getClass.getName} - | ${coordinateOptConfig.getClass.getName} - | ${lossFunctionFactory.getClass.getName}""".stripMargin) + | ${dataset.getClass.getName} + | ${coordinateOptConfig.getClass.getName} + | ${lossFunctionFactory.getClass.getName} + | ${priorModelOpt.getClass.getName}""".stripMargin) } } } diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala index 90219e63..2e282fcd 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala @@ -77,8 +77,7 @@ protected[ml] class RandomEffectCoordinate[Objective <: SingleNodeObjectiveFunct * @param model The model to use as a starting point * @return A (updated model, optional optimization tracking information) tuple */ - override protected[algorithm] def trainModel( - model: DatumScoringModel): (DatumScoringModel, OptimizationTracker) = + override protected[algorithm] def trainModel(model: DatumScoringModel): (DatumScoringModel, OptimizationTracker) = model match { case randomEffectModel: RandomEffectModel => @@ -183,17 +182,19 @@ object RandomEffectCoordinate { * problems * @param randomEffectDataset The data on which to run the optimization algorithm * @param configuration The optimization problem configuration - * @param objectiveFunctionFactory The objective function to optimize + * @param objectiveFunctionFactory The objective function factory option + * @param priorRandomEffectModelOpt The prior randomEffectModel option * @param glmConstructor The function to use for producing GLMs from trained coefficients * @param normalizationContext The normalization context * @param varianceComputationType If and how coefficient variances should be computed * @param interceptIndexOpt The index of the intercept, if there is one - * @return A new [[RandomEffectCoordinate]] object + * @return A new [[RandomEffectCoordinate]] */ protected[ml] def apply[RandomEffectObjective <: SingleNodeObjectiveFunction]( randomEffectDataset: RandomEffectDataset, configuration: RandomEffectOptimizationConfiguration, - objectiveFunctionFactory: Option[Int] => RandomEffectObjective, + objectiveFunctionFactory: (Option[GeneralizedLinearModel], Option[Int]) => RandomEffectObjective, + priorRandomEffectModelOpt: Option[RandomEffectModel], glmConstructor: Coefficients => GeneralizedLinearModel, normalizationContext: NormalizationContext, varianceComputationType: VarianceComputationType = VarianceComputationType.NONE, @@ -204,6 +205,7 @@ object RandomEffectCoordinate { randomEffectDataset.projectors, configuration, objectiveFunctionFactory, + priorRandomEffectModelOpt, glmConstructor, normalizationContext, varianceComputationType, diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala index c3c9068f..02630a58 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala @@ -14,6 +14,8 @@ */ package com.linkedin.photon.ml.estimators +import java.security.InvalidParameterException + import scala.language.existentials import org.apache.commons.cli.MissingArgumentException @@ -33,7 +35,7 @@ import com.linkedin.photon.ml.data._ import com.linkedin.photon.ml.evaluation._ import com.linkedin.photon.ml.function.ObjectiveFunctionHelper import com.linkedin.photon.ml.function.glm._ -import com.linkedin.photon.ml.model.{GameModel, RandomEffectModel} +import com.linkedin.photon.ml.model.{FixedEffectModel, GameModel, RandomEffectModel} import com.linkedin.photon.ml.normalization._ import com.linkedin.photon.ml.optimization.VarianceComputationType import com.linkedin.photon.ml.optimization.VarianceComputationType.VarianceComputationType @@ -122,14 +124,18 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P val validationEvaluators: Param[Seq[EvaluatorType]] = ParamUtils.createParam( "validation evaluators", - "A list of evaluators used to validate computed scores (Note: the first evaluator in the list is the one used " + - "for model selection)", + "A list of evaluators used to validate computed scores (Note: the first evaluator in the list is the one " + + "used for model selection)", PhotonParamValidators.nonEmpty[Seq, EvaluatorType]) val ignoreThresholdForNewModels: Param[Boolean] = ParamUtils.createParam[Boolean]( "ignore threshold for new models", - "Flag to ignore the random effect samples lower bound when encountering a random effect ID without an existing " + - "model during warm-start training.") + "Flag to ignore the random effect samples lower bound when encountering a random effect ID without an " + + "existing model during warm-start training.") + + val incrementalTraining: Param[Boolean] = ParamUtils.createParam[Boolean]( + "incremental training", + "Flag to enable incremental training.") val useWarmStart: Param[Boolean] = ParamUtils.createParam[Boolean]( "use warm start", @@ -177,6 +183,8 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P def setUseWarmStart(value: Boolean): this.type = set(useWarmStart, value) + def setIncrementalTraining(value: Boolean): this.type = set(incrementalTraining, value) + // // Params trait extensions // @@ -209,6 +217,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P setDefault(treeAggregateDepth, DEFAULT_TREE_AGGREGATE_DEPTH) setDefault(ignoreThresholdForNewModels, false) setDefault(useWarmStart, true) + setDefault(incrementalTraining, false) } /** @@ -229,10 +238,11 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P val updateSequence = getRequiredParam(coordinateUpdateSequence) val dataConfigs = getRequiredParam(coordinateDataConfigurations) val initialModelOpt = get(initialModel) - val retrainModelCoordsOpt = get(partialRetrainLockedCoordinates) + val lockedModelCoordsOpt = get(partialRetrainLockedCoordinates) val normalizationContextsOpt = get(coordinateNormalizationContexts) val ignoreThreshold = getOrDefault(ignoreThresholdForNewModels) val numUniqueCoordinates = updateSequence.toSet.size + val isIncrementalTraining = getOrDefault(incrementalTraining) // Cannot have coordinates repeat in the update sequence require( @@ -244,39 +254,108 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P !ignoreThreshold || initialModelOpt.isDefined, "'Ignore threshold for new models' flag set but no initial model provided for warm-start") - // Partial retraining and warm-start training require an initial GAME model to be provided as input - val coordinatesToTrain = (initialModelOpt, retrainModelCoordsOpt) match { - case (Some(initModel), Some(retrainModelCoords)) => + // Warm-start, partial re-training, and incremental training require the same initial GAME model to be provided as + // input. Partial re-training requires some coordinates to be locked. These locked coordinates and the coordinates + // to be trained are mutually exclusive. For those coordinates to be trained, warm start will be enabled if any + // initial model is present. Moreover, if incremental training is enabled, this initial model will be used to + // construct a prior distribution. + val coordinatesToTrain = (isIncrementalTraining, lockedModelCoordsOpt, initialModelOpt) match { + case (true, None, None) => + throw new InvalidParameterException(s"'${incrementalTraining.name}' is enabled but no initial model provided.") + + case (true, None, Some(initModel)) => + // The set of coordinates being trained and the set of coordinates trained previously must be identical + require( + updateSequence.toSet == initModel.toMap.keySet, + s"Coordinate sets don't match for incremental training; missing coordinates: " + + s"${MathUtils.symmetricDifference(updateSequence.toSet, initModel.toMap.keySet).mkString(", ")}") + + updateSequence.foreach { coordinateId => + val coordinateConfig = dataConfigs(coordinateId) + val coordinateModel = initModel(coordinateId) + + // TODO: Do the feature shards and random effect types need to match? It's possible for them to match + // TODO: perfectly with different names (if the initial model is sufficiently old). + (coordinateConfig, coordinateModel) match { + case (fEC: FixedEffectDataConfiguration, fEM: FixedEffectModel) => + + // Model and coordinate must be trained on the same feature shard + require( + fEC.featureShardId == fEM.featureShardId, + s"Incremental training error: feature shard ID mismatch for coordinate '$coordinateId' " + + s"('${fEC.featureShardId}' vs. '${fEM.featureShardId}').") + + // Model must contain variance info + require( + fEM.model.coefficients.variancesOption.isDefined, + s"Incremental training error: coordinate '$coordinateId' missing variance information.") + + case (rEC: RandomEffectDataConfiguration, rEM: RandomEffectModel) => + + // Model and coordinate must be trained on the same feature shard + require( + rEC.featureShardId == rEM.featureShardId, + s"Incremental training error: feature shard ID mismatch for coordinate '$coordinateId' " + + s"('${rEC.featureShardId}' vs. '${rEM.featureShardId}').") + + // Random effect types must match between coordinate and model + require( + rEC.randomEffectType == rEM.randomEffectType, + s"Incremental training error: random effect type mismatch for coordinate '$coordinateId' " + + s"('${rEC.randomEffectType}' vs. '${rEM.randomEffectType}').") + + // Model must contain variance info + require( + rEM + .modelsRDD + .mapPartitions( + iter => Seq(iter.forall(_._2.coefficients.variancesOption.isDefined)).iterator, + preservesPartitioning = true) + .fold(true)(_ && _), + s"Incremental training error: one or more models in coordinate '$coordinateId' missing variance information.") + + case (_, _) => + throw new IllegalArgumentException( + "Incremental training error: mismatch between coordinate and model types.") + } + } + + updateSequence + + case (true, Some(_), None) => + throw new InvalidParameterException("No initial model is provided when partial retraining is turned on.") - val newCoordinates = updateSequence.filterNot(retrainModelCoords.contains) + case (false, None, _) => + updateSequence + + case (false, Some(_), None) => + throw new InvalidParameterException("Partial model re-training is enabled but no initial model provided.") + + case (_, Some(lockedModelCoords), Some(initModel)) => + + val newCoordinates = updateSequence.filterNot(lockedModelCoords.contains) // Locked coordinates cannot be empty require( - retrainModelCoords.nonEmpty, - "Set of locked coordinates is empty.") + lockedModelCoords.nonEmpty, + "Empty set of locked coordinates is invalid.") // No point in training if every coordinate is being reused require( newCoordinates.nonEmpty, - "All coordinates in the update sequence are re-used from the initial model: no new coordinates to train.") + "All coordinates in the update sequence are re-used from the initial model; no new coordinates to train.") // All locked coordinates must be used by the update sequence require( - retrainModelCoords.forall(updateSequence.contains), + lockedModelCoords.forall(updateSequence.contains), "One or more locked coordinates for partial retraining are missing from the update sequence.") // All locked coordinates must be present in the initial model require( - retrainModelCoords.forall(initModel.toMap.contains), + lockedModelCoords.forall(initModel.toMap.contains), "One or more locked coordinates for partial retraining are missing from the initial model.") newCoordinates - - case (Some(_), None) | (None, None) => - updateSequence - - case (None, Some(_)) => - throw new IllegalArgumentException("Partial retraining enabled, but no base model provided.") } // All coordinates (including locked coordinates) should have a data configuration @@ -362,7 +441,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P // Train GAME models on training data val results = Timed("Training models:") { - var prevGameModel: Option[GameModel] = if (getOrDefault(useWarmStart)) { + var prevGameModel: Option[GameModel] = if (getOrDefault(useWarmStart) || getOrDefault(incrementalTraining)) { get(initialModel) } else { None @@ -468,7 +547,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P * @return A map of coordinate ID to training [[Dataset]] */ protected def prepareTrainingDatasets( - gameDataset: RDD[(UniqueSampleId, GameDatum)]): Map[CoordinateId, D forSome { type D <: Dataset[D] }] = { + gameDataset: RDD[(UniqueSampleId, GameDatum)]): Map[CoordinateId, D forSome {type D <: Dataset[D]}] = { val coordinateDataConfigs = getRequiredParam(coordinateDataConfigurations) val initialModelOpt = get(initialModel) @@ -546,7 +625,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P (coordinateId, randomEffectDataset) } - result.asInstanceOf[(CoordinateId, D forSome { type D <: Dataset[D] })] + result.asInstanceOf[(CoordinateId, D forSome {type D <: Dataset[D]})] } } @@ -648,7 +727,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P */ protected def train( configuration: GameOptimizationConfiguration, - trainingDatasets: Map[CoordinateId, D forSome { type D <: Dataset[D] }], + trainingDatasets: Map[CoordinateId, D forSome {type D <: Dataset[D]}], coordinateDescent: CoordinateDescent, initialModelOpt: Option[GameModel] = None): (GameModel, Option[EvaluationResults]) = Timed(s"Train model:") { @@ -672,18 +751,25 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P val downSamplerFactory = DownSamplerHelper.buildFactory(task) val lockedCoordinates = get(partialRetrainLockedCoordinates).getOrElse(Set()) val interceptIndices = getOrDefault(coordinateInterceptIndices) + val isIncrementalTraining = getOrDefault(incrementalTraining) // Create the optimization coordinates for each component model - val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate[_] }] = + val coordinates: Map[CoordinateId, C forSome {type C <: Coordinate[_]}] = updateSequence .map { coordinateId => - val coordinate: C forSome { type C <: Coordinate[_] } = if (lockedCoordinates.contains(coordinateId)) { + val coordinate: C forSome {type C <: Coordinate[_]} = if (lockedCoordinates.contains(coordinateId)) { trainingDatasets(coordinateId) match { case feDataset: FixedEffectDataset => new FixedEffectModelCoordinate(feDataset) case reDataset: RandomEffectDataset => new RandomEffectModelCoordinate(reDataset) case dataset => throw new UnsupportedOperationException(s"Unsupported dataset type: ${dataset.getClass}") } } else { + val priorModelOpt = if (getOrDefault(incrementalTraining)) { + Some(initialModelOpt.get(coordinateId)) + } else { + None + } + CoordinateFactory.build( trainingDatasets(coordinateId), configuration(coordinateId), @@ -692,6 +778,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P downSamplerFactory, normalizationContexts.getOrElse(coordinateId, NoNormalization()), variance, + priorModelOpt, interceptIndices.get(coordinateId)) } @@ -699,7 +786,8 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P } .toMap - val result = coordinateDescent.run(coordinates, initialModelOpt.map(_.toMap)) + val warmStartModelOpt = if (getOrDefault(useWarmStart)) initialModelOpt else None + val result = coordinateDescent.run(coordinates, warmStartModelOpt.map(_.toMap)) coordinates.foreach { case (_, coordinate) => coordinate match { diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelper.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelper.scala index cbac3167..08537d6c 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelper.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelper.scala @@ -20,15 +20,16 @@ import com.linkedin.photon.ml.algorithm.Coordinate import com.linkedin.photon.ml.function.glm.{GLMLossFunction, LogisticLossFunction, PoissonLossFunction, SquaredLossFunction} import com.linkedin.photon.ml.function.svm.SmoothedHingeLossFunction import com.linkedin.photon.ml.optimization.game.CoordinateOptimizationConfiguration +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Helper for [[ObjectiveFunction]] related tasks. */ object ObjectiveFunctionHelper { - type ObjectiveFunctionFactoryFactory = CoordinateOptimizationConfiguration => Option[Int] => ObjectiveFunction - type DistributedObjectiveFunctionFactory = Option[Int] => DistributedObjectiveFunction - type SingleNodeObjectiveFunctionFactory = Option[Int] => SingleNodeObjectiveFunction + type ObjectiveFunctionFactoryFactory = CoordinateOptimizationConfiguration => (Option[GeneralizedLinearModel], Option[Int]) => ObjectiveFunction + type DistributedObjectiveFunctionFactory = (Option[GeneralizedLinearModel], Option[Int]) => DistributedObjectiveFunction + type SingleNodeObjectiveFunctionFactory = (Option[GeneralizedLinearModel], Option[Int]) => SingleNodeObjectiveFunction /** * Construct a factory function for building [[ObjectiveFunction]] objects. diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/DistributedGLMLossFunction.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/DistributedGLMLossFunction.scala index ab94c7eb..9a8f2a1b 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/DistributedGLMLossFunction.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/DistributedGLMLossFunction.scala @@ -19,9 +19,11 @@ import org.apache.spark.rdd.RDD import com.linkedin.photon.ml.data.LabeledPoint import com.linkedin.photon.ml.function._ +import com.linkedin.photon.ml.model.{Coefficients => ModelCoefficients} import com.linkedin.photon.ml.normalization.NormalizationContext import com.linkedin.photon.ml.optimization.RegularizationType import com.linkedin.photon.ml.optimization.game.GLMOptimizationConfiguration +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel import com.linkedin.photon.ml.util.BroadcastWrapper /** @@ -150,6 +152,7 @@ object DistributedGLMLossFunction { * @param configuration The optimization problem configuration * @param singleLossFunction The PointwiseLossFunction providing functionality for l(z, y) * @param treeAggregateDepth The tree aggregation depth + * @param priorModelOpt Optional prior model, required if this is an objective function for incremental training * @param interceptIndexOpt The index of the intercept, if there is one * @return A new DistributedGLMLossFunction */ @@ -157,20 +160,36 @@ object DistributedGLMLossFunction { configuration: GLMOptimizationConfiguration, singleLossFunction: PointwiseLossFunction, treeAggregateDepth: Int, + priorModelOpt: Option[GeneralizedLinearModel] = None, interceptIndexOpt: Option[Int] = None): DistributedGLMLossFunction = { val regularizationContext = configuration.regularizationContext val regularizationWeight = configuration.regularizationWeight - regularizationContext.regularizationType match { - case RegularizationType.L2 | RegularizationType.ELASTIC_NET => - new DistributedGLMLossFunction(singleLossFunction, treeAggregateDepth) with L2RegularizationTwiceDiff { - l2RegWeight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + priorModelOpt match { + case None => + regularizationContext.regularizationType match { + case RegularizationType.L2 | RegularizationType.ELASTIC_NET => + new DistributedGLMLossFunction(singleLossFunction, treeAggregateDepth) + with L2RegularizationTwiceDiff { - override def interceptOpt: Option[Int] = interceptIndexOpt + l2RegWeight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + + override def interceptOpt: Option[Int] = interceptIndexOpt + } + + case _ => new DistributedGLMLossFunction(singleLossFunction, treeAggregateDepth) } - case _ => new DistributedGLMLossFunction(singleLossFunction, treeAggregateDepth) + case Some(priorModel) => + val l2Weight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + val priorModelCoefficients = priorModel.coefficients + + new DistributedGLMLossFunction(singleLossFunction, treeAggregateDepth) with PriorDistributionTwiceDiff { + override val priorCoefficients: ModelCoefficients = priorModelCoefficients + l2RegWeight = l2Weight + incrementalWeight = configuration.incrementalWeight.getOrElse(1.0D) + } } } } diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/GLMLossFunction.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/GLMLossFunction.scala index 5b4a918c..7aad6179 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/GLMLossFunction.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/GLMLossFunction.scala @@ -17,6 +17,7 @@ package com.linkedin.photon.ml.function.glm import com.linkedin.photon.ml.algorithm.Coordinate import com.linkedin.photon.ml.function.ObjectiveFunction import com.linkedin.photon.ml.optimization.game.{CoordinateOptimizationConfiguration, FixedEffectOptimizationConfiguration, RandomEffectOptimizationConfiguration} +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Helper for generalized linear model loss function related tasks. @@ -28,21 +29,32 @@ object GLMLossFunction { * * @param lossFunction A [[PointwiseLossFunction]] for training a generalized linear model * @param treeAggregateDepth The tree-aggregate depth to use during aggregation + * @param config Optimization problem configuration + * @param isIncrementalTraining Is this an objective function for incremental training? * @return A function which builds the appropriate type of [[ObjectiveFunction]] for a given [[Coordinate]] type and * optimization settings. */ - def buildFactory - (lossFunction: PointwiseLossFunction, treeAggregateDepth: Int) - (config: CoordinateOptimizationConfiguration): Option[Int] => ObjectiveFunction = - + def buildFactory( + lossFunction: PointwiseLossFunction, + treeAggregateDepth: Int)( + config: CoordinateOptimizationConfiguration): (Option[GeneralizedLinearModel], Option[Int]) => ObjectiveFunction = config match { case fEOptConfig: FixedEffectOptimizationConfiguration => - (interceptIndexOpt: Option[Int]) => - DistributedGLMLossFunction(fEOptConfig, lossFunction, treeAggregateDepth, interceptIndexOpt) + (generalizedLinearModelOpt: Option[GeneralizedLinearModel], interceptIndexOpt: Option[Int]) => + DistributedGLMLossFunction( + fEOptConfig, + lossFunction, + treeAggregateDepth, + generalizedLinearModelOpt, + interceptIndexOpt) case rEOptConfig: RandomEffectOptimizationConfiguration => - (interceptIndexOpt: Option[Int]) => - SingleNodeGLMLossFunction(rEOptConfig, lossFunction, interceptIndexOpt) + (generalizedLinearModelOpt: Option[GeneralizedLinearModel], interceptIndexOpt: Option[Int]) => + SingleNodeGLMLossFunction( + rEOptConfig, + lossFunction, + generalizedLinearModelOpt, + interceptIndexOpt) case _ => throw new UnsupportedOperationException( diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/SingleNodeGLMLossFunction.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/SingleNodeGLMLossFunction.scala index 32d1952f..23966c53 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/SingleNodeGLMLossFunction.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/function/glm/SingleNodeGLMLossFunction.scala @@ -18,9 +18,11 @@ import breeze.linalg._ import com.linkedin.photon.ml.data.LabeledPoint import com.linkedin.photon.ml.function._ +import com.linkedin.photon.ml.model.{Coefficients => ModelCoefficients} import com.linkedin.photon.ml.normalization.NormalizationContext import com.linkedin.photon.ml.optimization.RegularizationType import com.linkedin.photon.ml.optimization.game.GLMOptimizationConfiguration +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel import com.linkedin.photon.ml.util.BroadcastWrapper /** @@ -55,7 +57,7 @@ protected[ml] class SingleNodeGLMLossFunction private (singlePointLossFunction: input: Iterable[LabeledPoint], coefficients: Vector[Double], normalizationContext: BroadcastWrapper[NormalizationContext]): Double = - calculate(input, coefficients, normalizationContext)._1 + calculate(input, coefficients, normalizationContext)._1 /** * Compute the gradient of the function over the given data for the given model coefficients. @@ -144,26 +146,42 @@ object SingleNodeGLMLossFunction { * * @param configuration The optimization problem configuration * @param singleLossFunction The PointwiseLossFunction providing functionality for l(z, y) + * @param priorModelOpt Optional prior model, required if this is an objective function for incremental training * @param interceptIndexOpt The index of the intercept, if there is one * @return A new SingleNodeGLMLossFunction */ def apply( configuration: GLMOptimizationConfiguration, singleLossFunction: PointwiseLossFunction, + priorModelOpt: Option[GeneralizedLinearModel] = None, interceptIndexOpt: Option[Int] = None): SingleNodeGLMLossFunction = { val regularizationContext = configuration.regularizationContext val regularizationWeight = configuration.regularizationWeight - regularizationContext.regularizationType match { - case RegularizationType.L2 | RegularizationType.ELASTIC_NET => - new SingleNodeGLMLossFunction(singleLossFunction) with L2RegularizationTwiceDiff { - l2RegWeight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + priorModelOpt match { + case Some(priorModel) => + val l2Weight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + val priorModelCoefficients = priorModel.coefficients - override def interceptOpt: Option[Int] = interceptIndexOpt + new SingleNodeGLMLossFunction(singleLossFunction) with PriorDistributionTwiceDiff { + override val priorCoefficients: ModelCoefficients = priorModelCoefficients + l2RegWeight = l2Weight + incrementalWeight = configuration.incrementalWeight.getOrElse(1.0D) } - case _ => new SingleNodeGLMLossFunction(singleLossFunction) + case None => + regularizationContext.regularizationType match { + case RegularizationType.L2 | RegularizationType.ELASTIC_NET => + new SingleNodeGLMLossFunction(singleLossFunction) with L2RegularizationTwiceDiff { + + l2RegWeight = regularizationContext.getL2RegularizationWeight(regularizationWeight) + + override def interceptOpt: Option[Int] = interceptIndexOpt + } + + case _ => new SingleNodeGLMLossFunction(singleLossFunction) + } } } } diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunction.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunction.scala index a67667d9..ea22663a 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunction.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunction.scala @@ -21,6 +21,7 @@ import com.linkedin.photon.ml.constants.MathConst import com.linkedin.photon.ml.data.LabeledPoint import com.linkedin.photon.ml.function.ObjectiveFunction import com.linkedin.photon.ml.optimization.game.{CoordinateOptimizationConfiguration, FixedEffectOptimizationConfiguration, RandomEffectOptimizationConfiguration} +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Implement Rennie's smoothed hinge loss function (http://qwone.com/~jason/writing/smoothHinge.pdf) as an @@ -91,20 +92,20 @@ object SmoothedHingeLossFunction { * Construct a factory function for building distributed and non-distributed smoothed hinge loss functions. * * @param treeAggregateDepth The tree-aggregate depth to use during aggregation + * @param config Optimization problem configuration * @return A function which builds the appropriate type of [[ObjectiveFunction]] for a given [[Coordinate]] type and * optimization settings. */ - def buildFactory - (treeAggregateDepth: Int) - (config: CoordinateOptimizationConfiguration): Option[Int] => ObjectiveFunction = - + def buildFactory( + treeAggregateDepth: Int)( + config: CoordinateOptimizationConfiguration): (Option[GeneralizedLinearModel], Option[Int]) => ObjectiveFunction = config match { case fEOptConfig: FixedEffectOptimizationConfiguration => - (interceptIndexOpt: Option[Int]) => + (_: Option[GeneralizedLinearModel], _: Option[Int]) => DistributedSmoothedHingeLossFunction(fEOptConfig, treeAggregateDepth) case rEOptConfig: RandomEffectOptimizationConfiguration => - (interceptIndexOpt: Option[Int]) => + (_: Option[GeneralizedLinearModel], _: Option[Int]) => SingleNodeSmoothedHingeLossFunction(rEOptConfig) case _ => diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/DistributedOptimizationProblem.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/DistributedOptimizationProblem.scala index 2573bf3b..a02ec7f9 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/DistributedOptimizationProblem.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/DistributedOptimizationProblem.scala @@ -28,7 +28,7 @@ import com.linkedin.photon.ml.optimization.VarianceComputationType.VarianceCompu import com.linkedin.photon.ml.optimization.game.GLMOptimizationConfiguration import com.linkedin.photon.ml.sampling.DownSampler import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel -import com.linkedin.photon.ml.util.BroadcastWrapper +import com.linkedin.photon.ml.util.{BroadcastWrapper, VectorUtils} import com.linkedin.photon.ml.util.Linalg.choleskyInverse /** @@ -43,7 +43,7 @@ import com.linkedin.photon.ml.util.Linalg.choleskyInverse * @param regularizationContext The regularization context * @param varianceComputation If an how to compute coefficient variances */ -protected[ml] class DistributedOptimizationProblem[Objective <: DistributedObjectiveFunction] protected[optimization] ( +protected[ml] class DistributedOptimizationProblem[Objective <: DistributedObjectiveFunction] protected[optimization]( optimizer: Optimizer[Objective], objectiveFunction: Objective, samplerOption: Option[DownSampler], @@ -62,11 +62,13 @@ protected[ml] class DistributedOptimizationProblem[Objective <: DistributedObjec * @param regularizationWeight The new regularization weight */ def updateRegularizationWeight(regularizationWeight: Double): Unit = { + optimizer match { case owlqn: OWLQN => owlqn.l1RegularizationWeight = regularizationContext.getL1RegularizationWeight(regularizationWeight) case _ => } + objectiveFunction match { case l2RegFunc: DistributedObjectiveFunction with L2Regularization => l2RegFunc.l2RegularizationWeight = regularizationContext.getL2RegularizationWeight(regularizationWeight) @@ -85,10 +87,7 @@ protected[ml] class DistributedOptimizationProblem[Objective <: DistributedObjec val result = (objectiveFunction, varianceComputation) match { case (twiceDiffFunc: TwiceDiffFunction, VarianceComputationType.SIMPLE) => - Some( - twiceDiffFunc - .hessianDiagonal(input, coefficients) - .map(v => 1.0 / math.max(v, MathConst.EPSILON))) + Some(VectorUtils.invertVectorWithZeroHandler(twiceDiffFunc.hessianDiagonal(input, coefficients), 1.0 / MathConst.EPSILON)) case (twiceDiffFunc: TwiceDiffFunction, VarianceComputationType.FULL) => val hessianMatrix = twiceDiffFunc.hessianMatrix(input, coefficients) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/SingleNodeOptimizationProblem.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/SingleNodeOptimizationProblem.scala index c5875a8b..af99f3aa 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/SingleNodeOptimizationProblem.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/SingleNodeOptimizationProblem.scala @@ -24,7 +24,7 @@ import com.linkedin.photon.ml.normalization.NormalizationContext import com.linkedin.photon.ml.optimization.VarianceComputationType.VarianceComputationType import com.linkedin.photon.ml.optimization.game.GLMOptimizationConfiguration import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel -import com.linkedin.photon.ml.util.BroadcastWrapper +import com.linkedin.photon.ml.util.{BroadcastWrapper, VectorUtils} import com.linkedin.photon.ml.util.Linalg.choleskyInverse /** @@ -37,7 +37,7 @@ import com.linkedin.photon.ml.util.Linalg.choleskyInverse * @param glmConstructor The function to use for producing GLMs from trained coefficients * @param varianceComputationType If an how to compute coefficient variances */ -protected[ml] class SingleNodeOptimizationProblem[Objective <: SingleNodeObjectiveFunction] protected[optimization] ( +protected[ml] class SingleNodeOptimizationProblem[Objective <: SingleNodeObjectiveFunction] protected[optimization]( optimizer: Optimizer[Objective], objectiveFunction: Objective, glmConstructor: Coefficients => GeneralizedLinearModel, @@ -59,9 +59,7 @@ protected[ml] class SingleNodeOptimizationProblem[Objective <: SingleNodeObjecti override def computeVariances(input: Iterable[LabeledPoint], coefficients: Vector[Double]): Option[Vector[Double]] = (objectiveFunction, varianceComputationType) match { case (twiceDiffFunc: TwiceDiffFunction, VarianceComputationType.SIMPLE) => - Some(twiceDiffFunc - .hessianDiagonal(input, coefficients) - .map(v => 1.0 / math.max(v, MathConst.EPSILON))) + Some(VectorUtils.invertVectorWithZeroHandler(twiceDiffFunc.hessianDiagonal(input, coefficients), 1.0 / MathConst.EPSILON)) case (twiceDiffFunc: TwiceDiffFunction, VarianceComputationType.FULL) => val hessianMatrix = twiceDiffFunc.hessianMatrix(input, coefficients) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfiguration.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfiguration.scala index 6c9a4c69..3b2fda58 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfiguration.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfiguration.scala @@ -30,13 +30,16 @@ sealed trait CoordinateOptimizationConfiguration * @param regularizationWeight Regularization weight * @param regularizationWeightRange Regularization weight range * @param elasticNetParamRange Elastic net alpha range + * @param incrementalWeight Optional weight to balance the prior model and the current data in the incremental learning, + * the larger the weight is, the more important the prior model is. */ protected[ml] abstract class GLMOptimizationConfiguration( val optimizerConfig: OptimizerConfig, val regularizationContext: RegularizationContext, val regularizationWeight: Double, val regularizationWeightRange: Option[DoubleRange] = None, - val elasticNetParamRange: Option[DoubleRange] = None) + val elasticNetParamRange: Option[DoubleRange] = None, + val incrementalWeight: Option[Double] = None) extends CoordinateOptimizationConfiguration with Serializable { @@ -47,6 +50,9 @@ protected[ml] abstract class GLMOptimizationConfiguration( elasticNetParamRange.foreach { case DoubleRange(start, end) => require(start >= 0.0 && end <= 1.0, "Elastic net alpha ranges must lie within [0, 1]") } + incrementalWeight.foreach { weight => + require(0 <= weight, s"Negative incremental weight: $weight") + } } /** @@ -57,6 +63,8 @@ protected[ml] abstract class GLMOptimizationConfiguration( * @param regularizationWeight Regularization weight * @param regularizationWeightRange Regularization weight range * @param elasticNetParamRange Elastic net alpha range + * @param incrementalWeight Optional weight to balance the prior model and the current data in the incremental learning, + * the larger the weight is, the more important the prior model is. * @param downSamplingRate Down-sampling rate */ case class FixedEffectOptimizationConfiguration( @@ -65,13 +73,15 @@ case class FixedEffectOptimizationConfiguration( override val regularizationWeight: Double = 0D, override val regularizationWeightRange: Option[DoubleRange] = None, override val elasticNetParamRange: Option[DoubleRange] = None, + override val incrementalWeight: Option[Double] = None, downSamplingRate: Double = 1D) extends GLMOptimizationConfiguration( optimizerConfig, regularizationContext, regularizationWeight, regularizationWeightRange, - elasticNetParamRange) { + elasticNetParamRange, + incrementalWeight) { require(downSamplingRate > 0.0 && downSamplingRate <= 1.0, s"Unexpected downSamplingRate: $downSamplingRate") } @@ -84,16 +94,20 @@ case class FixedEffectOptimizationConfiguration( * @param regularizationWeight Regularization weight * @param regularizationWeightRange Regularization weight range * @param elasticNetParamRange Elastic net alpha range + * @param incrementalWeight The weight to balance the prior model and the current data in the incremental learning, + * the larger the weight is, the more important the prior model is. The default value is 1. */ case class RandomEffectOptimizationConfiguration( override val optimizerConfig: OptimizerConfig, override val regularizationContext: RegularizationContext = NoRegularizationContext, override val regularizationWeight: Double = 0D, override val regularizationWeightRange: Option[DoubleRange] = None, - override val elasticNetParamRange: Option[DoubleRange] = None) + override val elasticNetParamRange: Option[DoubleRange] = None, + override val incrementalWeight: Option[Double] = None) extends GLMOptimizationConfiguration( optimizerConfig, regularizationContext, regularizationWeight, regularizationWeightRange, - elasticNetParamRange) + elasticNetParamRange, + incrementalWeight) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala index c0a0201e..84b63171 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala @@ -20,10 +20,10 @@ import org.apache.spark.storage.StorageLevel import com.linkedin.photon.ml.Types.REId import com.linkedin.photon.ml.function.SingleNodeObjectiveFunction -import com.linkedin.photon.ml.model.Coefficients +import com.linkedin.photon.ml.model.{Coefficients, RandomEffectModel} import com.linkedin.photon.ml.normalization.NormalizationContext -import com.linkedin.photon.ml.optimization.{SingleNodeOptimizationProblem, VarianceComputationType} import com.linkedin.photon.ml.optimization.VarianceComputationType.VarianceComputationType +import com.linkedin.photon.ml.optimization.{SingleNodeOptimizationProblem, VarianceComputationType} import com.linkedin.photon.ml.projector.LinearSubspaceProjector import com.linkedin.photon.ml.spark.RDDLike import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel @@ -120,34 +120,43 @@ protected[ml] class RandomEffectOptimizationProblem[Objective <: SingleNodeObjec object RandomEffectOptimizationProblem { /** - * Build a new [[RandomEffectOptimizationProblem]]. + * Build a new [[RandomEffectOptimizationProblem]] to optimize. * * @tparam RandomEffectObjective The type of objective function used to solve individual random effect optimization * problems * @param linearSubspaceProjectorsRDD The per-entity [[LinearSubspaceProjector]] objects used to compress the * per-entity feature spaces * @param configuration The optimization problem configuration - * @param objectiveFunctionFactory The objective function to optimize + * @param objectiveFunctionFactory Factory for the objective function * @param glmConstructor The function to use for producing GLMs from trained coefficients * @param normalizationContext The normalization context * @param varianceComputationType If and how coefficient variances should be computed * @param interceptIndexOpt The option of intercept index - * @return A new [[RandomEffectOptimizationProblem]] object + * @return A new [[RandomEffectOptimizationProblem]] */ - def apply[RandomEffectObjective <: SingleNodeObjectiveFunction]( + protected[ml] def apply[RandomEffectObjective <: SingleNodeObjectiveFunction]( linearSubspaceProjectorsRDD: RDD[(REId, LinearSubspaceProjector)], configuration: RandomEffectOptimizationConfiguration, - objectiveFunctionFactory: Option[Int] => RandomEffectObjective, + objectiveFunctionFactory: (Option[GeneralizedLinearModel], Option[Int]) => RandomEffectObjective, + priorRandomEffectModelOpt: Option[RandomEffectModel], glmConstructor: Coefficients => GeneralizedLinearModel, normalizationContext: NormalizationContext, varianceComputationType: VarianceComputationType = VarianceComputationType.NONE, interceptIndexOpt: Option[Int]): RandomEffectOptimizationProblem[RandomEffectObjective] = { + val sc = linearSubspaceProjectorsRDD.sparkContext + val configurationBroadcast = sc.broadcast(configuration) + val objectiveFunctionBuilderBroadcast = sc.broadcast(objectiveFunctionFactory) + val glmConstructorBroadcast = sc.broadcast(glmConstructor) + val normalizationContextBroadcast = sc.broadcast(normalizationContext) + // Generate new NormalizationContext and SingleNodeOptimizationProblem objects val optimizationProblems = linearSubspaceProjectorsRDD - .mapValues { projector => - val factors = normalizationContext.factorsOpt.map(factors => projector.projectForward(factors)) - val shiftsAndIntercept = normalizationContext + .leftOuterJoin(priorRandomEffectModelOpt.map(_.modelsRDD).getOrElse(sc.emptyRDD[(REId, GeneralizedLinearModel)])) + .mapValues { case (projector: LinearSubspaceProjector, priorModelOpt: Option[GeneralizedLinearModel]) => + val normContext = normalizationContextBroadcast.value + val factors = normContext.factorsOpt.map(factors => projector.projectForward(factors)) + val shiftsAndIntercept = normContext .shiftsAndInterceptOpt .map { case (shifts, intercept) => val newShifts = projector.projectForward(shifts) @@ -156,19 +165,35 @@ object RandomEffectOptimizationProblem { (newShifts, newIntercept) } val projectedNormalizationContext = new NormalizationContext(factors, shiftsAndIntercept) + val objectiveFunctionBuilder = objectiveFunctionBuilderBroadcast.value val projectedInterceptOpt = interceptIndexOpt.map { interceptIndex => projector.originalToProjectedSpaceMap(interceptIndex) } - // TODO: Broadcast arguments to SingleNodeOptimizationProblem? + // Project prior model coefficients + val projectedPriorModelOpt = priorModelOpt.map{ + model => + val oldCoefficients = model.coefficients + val newCoefficients = Coefficients( + projector.projectForward(oldCoefficients.means), + oldCoefficients.variancesOption.map(projector.projectForward)) + + model.updateCoefficients(newCoefficients) + } + SingleNodeOptimizationProblem( - configuration, - objectiveFunctionFactory(projectedInterceptOpt), - glmConstructor, + configurationBroadcast.value, + objectiveFunctionBuilder(projectedPriorModelOpt, projectedInterceptOpt), + glmConstructorBroadcast.value, PhotonNonBroadcast(projectedNormalizationContext), varianceComputationType) } + configurationBroadcast.unpersist() + objectiveFunctionBuilderBroadcast.unpersist() + glmConstructorBroadcast.unpersist() + normalizationContextBroadcast.unpersist() + new RandomEffectOptimizationProblem(optimizationProblems, glmConstructor) } } diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelperTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelperTest.scala index c6447291..2376d2bc 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelperTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/function/ObjectiveFunctionHelperTest.scala @@ -23,6 +23,7 @@ import com.linkedin.photon.ml.function.glm.DistributedGLMLossFunction import com.linkedin.photon.ml.function.svm.DistributedSmoothedHingeLossFunction import com.linkedin.photon.ml.optimization.game.FixedEffectOptimizationConfiguration import com.linkedin.photon.ml.optimization.{OptimizerConfig, OptimizerType} +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Unit tests for [[ObjectiveFunctionHelper]]. @@ -48,15 +49,19 @@ class ObjectiveFunctionHelperTest { @Test(dataProvider = "trainingTaskProvider") def testBuildFactory(trainingTask: TaskType): Unit = { - val objectiveFunction = - ObjectiveFunctionHelper.buildFactory(trainingTask, TREE_AGGREGATE_DEPTH)(COORDINATE_OPT_CONFIG) + val objectiveFunction = ObjectiveFunctionHelper.buildFactory( + trainingTask, + TREE_AGGREGATE_DEPTH)(COORDINATE_OPT_CONFIG) trainingTask match { case TaskType.LOGISTIC_REGRESSION | TaskType.LINEAR_REGRESSION | TaskType.POISSON_REGRESSION => - assertTrue(objectiveFunction.isInstanceOf[Option[Int] => DistributedGLMLossFunction]) + assertTrue( + objectiveFunction.isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => DistributedGLMLossFunction]) case TaskType.SMOOTHED_HINGE_LOSS_LINEAR_SVM => - assertTrue(objectiveFunction.isInstanceOf[Option[Int] => DistributedSmoothedHingeLossFunction]) + assertTrue( + objectiveFunction + .isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => DistributedSmoothedHingeLossFunction]) } } } @@ -64,6 +69,7 @@ class ObjectiveFunctionHelperTest { object ObjectiveFunctionHelperTest { val COORDINATE_OPT_CONFIG = FixedEffectOptimizationConfiguration(OptimizerConfig(OptimizerType.LBFGS, 1, 2e-2)) + val ENABLE_INCREMENTAL_TRAINING = false val MAXIMUM_ITERATIONS = 1 val TOLERANCE = 2e-2 val TREE_AGGREGATE_DEPTH = 3 diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/function/glm/GLMLossFunctionTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/function/glm/GLMLossFunctionTest.scala index 648ef3be..4905b355 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/function/glm/GLMLossFunctionTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/function/glm/GLMLossFunctionTest.scala @@ -20,6 +20,7 @@ import org.testng.annotations.{DataProvider, Test} import com.linkedin.photon.ml.function.ObjectiveFunction import com.linkedin.photon.ml.optimization.{OptimizerConfig, OptimizerType} import com.linkedin.photon.ml.optimization.game.{CoordinateOptimizationConfiguration, FixedEffectOptimizationConfiguration, RandomEffectOptimizationConfiguration} +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Unit tests for [[GLMLossFunction]]. @@ -47,15 +48,16 @@ class GLMLossFunctionTest { @Test(dataProvider = "coordinateOptimizationProblemProvider") def testBuildFactory(coordinateOptConfig: CoordinateOptimizationConfiguration): Unit = { - val objectiveFunction = - GLMLossFunction.buildFactory(LOSS_FUNCTION, TREE_AGGREGATE_DEPTH)(coordinateOptConfig) + val objectiveFunction = GLMLossFunction.buildFactory(LOSS_FUNCTION, TREE_AGGREGATE_DEPTH)(coordinateOptConfig) coordinateOptConfig match { case _: FixedEffectOptimizationConfiguration => - assertTrue(objectiveFunction.isInstanceOf[Option[Int] => DistributedGLMLossFunction]) + assertTrue( + objectiveFunction.isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => DistributedGLMLossFunction]) case _: RandomEffectOptimizationConfiguration => - assertTrue(objectiveFunction.isInstanceOf[Option[Int] => SingleNodeGLMLossFunction]) + assertTrue( + objectiveFunction.isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => SingleNodeGLMLossFunction]) case _ => assertTrue(false) diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunctionTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunctionTest.scala index 1bb15fff..9d4eda16 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunctionTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/function/svm/SmoothedHingeLossFunctionTest.scala @@ -20,6 +20,7 @@ import org.testng.annotations.{DataProvider, Test} import com.linkedin.photon.ml.function.ObjectiveFunction import com.linkedin.photon.ml.optimization.{OptimizerConfig, OptimizerType} import com.linkedin.photon.ml.optimization.game.{CoordinateOptimizationConfiguration, FixedEffectOptimizationConfiguration, RandomEffectOptimizationConfiguration} +import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel /** * Unit tests for [[SmoothedHingeLossFunction]]. @@ -51,10 +52,14 @@ class SmoothedHingeLossFunctionTest { coordinateOptConfig match { case _: FixedEffectOptimizationConfiguration => - assertTrue(objectiveFunctionFactory.isInstanceOf[Option[Int] => DistributedSmoothedHingeLossFunction]) + assertTrue( + objectiveFunctionFactory + .isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => DistributedSmoothedHingeLossFunction]) case _: RandomEffectOptimizationConfiguration => - assertTrue(objectiveFunctionFactory.isInstanceOf[Option[Int] => SingleNodeSmoothedHingeLossFunction]) + assertTrue( + objectiveFunctionFactory + .isInstanceOf[(Option[GeneralizedLinearModel], Option[Int]) => SingleNodeSmoothedHingeLossFunction]) case _ => assertTrue(false) diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfigurationTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfigurationTest.scala index 9153da77..fda8ea5e 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfigurationTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/optimization/game/CoordinateOptimizationConfigurationTest.scala @@ -24,12 +24,13 @@ class CoordinateOptimizationConfigurationTest { @DataProvider def invalidInput(): Array[Array[Any]] = Array( - Array(-1D, 1D, None, None), - Array(1D, -1D, None, None), - Array(1D, 0D, None, None), - Array(1D, 2D, None, None), - Array(1D, 1D, Some(DoubleRange(-1D, 10D)), None), - Array(1D, 1D, None, Some(DoubleRange(0D, 1.1)))) + Array(-1D, 1D, None, None, None), + Array(1D, -1D, None, None, None), + Array(1D, 0D, None, None, None), + Array(1D, 2D, None, None, None), + Array(1D, 1D, Some(DoubleRange(-1D, 10D)), None, None), + Array(1D, 1D, None, Some(DoubleRange(0D, 1.1)), None), + Array(-1D, 1D, None, None, Some(-1D))) /** * Test that [[FixedEffectOptimizationConfiguration]] will reject invalid input. @@ -42,7 +43,8 @@ class CoordinateOptimizationConfigurationTest { regularizationWeight: Double, downSamplingRate: Double, regularizationWeightRange: Option[DoubleRange], - elasticNetParamRange: Option[DoubleRange]): Unit = { + elasticNetParamRange: Option[DoubleRange], + incrementalWeight: Option[Double]): Unit = { val mockOptimizerConfig = mock(classOf[OptimizerConfig]) val mockRegularizationContext = mock(classOf[RegularizationContext]) @@ -53,6 +55,7 @@ class CoordinateOptimizationConfigurationTest { regularizationWeight, regularizationWeightRange, elasticNetParamRange, + incrementalWeight, downSamplingRate) } } diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/util/GameTestUtils.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/util/GameTestUtils.scala index 2deb5862..2317b86f 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/util/GameTestUtils.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/util/GameTestUtils.scala @@ -314,7 +314,7 @@ trait GameTestUtils extends SparkTestUtils { seed) val optimizationProblem = generateRandomEffectOptimizationProblem(randomEffectDataset) - val coordinate = new RandomEffectCoordinate[SingleNodeGLMLossFunction](randomEffectDataset, optimizationProblem) + val coordinate = new RandomEffectCoordinate(randomEffectDataset, optimizationProblem) val models = sc.parallelize(generateLinearModelsForRandomEffects(randomEffectIds, dimensions)) val model = new RandomEffectModel( models, diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala index ed6ecd8d..6751a2f2 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala @@ -168,8 +168,12 @@ object GameTrainingDriver extends GameDriver { val ignoreThresholdForNewModels: Param[Boolean] = ParamUtils.createParam[Boolean]( "ignore threshold for new models", - "Flag to ignore the random effect samples lower bound when encountering a random effect ID without an existing " + - "model during warm-start training.") + "Flag to ignore the random effect samples lower bound when encountering a random effect ID without an " + + "existing model during warm-start training.") + + val incrementalTraining: Param[Boolean] = ParamUtils.createParam[Boolean]( + "incremental training", + "Flag to enable incremental training.") // // Initialize object @@ -216,6 +220,7 @@ object GameTrainingDriver extends GameDriver { setDefault(modelSparsityThreshold, VectorUtils.DEFAULT_SPARSITY_THRESHOLD) setDefault(timeZone, Constants.DEFAULT_TIME_ZONE) setDefault(ignoreThresholdForNewModels, false) + setDefault(incrementalTraining, false) } /** @@ -245,11 +250,7 @@ object GameTrainingDriver extends GameDriver { val normalizationType = paramMap.getOrElse(normalization, getDefault(normalization).get) val hyperParameterTuningMode = paramMap.getOrElse(hyperParameterTuning, getDefault(hyperParameterTuning).get) val ignoreThreshold = paramMap.getOrElse(ignoreThresholdForNewModels, getDefault(ignoreThresholdForNewModels).get) - - // Warm-start must be enabled to ignore threshold - require( - !ignoreThreshold || baseModelDirOpt.isDefined, - "'Ignore threshold for new models' flag set but no initial model provided for warm-start") + val isIncrementalTraining = paramMap.getOrElse(incrementalTraining, getDefault(incrementalTraining).get) // Partial retraining and warm-start training require an initial GAME model to be provided as input val coordinatesToTrain = (baseModelDirOpt, retrainModelCoordsOpt) match { @@ -330,6 +331,16 @@ object GameTrainingDriver extends GameDriver { case _ => } + + // Warm-start must be enabled to ignore threshold + require( + !ignoreThreshold || baseModelDirOpt.isDefined, + s"'${ignoreThresholdForNewModels.name}' set but no initial model provided (warm-start not enabled).") + + // If incremental training is enabled, prior model must be defined. + require( + !isIncrementalTraining || baseModelDirOpt.isDefined, + s"'${incrementalTraining.name}' set but no initial model provided.") } // @@ -375,7 +386,7 @@ object GameTrainingDriver extends GameDriver { validationData.map(_.persist(StorageLevel.DISK_ONLY)) val modelOpt = get(modelInputDirectory).map { modelDir => - Timed("Load model for warm-start training") { + Timed("Load model for warm-start training / incremental learning") { ModelProcessingUtils.loadGameModelFromHDFS( sc, modelDir, @@ -458,6 +469,7 @@ object GameTrainingDriver extends GameDriver { .setVarianceComputation(getOrDefault(varianceComputationType)) .setIgnoreThresholdForNewModels(getOrDefault(ignoreThresholdForNewModels)) .setUseWarmStart(true) + .setIncrementalTraining(getOrDefault(incrementalTraining)) get(inputColumnNames).foreach(estimator.setInputColumnNames) modelOpt.foreach(estimator.setInitialModel) diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroUtils.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroUtils.scala index a900ae23..82e5ac12 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroUtils.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroUtils.scala @@ -68,7 +68,7 @@ object AvroUtils { val minPartitionsPerPath = math.ceil(1.0 * minPartitions / inputPaths.length).toInt - sc.union(inputPaths.map { path => readAvroFilesInDir[GenericRecord](sc, path, minPartitionsPerPath) } ) + sc.union(inputPaths.map { path => readAvroFilesInDir[GenericRecord](sc, path, minPartitionsPerPath) }) } /** @@ -251,8 +251,10 @@ object AvroUtils { * @return The nameAndTerm parsed from the Avro record */ protected[avro] def readNameAndTermFromGenericRecord(record: GenericRecord): NameAndTerm = { + val name = Utils.getStringAvro(record, AvroFieldNames.NAME) val term = Utils.getStringAvro(record, AvroFieldNames.TERM, isNullOK = true) + NameAndTerm(name, term) } @@ -269,6 +271,7 @@ object AvroUtils { genericRecords .flatMap { _.get(featureSectionKey) match { + case recordList: JList[_] => recordList.asScala.map { case record: GenericRecord => @@ -278,8 +281,8 @@ object AvroUtils { throw new IllegalArgumentException( s"$any in features list is not a record. It needs to be an Avro record containingg a name and term for " + s"each feature.") - } + case _ => throw new IllegalArgumentException( s"$featureSectionKey is not a list (and might be null). It needs to be a list of Avro records containing a " + @@ -364,27 +367,17 @@ object AvroUtils { featureMap: IndexMap): GeneralizedLinearModel = { val meansAvros = bayesianLinearModelAvro.getMeans + val variancesAvros = bayesianLinearModelAvro.getVariances val modelClass = bayesianLinearModelAvro.getModelClass.toString - val indexAndValueArrayBuffer = new mutable.ArrayBuffer[(Int, Double)] - val iterator = meansAvros.iterator() - while (iterator.hasNext) { - val feature = iterator.next() - val name = feature.getName.toString - val term = feature.getTerm.toString - val featureKey = Utils.getFeatureKey(name, term) - if (featureMap.contains(featureKey)) { - val value = feature.getValue - val index = featureMap.getOrElse(featureKey, - throw new NoSuchElementException(s"nameAndTerm $featureKey not found in the feature map")) - indexAndValueArrayBuffer += ((index, value)) - } + val means = convertNameTermValueAvroList(meansAvros, featureMap) + val coefficients = if (variancesAvros == null) { + Coefficients(means) + } else { + val variances = convertNameTermValueAvroList(variancesAvros, featureMap) + Coefficients(means, Some(variances)) } - val length = featureMap.featureDimension - val coefficients = Coefficients( - VectorUtils.toVector(indexAndValueArrayBuffer.toArray, length)) - // Load and instantiate the model try { Class.forName(modelClass) @@ -399,6 +392,36 @@ object AvroUtils { } } + /** + * Convert the NameTermValueAvro List of the type [[JList[NameTermValue]]] to Breeze vector of type [[Vector[Double]]]. + * + * @param nameTermValueAvroList List of the type [[JList[NameTermValue]]] + * @param featureMap The map from feature name of type [[NameAndTerm]] to feature index of type [[Int]] + * @return Breeze vector of type [[Vector[Double]]] + */ + protected[avro] def convertNameTermValueAvroList( + nameTermValueAvroList: JList[NameTermValueAvro], + featureMap: IndexMap): Vector[Double] = { + + val iterator = nameTermValueAvroList.iterator() + val indexAndValueArrayBuffer = new mutable.ArrayBuffer[(Int, Double)] + val length = featureMap.featureDimension + + while (iterator.hasNext) { + val feature = iterator.next() + val name = feature.getName.toString + val term = feature.getTerm.toString + val featureKey = Utils.getFeatureKey(name, term) + if (featureMap.contains(featureKey)) { + val value = feature.getValue + val index = featureMap.getOrElse(featureKey, + throw new NoSuchElementException(s"nameAndTerm $featureKey not found in the feature map")) + indexAndValueArrayBuffer += ((index, value)) + } + } + VectorUtils.toVector(indexAndValueArrayBuffer.toArray, length) + } + /** * Convert the latent factor of type [[Vector[Double]]] to Avro record of type [[LatentFactorAvro]]. * diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/ModelProcessingUtils.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/ModelProcessingUtils.scala index 7c03c199..cc1b3aad 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/ModelProcessingUtils.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/ModelProcessingUtils.scala @@ -323,8 +323,6 @@ object ModelProcessingUtils { /** * Load a single GLM from HDFS. * - * TODO: Currently only the means of the coefficients are loaded, the variances are discarded - * * @param inputDir The directory from which to load the model * @param indexMap A feature to index map * @param sc The Spark Context diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpers.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpers.scala index 09e44ba8..522a1e35 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpers.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpers.scala @@ -71,6 +71,7 @@ object ScoptParserHelpers extends Logging { val COORDINATE_OPT_CONFIG_REG_ALPHA_RANGE = "reg.alpha.range" val COORDINATE_OPT_CONFIG_REG_WEIGHTS = "reg.weights" val COORDINATE_OPT_CONFIG_REG_WEIGHT_RANGE = "reg.weight.range" + val COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT = "incremental.weight" val COORDINATE_OPT_CONFIG_DOWN_SAMPLING_RATE = "down.sampling.rate" val COORDINATE_CONFIG_REQUIRED_ARGS = Map( @@ -87,7 +88,8 @@ object ScoptParserHelpers extends Logging { (COORDINATE_OPT_CONFIG_REG_ALPHA, ""), (COORDINATE_OPT_CONFIG_REG_ALPHA_RANGE, "-"), (COORDINATE_OPT_CONFIG_REG_WEIGHTS, ""), - (COORDINATE_OPT_CONFIG_REG_WEIGHT_RANGE, "-")) + (COORDINATE_OPT_CONFIG_REG_WEIGHT_RANGE, "-"), + (COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT, "")) val COORDINATE_CONFIG_FIXED_EFFECT_OPTIONAL_ARGS = Map( (COORDINATE_OPT_CONFIG_DOWN_SAMPLING_RATE, "")) val COORDINATE_CONFIG_RANDOM_EFFECT_OPTIONAL_ARGS = Map( @@ -226,6 +228,8 @@ object ScoptParserHelpers extends Logging { .get(COORDINATE_OPT_CONFIG_REG_ALPHA_RANGE) .map(parseDoubleRange) + val incrementalWeight = input.get(COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT).map(_.toDouble) + val reTypeOpt = input.get(COORDINATE_DATA_CONFIG_RANDOM_EFFECT_TYPE) val config = reTypeOpt match { // Random effect coordinate @@ -241,7 +245,8 @@ object ScoptParserHelpers extends Logging { optimizerConfig, regularizationContext, regularizationWeightRange = regularizationWeightRange, - elasticNetParamRange = elasticNetParamRange) + elasticNetParamRange = elasticNetParamRange, + incrementalWeight = incrementalWeight) // Log warnings for fixed effect coordinate settings found in random effect coordinate COORDINATE_CONFIG_FIXED_ONLY_ARGS.foreach { config => @@ -258,7 +263,8 @@ object ScoptParserHelpers extends Logging { optimizerConfig, regularizationContext, regularizationWeightRange = regularizationWeightRange, - elasticNetParamRange = elasticNetParamRange) + elasticNetParamRange = elasticNetParamRange, + incrementalWeight = incrementalWeight) // Log warnings for random effect coordinate settings found in fixed effect coordinate COORDINATE_CONFIG_RANDOM_ONLY_ARGS.foreach { config => @@ -476,6 +482,10 @@ object ScoptParserHelpers extends Logging { } } + optConfig.incrementalWeight.foreach { weight => + argsMap += (COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT -> weight.toString) + } + // // Build feature shard config args string // diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/game/ScoptGameTrainingParametersParser.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/game/ScoptGameTrainingParametersParser.scala index 35599903..dc3e5069 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/game/ScoptGameTrainingParametersParser.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/io/scopt/game/ScoptGameTrainingParametersParser.scala @@ -164,7 +164,11 @@ object ScoptGameTrainingParametersParser extends ScoptGameParametersParser { // Ignore Threshold for New Models ScoptParameter[Boolean, Boolean]( - GameTrainingDriver.ignoreThresholdForNewModels)) + GameTrainingDriver.ignoreThresholdForNewModels), + + // Incremental training + ScoptParameter[Boolean, Boolean]( + GameTrainingDriver.incrementalTraining)) override protected val parser: OptionParser[ParamMap] = new OptionParser[ParamMap]("GAME-Training") { diff --git a/photon-client/src/test/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpersTest.scala b/photon-client/src/test/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpersTest.scala index 809c0b6f..e1e23038 100644 --- a/photon-client/src/test/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpersTest.scala +++ b/photon-client/src/test/scala/com/linkedin/photon/ml/io/scopt/ScoptParserHelpersTest.scala @@ -116,6 +116,7 @@ class ScoptParserHelpersTest { val regWeights2Str = Seq("1", "10", "100", "100", "10").mkString(ScoptParserHelpers.SECONDARY_LIST_DELIMITER.toString) val regWeights2 = Set(1, 10, 100) + val incrementalWeight = Some(1.1D) val activeDataLowerBound1 = None val activeDataLowerBound2 = Some(5) val activeDataUpperBound1 = None @@ -146,7 +147,8 @@ class ScoptParserHelpersTest { ScoptParserHelpers.COORDINATE_DATA_CONFIG_FEATURES_TO_SAMPLES_RATIO -> featuresSamplesRatio2.get.toString, ScoptParserHelpers.COORDINATE_OPT_CONFIG_REGULARIZATION -> regularizationType.toString, ScoptParserHelpers.COORDINATE_OPT_CONFIG_REG_WEIGHTS -> regWeights2Str, - ScoptParserHelpers.COORDINATE_OPT_CONFIG_REG_ALPHA -> alpha.toString) + ScoptParserHelpers.COORDINATE_OPT_CONFIG_REG_ALPHA -> alpha.toString, + ScoptParserHelpers.COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT -> incrementalWeight.get.toString) val coordinateConfig1 = ScoptParserHelpers.parseCoordinateConfiguration(inputMap1)(coordinateId) coordinateConfig1 match { @@ -202,6 +204,7 @@ class ScoptParserHelpersTest { assertEquals(reConfig.dataConfiguration.numFeaturesToSamplesRatioUpperBound, featuresSamplesRatio2) assertEquals(reConfig.optimizationConfiguration.regularizationContext, regularization2) assertEquals(reConfig.regularizationWeights, regWeights2) + assertEquals(reConfig.optimizationConfiguration.incrementalWeight, incrementalWeight) case _ => throw new IllegalArgumentException("Expected random effect coordinate configuration.") @@ -434,6 +437,7 @@ class ScoptParserHelpersTest { val featuresSamplesRatio = 6.0 val regularizationType = RegularizationType.ELASTIC_NET val regularizationAlpha = 0.7 + val incrementalWeight = 1.1 val regularizationWeights = SortedSet[Double](8.8, 9.9).asInstanceOf[Set[Double]] val feDataConfig = FixedEffectDataConfiguration(featureShardId, minPartitions) @@ -469,7 +473,8 @@ class ScoptParserHelpersTest { Some(featuresSamplesRatio)) val optConfig4 = RandomEffectOptimizationConfiguration( optimizerConfig, - ElasticNetRegularizationContext(regularizationAlpha)) + ElasticNetRegularizationContext(regularizationAlpha), + incrementalWeight = Some(incrementalWeight)) val coordinateConfig4 = RandomEffectCoordinateConfiguration(dataConfig4, optConfig4, regularizationWeights) val coordinateConfigurations = ListMap[CoordinateId, CoordinateConfiguration]( @@ -512,7 +517,8 @@ class ScoptParserHelpersTest { ScoptParserHelpers.COORDINATE_OPT_CONFIG_REGULARIZATION -> regularizationType.toString, ScoptParserHelpers.COORDINATE_OPT_CONFIG_REG_ALPHA -> regularizationAlpha.toString, ScoptParserHelpers.COORDINATE_OPT_CONFIG_REG_WEIGHTS -> - regularizationWeights.mkString(ScoptParserHelpers.SECONDARY_LIST_DELIMITER.toString))) + regularizationWeights.mkString(ScoptParserHelpers.SECONDARY_LIST_DELIMITER.toString), + ScoptParserHelpers.COORDINATE_OPT_CONFIG_INCREMENTAL_WEIGHT -> incrementalWeight.toString)) .map { case (arg, value) => s"$arg${ScoptParserHelpers.KV_DELIMITER}$value" } diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/function/PriorDistribution.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/function/PriorDistribution.scala new file mode 100644 index 00000000..b70afa56 --- /dev/null +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/function/PriorDistribution.scala @@ -0,0 +1,210 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linkedin.photon.ml.function + +import breeze.linalg.{DenseMatrix, DenseVector, Vector, diag, sum} +import com.linkedin.photon.ml.normalization.NormalizationContext +import com.linkedin.photon.ml.model.{Coefficients => ModelCoefficients} +import com.linkedin.photon.ml.util.{BroadcastWrapper, VectorUtils} + +/** + * Trait for an incremental training objective function. It is assumed that the prior is Gaussian Distribution with mean + * as prior model's means and variance as prior model's variances. incrementalWeight controls importance of the prior + * distribution. The larger incrementalWeight is, the more important the prior distribution is. incrementalWeight has a + * default value of 1. l2RegWeight sets the L2 regularization term for any feature which is not included in the prior + * model. For any feature which is included in the prior model, the equivalent L2 regularization term is + * incrementalWeight / prior model's variance. + */ +trait PriorDistribution extends ObjectiveFunction { + + protected var l2RegWeight: Double = 0D + protected var incrementalWeight: Double = 1D + + val priorCoefficients: ModelCoefficients = ModelCoefficients(DenseVector.zeros(1)) + + lazy protected val priorMeans: Vector[Double] = priorCoefficients.means + lazy protected val priorVariances: Vector[Double] = priorCoefficients.variancesOption.get + lazy protected val inversePriorVariances: DenseVector[Double] = VectorUtils.invertVectorWithZeroHandler(priorVariances, l2RegWeight).toDenseVector + + require(l2RegWeight >= 0D, s"Invalid regularization weight '$l2RegWeight") + + /** + * Compute the value of the function over the given data for the given model coefficients, with regularization towards + * the prior coefficients. + * + * @param input The data over which to compute the objective function value + * @param coefficients The model coefficients for which to compute the objective function's value + * @param normalizationContext The normalization context + * @return The value of the objective function and regularization terms + */ + abstract override protected[ml] def value( + input: Data, + coefficients: Vector[Double], + normalizationContext: BroadcastWrapper[NormalizationContext]): Double = + super.value(input, coefficients, normalizationContext) + l2RegValue(coefficients) + + /** + * Compute the Gaussian regularization term for the given model coefficients. L2 regularization term is + * incrementalWeight * sum(pow(coefficients - priorMeans, 2) :/ priorVariance) / 2. + * + * @param coefficients The model coefficients + * @return The Gaussian regularization term value + */ + protected def l2RegValue(coefficients: Vector[Double]): Double = { + + val normalizedSquaredCoefficients = (coefficients - priorMeans) *:* inversePriorVariances *:* (coefficients - priorMeans) + + incrementalWeight * sum(normalizedSquaredCoefficients) / 2 + } +} + +trait PriorDistributionDiff extends DiffFunction with PriorDistribution { + + /** + * Compute the value of the function over the given data for the given model coefficients, with regularization towards + * the prior coefficients. + * + * @param input The data over which to compute the objective function value + * @param coefficients The model coefficients for which to compute the objective function's value + * @param normalizationContext The normalization context + * @return The value of the objective function and regularization terms + */ + abstract override protected[ml] def value( + input: Data, + coefficients: Vector[Double], + normalizationContext: BroadcastWrapper[NormalizationContext]): Double = + calculate(input, coefficients, normalizationContext)._1 + + /** + * Compute the gradient of the function over the given data for the given model coefficients, with regularization + * towards the prior coefficients. + * + * @param input The data over which to compute the objective function gradient + * @param coefficients The model coefficients for which to compute the objective function's gradient + * @param normalizationContext The normalization context + * @return The gradient of the objective function and regularization terms + */ + abstract override protected[ml] def gradient( + input: Data, + coefficients: Vector[Double], + normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] = + calculate(input, coefficients, normalizationContext)._2 + + /** + * Compute both the value and the gradient of the function over the given data for the given model coefficients, with + * regularization towards the prior coefficients (computing value and gradient at once is more efficient than + * computing them sequentially). + * + * @param input The data over which to compute the objective function value and gradient + * @param coefficients The model coefficients for which to compute the objective function's value and gradient + * @param normalizationContext The normalization context + * @return The value and gradient of the objective function and regularization terms + */ + abstract override protected[ml] def calculate( + input: Data, + coefficients: Vector[Double], + normalizationContext: BroadcastWrapper[NormalizationContext]): (Double, Vector[Double]) = { + + val (baseValue, baseGradient) = super.calculate(input, coefficients, normalizationContext) + val valueWithRegularization = baseValue + l2RegValue(coefficients) + val gradientWithRegularization = baseGradient + l2RegGradient(coefficients) + + (valueWithRegularization, gradientWithRegularization) + } + + /** + * Compute the gradient of the Gaussian regularization term for the given model coefficients. Gradient is + * incrementalWeight * (coefficients - priorMeans) :/ priorVariance. + * + * @param coefficients The model coefficients + * @return The gradient of the Gaussian regularization term + */ + protected def l2RegGradient(coefficients: Vector[Double]): Vector[Double] = { + + val normalizedCoefficients = (coefficients - priorMeans) *:* inversePriorVariances + + incrementalWeight * normalizedCoefficients + } +} + +trait PriorDistributionTwiceDiff extends TwiceDiffFunction with PriorDistributionDiff { + + /** + * Compute the Hessian diagonal of the objective function over the given data for the given model coefficients, * the + * gradient direction, with regularization towards the prior coefficients. + * + * @param input The data over which to compute the Hessian diagonal * gradient direction + * @param coefficients The model coefficients for which to compute the objective function's Hessian diagonal + * * gradient direction + * @param multiplyVector The gradient direction vector + * @param normalizationContext The normalization context + * @return The Hessian diagonal (multiplied by the gradient direction) of the objective function and regularization + * terms + */ + abstract override protected[ml] def hessianVector( + input: Data, + coefficients: Vector[Double], + multiplyVector: Vector[Double], + normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] = + super.hessianVector(input, coefficients, multiplyVector, normalizationContext) + + l2RegHessianVector(multiplyVector) + + /** + * Compute the Hessian diagonal of the objective function over the given data for the given model coefficients, with + * regularization towards the prior coefficients. + * + * @param input The data over which to compute the Hessian diagonal + * @param coefficients The model coefficients for which to compute the objective function's Hessian diagonal + * @return The Hessian diagonal of the objective function and regularization terms + */ + abstract override protected[ml] def hessianDiagonal(input: Data, coefficients: Vector[Double]): Vector[Double] = + super.hessianDiagonal(input, coefficients) :+ l2RegHessianDiagonal + + /** + * Compute the Hessian matrix of the objective function over the given data for the given model coefficients, with + * regularization towards the prior coefficients. + * + * @param input The data over which to compute the Hessian matrix + * @param coefficients The model coefficients for which to compute the objective function's Hessian matrix + * @return The Hessian matrix of the objective function and regularization terms + */ + abstract override protected[ml] def hessianMatrix(input: Data, coefficients: Vector[Double]): DenseMatrix[Double] = + super.hessianMatrix(input, coefficients) + l2RegHessianMatrix + + /** + * Compute the Hessian diagonal * gradient direction of the Gaussian regularization term for the given model + * coefficients. + * + * @param multiplyVector The gradient direction vector + * @return The Hessian diagonal of the Gaussian regularization term, with gradient direction vector + */ + protected def l2RegHessianVector(multiplyVector: Vector[Double]): Vector[Double] = + incrementalWeight * (multiplyVector *:* inversePriorVariances) + + /** + * Compute the Hessian diagonal of the Gaussian regularization term for the given model coefficients. Hessian + * diagonal is incrementalWeight :/ priorVariance. + * + * @return The Hessian diagonal of the Gaussian regularization term + */ + protected def l2RegHessianDiagonal: Vector[Double] = incrementalWeight * inversePriorVariances + + /** + * Compute the Hessian matrix of the Gaussian regularization term for the given model coefficients. + * + * @return The Hessian matrix of the Gaussian regularization term + */ + protected def l2RegHessianMatrix: DenseMatrix[Double] = incrementalWeight * diag(inversePriorVariances) +} diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/model/Coefficients.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/model/Coefficients.scala index 4894ce8f..84219d7b 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/model/Coefficients.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/model/Coefficients.scala @@ -14,7 +14,7 @@ */ package com.linkedin.photon.ml.model -import breeze.linalg.{DenseVector, SparseVector, Vector, norm} +import breeze.linalg.{Vector, norm} import breeze.stats.meanAndVariance import com.linkedin.photon.ml.constants.MathConst @@ -32,10 +32,12 @@ case class Coefficients(means: Vector[Double], variancesOption: Option[Vector[Do extends Summarizable { // GAME over if variances are given but don't have the same length as the vector of means - require(variancesOption.isEmpty || variancesOption.get.length == means.length, + require( + variancesOption.isEmpty || variancesOption.get.length == means.length, "Coefficients: Means and variances have different lengths") def length: Int = means.length + lazy val meansL2Norm: Double = norm(means, 2) lazy val variancesL2NormOption: Option[Double] = variancesOption.map(variances => norm(variances, 2)) @@ -47,6 +49,7 @@ case class Coefficients(means: Vector[Double], variancesOption: Option[Vector[Do * @return The score */ def computeScore(features: Vector[Double]): Double = { + require( means.length == features.length, s"Coefficients length (${means.length}) != features length (${features.length})") @@ -60,6 +63,7 @@ case class Coefficients(means: Vector[Double], variancesOption: Option[Vector[Do * @return A summary of the object in string representation */ override def toSummaryString: String = { + val sb = new StringBuilder() val isDense = means.getClass.getName.contains("Dense") val meanAndVar = meanAndVariance(means) @@ -96,22 +100,22 @@ case class Coefficients(means: Vector[Double], variancesOption: Option[Vector[Do * @param that The other Coefficients to compare to * @return True if the Coefficients are equal, false otherwise */ - override def equals(that: Any): Boolean = - that match { - case other: Coefficients => - val (m1, v1, m2, v2) = (this.means, this.variancesOption, other.means, other.variancesOption) - val sameType = m1.getClass == m2.getClass && v1.map(_.getClass) == v2.map(_.getClass) - lazy val sameMeans = VectorUtils.areAlmostEqual(m1, m2) - lazy val sameVariance = (v1, v2) match { - case (None, None) => true - case (Some(val1), Some(val2)) => VectorUtils.areAlmostEqual(val1, val2) - case (_, _) => false - } - - sameType && sameMeans && sameVariance - - case _ => false - } + override def equals(that: Any): Boolean = that match { + case other: Coefficients => + val (m1, v1, m2, v2) = (this.means, this.variancesOption, other.means, other.variancesOption) + val sameType = (m1.getClass == m2.getClass) && (v1.map(_.getClass) == v2.map(_.getClass)) + lazy val sameMeans = VectorUtils.areAlmostEqual(m1, m2) + lazy val sameVariance = (v1, v2) match { + case (None, None) => true + + case (Some(val1), Some(val2)) => VectorUtils.areAlmostEqual(val1, val2) + case (_, _) => false + } + + sameType && sameMeans && sameVariance + + case _ => false + } /** * Returns a hash code value for the object. @@ -131,7 +135,6 @@ protected[ml] object Coefficients { * @param dimension Dimensionality of the coefficient vector * @return Zero coefficient vector */ - def initializeZeroCoefficients(dimension: Int): Coefficients = { + def initializeZeroCoefficients(dimension: Int): Coefficients = Coefficients(Vector.zeros[Double](dimension), variancesOption = None) - } } diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/model/GameModel.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/model/GameModel.scala index 417ba0e2..74fb89ba 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/model/GameModel.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/model/GameModel.scala @@ -40,8 +40,17 @@ class GameModel (private val gameModels: Map[CoordinateId, DatumScoringModel]) e /** * Get a sub-model by name. * - * @param name The model name - * @return An [[Option]] containing the sub-model associated with `name` in the GAME model, or `None` if none exists. + * @throws NoSuchElementException if no sub-model with key [[name]] exists + * @param name The sub-model name + * @return The sub-model associated with [[name]] in the GAME model + */ + def apply(name: CoordinateId): DatumScoringModel = gameModels(name) + + /** + * Get a sub-model by name. + * + * @param name The sub-model name + * @return [[Some]] sub-model associated with [[name]] in the GAME model, or [[None]] if none exists. */ def getModel(name: CoordinateId): Option[DatumScoringModel] = gameModels.get(name) diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/MathUtils.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/MathUtils.scala index 313b561f..eae9ad8b 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/MathUtils.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/MathUtils.scala @@ -63,4 +63,14 @@ object MathUtils { * @return True if x1 is greater than x2, false otherwise */ def greaterThan(x1: Double, x2: Double): Boolean = x1 > x2 + + /** + * Compute the symmetrical difference of two sets (i.e. A ∆ B = (A ⋃ B) - (A ⋂ B)) + * + * @tparam T Some type + * @param a The first set + * @param b The second set + * @return A set containing of elements that are in the first set or the second set but not both sets + */ + def symmetricDifference[T](a: Set[T], b: Set[T]): Set[T] = a.diff(b).union(b.diff(a)) } diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala index 954fc219..b7984889 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/util/VectorUtils.scala @@ -20,6 +20,8 @@ import breeze.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.ml.linalg.{DenseVector => SparkMLDenseVector, SparseVector => SparkMLSparseVector, Vector => SparkMLVector} import org.apache.spark.mllib.linalg.{DenseVector => SparkDenseVector, SparseVector => SparkSparseVector, Vector => SparkVector} +import com.linkedin.photon.ml.constants.MathConst + /** * A utility object that contains operations to create, copy, compare, and convert [[Vector]] objects. */ @@ -284,4 +286,15 @@ object VectorUtils { set } + + /** + * Element-wise inversion of a [[Vector]], if an element is smaller than MathConst.EPSILON, the inversion will be + * replaced by zeroReplacedVal. + * + * @param vector The [[Vector]] to invert + * @param zeroReplacedVal The replaced value when inverting an element close to 0 + * @return The inverted [[Vector]] + */ + def invertVectorWithZeroHandler(vector: Vector[Double], zeroReplacedVal: Double): Vector[Double] = + vector.map(v => if (math.abs(v) > MathConst.EPSILON) 1.0 / v else zeroReplacedVal) } diff --git a/photon-lib/src/test/scala/com/linkedin/photon/ml/function/PriorDistributionTest.scala b/photon-lib/src/test/scala/com/linkedin/photon/ml/function/PriorDistributionTest.scala new file mode 100644 index 00000000..e8937cd4 --- /dev/null +++ b/photon-lib/src/test/scala/com/linkedin/photon/ml/function/PriorDistributionTest.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linkedin.photon.ml.function + +import breeze.linalg.{DenseVector, diag} +import org.testng.annotations.Test +import org.testng.Assert.assertEquals +import org.mockito.Mockito.mock + +import com.linkedin.photon.ml.model.{Coefficients => ModelCoefficients} +import com.linkedin.photon.ml.normalization.NormalizationContext +import com.linkedin.photon.ml.util.BroadcastWrapper + +/** + * Unit tests for [[PriorDistribution]], [[PriorDistributionDiff]], and [[PriorDistributionTwiceDiff]]. + */ +class PriorDistributionTest { + + import L2RegularizationTest._ + + private val DIMENSION = 4 + + /** + * Test that the prior distribution mixin traits can correctly modify the existing behaviour of an objective function. + */ + @Test + def testAll(): Unit = { + + val mockNormalization = mock(classOf[BroadcastWrapper[NormalizationContext]]) + + val coefficients = DenseVector.ones[Double](DIMENSION) + val priorMean = coefficients *:* 2D + val multiplyVector = coefficients *:* 3D + val priorVar = coefficients *:* 4D + + val increWeight = 10D + + val mockObjectiveFunction = new MockObjectiveFunction with PriorDistributionTwiceDiff { + override val priorCoefficients = ModelCoefficients(priorMean, Option(priorVar)) + incrementalWeight = increWeight + } + + /** + * Assume that coefficients = 1-vector, prior mean = 2-vector, multiply = 3-vector, prior variance = 4-vector for all expected values below + * + * l2RegValue = sum(DenseVector.fill(DIMENSION){pow(1 - 2, 2) / 4)}) * increWeight / 2 = 0.25 * increWeight * DIMENSION / 2; + * l2RegGradient = (1 - 2) / 4 * increWeight = (-0.25) * increWeight; + * l2RegHessianDiagonal = 1 / 4 * increWeight = 0.25 * increWeight; + * l2RegHessianVector = 3 / 4 * increWeight = 0.75 * increWeight. + */ + val expectedValue = MockObjectiveFunction.VALUE + 0.25 * increWeight * DIMENSION / 2 + val expectedGradient = DenseVector(Array.fill(DIMENSION)(MockObjectiveFunction.GRADIENT + (-0.25) * increWeight)) + val expectedVector = DenseVector(Array.fill(DIMENSION)(MockObjectiveFunction.HESSIAN_VECTOR + 0.75 * increWeight)) + val expectedDiagonal = DenseVector(Array.fill(DIMENSION)(MockObjectiveFunction.HESSIAN_DIAGONAL + 0.25 * increWeight)) + val expectedMatrix = diag(DenseVector(Array.fill(DIMENSION)(MockObjectiveFunction.HESSIAN_MATRIX + 0.25 * increWeight))) + + assertEquals(mockObjectiveFunction.value(Unit, coefficients, mockNormalization), expectedValue) + assertEquals(mockObjectiveFunction.gradient(Unit, coefficients, mockNormalization), expectedGradient) + assertEquals( + mockObjectiveFunction.hessianVector(Unit, coefficients, multiplyVector, mockNormalization), + expectedVector) + assertEquals(mockObjectiveFunction.hessianDiagonal(Unit, coefficients), expectedDiagonal) + assertEquals(mockObjectiveFunction.hessianMatrix(Unit, coefficients), expectedMatrix) + } +} diff --git a/photon-lib/src/test/scala/com/linkedin/photon/ml/util/VectorUtilsTest.scala b/photon-lib/src/test/scala/com/linkedin/photon/ml/util/VectorUtilsTest.scala index 53bc33b0..44515f08 100644 --- a/photon-lib/src/test/scala/com/linkedin/photon/ml/util/VectorUtilsTest.scala +++ b/photon-lib/src/test/scala/com/linkedin/photon/ml/util/VectorUtilsTest.scala @@ -241,6 +241,19 @@ class VectorUtilsTest { @Test(expectedExceptions = Array(classOf[IllegalArgumentException])) def testInitializeZerosVectorOfSameTypeOfUnsupportedVectorType(): Unit = VectorUtils.zeroOfSameType(new MockVector[Double]()) + + @DataProvider + def invertVectorWithZeroHandlerProvider() = Array( + Array(DenseVector(1.0, 0.0, 2.0), 3.0, DenseVector(1.0, 3.0, 0.5)), + Array(new SparseVector(Array(1, 2), Array(1.0, 2.0), 3), 4.0, DenseVector(4.0, 1.0, 0.5)) + ) + + /** + * Test inverting vectors with zero handler. + */ + @Test(dataProvider = "invertVectorWithZeroHandlerProvider") + def testInvertVectorWithZeroHandler(vector: Vector[Double], replacedVal: Double, expectedVector: Vector[Double]): Unit = + assertEquals(VectorUtils.invertVectorWithZeroHandler(vector, replacedVal), expectedVector) } object VectorUtilsTest {