Kafka介绍

Kafka初窥

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理

消息系统特点

解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。 消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。 这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

可恢复性

当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。

送达保证

消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。

顺序保证

在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。

缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行–写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。

理解数据流

在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息队列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

异步通信

很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

常用消息系统一览

名称 一览
RabbitMQ RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
ZeroMQ ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
ActiveMQ ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
Kafka Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

为什么选择Kafka

  • Apache子项目,轻量级的消息系统,版本稳定并持续更新
  • 快速持久化,可以在O(1)的系统开销下进行消息持久化
  • 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率
  • 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡
  • 支持离线数据处理和实时数据处理

Kafka使用场景

  • 消息系统: 应用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。
  • 监控:主机通过Kafka发送与系统和应用程序健康相关的指标,然后这些信息会被收集和处理从而创建监控仪表盘并发送警告。除此之外,LinkedIn还利用Apache Samza实现了一个能够实时处理事件的富调用图分析系统。
  • 分析: 为了更好地理解用户行为,改善用户体验,LinkedIn公司将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并通过Hadoop进行分析、生成日常报告。
  • 作为分布式应用程序或平台的构件(日志):大数据仓库解决方案Pinot等产品将Kafka作为核心构件(分布式日志),分布式数据库Espresso将其作为内部副本并改变传播层。
  • 日志聚合: kafka的特性决定它非常适合作为“日志收集中心”;application可以将操作日志“批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统。

Kafka深入

基本术语

  • topic:将Kafka中的消息分类,一类消息可以称为topic,topic也可以看作消息队列
  • produce:将消息发布到Kafka对应topic中的进程
  • consumer:订阅Kafka的某个topic,并从该topic中消费消息
  • broker:Kafka由一个或多个服务器构成的分布式系统,每个服务器称作broker。一台物理机器可能有多个broker。 alt text

图从上层结构反映:producers通过网络向Kafka集群发送消息,Kafka又将消息发送给consumers。Produce和Kafka集群,以及Kafka集群和consumer之间是通过TCP协议通信的。

Topic和日志

Topic是属于同一类消息的集合,Kafka采用分区的方法维护每个topic,格式如下: 每个分区都是一个有序的,不变的消息队列。分区中的消息在本分区中都有一个唯一序列id,该序列id称为offset。新的消息直接添加到分区的末尾。 Kafka会保留设置时间范围内的所有消息记录,尽管某些记录已经被consumer消费过。每个consumer会保存其在日志中的位置信息,该位置信息称为offset,offset由consumer管 alt text

理。Consumer增加offset的值,依次读取队列中新的消息。但是,consumer控制offset,使其能够读取任意offset的消息。 Consumer这种读取消息的机制,使得consumer非常灵活,可以随时读取topic中任意位置的消息,consumer的运行和Kafka集群以及其他consumer没有影响。 Partition是物理上的分区,每个partition有若干segment组成。具体参考如下:

分布式

日志的partitions分布在Kafka集群的每个server上,每个服务器负责一部分partitions的数据处理和请求。每个partition都可以设置若干备份分布在Kafka集群的其他server上,并且每个partition都有一个server作为leader,0个或多个server作为辅助的follows。Leader处理其负责的partition所有的读写请求,follows对leader上所有的读写请求等元数据进行备份,以便leader宕机之后follow充当leader的角色。扮演leader的服务器处理部分partitions,其他的follows处理其他的partitions,这样Kafka集群就实现了负载均衡的功能。

Producers

Produce将消息发布到它选择的topic中,并负责将消息分发到其对应topic的partition中。

Consumers

传统消息有排队和发布-订阅两种模型。在排队模型中,consumer池中的consumer从一个server读取消息,每条消息只能被一个consumer取到。在分发-订阅模型中,每条消息被广播到所有的consumers。Kafka用consumer group整合了这两种模型的特点。 每个consumer都属于一个consumer group。Kafka集群和consumer group之间的消息分发采用了发布-订阅模型,consumer group中的consumer通过consumer group和Kafka集群采用了排队模型。

alt text

alt text

上图所示:每个server中的partition都广播到各个consumer group,每个consumer group中的consumer都从group中获取消息。 两种极端的情况: 1. 若所有的consumer都属于同一个consumer group,每个consumer都从Kafka集群以队列模型获取消息 2. 若所有的consumer属于不同的consumer group,kafka集群相当于将消息广播到每个consumer Kafka通过将topic分成partitions的方式维持并行性,并通过consumer线程池确保消息的有序性和负载均衡。具体做法为:将topic分成若干partitions,在consumer group中创建若干consumers,每个partition对应一个consumer,这样每个consumer都有序的读取其对应partition上的消息。从整个topic来看,若干consumers并行读取topic中的消息。

High-Level 消费者设计原则

  1. 如果线程数目大于partition数目,有些线程将没法取到topic中的消息

安装和使用

