# Spark组件之GraphX学习5--随机图生成和消息发送aggregateMessages以及mapreduce操作（含源码分析）

0.主要：

1解释

（1）随机图生成

GraphGenerators.logNormalGraph随机图生成方法源码：默认出度为4，标准偏差为1.3，并行生成numVertices，partition默认为sc的默认partition。然后边的属性值生成调用了sampleLogNormal方法生成，边的生成调用了generateRandomEdges方法，总边数为每个顶点与其出度的乘积之和，默认生成的边为：Edge[Int](src, rand.nextInt(maxVertexId), 1)，也就是说目的顶点随机，可能重复也可能指向自己

```    /**
* Generate a graph whose vertex out degree distribution is log normal.
*
* The default values for mu and sigma are taken from the Pregel paper:
*
* Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
* Pregel: a system for large-scale graph processing. SIGMOD '10.
*
* If the seed is -1 (default), a random seed is chosen. Otherwise, use
* the user-specified seed.
*
* @param sc Spark Context
* @param numVertices number of vertices in generated graph
* @param numEParts (optional) number of partitions
* @param mu (optional, default: 4.0) mean of out-degree distribution
* @param sigma (optional, default: 1.3) standard deviation of out-degree distribution
* @param seed (optional, default: -1) seed for RNGs, -1 causes a random seed to be chosen
* @return Graph object
*/```
``` def logNormalGraph(
sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0,
sigma: Double = 1.3, seed: Long = -1): Graph[Long, Int] = {

val evalNumEParts = if (numEParts == 0) sc.defaultParallelism else numEParts

// Enable deterministic seeding
val seedRand = if (seed == -1) new Random() else new Random(seed)
val seed1 = seedRand.nextInt()
val seed2 = seedRand.nextInt()

val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, evalNumEParts).map {
src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ src)))
}

val edges = vertices.flatMap { case (src, degree) =>
generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 ^ src))
}

Graph(vertices, edges, 0)
}```

```  /**
* Randomly samples from a log normal distribution whose corresponding normal distribution has
* the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`,
* `s` are the mean, standard deviation of the lognormal distribution and
* `Z ~ N(0, 1)`. In this function,
* `m = e^(mu+sigma^2/2)` and `s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]`.
*
* @param mu the mean of the normal distribution
* @param sigma the standard deviation of the normal distribution
* @param maxVal exclusive upper bound on the value of the sample
* @param seed optional seed
*/
private[spark] def sampleLogNormal(
mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = {
val rand = if (seed == -1) new Random() else new Random(seed)

val sigmaSq = sigma * sigma
val m = math.exp(mu + sigmaSq / 2.0)
// expm1 is exp(m)-1 with better accuracy for tiny m
val s = math.sqrt(math.expm1(sigmaSq) * math.exp(2*mu + sigmaSq))
// Z ~ N(0, 1)
var X: Double = maxVal

while (X >= maxVal) {
val Z = rand.nextGaussian()
X = math.exp(mu + sigma*Z)
}
math.floor(X).toInt
}
```

```  // Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
val RMATc = 0.15

def generateRandomEdges(
src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): Array[Edge[Int]] = {
val rand = if (seed == -1) new Random() else new Random(seed)
Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}```

（2）发送消息

`In addition, aggregateMessages takes an optional tripletsFields which indicates what data is accessed in the EdgeContext (i.e., the source vertex attribute but not the destination vertex attribute). The possible options for the tripletsFields are defined in TripletFields and the default value is TripletFields.All which indicates that the user defined sendMsg function may access any of the fields in the EdgeContext. The tripletFields argument can be used to notify GraphX that only part of the EdgeContext will be needed allowing GraphX to select an optimized join strategy. For example if we are computing the average age of the followers of each user we would only require the source field and so we would use TripletFields.Src to indicate that we only require the source field`

```  /**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
* sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
* destined to the same vertex.
*
* @tparam A the type of message to be sent to each vertex
*
* @param sendMsg runs on each edge, sending messages to neighboring vertices using the
*   [[EdgeContext]].
* @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
*   combiner should be commutative and associative.
* @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
*   `sendMsg` function. If not all fields are needed, specifying this can improve performance.
*
* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
*   rawGraph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
* }}}
*
* @note By expressing computation at the edge level we achieve
* maximum parallelism.  This is one of the core functions in the
* Graph API in that enables neighborhood level computation. For
* example this function can be used to count neighbors satisfying a
* predicate or implement PageRank.
*
*/
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}```

```  /**
* Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
* `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
* sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
* destined to the same vertex.
*
* This variant can take an active set to restrict the computation and is intended for internal
* use only.
*
* @tparam A the type of message to be sent to each vertex
*
* @param sendMsg runs on each edge, sending messages to neighboring vertices using the
*   [[EdgeContext]].
* @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
*   combiner should be commutative and associative.
* @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
*   `sendMsg` function. If not all fields are needed, specifying this can improve performance.
* @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
*   desired. This is done by specifying a set of "active" vertices and an edge direction. The
*   `sendMsg` function will then run on only edges connected to active vertices by edges in the
*   specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
*   destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
*   originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
*   run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
*   will be run on edges with *both* vertices in the active set. The active set must have the
*   same index as the graph's vertices.
*/
private[graphx] def aggregateMessagesWithActiveSet[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)])
: VertexRDD[A]
```

2.代码：

```/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators

