storm开发手册

1、api:

1.1 数据模型(Data Model)

storm使用tuple来作为它的数据模型。每个 tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型, 在我的理解里面一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类 型。你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。

topology里面的每个节点必须定义它要发射的tuple的每个字段。 比如下面这个bolt定义它所发射的tuple包含两个字段,字段名称分别是: double和triple。

public class DoubleAndTripleBolt implements IRichBolt {
    private OutputCollectorBase _collector;
 
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }
 
    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }
 
    @Override
    public void cleanup() {
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }
}
declareOutputFields方法定义要输出的字段 : ["double", "triple"]。

1.2 流分组策略(Stream grouping)

流分组策略告诉topology如何在两个组件之间发送tuple。 要记住, spouts和bolts以很多task的形式在topology里面同步执行。

当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt B的哪个task呢?
stream grouping专门回答这种问题的。在我们深入研究不同的stream grouping之前, 让我们看一下storm-starter里面的另外一个topology。WordCountTopology读取一些句子, 输出句子里面每个单词出现的次数.

TopologyBuilder builder = new TopologyBuilder();
 
builder.setSpout(1, new RandomSentenceSpout(), 5 );
builder.setBolt(2, new SplitSentence(), 8 )
        .shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 12)
        .fieldsGrouping(2, new Fields("word"));

SplitSentence对于句子里面的每个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。

有好几种不同的stream grouping:

  1. 最 简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
  2. 一种更有趣的grouping是 fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task, 那么统计出来的单词次数就不对了。

fields grouping是stream合并,stream聚合以及很多其它场景的基础。在背后, fields grouping使用的一致性哈希来分配tuple的。

还有一些其它类型的stream grouping.

1.3 使用别的语言来定义Bolt

Bolt 可以使用任何语言来定义。用其它语言定义的bolt会被当作子进程(subprocess)来执行, storm使用JSON消息通过stdin/stdout来和这些subprocess通信。这个通信协议是一个只有100行的库, storm团队给这些库开发了对应的Ruby, Python和Fancy版本。

下面是WordCountTopology里面的SplitSentence的定义:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
SplitSentence继承自ShellBolt并且声明这个Bolt用python来运行,并且参数是: splitsentence.py。下面是splitsentence.py的定义:

import storm
 
class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])
 
SplitSentenceBolt().run()
更多有关用其它语言定义Spout和Bolt的信息, 以及用其它语言来创建topology的 信息可以参见:  Using non-JVM languages with Storm .

1.4 Topology编程之Spout&Bolt

往Storm集群上提交的Job称为Topology。Topology可以理解为由Spout和Bolt组成的对实时数据处理的逻辑图,其中Spout负责从外界接收数据,Bolt负责对接收的数据进行处理。要使Storm意识到用户写的处理逻辑,需要实现Storm开放的相关接口。为了方便用户实现自己的逻辑Storm也提供了一些已实现部分功能的超类,用户可以直接继承这些超类,而不用从头开始实现ISpout或IBolt。在本文中,将介绍实现Spout和Bolt的不同方式。
1. IComponent接口
在Storm中Spout与Bolt都称为Component,每个用户定义的Spout或Bolt都要直接或间接的实现IComponent接口。IComponent接口如下。
public interface IComponent extends Serializable {
    void declareOutputFields(OutputFieldsDeclarer var1);
    Map<String, Object> getComponentConfiguration();
}

