Storm之——流组件简单串行编程实践

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/78447229

Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易。下面,简单介绍编程实践过程中需要理解的Storm中的几个概念:

  • Topology

Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排、容纳一组计算逻辑组件(Spout、Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task、Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stream Groupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止。

  • Spout

Storm中Spout是一个Topology的消息生产的源头,Spout应该是一个持续不断生产消息的组件,例如,它可以是一个Socket Server在监听外部Client连接并发送消息,可以是一个消息队列(MQ)的消费者、可以是用来接收Flume Agent的Sink所发送消息的服务,等等。Spout生产的消息在Storm中被抽象为Tuple,在整个Topology的多个计算组件之间都是根据需要抽象构建的Tuple消息来进行连接,从而形成流。

  • Bolt

Storm中消息的处理逻辑被封装到Bolt组件中,任何处理逻辑都可以在Bolt里面执行,处理过程和普通计算应用程序没什么区别,只是需要根据Storm的计算语义来合理设置一下组件之间消息流的声明、分发、连接即可。Bolt可以接收来自一个或多个Spout的Tuple消息,也可以来自多个其它Bolt的Tuple消息,也可能是Spout和其它Bolt组合发送的Tuple消息。

  • Stream Grouping

Storm中用来定义各个计算组件(Spout、Bolt)之间流的连接、分组、分发关系。Storm定义了如下7种分发策略:Shuffle Grouping(随机分组)、Fields Grouping(按字段分组)、All Grouping(广播分组)、Global Grouping(全局分组)、Non Grouping(不分组)、Direct Grouping(直接分组)、Local or Shuffle Grouping(本地/随机分组),各种策略的具体含义可以参考Storm官方文档、比较容易理解。

下面,作为入门实践,我们简单介绍几种开发中常用的流操作处理方式的实现:

Storm组件简单串行

这种方式是最简单最直观的,只要我们将Storm的组件(Spout、Bolt)串行起来即可实现,只需要了解编写这些组件的基本方法即可。在实际应用中,如果我们需要从某一个数据源连续地接收消息,然后顺序地处理每一个请求,就可以使用这种串行方式来处理。如果说处理单元的逻辑非常复杂,那么就需要处理逻辑进行分离,属于同一类操作的逻辑封装到一个处理组件中,做到各个组件之间弱耦合(除了定义Field的schema外,只通过发送消息来连接各个组件)。
下面,我实现一个简单的WordCount的例子,各个组件之间的连接方式,如下图所示:


ProduceRecordSpout类是一个Spout组件,用来产生消息,我们这里模拟发送一些英文句子,实际应用中可以指定任何数据源,如数据库、消息中间件、Socket连接、RPC调用等等。ProduceRecordSpout类代码如下所示:

package com.lyz.storm.word.spout;

import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 产生消息源,不断发送数据
 * @author liuyazhuang
 *
 */
public class ProduceRecordSpout extends BaseRichSpout {

     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(ProduceRecordSpout.class);
     private SpoutOutputCollector collector;
     private Random random;
     private String[] records;
    
     public ProduceRecordSpout(String[] records) {
          this.records = records;
     }
    
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
          this.collector = collector;    
          random = new Random();
     }

     @Override
     public void nextTuple() {
          Utils.sleep(500);
          String record = records[random.nextInt(records.length)];
          List<Object> values = new Values(record);
          collector.emit(values, values);
          LOG.info("Record emitted: record=" + record);
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("record"));         
     }
}
构造一个ProduceRecordSpout对象时,传入一个字符串数组,然后随机地选择其中一个句子,emit到下游(Downstream)的WordSplitterBolt组件,只声明了一个Field,WordSplitterBolt组件可以根据声明的Field,接收到emit的消息,WordSplitterBolt类代码实现如下所示:
package com.lyz.storm.word.bolt;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 切分接收到的文本,按照空格切分每隔单词,然后发送每个单词的数量
 * @author liuyazhuang
 *
 */
public class WordSplitterBolt extends BaseRichBolt {

     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(WordSplitterBolt.class);
     private OutputCollector collector;
    
     @Override
     public void prepare(Map stormConf, TopologyContext context,
               OutputCollector collector) {
          this.collector = collector;              
     }

