Storm之——编程案例

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/78383974

一、程序需求

今天,我们再次为大家带来一篇关于Storm的文章,以便为大家起到复习Storm的效果。这篇文章的编程案例基于Maven实现,主要的功能是:从文件读取内容——>切分单词,去掉首尾空格并将单词转化为小写——>统计单词数量并打印结果。

好了,明确了程序要实现的功能之后,我们就正式进入Storm的开发。

二、程序实现

1、创建工程

首先我们创建一个Maven工程,编译pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.lyz</groupId>
	<artifactId>storm-test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<compilerVersion>1.7</compilerVersion>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<repositories>
		<!-- Repository where we can found the storm dependencies -->
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>
	<dependencies>
		<!-- Storm Dependency -->
		<dependency>
			<groupId>storm</groupId>
			<artifactId>storm</artifactId>
			<version>0.6.0</version>
		</dependency>
	</dependencies>
</project>

工程的目录结构如下:


2、编写Spout类WordReader

这个类的主要作用是负责从文件按行读取文本,并把文本行提供给第一个bolt。

主要代码如下:

package com.lyz.storm.spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 *WordReader负责从文件按行读取文本,并把文本行提供给第一个bolt。
 * @author liuyazhuang
 *
 */
public class WordReader implements IRichSpout {

	private static final long serialVersionUID = -850307559130820088L;

	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;
	private TopologyContext context;
	@Override
	public boolean isDistributed() {
		return false;
	}
	@Override
	public void ack(Object msgId) {
		System.out.println("OK:" + msgId);
	}
	@Override
	public void close() {
	}
	@Override
	public void fail(Object msgId) {
		System.out.println("FAIL:" + msgId);
	}

	/**
	 * 这个方法做的惟一一件事情就是分发文件中的文本行
	 */
	@Override
	public void nextTuple() {
		/**
		 * 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。
		 */
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// 什么也不做
			}
			return;
		}
		String str;
		// 创建reader
		BufferedReader reader = new BufferedReader(fileReader);
		try {
			// 读所有文本行
			while ((str = reader.readLine()) != null) {
				/**
				 * 按行发布一个新值
				 */
				this.collector.emit(new Values(str), str);
			}
		} catch (Exception e) {
			throw new RuntimeException("Error reading tuple", e);
		} finally {
			completed = true;
		}
	}

	/**
	 * 我们将创建一个文件并维持一个collector对象
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		try {
			this.context = context;
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");
		}
		this.collector = collector;
	}

	/**
	 * 声明输入域"word"
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}

}

3、编写第一个bolt类WordNormalizer

这个类的主要作用是:负责得到并标准化每行文本。它把文本行切分成单词,大写转化成小写,去掉头尾空白符,并将结果发送给第二个bolt类。

具体代码如下:

package com.lyz.storm.bolts;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 负责得到并标准化每行文本。它把文本行切分成单词,大写转化成小写,去掉头尾空白符。
 * @author liuyazhuang
 *
 */
public class WordNormalizer implements IRichBolt {
	private static final long serialVersionUID = -2127001114476106896L;
	
	private OutputCollector collector;
	
	@Override
	public void cleanup() {
		
	}

	/**
	 * *bolt*从单词文件接收到文本行,并标准化它。 文本行会全部转化成小写,并切分它,从中得到所有单词。
	 */
	@Override
	public void execute(Tuple input) {
		String sentence = input.getString(0);
		String[] words = sentence.split(" ");
		for (String word : words) {
			word = word.trim();
			if (!word.isEmpty()) {
				word = word.toLowerCase();
				// 发布这个单词
				List<Tuple> a = new ArrayList<Tuple>();
				a.add(input);
				collector.emit(a, new Values(word));
			}
		}
		// 对元组做出应答
		collector.ack(input);
	}
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	/**
	 * 这个*bolt*只会发布“word”域
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

4、编写第二个bolt类WordCounter

这个类的主要作用是:统计每个单词的数量并打印结果。

package com.lyz.storm.bolts;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

/**
 * 负责为单词计数。这个拓扑结束时(cleanup()方法被调用时),我们将显示每个单词的数量。
 * 这个例子的bolt什么也没发布,它把数据保存在map里,但是在真实的场景中可以把数据保存到数据库。
 * @author liuyazhuang
 *
 */
public class WordCounter implements IRichBolt{
	
	private static final long serialVersionUID = 6323893801667766697L;
	Integer id;
    String name;
    Map<String,Integer> counters;
    private OutputCollector collector;

    /**
      * 这个spout结束时(集群关闭的时候),我们会显示单词数量
      */
    @Override
    public void cleanup(){
        System.out.println("-- 单词数 【"+name+"-"+id+"】 --");
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     *  为每个单词计数
     */
    @Override
    public void execute(Tuple input) {
        String str=input.getString(0);
        /**
         * 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
         */
        if(!counters.containsKey(str)){
        	counters.put(str,1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str,c);
        }
        //对元组作为应答
        collector.ack(input);
    }

    /**
     * 初始化
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

5、编写运行程序的入口类TopologyMain

这个类的主要作用是:作为程序的入口,以本地模式运行。
具体代码如下:

package com.lyz.storm;
import com.lyz.storm.bolts.WordCounter;
import com.lyz.storm.bolts.WordNormalizer;
import com.lyz.storm.spouts.WordReader;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * 运行程序的主类,本实例以本地模式运行
 * @author liuyazhuang
 *
 */
public class TopologyMain {
	
	public static void main(String[] args) throws InterruptedException {
		// 定义拓扑
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader", new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping("word-normalizer", new Fields("word"));

		// 配置
		Config conf = new Config();
		conf.put("wordsFile", "D:/Workspaces/Hadoop/storm-test/src/main/resources/word.txt");
		conf.setDebug(false);

		// 运行拓扑
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
		Thread.sleep(1000);
		cluster.shutdown();
	}
}

6、创建word.txt文件

在工程的src/main/resources目录下创建word.txt文件如下:

Storm test are great is an Storm simple application but very powerful really Storm is great great great great great great great great great great great great great great great great great great great great great great great 
至此,整个工程创建完毕。

三、运行程序

我们运行程序的入口类TopologyMain

可以看到控制台输出如下日志:

1501 [Thread-24] INFO  backtype.storm.util  - Async loop interrupted!
-- 单词数 【word-counter-3】 --
really: 1
but: 1
great: 24
an: 1
storm: 3
1504 [main] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Topologie-1-1509248619:3
1504 [main] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Topologie-1-1509248619:2
1504 [Thread-26] INFO  backtype.storm.util  - Async loop interrupted!
-- 单词数 【word-counter-2】 --
application: 1
is: 2
are: 1
test: 1
simple: 1
powerful: 1
very: 1
1507 [main] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Topologie-1-1509248619:2
其中打印出了word.txt文件中的每个单词的统计数量。至此,整个应用程序编写测试完毕。

四、温馨提示

大家可以到链接http://download.csdn.net/download/l1028386804/10043475下载本博文的工程源码——storm编程实例。

相关文章

相关标签/搜索