Storm安装和使用Demo

一,Storm集群的安装

1.下载安装包:

2.解压移动到软件安装目录

tar -zxvf   xxxx

3.修改配置文件, conf目录下:
vim storm.yaml


storm.zookeeper.servers:
      - "hadoop6"
      - "hadoop7"
      - "hadoop8"
nimbus.host: "hadoop5"
nimbus.childopts: "-Xmx1024m"
#
supervisor.childopts: "-Xmx1024m"
#
worker.childopts: "-Xmx1024m"
#
ui.childopts: "-Xmx1024m"
#
supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

4.分发到运行的节点上:

scp -r storm  hadoop@hadoop8:/home/hadoop/app/
......

5.在各个节点启动服务:

  • nimbus.host所属的机器上启动 nimbus服务

cd /export/servers/storm/bin/

nohup ./storm nimbus &

  • nimbus.host所属的机器上启动ui服务

cd /export/servers/storm/bin/

nohup ./storm ui &

  • 在其它个点击上启动supervisor服务

cd /export/servers/storm/bin/

nohup ./storm supervisor &


6.查看


访问nimbus.host:/8080,即可看到stormui界面。


二,Storm得使用Demo

1.官方统计字数例子

bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.7.jar storm.starter.WordCountTopology wordcount

2.运行结果


3.自己书写一个字符统计的Demo

package com.demo.storm.wordcount;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @Description: Storm 单词统计主类
 * @author: songqinghu
 * @date: 2017年11月2日 下午2:18:19
 * Version:1.0
 */
public class WordCountStart {


    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        //建造者
        TopologyBuilder builder = new TopologyBuilder();
        //构建spout模块
        builder.setSpout("mySpout", new MyWordSpout(), Integer.valueOf(4));
        //构建bolt模块 --切割  定义 mySpout为随机发送规则
        builder.setBolt("mySplitBolt", new MySplitWordBolt(), Integer.valueOf(5)).shuffleGrouping("mySpout");
        //构建bolt模块 --统计  定义 mySplitBolt为Hash分发
        builder.setBolt("myCountBolt", new MyCountWordBolt(), Integer.valueOf(5)).fieldsGrouping("mySplitBolt", new Fields("word"));

        Config config = new Config();

        config.setNumWorkers(Integer.valueOf(3));
        //向集群提交任务
        if(args.length==1){
            StormSubmitter.submitTopology(args[0],config, builder.createTopology());
        }else{
            StormSubmitter.submitTopology("mywordcount",config, builder.createTopology());
        }
    }
}
/**
 * @Description: 对接受到的数据进行处理 --统计
 * @author: songqinghu
 * @date: 2017年11月2日 下午2:42:33
 * Version:1.0
 */
class MyCountWordBolt extends BaseRichBolt {


    private Map<String, Integer> wordCount = new HashMap<String,Integer>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);

        if(wordCount.containsKey(word)){
            wordCount.put(word, wordCount.get(word)+1);
        }else{
            wordCount.put(word, 1);
        }

        System.out.println(wordCount);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //不在向下发送
    }

}


/**
 * @Description: 对接受到的数据进行处理 --切割  分组发送
 * @author: songqinghu
 * @date: 2017年11月2日 下午2:42:33
 * Version:1.0
 */
class MySplitWordBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector =collector;
    }

    @Override
    public void execute(Tuple input) {
        String words = input.getString(0);
        String[] split = words.split(" ");
        for (String word : split) {
            collector.emit(new Values(word.trim()));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}



/**
 * @Description: 数据源接入类,完成数据源的接入和传递工作
 * @author: songqinghu
 * @date: 2017年11月2日 下午2:41:23
 * Version:1.0
 */
class MyWordSpout extends BaseRichSpout{

    private SpoutOutputCollector collector;


    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //初始执行 
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        //发送数据
        collector.emit(new Values("My life is in these books Read these and know my heart"));
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //定义发送的tuple对应字段
        declarer.declare(new Fields("words"));
    }

}



4.打包运行即可:
bin/storm jar wordcount.jar  com.demo.storm.wordcount.WordCountStart mywordcount
相关文章

相关标签/搜索