编写Akka程序实现WordCount功能


MapReduceApplication.scala是程序的入口,执行后产生结果如下:

{hello=2, love=3, hadoop=2, hi=1, spark=2, and=1, i=3}

 

下面将逐个分析代码:

1. MapReduceApplication.scala

主要是向MasterActor发送待统计的消息,完整代码如下:

 

package myakka.messages
 
import akka.actor.{Props, ActorSystem}
import myakka.actors.MasterActor
 
/**
 * Created by jsz on2016/1/16.
 */
object MapReduceApplication {
  def main(args:Array[String]) {
    val _system =ActorSystem("MapReduceApplication")
    val master =_system.actorOf(Props[MasterActor], name = "master")
 
    master ! "Hello,I love Spark. "
    master ! "Hello,I love Hadoop. "
    master ! "Hi, Ilove Spark and Hadoop. "
 
    Thread.sleep(500)
    master ! newResultData
 
    Thread.sleep(500)
    _system.shutdown()
  }
}

 

2. Messages.scala

定义一些数据格式,完整代码如下:

package myakka.messages
 
import java.util.{ArrayList,HashMap}
 
/**
 * Created by jsz on2016/1/16.
 */
 
class Word(val word: String, val count: Integer)
case class ResultData()
class MapData(val dataList: ArrayList[Word])
class ReduceData(val reduceDataMap: HashMap[String, Integer])

 

3. MasterActor.scala

针对不同的消息类型进行处理,完整代码如下:

 

package myakka.actors
 
import akka.actor.{Props, ActorRef, Actor}
import myakka.messages.ResultData
 
/**
 * Created by jsz on2016/1/16.
 */
class MasterActor extends Actor {
  val aggregateActor:ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate")
  val reduceActor:ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)), name ="reduce")
  val mapActor: ActorRef= context.actorOf(Props(new MapActor(reduceActor)), name = "map")
 
  override def receive:Receive = {
    case message: String =>
      mapActor ! message
    case messge:ResultData =>
      aggregateActor !messge
    case _ =>
  }
}


 

4. MapActor.scala

Map阶段对每个单词进行统计,输出<word,count>形式的结果,此阶段不对相同的单词进行汇总,完整代码如下:

package myakka.actors
 
import java.util
import java.util.StringTokenizer
 
import akka.actor.{Actor, ActorRef}
import myakka.messages.{MapData, Word}
 
/**
 * Created by jsz on2016/1/16.
 */
class MapActor(reduceActor: ActorRef) extends Actor {
  // don't count wordsinclude (a,is)
  val STOP_WORDS_LIST =List("a", "is")
 
  override def receive:Receive = {
    case message: String=>
      reduceActor !evaluateExpression(message)
    case _ =>
  }
 
  defevaluateExpression(line: String): MapData = {
    val dataList = newutil.ArrayList[Word]
    val doLine =line.replaceAll("[,!?.]"," ")
    var parser:StringTokenizer = new StringTokenizer(doLine)
    val defaultCount:Integer = 1
   while(parser.hasMoreTokens()) {
      var word: String =parser.nextToken().toLowerCase()
     if(!STOP_WORDS_LIST.contains(word)) {
        dataList.add(newWord(word,defaultCount))
      }
    }
 
    return new MapData(dataList)
  }
}


5. ReduceActor.scala

对单词进行累加汇总操作,完整代码如下:

package myakka.actors
 
import scala.collection.JavaConversions._
import java.util
 
import akka.actor.{Actor, ActorRef}
import myakka.messages.{MapData, ReduceData, Word}
 
/**
 * Created by jsz on2016/1/16.
 */
class ReduceActor(aggregateActor: ActorRef) extends Actor {
  override def receive:Receive = {
    case message: MapData=>
      aggregateActor !reduce(message.dataList)
    case _ =>
  }
 
  def reduce(dataList:util.ArrayList[Word]): ReduceData = {
    var reducedMap = newutil.HashMap[String, Integer]
    for (wc: Word <-dataList) {
      var word: String =wc.word
      if(reducedMap.containsKey(word)) {
       reducedMap.put(word, reducedMap.get(word) + 1)
      } else {
        reducedMap.put(word,1)
      }
    }
 
    return newReduceData(reducedMap)
  }
}


6. AggregateActor.scala

全局聚合操作,并输出结果,完整代码如下:

package myakka.actors
 
import scala.collection.JavaConversions._
import java.util
 
import akka.actor.Actor
import myakka.messages.{ResultData, ReduceData}
 
/**
 * Created by jsz on2016/1/16.
 */
class AggregateActor extends Actor {
 
  var finalReducedMap =new util.HashMap[String, Integer]
 
  override def receive:Receive = {
    case message:ReduceData =>
     aggregateInMemoryReduce(message.reduceDataMap)
    case message: ResultData =>
     System.out.println(finalReducedMap.toString)
  }
 
  defaggregateInMemoryReduce(reducedList: util.HashMap[String, Integer]) = {
    var count: Integer =0
    for (key <-reducedList.keySet) {
      if(finalReducedMap.containsKey(key)) {
        count =reducedList.get(key)
        count +=finalReducedMap.get(key)
       finalReducedMap.put(key, count)
      } else {
       finalReducedMap.put(key, reducedList.get(key))
      }
    }
  }
}
相关文章
相关标签/搜索