方法:
1. declareOutputFields。用于声明当前Component的每个输出流的Schema。
2. getComponentConfiguration。设置当前Component的配置。只有Storm的配置文件中,“topology.*"的配置项对当前的Component有效。
2. Spout
2.1 ISpout接口
每个用户定义的Spout都需要直接或间接的实现ISpout接口。ISpout接口如下:
public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void activate();
    void deactivate();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
    void close();
}
方法:
1. open。该方法会在创建Spout时被调用,在该方法中通常做一些初始化工作。其中的参数conf包含了当前Component的配置信息,context指出了当前component的上下文环境,从context中可以获得当前component对应的TaskId,ComponentId,所在Worker的进程Id,代码所在目录等信息。collector用于往下层发送数据,通常在nextTuple方法中调用。
2. activate。调用该方法,将会使Spout从deactive 模式中恢复过来,nextTuple方法会变成可被调用状态。
3. deactivate。 调用该方法,将会使Spout变为deactivate 模式。nextTuple方法会变成不可被调用状态。activate与deactivate方法通过Storm 客户端或Storm UI调用。
4. nextTuple。在active模式下,nextTuple将会不断被调用,用于往下层的Bolt发送数据。当没有数据需要发送时,最好让nextTuple方法sleep一小段时间,以免浪费CPU资源。
5. ack。当Storm确定由msgId标识的由Spout发出的Tuple被成功处理后,将会调用ack方法。在该方法中可以做消息被成功处理的后续处理工作,如从缓存队列中将该msgId对应的消息删除。
6. fail。当由Spout发出被标识为msgId的Tuple在后续处理中失败时,将会调用fail方法。在该方法中可以做消息处理失败的后续处理工作,如将该msgId对应的消息重新发送。
7. close。当Spout被shutdown时,该方法被调用。但并不能保证该方法一定会被调用,因为一个Spout所在的worker 可能会被以kill -9这种杀死进程的方式杀掉。
Storm在同一个线程中执行ack、fail、nextTuple方法。这意味着ISpout的实现不需要担心这些方法之间的并发问题。但是这也意味着nextTuple方法是非阻塞式的,要不然ack和fail方法会得不到执行。

ISpout接口中的nextTuple、ack、fail、activate,deactivate方法都在同一个线程中被调用,所以ISpout接口中的方法都不能是阻塞的实现。
2.2 用户实现Spout
用户定义的Spout需要实现IComponent与ISpout接口中的所有方法。但为了更方便的实现Spout,用户大多数情况下不用直接实现IComponent与ISpout接口,取而代之的是,实现或继承以下接口或类。
1.  IRichSpout。该接口继承自IComponent与ISpout接口,所以当用户需要实现Spout必需的所有方法时,可以继承自这个接口。
2.  BaseRichSpout。该超类用空方法实现了IRichSpout中的除了declareOutputFields、open、nextTuple之外的所有的方法,当用户不需要实现Spout必需的所有方法时,可以继承自这个类。

3. Bolt
3.1 IBolt接口
每个用户定义的Bolt都需要直接或间接的实现IBolt接口。Bolt以元组作为输入,并产生元组作为输出。IBolt接口如下。
public interface IBolt extends Serializable {
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    void execute(Tuple input);
    void cleanup();
}
方法:
1. prepare。该方法在创建Bolt时被调用,类似于Spout的open方法,在该方法中通常做一些初始工作。其中的参数conf包含了当前Component的配置信息,context指出了当前component的上下文环境,从context中可以获得当前component对应的TaskId,ComponentId,所在Worker的进程Id,代码所在目录等信息。collector用于往下层发送数据,通常在execute方法中调用。
2. execute。execute方法会被不断调用,对于每一个tuple都会调用一次。
3. cleanup。当Bolt被shutdown的时候,该方法被调用。但并不能保证该方法一定会被调用,因为一个Bolt所在的worker可能会被以kill -9这种杀死进程的方式杀掉。
3.2 用户实现Bolt
用户定义的Bolt需要实现IComponent与IBolt接口中的所有方法。但为了更方便的实现Bolt,用户大多数情况下不用直接实现IComponent与IBolt接口,取而代之的是,实现或继承以下接口或类。
1.  IRichBolt。该接口继承自IComponent与ISpout接口,所以当用户需要实现Bolt必需的所有方法时,可以继承自这个接口。
2.  BaseRichBolt。该超类用空方法实现了IRichBolt中的除了declareOutputFields、prepare与execute之外的方法,当用户不需要实现Spout必需的所有方法时,可以继承自这个类。
在实现Bolt时也可以通过继承或实现以下的接口或类,这些接口或类虽然没有直接或间接实现IBolt,但当设置Topology时,这些接口的对象会被Storm转化成ISpout类型的对象。
1.  IBasicBolt。该接口也继承了IComponent,包含方法的方法名与IBolt中的一样。
2.  BaseBasicBolt。这个抽象类使用空方法实现了IBasicBolt中除了execut与declareOutputFields方法之外的其他方法。当不需要实现IBasicBolt包含的所有方法时可以继承自这个类。
Storm会在创建Topology时,使用BasicBoltExecutor对实现了IBasicBolt的类进行包装。当用户继承自IBasicBolt来实现自定义的Bolt时,不需要对输入的tuple进行ack、fail。BasicBoltExecutor会自动在调用IBasicBolt的execute方法后调用ack方法。如果在IBasicBolt的execute方法中抛出FailedException,BasicBoltExecutor则会调用fail方法。