     @Override
     public void execute(Tuple input) {
          String record = input.getString(0);
          if(record != null && !record.trim().isEmpty()) {
               for(String word : record.split("\\s+")) {
                    collector.emit(input, new Values(word, 1));
                    LOG.info("Emitted: word=" + word);
                    collector.ack(input);
               }
          }
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word", "count"));         
     }
    
}
在execute方法中,传入的参数是一个Tuple,该Tuple就包含了上游(Upstream)组件ProduceRecordSpout所emit的数据,直接取出数据进行处理。上面代码中,我们将取出的数据,按照空格进行的split,得到一个一个的单词,然后在emit到下一个组件,声明的输出schema为2个Field:word和count,当然这里面count的值都为1。
进行统计词频的组件为WordCounterBolt,实现代码如下所示:
package com.lyz.storm.word.bolt;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.google.common.collect.Maps;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * 统计每个单词出现的总次数,然后打印结果
 * @author liuyazhuang
 *
 */
public class WordCounterBolt extends BaseRichBolt {

     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(WordCounterBolt.class);
     private OutputCollector collector;
     private final Map<String, AtomicInteger> counterMap = Maps.newHashMap();
    
     @Override
     public void prepare(Map stormConf, TopologyContext context,
               OutputCollector collector) {
          this.collector = collector;              
     }

     @Override
     public void execute(Tuple input) {
          String word = input.getString(0);
          int count = input.getIntegerByField("count"); // 通过Field名称取出对应字段的数据
          AtomicInteger ai = counterMap.get(word);
          if(ai == null) {
               ai = new AtomicInteger(0);
               counterMap.put(word, ai);
          }
          ai.addAndGet(count);
          LOG.info("DEBUG: word=" + word + ", count=" + ai.get());
          collector.ack(input);
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {           
     }
    
     @Override
     public void cleanup() {
          // print count results
          LOG.info("Word count results:");
          for(Entry<String, AtomicInteger> entry : counterMap.entrySet()) {
               LOG.info("\tword=" + entry.getKey() + ", count=" + entry.getValue().get());
          }
     }

}
上面代码通过一个Map来对每个单词出现的频率进行累加计数,比较简单。因为该组件是Topology的最后一个组件,所以不需要在declareOutputFields方法中声明Field的Schema,而是在cleanup方法中输出最终的结果,只有在该组件结束任务退出时才会调用cleanup方法输出。
最后,需要基于上面的3个组件来创建一个Topology实例,提交到Storm集群去运行,配置代码如下所示:
package com.lyz.storm.word.main;

import com.lyz.storm.word.bolt.WordCounterBolt;
import com.lyz.storm.word.bolt.WordSplitterBolt;
import com.lyz.storm.word.spout.ProduceRecordSpout;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * 程序的入口,运行程序
 * @author liuyazhuang
 *
 */
public class WordCountTopology {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
	     // configure & build topology
	     TopologyBuilder builder = new TopologyBuilder();
	     String[] records = new String[] {
	               "A Storm cluster is superficially similar to a Hadoop cluster",
	               "All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster",
	               "The core abstraction in Storm is the stream"
	     };
	     builder
	          .setSpout("spout-producer", new ProduceRecordSpout(records), 1)
	          .setNumTasks(3);
	     builder
	          .setBolt("bolt-splitter", new WordSplitterBolt(), 2)
	          .shuffleGrouping("spout-producer")
	          .setNumTasks(2);
	     builder.setBolt("bolt-counter", new WordCounterBolt(), 1)
	          .fieldsGrouping("bolt-splitter", new Fields("word"))
	          .setNumTasks(2);
	    
	     // submit topology
	     Config conf = new Config();
	     String name = WordCountTopology.class.getSimpleName();
	     if (args != null && args.length > 0) {
	          String nimbus = args[0];
	          conf.put(Config.NIMBUS_HOST, nimbus);
	          conf.setNumWorkers(2);
	          StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
	     } else {
	          LocalCluster cluster = new LocalCluster();
	          cluster.submitTopology(name, conf, builder.createTopology());
	          Thread.sleep(60000);
	          cluster.shutdown();
	     }
	}

}
上面通过TopologyBuilder来配置组成一个Topology的多个组件(Spout或Bolt),然后通过调用createTopology()方法创建一个Topology实例。上面方法中,对应着2种运行模式:如果没有传递任何参数,则是使用LocalCluster来运行,适合本地调试代码;如果传递一个Topology名称作为参数,则是在真实的Storm集群上运行,需要对实现的Topology代码进行编译打包,通过StormSubmitter提交到集群上作为服务运行。

至此,流组件简单串行编程实例完毕。

相关文章

相关标签/搜索