深入机器学习系列2-支持向量机

写在前面的:

SVM算法是在深度学习大火之前最受欢迎的机器学习算法,也是广大机器学习爱好者的入门算法。请大家系好安全带,当心老司机一言不合就甩你一脸代码!

1 SVM

1.1 介绍

Support Vector Machine 支持向量机是一种机器学习算法。

所以优化问题可以写成:

这等价于最小化

subject to 


Figure1: SVM

事实上,它可以被看作一个带有惩罚项的最小化损失问题。最终,我们希望找到以下问题的最小解

对于这一最优化问题,我们可以使用梯度下降算法来达到最小值。

目标函数为:

所以,迭代 t 时的梯度为:

1.2 SGD

从上一节我们可以看到每次迭代我们都需要所有的数据点来计算梯度。而当数据集变大后,无疑会耗费大量的计算时间。 这就是为什么在大规模梯度下降算法中,我们总会使用 SGD(随机梯度下降)。SDG 在每次迭代时只使用一部分数据而不是全部, 从而降低了计算量。

所以,现在目标函数变成了:

1.3 Pegasos and MLlib implementation

Pegasos 是 SVM 使用梯度下降算法的一种实现。Spark MLlib 也提供了 SVM 的梯度下降实现,于 Pegasos 稍有不同。 主要是梯度的更新速度不同。


2 SGD in Spark

2.1 treeAggregate

Spark 来计算 SGD 的主要优势使可以分布式地计算梯度,然后将它们累加起来。 在 Spark 中,这一任务是通过 RDD 的 treeAggregate 方法来完成的。 Aggregate 可被视为泛化的 Map 和 Reduce 的组合。 treeAggregate 的定义为


RDD.treeAggregate(zeroValue: U)(
     seqOp: (U, T) => U,
     combOp: (U, U) => U,
     depth: Int = 2): U


在此方法中有三个参数,其中前两个对我们更重要:

  • seqOp: 计算每隔 partition 中的子梯度

  • combOp: 将 seqOp 或上层 combOp 的值合并

  • depth: 控制 tree 的深度

Figure 2: tree aggregate

2.2 实现(提示:代码块部分可以左右滑动来完整查看哦~)

SGD 是一个求最优化的算法,许多机器学习算法都可以用 SGD 来求解。所以 Spark 对其做了抽象。


  
            
  
  

  
            
  
  
class SVMWithSGD private (
   private var stepSize: Double,
   private var numIterations: Int,
   private var regParam: Double,
   private var miniBatchFraction: Double)
 extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {
 private val gradient = new HingeGradient()
 private val updater = new SquaredL2Updater()
 @Since("0.8.0")
 override val optimizer = new GradientDescent(gradient, updater)
   .setStepSize(stepSize)
   .setNumIterations(numIterations)
   .setRegParam(regParam)
   .setMiniBatchFraction(miniBatchFraction)


可以看到 SVMWithSGD 继承了 GeneralizedLinearAlgorithm ,并定义 optimizer 来确定如何获得优化解。 而 optimizer 即是 SGD 算法的实现。正如上节所述,线性 SVM 实际上是使用 hinge 损失函数和一个 L2 惩罚项的线性模型,因此这里使用了 HingeGradient 和 SquaredL2Updater 作为 GradientDescent 的参数。


class HingeGradient extends Gradient {
 override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
   val dotProduct = dot(data, weights)
   // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
   // Therefore the gradient is -(2y - 1)*x
   val labelScaled = 2 * label - 1.0
   if (1.0 > labelScaled * dotProduct) {
     val gradient = data.copy
     scal(-labelScaled, gradient)
     (gradient, 1.0 - labelScaled * dotProduct)
   } else {
     (Vectors.sparse(weights.size, Array.empty, Array.empty), 0.0)
   }
 }
 override def compute(
     data: Vector,
     label: Double,
     weights: Vector,
     cumGradient: Vector): Double = {
   val dotProduct = dot(data, weights)
   // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
   // Therefore the gradient is -(2y - 1)*x
   val labelScaled = 2 * label - 1.0
   if (1.0 > labelScaled * dotProduct) {
     axpy(-labelScaled, data, cumGradient)
     1.0 - labelScaled * dotProduct
   } else {
     0.0
   }
 }}




/** * :: DeveloperApi :: * Updater for L2 regularized problems. *          R(w) = 1/2 ||w||^2 * Uses a step-size decreasing with the square root of the number of iterations. */@DeveloperApiclass SquaredL2Updater extends Updater {
 override def compute(
     weightsOld: Vector,
     gradient: Vector,
     stepSize: Double,
     iter: Int,
     regParam: Double
): (Vector, Double)
= {
   // add up both updates from the gradient of the loss (= step) as well as
   // the gradient of the regularizer (= regParam * weightsOld)
   // w' = w - thisIterStepSize * (gradient + regParam * w)
   // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
   val thisIterStepSize = stepSize / math.sqrt(iter)
   val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
   brzWeights :*= (1.0 - thisIterStepSize * regParam)
   brzAxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
   val norm = brzNorm(brzWeights, 2.0)
   (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
 }}



此节中, Code 1展示了 GradientDescent 的主要执行逻辑。 重复执行 numIterations 次以获得最终的 W。

首先, data.sample 通过 miniBatchFraction 取一部分样本. 然后使用 treeAggregate 。 在seqOp 中, gradientSum 会通过 axpy(y, b_x, c._1) 更新,如果