4. 总结
通过实现Storm暴露的Spout、Bolt的接口,可以方便的实现用户定义的Topology组件。Storm的ack机制也可以对Spout发出的tuple进行跟踪,并在被成功处理,或处理失败时,进行反馈。


2、Storm开发环境

如何配置一个storm的开发环境, 总的来看有下面几个步骤:

  1. 下载storm的release版本, 解压,并且把bin/目录加到环境变量PATH里面去。
  2. 为了让我们可以启动/停止远端storm集群上的topology, 把集群的信息配置在 ~/.storm/storm.yaml里面。

下面具体介绍。

2.1 storm开发环境到底是个什么样子

storm有两种操作模式: 本地模式和远程模式。使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远端模式的时候你提交的topology会在一个集群的机器上执行。

一个storm开发环境安装了你使用本地模式开发测试topology; 把topology打包以部署到远端的集群; 提交,终止远端集群上的topology所需要的一切东西。

让我们快速看一下你的机器和远端storm集群之间的关系。storm集群是被一个称作Nimbus的控制节点所管理的。你的机器 与nimbus通信以提交topology的代码,运行这个topology,而nimbus会自动在集群内部分发你的topology代码, 分配任务给各个机器。你的机器使用一个称为storm的客户端去和nimbus通信。storm只有在远程模式的时候才有用; 对于用本地模式开发、测试topology来说是没什么用的。


2.2 在本地机器安装一个storm

如果你想从你的机器提交topology给远端的storm集群, 你应该在你的本地安装一个storm发行版。安装了storm发行版之后你会得到你和远端集群通信的工具: storm。为了在本地安装storm, 从这里下载代码,并且把它解压到你机器上的一个目录。然后把bin/目录添加到环境变量PATH里面去并且使bin/storm有可执行权限。

安装一个storm发行版只是用来和远端storm集群通信用的。而为了用本地模式开发测试topology, 我们推荐你使用maven来把storm作为你的项目的一个dev依赖。关于使用Maven开发storm项目可以看这篇文章:Maven

2.3 在一个远端集群里面启动/终止topology

前面一个步骤在你的机器上安装了可以和远端集群通信用的storm客户端。现在你只需要告诉你的storm客户端和哪个集群进行通信。把你集群的控制节点的地址指定在~/.storm/storm.yaml里面就可以了:

 

1
nimbus.host: "123.45.678.890"

 

更多关于storm-deploy的信息看这里


3、创建一个新的storm项目

如何新建一个storm项目, 主要步骤:

  1. 把storm的jar包加到classpath里面去。
  2. 如果使用storm的多语言特性的话(非JVM语言), 把multilang目录也加到classpath里面去。

3.1 把storm jar包添加到classapth里面

为了开发storm项目你的classpath里面需要有storm的jar包。最推荐的方式是使用Maven, 不使用maven的话你可以手动把storm发行版里面的所有的jar包添加到classpath。

要在eclipse里面设置storm的classpath的话, 以src/jvm作为源代码路径建立一个新的项目并且把liblib/jvm里面所有的jar包都添加到这个项目的Referenced Libraries里面去。

3.2如果要使用多语言特性, 把mutltilang添加到classpath里面去

如果你用非java的其它语言来实现你的spout和bolt(storm里面的两个关键对象), 那么你的实现要放在这个项目的multilang/resources目录下面。为了让storm在本地模式下能找到这些文件, multilang目录要放到classpath里面去。在eclipse里面你可以通过把multilang目录设置成源代码目录来达到这个目的。

更多关于用非java语言来编写spout和bolt的信息可以看一下: Using non-JVM languages with Storm

为了在eclipse测试所有的东西是不是都配置好了, 你可以运行一下WordCountTopology这个类, 如果正常的话, 你可以看到它发射十秒钟消息。


4、Storm本地模式和集群模式

4.1 本地模式

4.1.1 API

