Storm入门示例

TopologyBuilder

创建一个拓扑,接管spout和bolt

package ***;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author  Michael
 * 2017年12月15日
 * @description
 */
public class WordCountTopology {

	public static void main(String[] args){
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout", new RandomSentenceSpout(), 5);
		builder.setBolt("split", new SplitSentence(),8).shuffleGrouping("spout");
		builder.setBolt("count", new WordCount(),12).fieldsGrouping("split", new Fields("word"));
		
		Config conf = new Config();
		conf.setDebug(false);
		
		conf.setMaxTaskParallelism(3);
		LocalCluster cluster = new LocalCluster();
		
		cluster.submitTopology("word-count", conf, builder.createTopology());
		
		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

Spout

数据流入口,将初步处理之后数据转交给下一步bolt(SplitSentenceBolt)

package ***;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * @author  Michael
 * 2017年12月22日
 * @description
 */
public class RandomSentenceSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private SpoutOutputCollector collector;
	
	private Random rand;
	
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
		rand = new Random();
	}

	@Override
	public void nextTuple() {
		Utils.sleep(100);
		String[] sentences = new String[]{
				"the cow jumped over the moon", "an apple a day keeps the doctor away",  
                "four score and seven years ago", "snow white and the seven dwarfs",
                "i am at two with natu"
		};
		String sentence = sentences[rand.nextInt(sentences.length)];
		System.out.println(sentence);
		collector.emit(new Values(sentence));
//		collector.emit(new Values("the cow jumped over the moon"));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		/**
		 * 定义上面emit(value) value对应的key,这这下一步,可以就跟这个剧key取出emit的value
		 */
		declarer.declare(new Fields("sentence"));
	}

	   // 确认函数  
    @Override  
    public void ack(Object id){  
    	System.out.println("ack");
    }  
      
    // 处理失败的时候调用  
    @Override  
    public void fail(Object id){  
    	System.out.println("fail");
    }  
	
	
	public static void main(String[] args){
		Random rand = new Random();
		String[] sentences = new String[]{
				"the cow jumped over the moon", "an apple a day keeps the doctor away",  
                "four score and seven years ago", "snow white and the seven dwarfs",
                "i am at two with natu"
		};
		
		System.out.println(sentences[rand.nextInt(sentences.length)]);
		
	}
	

}

Bolt

SplitSentenceBolt

package ***;

import java.util.StringTokenizer;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
* @author  Michael
* 2017年12月22日
* @description
*/
public class SplitSentence extends BaseBasicBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 3307820940609824864L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
//		String sentence = tuple.getString(0);
		String sentence = tuple.getStringByField("sentence");
		System.out.println("从spout中取出的sentence:" + sentence);
		StringTokenizer iter = new StringTokenizer(sentence);
		while(iter.hasMoreElements()){
			collector.emit(new Values(iter.nextToken()));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

WordCountBolt

统计词频的bolt

package ***;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @author  Michael
 * 2017年12月22日
 * @description
 */
public class WordCount extends BaseBasicBolt {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	Map<String,Integer> counts = new HashMap<String,Integer>();
	

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String word = tuple.getString(0);
		
		Integer count = counts.get(word);
		if(count == null){
			count = 0;
		}
		count ++ ;
		counts.put(word, count);
		System.out.println("word:" + word + "  count:" + count);
		collector.emit(new Values(word,count));
	}
	
	

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word","count"));
	}
	
	


}
相关文章
相关标签/搜索