Storm连接Kafka

Storm提供了操作Kafka的工具包,可以从Kafka中读取数据,也可以写数据到Kafka。本篇文章主要介绍Storm读取Kafka中的数据,Storm往Kafka中写数据,Storm操作kafka的常见问题汇总。

1. 读取数据。

通过创建Spout从Kafka连接数据。创建连接Kafka的Spout,需要Kafka的位置配置信息,连接配置信息。

1.1 Kafka位置配置-BrokerHosts

BrokerHosts是Kafka位置配置信息的接口,它有两种实现,对应以下两种连接Kafka的方式:
(1)提供Kafka集群的zookeeper地址。通过ZkHosts创建这种连接。通过public ZkHosts(String brokerZkStr, String brokerZkPath),或public ZkHosts(String brokerZkStr)创建ZKHosts。参数brokerZkStr是Kafka集群的ZK地址,格式为“ ip1:port,ip2:port:ip3:port”。参数brokerZkPath为Kafka集群在ZK上的目录,默认为Kafka的默认设置“/borkers”。
这种连接方式获取Kafka集群的信息是动态的,支持Kafka集群节点和Topic的partition的变化。通过这种方式,在Topology运行过程中,即使Kafka集群增加了节点或者对使用的Topic的partition做了修改,也不会影响Topology正常的运行。
(2)提供Kafka集群的Brokers列表。通过StaticHosts创建这种连接。这种连接方式需要提供Topic的partition和Broker的对应关系,且一旦创建就不会改变。为了使用这种连接方式,需要创建GlobalPartitionInformation的示例。使用示例如下。
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

1.2 Kafka连接配置

创建一个KafkaSpout的第二步是创建连接Kafka的配置-KafkaConfig。通过public KafkaConfig(BrokerHosts hosts, String topic),或public KafkaConfig(BrokerHosts hosts, String topic, String clientId)创建。BrokerHosts可以是1.1中提到的两种方式之一。参数topic是要读取数据的Kafka topic,参数client是可选设置,会作为消费offset在ZK上的路径的一部分。
KafkaConfig有两个继承类:SpoutConfig和TridentKafkaConfig。
创建普通的Topology需要使用SpoutConfig,而创建Trident的Spout需要使用TridentKafkaConfig。
KafkaConfig中的配置:
public int fetchSizeBytes = 1024 * 1024; //每次从Kafka中拉取的数据量
    public int socketTimeoutMs = 10000; //连接kafka的过期时长
    public int fetchMaxWait = 10000;    //每次从Kafka中拉取数据的最大等待时长
    public int bufferSizeBytes = 1024 * 1024; //KafkaConsumer的缓存大小
    public MultiScheme scheme = new RawMultiScheme();
    public boolean forceFromStart = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60; //发动消费kafka的metrics周期
SpoutConfig中还有一个配置项,public long stateUpdateIntervalMs = 2000,用于确定往ZK注册消费offset信息的周期(单位为ms)。
KafkaConfig中的scheme配置项用来设置如何将Kafka中获得的byte[]转化为另一种类型。这个工具包提供了RawMultiScheme,SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme。RawMultiScheme,直接将Kafka中获得的byte[]数组返回,outFields名字为“bytes”。SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme可以读到的值转为其他类型。

1.3 创建连接Kafka的Spout示例

Core Spout
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

2. 写入数据。

Topology可以通过KafkaBolt向Kafka写数据。在Trident中,可以通过使用TridentState,TridentStateFactory和TridentKafkaUpdater向Kafka写数据。
向Kafka写数据需要实现以下两个接口。

2.1 TupleToKafkaMapper和TridentTupleToKafkaMapper

这两个接口中包含如下两个方法:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
这个方法分别定义了从Tuple到Kafka Key和Kafka message的转化。如果明确确定Tuple中Feild和Kafka key与Kafka Message的对应关系,可以使用FieldNameBasedTupleToKafkaMapper。

2.2 KafkaTopicSelector和trident KafkaTopicSelector

这个接口只包含一个方法:java public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }。这个方法定义了一个tuple需要发送到Kafka的哪个Topic,如果返回null,则这个tuple会被忽略。如果你有一个确定的topic名字,则可以使用DefaultTopicSelector,在构造函数中指定Topic名字。

2.3 Kafka生产者的配置

Kafka生产者的属性配置,请参考 http://kafka.apache.org/documentation.html#producerconfigs

2.4 Storm向Kafka写数据的示例

Core Topology
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
 FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
             new Values("storm", "1"),
             new Values("trident", "1"),
             new Values("needs", "1"),
             new Values("javadoc", "1")
 );
 spout.setCycle(true);
 builder.setSpout("spout", spout, 5);
 KafkaBolt bolt = new KafkaBolt()
         .withTopicSelector(new DefaultTopicSelector("test"))
         .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
 builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
 Config conf = new Config();
 //set producer properties.
 Properties props = new Properties();
 props.put("metadata.broker.list", "localhost:9092");
 props.put("request.required.acks", "1");
 props.put("serializer.class", "kafka.serializer.StringEncoder");
 conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
 StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
Trident

Fields fields = new Fields("word", "count");
    FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
            new Values("storm", "1"),
            new Values("trident", "1"),
            new Values("needs", "1"),
            new Values("javadoc", "1")
    );
    spout.setCycle(true);
    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout1", spout);
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
    stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
    Config conf = new Config();
    //set producer properties.
    Properties props = new Properties();
    props.put("metadata.broker.list", "localhost:9092");
    props.put("request.required.acks", "1");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
    StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

3. 常见问题。

3.1 Spout的并行度和Kafka Topic Partition的关系

Kafka Topic的Partition会均匀的分配给每个Spout的并行度。如果Spout的并行度大于Kafka Topic的Parttion数量,则会有一些任务一直处于空闲状态。

3.2 如何在一个Topology中读取多个Topic

一个Spout只能读取一个Kafka Topic。如果需要读取多个Topic做处理的话,可以使下游的Bolt的输入为多个KafkaSpout。

3.3 Topology对Kafka Topic的消费offset的记录位置

消费的Offset信息会记录到ZK上,默认记录到Storm所使用的ZK,用户可以通过设置SpoutConfig.zkServers指定其他的ZK。在ZK上的路径为SpoutConfig.zkRoot+“/" + SpoutConfig.id。

3.4 重新提交任务对消费位置的控制

如果是第一次提交一个Topology,Kafka Spout默认从Kafka中最老的offset开始读,可以通过KafkaConfig.forceFromStart修改该值为kafka.api.OffsetRequest.LatestTime(),使其从最新的offset开始读。Storm会将Kafka的消费offset信息记录到ZK上,下次重新启动的时候默认会从上次记录到ZK上的Offset位置开始读,但也可以通过设置SpoutConfig.forceFromStart = true,使其强制从KafkaConfig.forceFromStart设置的位置开始读。

3.5 监控消费Kafka Topic的Offset信息

对Kafka Topic的消费信息是由Storm自己管理的,消费信息的存储路径和存储格式都与默认的不同,使得大多数的监控Kafka Topic消费信息的工具都失效。
相关文章
相关标签/搜索