本地模式在一个进程里面模拟一个storm集群的所有功能, 这对开发和测试来说非常方便。以本地模式运行topology与集群模式类似。

要创建一个进程内“集群”,使用LocalCluster对象就可以了:

 

1
2
3
import backtype.storm.LocalCluster;
 
LocalCluster cluster = new LocalCluster();

 

然后可以通过LocalCluster对象的submitTopology方法来提交topology, 效果和StormSubmitter对应的方法是一样的。submitTopology方法需要三个参数: topology的名字, topology的配置以及topology对象本身。你可以通过killTopology方法来终止一个topology, 它需要一个topology名字作为参数。

要关闭一个本地集群,简单调用:

 

1
cluster.shutdown();

 

就可以了。

4.1.2 本地模式下的一些常用配置

你可以在这里看到完整的配置列表:

1. Config.TOPOLOGY_MAX_TASK_PARALLELISM 这个配置给topology里面各个组件(spout, bolt)设定一个线程数量上限。一般来说生成环境的这个配置很大(100左右), 而这对于本地测试来说太大了, 这个配置可以让你把它调小。

2. Config.TOPOLOGY_DEBUG 这个配置如果设置成true, storm会log下spout和bolt发射出来的所有消息, 对于调试来说非常有用。

4.2集群模式

4.2.1 API

在生产集群上运行topology跟本地模式差不多。下面是步骤:

1)定义topology(如果是java的话, 用TopologyBuilder)

2) 使用StormSubmitter来把topology提交到集群。StormSubmitter的参数有:topology的名字,topology的配置对象,以及topology本身。比如:

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("name",
                   conf, topology);
3) 创建一个包含你的程序代码以及你代码所依赖的依赖包的jar包(有关storm的jar包不用包括, 这些jar包会在工作节点上自动被添加到classpath里面去)。如果你使用maven, 那么插件: Maven Assembly Plugin 可以帮你打包,只要把下面的配置加入你的pom.xml。
<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
      <manifest>
        <mainClass>com.path.to.main.Class</mainClass>
      </manifest>
    </archive>
  </configuration>
</plugin>

然后运行mvn assembly:assembly就可以打包了. 再说一下,不用包括storm相关的jar包,它们会自动加到classpath里面。

4)用storm客户端去提交jar包:

 

1
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

 

storm jar 会把代码提交到集群并且配置StormSubmitter类以让它和正确的集群进行通信。在这个例子里面,上传jar包之后storm jar命令会调用org.me.MyTopology的main函数,参数是 arg1, arg2, arg3。关于如何配置你的storm客户端去和storm集群进行通信可以看下配置storm开发环境

4.2.2 常见配置

有很多topology级的配置可以设。这里有关于所有配置的清单, 以”TOPOLOGY”打头的配置是topology级别的配置,可以覆盖全局级别的配置。下面是一些比较常见的:

1)Config.TOPOLOGY_WORKERS:  这个设置用多少个工作进程来执行这个topology。比如,如果你把它设置成25, 那么集群里面一共会有25个java进程来执行这个topology的所有task。如果你的这个topology里面所有组件加起来一共有150的并行 度,那么每个进程里面会有6个线程(150 / 25 = 6)。

2)Config.TOPOLOGY_ACKERS: 这个配置设置acker线程的数目。Ackers是Storm的可靠性API的一部分,关于storm的可靠性API可以看下:Twitter Storm如何保证消息不丢失

3)Config.TOPOLOGY_MAX_SPOUT_PENDING:  这个设置一个spout task上面最多有多少个没有处理的tuple(没有ack/failed)回复, 我们推荐你设置这个配置,以防止tuple队列爆掉。

4)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 这个配置storm的tuple的超时时间  – 超过这个时间的tuple被认为处理失败了。这个设置的默认设置是30秒,对于大多数的topology都已经足够了。关于storm的可靠性API可以看看Twitter Storm如何保证消息不丢失

5)Config.TOPOLOGY_SERIALIZATIONS: 为了在你的tuple里面使用自定义类型,你可以用这个配置注册自定义serializer。

终止一个topology

要终止一个topology, 执行:

storm kill {stormname}

其中{stormname}是提交topology给storm集群的时候指定的名字。