Centos安装Zookeeper

  • 解压zookeeper包
    ##以版本号zookeeper-3.4.8.tar.gz为例,将zookeeper-3.4.8.tar.gz解压到/home/kafka目录下

    sudo tar xvf zookeeper-3.4.8.tar.gz  -C    /home/kafka
    mv zookeeper-3.4.8 zookeeper
  • 配置/etc/profile文件

    sudo vi /etc/profile
    export ZOOKEEPER_HOME=/home/kafka/zookeeper
    export PATH=$ZOOKEEPER_HOME/bin:$PATH
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$ZOOKEEPER_HOME/libs
    export PATH
    source /etc/profile   ##使得配置文件生效
  • 配置zookeeper

    mkdir  -p /home/kafka/zookeeper/data
    cd   /home/kafka/zookeeper/bin
    cp zoo.sample.cfg  zoo.cfg
    vi zoo.cfg
    dataDir=/home/kafka/zookeeper/data ##修改 
  • 启动和关闭zookeeper

    bin/zkServer.sh start ##启动zookeeper bin/zkServer.sh stop ##关闭zookeeper

Centos安装Kafka

  • 解压和安装kafka
    ##以版本号kafka_2.11-0.9.0.1.tgz为例,将kafka_2.11-0.9.0.1.tgz解压到/home/kafka目录下

    sudo tar xvf kafka_2.11-0.9.0.1.tgz  -C    /home/kafka
    mv kafka_2.11-0.9.0.1.tgz kafka
  • 配置kafka

    cd ~/kafka/config
    vi server.properties ##打开server.properties配置文件
    log.dirs=/tmp/kafka-logs ##修改log路径
    zookeeper.connect=localhost:2181 ##配置zookeeper
  • 使用kafka

    cd ~/kafka/bin 
    bin/kafka-server-start.sh config/server.properties & ##启动kafka bin/kafka-server-stop.sh config/server.properties ##关闭kafka bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test & ##创建topic bin/kafka-console-producer.sh --broker-list localhost:9092 --sync --topic test ##创建生产者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test ##创建消费者

Java代码:提交消息到Kafka

  • 配置kafka-provider.properties文件

    metadata.broker.list=10.21.10.21:9092
    serializer=kafka.serializer.StringEncoder
    key.serializer=kafka.serializer.StringEncoder
    request.required.acks=1
    topic=test
  • Java代码

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.cib.fintech.ifp.message.api.exception.bz.SmsBZException;
import com.cib.fintech.ifp.message.api.exception.rt.SmsCommitKafkaRTException;
import com.cib.fintech.ifp.message.api.exception.rt.SmsRTException;
import com.cib.fintech.ifp.message.api.model.SMS;
import com.cib.fintech.ifp.message.api.model.enums.SmsStatus;
import com.cib.fintech.ifp.message.api.model.response.BatchSmsResponseItem;
import com.cib.fintech.ifp.message.api.model.response.SendBatchSmsResponse;
import com.cib.fintech.ifp.message.api.model.response.SendSingleSmsResponse;
import com.cib.fintech.ifp.message.smsprovider.gateway.SmsGateway;
import com.cib.fintech.ifp.message.smsprovider.model.KafkaProperties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class KProducer {
    private final Producer<String, String> producer;
    public String singleTopic = "single-topic";
    public String batchTopic = "batch-topic";
    Properties props = new Properties();
    @Autowired
    public KProducer(KafkaProperties kafkaProperties) {

        //此处配置的是kafka的端口
        props.put("metadata.broker.list", kafkaProperties.getMetadata_broker_list());
        //配置value的序列化类
        props.put("serializer.class", kafkaProperties.getSerializer());
        //配置key的序列化类
        props.put("key.serializer.class", kafkaProperties.getKey_serializer());
        props.put("request.required.acks", kafkaProperties.getRequest_required_acks());
        singleTopic = kafkaProperties.getTopic();
        //初始化producer
        producer = new Producer<String, String>(new ProducerConfig(props));

    }

    public void send(String message) {
        producer.send(new KeyedMessage<String, String>(singleTopic, message));
    }
 }

Java代码:从Kafka中获取消息

  • 配置kafka-consumer.properties文件
zookeeper.connect=10.21.10.21:2181
group.id=jd-group
zookeeper.session.timeout.ms=40000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.StringEncoder
topic=test
  • Java代码
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
import com.cib.fintech.ifp.message.api.exception.bz.SmsBZException;
import com.cib.fintech.ifp.message.api.exception.rt.SmsRTException;
import com.cib.fintech.message.consumer.model.KafkaConsumerProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public class KConsumer {
    private final ConsumerConnector consumer;
    private String topic = "test";
    String name="";
    @Autowired
    public KConsumer(KafkaConsumerProperties kCProperties) {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", kCProperties.getZookeeper_connect());
        //group 代表一个消费组
        props.put("group.id", kCProperties.getGroup_id());
        //zk连接超时
        props.put("zookeeper.session.timeout.ms", kCProperties.getZookeeper_session_timeout_ms());
        props.put("zookeeper.sync.time.ms", kCProperties.getZookeeper_sync_time_ms());
        props.put("auto.commit.interval.ms", kCProperties.getAuto_commit_interval_ms());
        props.put("auto.offset.reset", kCProperties.getAuto_offset_reset());
        //序列化类
        props.put("serializer.class", kCProperties.getSerializer_class());

        topic = kCProperties.getTopic();
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    public void setName(String name) {
        this.name = name;
    }

    public void consumeByte() throws SmsRTException, SmsBZException {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<String, String> it = stream.iterator();

        while (it.hasNext()) {

            String str = new String(it.next().message());
            System.out.println(str);

        }
    }
}
相关文章

相关标签/搜索