object GraphOperatorsJoin {

def main(args: Array[String]): Unit = {
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers

val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues((id, value) => value match { case (count, totalAge) => totalAge / count })
// Display the results
println("Graph:");
println("sc.defaultParallelism:"+sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:"+graph.edges.count);
println("\nolderFollowers:");
olderFollowers.collect.foreach(println)
println("\navgAgeOfOlderFollowers:");
avgAgeOfOlderFollowers.collect.foreach(println(_))
}

}```

（1）第一次aggregateMessages

``` val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)```

（2）mapValues

```val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues((id, value) => value match { case (count, totalAge) => totalAge / count })```

3.结果：

```Graph:
sc.defaultParallelism:4
vertices:
(4,4.0)
(0,0.0)
(8,8.0)
(1,1.0)
(9,9.0)
(5,5.0)
(6,6.0)
(2,2.0)
(3,3.0)
(7,7.0)
edges:
Edge(0,1,1)
Edge(0,1,1)
Edge(0,1,1)
Edge(0,4,1)
Edge(0,5,1)
Edge(0,7,1)
Edge(1,2,1)
Edge(1,4,1)
Edge(1,7,1)
Edge(2,2,1)
Edge(2,2,1)
Edge(3,0,1)
Edge(3,0,1)
Edge(3,1,1)
Edge(3,8,1)
Edge(3,9,1)
Edge(4,0,1)
Edge(4,0,1)
Edge(4,1,1)
Edge(4,2,1)
Edge(4,8,1)
Edge(4,9,1)
Edge(4,9,1)
Edge(4,9,1)
Edge(5,4,1)
Edge(5,6,1)
Edge(5,7,1)
Edge(5,8,1)
Edge(6,0,1)
Edge(6,1,1)
Edge(6,1,1)
Edge(6,1,1)
Edge(6,3,1)
Edge(6,4,1)
Edge(6,8,1)
Edge(6,8,1)
Edge(6,9,1)
Edge(7,6,1)
Edge(7,6,1)
Edge(7,7,1)
Edge(8,0,1)
Edge(8,4,1)
Edge(8,6,1)
Edge(8,6,1)
Edge(8,7,1)
Edge(8,7,1)
Edge(8,8,1)
Edge(8,9,1)
Edge(9,1,1)
Edge(9,2,1)
Edge(9,3,1)
Edge(9,4,1)
Edge(9,4,1)
Edge(9,7,1)
Edge(9,9,1)
count:55

olderFollowers:
(4,(5,37.0))
(0,(6,28.0))
(1,(6,34.0))
(6,(4,30.0))
(2,(2,13.0))
(3,(2,15.0))
(7,(3,25.0))

avgAgeOfOlderFollowers:
(4,7.4)
(0,4.666666666666667)
(1,5.666666666666667)
(6,7.5)
(2,6.5)
(3,7.5)
(7,8.333333333333334)```

【1】 http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html

每一个你不满意的现在，都有一个你没有努力的曾经。
一个历史类的公众号，欢迎关注 