storm不会马上终止topology。相反,它会先终止所有的spout,让它们不再发射任何新的tuple, storm会等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才杀掉所有的工作进程。这会给topology足够的时 间来完成所有我们执行storm kill命令的时候还没完成的tuple。

更新一个运行中的topology

为了更新一个正在运行的topology, 唯一的选择是杀掉正在运行的topology然后重新提交一个新的。一个计划中的命令是实现一个storm swap命令来运行时更新topology, 并且保证前后两个topology不会同时在运行,同时保证替换所造成的“停机”时间最少。

4.2.3 监控topology

监控topology的最好的方法是使用Storm UI。Storm UI提供有关task里面发生的错误以及topology里面每个组件的吞吐量和性能方面的统计信息。同时你可以看看集群里面工作机器上面的日志。


5、一个简单的Topology

让我们来看一个简单的topology的例子, 我们看一下storm-starter里面的ExclamationTopology:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new TestWordSpout(), 10);
builder.setBolt(2, new ExclamationBolt(), 3)
        .shuffleGrouping(1);
builder.setBolt(3, new ExclamationBolt(), 2)
        .shuffleGrouping(2);

这个Topology包含一个Spout和两个Bolt。Spout发射单词, 每个bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。

我们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收我们指定的一个id, 一个包含处理逻辑的对象(spout或者bolt), 以及你所需要的并行度。

这个包含处理的对象如果是spout那么要实现IRichSpout的接口, 如果是bolt,那么就要实现IRichBolt接口.
最后一个指定并行度的参数是可选的。它表示集群里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。

setBolt方法返回一个InputDeclarer对象, 这个对象是用来定义Bolt的输入。 这里第一个Bolt声明它要读取spout所发射的所有的tuple — 使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。shuffle grouping表示所有的tuple会被随机的分发给bolt的所有task。给task分发tuple的策略有很多种,后面会介绍。

如果你想第二个bolt读取spout和第一个bolt所发射的所有的tuple, 那么你应该这样定义第二个bolt:

builder.setBolt(3, new ExclamationBolt(), 5)
            .shuffleGrouping(1)
            .shuffleGrouping(2);
让我们深入地看一下这个topology里面的spout和bolt是怎么实现的。Spout负责发射新的tuple到这个topology里面 来。TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike",
                     "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}
可以看到,实现很简单。
ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;
 
    public void prepare(Map conf, TopologyContext context,
                        OutputCollector collector) {
        _collector = collector;
    }
 
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}


repare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法 使用。

execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源). ExclamationBolt获取tuple的第一个字段,加上”!!!”之后再发射出去。如果一个bolt有多个输入源,你可以通过调用 Tuple#getSourceComponent方法来知道它是来自哪个输入源的。

execute方法里面还有其它一些事情值得一提: 输入tuple被作为emit方法的第一个参数,并且输入tuple在最后一行被ack。这些呢都是Storm可靠性API的一部分,后面会解释。

cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏

6、Trident简介

Trident是基于Storm基本组件(spout/bolt)上提供的一种高层抽象。它提供了高吞吐、有状态的流处理、低延迟查询等多种功能。Trident同Pig、Cascading一样,提供了joins、aggregations、groupings、functions、filters等多种高层抽象操作,也为有状态的、增量流处理等提供基本操作。同时实现了exactly-once语义,既保证吞吐有保证可靠性。

事务Pipeline

默认情况下,Trident一次处理一批消息。这批消息被封装在一个事务(Transactions)中;每个事务都有一个全局唯一的id。当这个事务处理完成后,才开始下一个事务的处理过程。为了增加吞吐,可以对事务进行Pipeline(同Redis的Pipeline概念)。允许配置在拓扑中有多个事务同时处理。这个属性是“topology.max.spout.pending”。

在事务的处理过程中,可以使乱序的,即多个事务可以交叉处理,但是Trident会保证事务状态提交是严格有序的。例如,当前拓扑有1-10号事务,这些事务内的tuple可以同时被处理,但是事务2的更新必须等待事务1更新完成之后。

Trident提供了下面的语义来实现有且只有一次被处理的目标。

  1. Tuples 是被分成小的集合被批量处理的
  2. 每一批tuples被给定一个唯一ID作为事务ID (txid). 当这一批tuple被重播时, txid不变.
  3. 批与批之间的状态更新时严格顺序的。比如说第三批tuple的状态的更新必须要等到第二批tuple的状态更新成功之后才可以进行.

事务Spout

根据容错等级,可以分为三种类型spout: 非事务的,事务的,以及不透明事务的spout。对应的,也有3种容错的State:非事务的,事务的,以及不透明事务的状态。

记住,Trident是以小批量(batch)的形式在处理tuple,并且每一批都会分配一个唯一的transaction id。 不同的spout会根据他们可以给予不同的批量tuple的guarantee的能力有不同的属性。一个transactional spout会有如下这些属性:

  1. 有着同样txid的batch一定是一样的。当重播一个txid对应的batch时,一定会重播和之前对应txid的batch中同样的tuples。

  2. 各个batch之间是没有交集的。每个tuple只能属于一个batch

  3. 每一个tuple都属于一个batch,无一例外

这是一类非常容易理解的spout, tuple 流被划分为固定的batch并且永不改变。

为什么我们不总是使用transactional spout?这很容易理解。一个原因是并不是所有的地方都需要容错的。举例来说,TransactionalTridentKafkaSpout 工作的方式是给定一个txid的batch所包含的一个属于一个topic的来自于所有Kafka partition的tuple序列。一旦这个batch被发出,在任何时候如果这个batch被重新发出时,它必须包含原来所有的tuple以满足 transactional spout的语义。现在我们假定一个batch被TransactionalTridentKafkaSpout 所发出,这个batch没有被成功处理,并且同时kafka的一个节点也down掉了。你就无法像之前一样重播一个完全一样的batch(因为kakfa 的节点down掉,该topic的一部分partition可能会无法使用),整个处理会被中断。

也就是说事务性的Spout,对于可能不能多次拉取到同样消息的消息源没有容错能力,可能导致整个拓扑block住。

这也就是OpaqueTransactionalSpouts存在的原因- 它对于丢失源节点这种情况是容错的,仍然能够帮你达到有且只有一次处理的语义。

一个opaque transactional spout有如下属性:

  1. 每个tuple只在一个batch中被成功处理。然而,一个tuple在一个batch中被处理失败后,有可能会在另外的一个batch中被成功处理

OpaqueTridentKafkaSpout 是一个拥有这种属性的spout,并且它是容错的,即使Kafak的节点丢失。当OpaqueTridentKafkaSpout 发送一个batch的时候, 它会从上个batch成功结束发送的位置开始发送一个tuple序列。这就确保了永远没有任何一个tuple会被跳过或者被放在多个batch中被多次成功处理的情况.

使用opaque transactional spout,再使用和transactional spout相同的处理方式:判断数据库中存放的txid和当前txid去做对比已经不好用了。这是因为在state的更新过程之间,batch可能已经变了。

事务State

事务性的State对事务性的Spout是幂等的。

透明事务的State对事务性Spout和透明事务Spout更新是幂等的。

persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的(如果没有Group过,必须实现SnapshottableMap接口)。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:

public interface MapState<T> extends State { 
 
    List<T> multiGet(List<List<Object>> keys); 
 
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); 
 
    void multiPut(List<List<Object>> keys, List<T> vals); 
 
}
当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现 Snapshottable接口:

public interface Snapshottable<T> extends State { 
    T get();  
 
    T update(ValueUpdater updater);  
 
    void set(T o); 
}

MemoryMapState 和 MemcachedState 都实现了上面的2个接口。

在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:


public interface IBackingMap<T> { 
 
    List<T> multiGet(List<List<Object>> keys);  
 
    void multiPut(List<List<Object>> keys, List<T> vals);  
 
}

OpaqueMap's会用OpaqueValue的value来调用multiPut方法,TransactionalMap's会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。

 Trident还提供了一种CachedMap类来进行自动的LRU cache。 

另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象. 

可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。


参考:

http://storm.incubator.apache.org/documentation/Trident-tutorial.html

http://storm.incubator.apache.org/documentation/Trident-state.html

http://storm.incubator.apache.org/documentation/Tutorial.html

http://storm.incubator.apache.org/documentation/Documentation.html

http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/

http://blog.linezing.com/?cat=92

http://in.sdo.com/?p=542

相关文章
相关标签/搜索