Kafka 版本

kafka_2.11-1.0.0.tar.gz

  • 2.11 : 表示的是 Scala 语言版本
  • 1.0.0 : 表示的是 Kafka 的版本

下载安装及使用

Download

cd ~/Downloads
tar -xvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

启动

要先安装好 Java 环境. 至少 JDK 1.7 及以上版本

zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

后台运行的话:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &

成功后可看到

binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

zookeeper 监听了 2181 端口

kafka

bin/kafka-server-start.sh config/server.properties

后台运行的话
bin/kafka-server-start.sh -daemon config/server.properties
或
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
其实在脚本中 `-daemon` 参数效果等同于 `nohup xxx`

成功后可看到

INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

默认情况下 kafka 监听的是 9092 端口

关闭

bin/kafka-server-stop.sh

注意, 这会搜索当前机器中所有 Kafka broker 进程, 然后关闭它们.

简单使用测试

创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".


查看 topic 状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
	输入完后, 就可以不断输入消息了. 每一行一条消息. 最后按 `Ctrl-C` 退出


消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
....这里会输出上面产生的消息内容了....

Kafka 设计

高吞吐, 低延时设计目标

  • 大量使用OS页缓存, 内存操作速度快且命中率高
  • Kafka 不直接参与物理 I/O 操作, 而是交由最擅长此事的OS来完成
  • 采用追加方式写入, 摒弃了缓慢的磁盘随机读/写操作
  • 使用 sendfile 为代表的 zero-copy 技术加强网络间的数据传输效率

消息持久化

所有的数据会立即被写入文件系统的持久化日志中, 之后 Kafka 服务器才会返回结果给客户端

负载均衡和故障转移

负载均衡

默认情况下 Kafka 每台服务都有均等机会为 Kafka 客户端提供服务

故障转移

通过会话机制. 在 Kafka 启动后以会话形式注册到 Zookeeper 上, 如果会话超时, Kafka 集群会选举出另一台服务器来完全代替这台服务器继续提供服务

伸缩性

通过 Zookeeper 保存轻量级状态来扩展 Kafka 集群

总结

  • 生产者 => Kafka 服务器 ( Broker )
  • Kafka 服务器 ( Broker ) => 消费者
  • Kafka 服务器 ( Broker ) 依托 Zookeeper 集群进行服务协调管理

概念

消息

消息格式(图片来源于书)

image-20181228142903374

普通用度一般主要关注

  • Key : 消息键
  • Value : 消息体
  • Timestamp: 消息发送时间戳. ( 于用流式处理及其依赖时间的处理说语义). 如果不指定则是当前时间
  • 属性字段: 目前只使用最低的3 bit用于保存消息的压缩类型, 其余5 bit 尙未使用.
    • 0: 无压缩
    • 1: gzip
    • 2: snappy
    • 3: lz4

Kafka 避开繁重的 Java 堆中内存分配, 直接使用紧凑二进制字节数组 ByteBuffer 而不是独立的对象, 减少内存占用. 通常运行 Java 的OS默认都开启了页缓存机制, 也就是说堆上保存的对象很可能在页缓存中还保留了一份, 这造成极大的资源浪费.

页缓存还有一个好处: 当出现 Broker 进程崩溃时, 堆内存上的数据也一并消失, 但页缓存的数据依然存在. 下次 Broker 重启后可以继续提供服务, 不需要再单独”热”缓存了.

topic 和 partition

  • topic

只是一个逻辑概念, 代表一类消息, 也可以认为是消息被发送的地方.

  • partiton

它是 topic 的分区, 并没有太多的业务含义, 它的引入单纯就是为了提升系统的吞吐量.

image-20181228144119453

offset

  • partition 中消息的 offset : 它是固定的.
  • 消费者消费该partition 中的消息时, 它自己也有个 offset 会随着消费进度不断前移.
  • 唯一定位消息 <topic, partition, offset>

image-20181228144556532

replica

分布式系统实现高可靠性, 目前主要依靠冗余机制. 对 Kakfa 就是备份多份日志. 这备份日志在 Kafka 中被称为 replica (副本). 它的唯一 目的就是防止数据丢失 !

在创建主题时 --replication-factor N 参数中的 N = leader + follower , 如果N为1, 则只有 leader

并且这个N, 不能大于当前可用的 broker 数量, 否则会报如下异常

Replication factor: 2 larger than available brokers: 1

leader replica

领导者副本. 对外提供服务

follower replica

追随者副本. 它是不能提供服务给客户端的. 它只是被动地向领导者副本 (leader replica) 获取数据, 而一旦 leader replica 所在的 broker 宕机, kafka 会从剩余的 replica 中选举出新的 leader 继续提供服务.

Kafka 保证, 同一个 partition 的多个 replica 一定不会分配在同一台 broker 中.

ISR 系统

in-sync-replica

即与 leader replica 保持同步的 replica 集合.这是 Kafka 动态维护一个 replica 的集合, 只有在这个集合中的 replica 才能被选举为 leader , 也只有在该集合中所有 replica 都接收到了同一条消息, Kafka 才会将该消息标识为 已提交 状态(即认为这条消息发送成功).

Kafka 承诺, 只要这集合中至少存在一个 replica, 那 已提交 的消息就不会丢失.

Kafka 适用场景

  • 消息传输
  • 网站行为日志追踪
  • 审计数据收集
  • 日志收集
  • Event Souring
  • 流式处理

Kafka 与 Confluent

confluent 分开源版本与企业版本. 开源版本跟 Kafka 并无太大区别

confluent.io

线上环境部署

OS

推荐 Linux. 新clients在底层时采用了 Java 的 Selector 机制, 它在 Linux 上的实现就是 Epoll. 但在 Windows 上, NIO 的 Selector 使用的是 Select 而非 IOCP (真正的异步I/O模型, 性能比较高). Java NIO2 才是使用 IOCP 实现的.因此, 用 Linux 部署 Kafka , I/O 上更高效

Zero-Copy 在 Linux 上支持良好. 如 sendfile, mmap

Java 8u60 版本Windows平台的, 才正式支持FileChannel.transferTo 调用类似Linux 的 sendfile 技术

磁盘

机械磁盘也够. 当然SSD更好

容量

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 副本数
  • 是否启用压缩

内存

Kafka 只是将消息写入 page cache, 之后由OS刷到磁盘的.

consumer 读取时, 也会优先从该区域中查询, 如果直接命中则完全不用执行耗时的物理I/O操作

  • 尽量分配更多的内存给OS的 page cache
  • 不要为 broker 设置过大的堆内存, 最好不超过6GB
  • page cache 大小至少要大于一个日志段的大小

CPU

追求多核而非高时钟频率.

使用 0.10.0.0 之前的版本或 clients 与 broker 端消息版本不一致, 则要防止消息解压操作消耗过多的CPU

带宽

  • 尽量使用高速网络
  • 避免使用跨机房网络

多节点环境安装

由一套多节点 Zookeeper 和一套多节点 Kafka 集群组成

image-20181228155156090

zookeeper 集群

一个 zookeeper 集群通常称为一个 ensemble , 最好使用 奇数 个服务器, 即 2n+1, 这样 zookeeper 最多可容忍 n 台服务器宕机而保证依然提供服务.

zoo.cfg

tickTime=2000
dataDir=/usr/zookeeper/data_dir
clientPort=2181
initLimit=5
syncLimit=2
server.1=主机名或IP:2888:3888
server.2=主机名或IP:2888:3888
server.3=主机名或IP:2888:3888
  • tickTime: 小最时间单位, 用于丈量心跳时间和超时时间. 单位 ms
  • dataDir: 保存内存快照的数据目录
  • clientPort: 监听客户端的连接端口
  • initLimit: follower 初始时连接 leader 的最大 tick 次数. 即 syncLimit * tickTime 内要连上 leader , 否则将被视为超时
  • syncLimit: follower 与 leader 进步同步的最大时间. 类似 initLimit, 以 tickTime 为单位来指定
  • server.X=host:port1:port2: X必须是全局唯一的数字, 且需要与 myid 文件中的数字相对应. port1 用于 follower 节点连接 leader 节点. port2 用于 leader 选举

myid

这个要保存在 zoo.cfg 文件中的 dataDir 中, 内容就是上面指定server.X的一个数字X 即可.

多节点配置时配置文件的差别

  • clientPort 这个如果在同一台机器上, 则需要不同的端口. 如果是不同的机器上, 则可以为相同端口(只要确保没有被占用即可)
  • dataDir : 集群中的每个节点的数据目录也要不同.
  • server.X=主机名或IP:2888:3888 这些要修改为相应节点的不同的配置. 并确保这些端口没有被其他程序占用

启动集群

bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg

查看集群状态

bin/zkServer.sh status conf/zoo1.cfg
bin/zkServer.sh status conf/zoo2.cfg
bin/zkServer.sh status conf/zoo3.cfg

Kafka 集群

创建多份配置. 每份配置的主要不同内容为(假设为 config/server1.properties, config/server2.properties, config/server3.properties )

  • broker.id: 0, 1, 2…等必须是全局唯一的
  • listeners = listener_name://host_name:port : port 指定为不同的端口
  • log.dirs日志数据文件目录. 最好修改为其他的而不是 /tmp
  • zookeeper.connect : 连接 zookeeper 集群. 要同时指定 zookeeper 集群中的所有 zookper 节点. 例如 zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

启动

bin/kafka-server-start.sh -damon config/server1.properties
bin/kafka-server-start.sh -damon config/server2.properties
bin/kafka-server-start.sh -damon config/server3.properties

验证

创建主题
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --create --topic test-topic --partitions 3 --replication-factor 3

描述主题
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --describe --topic test-topic

删除主题
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --delete --topic test-topic

列出主题
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --list

发送消息
bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka:9093,kafka:9094 --topic test-topic
然后输入内容, 最后 Ctrl-C 退出

消费消息
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka:9093,kafka:9094 --topic test-topic --from-beginning

生产者吞吐测试
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 500000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka:9093,kafka:9094 acks=-1

消费者吞吐测试
bin/kafka-consumer-perf-test.sh --broker-list kafka1:9092,kafka:9093,kafka:9094  --fetch-size 2000 --messages 500000 --topic test-topic


参数设置

broker

config/server.properties

要生效的话, 要重启该 broker.

  • broker.id : 唯一的整数标识 broker. 如果不指定, kafka 会自动生成一个唯一值.
  • log.dirs : 持久化消息的目录. 可以有多个目录, 以逗号分隔. 如 /home/kafka1,/home/kafka2. 如果有N块磁盘, 则填写N个目录目录, 则可以极大提升同时写操作.
  • zookeeper.connect : 连接 zookeeper 集群. 如果要管理多套kafka 集群, 则要指定 zookeeper 的 chroot. 比如 zk1:8181/kafka. 默认使用 zookeeper 的根路径.
  • listeners : broker 监听器的CSV列表. 格式为 协议://主机名:port,协议://主机名:port,协议://主机名:port . 该参数主要用于客户端连接 broker 使用, 可以认为是 broker 开放给 clients 的监听端口. 如果不指定主机, 则表示绑定默认网卡; 如果主机名为 0.0.0.0 则表示绑定所有网卡.
  • advertised.listeners : 类似 listeners, 但主要用于 IaaS 环境. 通常用来绑定公网IP提供外部 clients 使用. 然后上面的 listeners 绑定私网IP供 broker 间通信使用.
  • unclean.leader.election.enable : 是否开启 unclean leader 选举. 默认为 false, 如果允许, 虽然 Kafka 继续提供服务, 但会造成消息数据的丢失. 对于可以容忍数据丢失的, 可以选为 true.
  • delete.topic.enable : 是否允许 kafka 删除 topic.
  • log.retention.{hours|minutes.ms} : 消息数据留存时间. 优先级为 ms > minutes > hours. (新版本的Kafka会根据消息的时间戳来进行判断, 旧的版本没有时间戳格式, 则根据日志文件的最近修改时间来判断)
  • log.retention.bytes: 控制每个消息日志保存多大的数据. 默认为 -1, 即不根据大小来删除日志.
  • min.insync.replicas : 与 produceracks 配合使用. acks=-1, 表示最高级别的持久化保证. min.insync.replicas 也只有 acks=-1 时才有意义. 指定 broker 成功响应 clients 消息发送的最少副本数. 假如无法满足的话, 消息会被认为发送不成功. min.insync.replicas=-1 表示要将将消息写入所有副本中才算成功.
  • num.network.threads : 控制 broker 在后台用于处理网络请求的线程数.(其实只是负责转发请求).
  • num.io.threads : 实际处理网络请求的线程数.
  • message.max.bytes : 能够接收的最大消息大小. 默认为 977KB.

topic

  • delete.retention.ms : 每个 topic 可以设置自己的日志留存时间以覆盖全局默认值
  • max.message.bytes : 覆盖全局的 message.max.bytes
  • retension.bytes: 覆盖全局的 log.retension.bytes

GC

作者在书的给出的参数(JDK 8)

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50  -XX:MaxMetaspaceFreeRatio=80

OS

  • ulimit -n 100000
  • Socket 缓存区大小
  • Ext4或XFS文件系统
  • 关闭 Swap sysctl vm.swappiness=一个较小的数值 降低对 swap 空间的使用
  • 设置更长的 flush 时间
    • /proc/sys/vm/dirty_writeback_centisecs单位为 百分秒. 默认值为 500, 即5秒.
    • /proc/sys/vm/dirty_expire_centisecs 百分秒默认为3000, 即30秒

Producer 开发

image-20181228181629014

步骤

  • 构造一个 java.util.Properties 对象. 至少指定 bootstrap.servers, key.serializervalue.serializer
  • 通过 Properties 对象构造 KafkaProducer 对象
  • 构造 ProducerRecord , 指定要发送到的 topic, partition 以及对应的 key, value. (partition 和 value 可以不指定而由 Kafka 自行确定)
  • 调用 KafkaProducer.send 方法
  • 关闭 KafkaProducer

可重试异常

org.apache.kafka.common.errors.RetriableException 所有继承这个类的都是可重试异常, 否则一般是不可重试异常.

image-20181228182541150

常见配置

配置文档 producer config

  • acks

    • 0 : 表示完全不理 leader broker 的处理结果, 然后可立即发送下一条消息 (通常这种情况下吞吐量是最高)
    • all 或 -1 : 等待 leader broker 和所有 ISR 中所有副本都成功写入才将响应结果发送给 producer .(通常这种情况下, 吞吐量是最低的)
    • 1 : 默认值. 仅 leader broker 成功写入就响应给 producer, 而无须等待 ISR 中其他副本是否成功写入.(吞吐介于 0 和 -1 之间)

直接在 properties 对象中增加 properties.put(ProducerConfig.ACKS_CONFIG, "1"); 即可. 注意, value是 String 类型

  • buffer.memory

缓存消息的缓冲区大小, 单位是字节. 默认为 33554432, 即 32MB.

producer => buffer.memory => io thread => broker

  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • compression.type

    • lz4
    • none
    • gzip
    • snappy

是否压缩, 默认为 none. 开启压缩会额外CPU消耗.

!!!!!

如果 producer 与 broker 的压缩设置不同, 则broker 会在写入消息之前对相应的消息进行解压 => 重压缩 操作.

  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
  • retries

表示重试次数.

  • 注意可能导致消息重复发送. 这要求在 consumer 执行去重

  • 消息乱序. (可通过 max.in.flight.requests.per.connection 设置为1, 这会确保同一时刻只能发送一次请求)

    properties.put(ProducerConfig.RETRIES_CONFIG, 100);
    
  • retry.backoff.ms

重试间隔停顿时间, 默认为 100ms

  • batch.size

非常重要的参数之一. producer 会将发送到同一分区的多条消息封装进一个 batch 中. 当 batch 满了或超过时间 linger.ms 时就会发送给 broker.

默认为 16384, 即 16KB.

  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);
  • linger.ms

默认为0, 表示立即发送, 而无须关心 batch 是否被填满.

  properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);
  • request.timeout.ms

指定 broker 返回给 producer 的超时时间. 默认为 30 秒.

  properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);

消息分区机制

默认分区机制

默认的分区器, 尽力确保具有相同的 key 的所有消息, 被发送到相同的分区上.

如果没有指定 key, 则选择轮询方式来确保消息在 topic 的所有分区中均匀分配. (但旧版的 producer 不是这样子实现)

默认情况下的分区器对有 key 的消息, 会根据 mermur2算法计算 hash, 然后对总分区数求模得到消息要被发送到的目标分区号.

自定义分区机制

  • 创建一个类, 实现 org.apache.kafka.clients.producer.Partitioner 接口.
  • 在 Properties 中设置 partitioner.class 参数.(完全限定类名)
package hello;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class AppPartitioner implements Partitioner {
    @Override
    public int partition(final String topic, final Object key, final byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster cluster) {
        //分区策略的实现
        return 0;
    }

    @Override
    public void close() {
        //释放资源
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        //初始化资源
    }
}

然后配置使用

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "hello.AppPartitioner");

序列化消息

默认

image-20181228191315996

自定义

package hello;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class AppSerializer implements Serializer {
    private ObjectMapper objectMapper;
    @Override
    public void configure(final Map configs, final boolean isKey) {
        objectMapper = new ObjectMapper();
    }

    @Override
    public byte[] serialize(final String topic, final Object data) {
        try {
            return objectMapper.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            //log.error("erro", e);
            return null;
        }
    }

    @Override
    public void close() {
        objectMapper = null;
    }
}

使用

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "hello.AppSerializer");

producer 拦截器

个人感觉类似 Web MVC 中的 interceptor.

实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口

使用

final List<String> interceptorList = new ArrayList<>();
interceptorList.add("hello.AppInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);

无消息丢失配置

  • block.on.buffer.full=true

0.9.0.0 版本标记为 deprecated, 并使用 max.block.ms 参数替代.

  • acks=all或-1

  • retries=Integer.Max_VALUE

  • max.in.flight.reqeusts.per.connection=1

  • 使用带回调的 send 方法

  • callback 逻辑中, 显式地立即关闭 producer. 使用 close(0)

防乱序.

下面的是 broker 设置

  • unclean.leader.election.enable=false
  • replication.factor=3
  • min.insync.replicas=2
  • replicatioin.factor > min.insync.replicas
  • enable.auto.commit=false

多线程使用 KafkaProducer 是线程安全的

KafkaProducer 是线程安全的.

  • 多线程单实例 KafkaProducer
  • 多线程多实例 KafkaProducer

image-20181228192951371

旧版本 producer

也称为 Scala 版本 producer . 有很大的不同. 旧版本默认是同步发送, 新版本是异步发送.

不推荐使用旧版本了, 建议用新的 produder.

引入 kafka-core 依赖就可以使用旧版 producer

引入 kafka-clients 依赖就可以使用新版 producer

旧版本与 zookeeper 通信来发送数据

新版本直接与 kafka 集群来发送数据

Consumer 开发

consumer group

由多个消费者实例构成一个整体进行消费

消费者使用一个消费者组名(即 group.id ) 来标记自己, topic 的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上.

Kafka 通过 consumer group 来实现 基于队列和基于发布/订阅的两种消息模型.

  • 所有 consumer 都属于相同 group : 实现基于队列的模型. 每条消息只会被一个 consumer 实例处理
  • consumer 实例都属于不同 group : 实现基于发布/订阅的模型. 极端情况下每个 consumer 设置完全不同的 group, 这样消息就会被广播到所有 consumer 实例上

rebalance

这个概念只对 consumer group 有.

consumer group 中多个 consumer 可以同时读取 Kafka 消息, 一旦某个 consumer 挂了, consumer group 会立即将已崩溃的 consumer 负责的分区转交给其他 consumer 来负责. 这个过程称为 rebalance

这是个协议, 规定了一个 consumer group 下所有的 consumer 如何达成一致来分配订阅 topic 的所有分区.

触发条件

  • 组成员发生变更
  • 组订阅的 topic 数发生变更
  • 组订阅的 topic 的分区数发生变更

分配策略

通过参数 partition.asignment.strategy 来设置

  • range : 范围. 也是默认策略
  • round-robin : 轮询
  • sticky : 考虑历史数据 (0.11.0.0 版本才引入)
  • 自定义分配器

总结

  • consumer group 下可有一个或多个 consumer 实例
  • group.id 唯一标识一个 consumer group
  • 对某个 group 而言, 订阅 topic 的每个分区只能分配给该 group 下的一个 consumer 实例(当然, 该分区还可以分配给其他订阅该 topic 的 consumer group). 注意, 是该 topic 的每个分区, 只会分配给同一个 group 下的一个 consumer.

standalone consumer

单独执行消费操作

offset

每个 consumer 实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息, 这就是consumer 的 offset

image-20181229104828502

offset commit

consumer 需要定期向 Kafka 集群汇报自己消费数据的进度, 这一过程就是 offset commit

  • 旧版本 consumer 依赖 zookeeper 来保存 offset (当然旧版本也提供了一个参数 offsets.storage=kafka来将 offset 提交到 kafka (__consumer__offsets 这个 topic), 但默认是 zookeeper.)
  • 新版本 consumer 提交到 Kafka 内部的一个 topic (__consumer__offstes) 上, 所以不用再依赖 zookeeper

__consumer__offsets

这个是内部的 topic, 则 kafka 自己创建, 用来保存 consumer 或 consumer group 的 offset commit. 千万不要随时删除个 topic 的数据.

构建 consumer

package hello;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class AppConsumer {
    public static void main(String[] args) {
        final String topic = "test-topic";
        final String groupId = "test-group";
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("offset = " + record.offset());
                    System.out.println("key = " + record.key());
                    System.out.println("value = " + record.value());
                }
            }
        } finally {
            consumer.close();
        }

    }
}
  • 构建 Properties 对象, 至少要指定
    • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
    • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
    • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
    • ConsumerConfig.GROUP_ID_CONFIG
  • 通过 Properties 对象构造 KafkaConsumer
  • 调用 KafkaConsumer.subscribe 订阅 consumer group 感兴趣的 topic
  • 循环调用 KafkaConsumer.poll 来获取 topic 的消息
  • 处理 ConsumerRecord 对象
  • 关闭 KafkaConsumer

订阅主题

  • 订阅一个
  • 订阅多个
  • 通过正则来匹配, 如 consumer.subscribe(Pattern.compile("log.app.*"));

注意 subscribe 方法并不是增量式的, 这意味着, 最后设置的订阅主题, 会覆盖前面的.

Consumer 主要参数

consumer config

  • session.timeout.ms
    • consumer group 检测组内成员发送崩溃的时间.
    • consumer 消息处理逻辑的最大时间 – 两次 poll 之间的间隔超过了该参数指定的阈值. 这时 coordinator 会认为这个 consumer 追不上组内其他成员消费进度, 这时会将该 consumer 踢出该 consumer group, 该 consumer 负责的分区会被分配给其他的 consumer 来处理. 被踢出的 consumer 会无法 offset commit, 导致 rebalance 后, 会重新消费一次消息.
    • 0.10.1.0 版本及以后版本, 则只有一种含义: coordinator 检测失败的时间. 因此在这些版本中, 设置一个比较小的值来快速检测失败. 默认为 10 秒.
  • max.poll.interval.ms : 从上面 session.timeout.msconsumer 处理逻辑最大时间 的含义剥离出来的.
  • auto.offset.reset : 无位移信息(如刚开始时)位移越界 时Kafka 的对应策略.
    • earliest : 从最早开始(不一定是0, 比如最近一次最后提交的位置开始 )
    • latest : 从最新处开始
    • none : 抛出异常
  • enable.auto.commit : 是否自动提交 offset. (类似 RabbitMQ 的 autoAck)
  • fetch.max.bytes : consumer 单次获取数据的最大字节数.
  • max.poll.records : 单次 poll 调用返回的最大消息数. 默认 500 条
  • heartbeat.interval.ms : 当 coordinator 决定开启新一轮 rebalance 时, 它会将这个决定以 REBALANCE_IN_PROGRESS 异常的形式塞进 consumer 心跳请求中, 这样子其他成员拿到 response 后才能知道它需要重新加入 group. 这个参数就是用来做这件事的. 它要小于 session.timeout.ms heartbeat.interval.ms < session.timeout.ms
  • connections.max.idle.ms : 定期关闭空闲 socket 的时间. 默认为 9分钟. -1 的话表示不关闭空闲连接

Java Consumer 不是线程安全的

  • 要定期执行其他子任务: poll(小超时) + 运行标识布尔变量的方式
  • 不需要定期执行: poll(MAX_VALUE) + 捕获 WakeupException 方式

consumer offset

语义

  • 最多一次 at most once : 消息可能丢失, 但不会被重复处理 (消费之前提交)
  • 最少一次 at least once : 消息不会丢失, 但可能被处理多次 (消费之后提交)(producer 默认提供的就是这个语义)
  • 精确一次 exactly once : 消息一定会被处理且只会被处理一次

image-20181229114827925

consumer 最多只能读取到 水位之前的消息, 而不是读取到水位之后的消息.

水位 : offset <= 水位, 被认为是”已提交”或”已备份”的意思.

自动与手动提交 offset

默认情况下是自动提交的, 间隔是 5 秒.

要想手动提交:

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        consumer.subscribe(Pattern.compile("log.app.*"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("offset = " + record.offset());
                    System.out.println("key = " + record.key());
                    System.out.println("value = " + record.value());
                }
                consumer.commitSync();
                consumer.commitAsync();
            }
        } finally {
            consumer.close();
        }
  • 关闭自动提交
  • 调用 consumer.commitSync();consumer.commitAsync();

更细粒度的提交

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("value " + record.value());
                    }
                    final long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
                }
            }
        } finally {
            consumer.close();
        }

注意, 提交的位移一定是consumer下一条待取消息的位置, 所以上面的代码使用了 offset+1

独立 consumer

package hello;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class AppConsumer {
    public static void main(String[] args) {
        final String topic = "test-topic";
        final String groupId = "test-group";
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        final List<TopicPartition> topicPartitions = new ArrayList<>();
        final List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        for (PartitionInfo info : partitions) {
            topicPartitions.add(new TopicPartition(info.topic(), info.partition()));
        }
        consumer.assign(topicPartitions);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("value " + record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            //
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }
}

assign 固定地为 consumer 指定要消费的分区.

Broker 端设计

消息格式

V0 版本

image-20181229122002320

  • crc: 4 字节
  • magic : 1 字节, 版本号. v0为0, v1为1, v2为2
  • attribute: 1 字节
  • key长度 : 4字节. 未指定key时则为-1
  • key值 : key长度指定的大小. 如果 key 长度为-1, 则没有该字段
  • value 长度: 4 字节. 若没有, 则为-1
  • value 值: 消息value. 长度由上面指定. 如果长度值为-1, 则没有该字段.

除开 key 值value 值之外, 统称为消息头部 (message header), 共 14 字节. 也就是说, V0版本的消息, 最小是14字节.(crc + magic + attribute + key长度 + value 长度 = 4 + 1 + 1 + 4 + 4 = 14 字节)

V1 版本

主要变化就是加入了时间戳字段.

image-20181229122846351

这个版本, 消息头部占22字节.

V1 VS v0 的区别

  • 添加了时间戳
  • attribute 字段中, 第4位用于指定时间戳的类型. CREATE_TIME (表示由 producer 指定) LOG_APPEND_TIME (表示由 broker 指定)

v0, v1 日志项格式

image-20181229123516504

v2 版本

image-20181229123651639

变化

  • 增加消息总长度字段
  • 保存时间戳增量
  • 保存位移增量
  • 增加消息头部. (v0, v1 是元数据, 对用户透明. v2这个 headers 是用户可见的.)
  • 去除 crc 检验
  • 废弃 attribute 字段. 统一放在 v2 版本中的 batch 格式字段

v2 batch 格式

image-20181229124117313

集群管理

通过 zookeeper . 每个 broker 启动时, 将自己注册到 zookeeper 下的一个节点. 路径为 chroot/brokers/ids/{broker.id}

历史各版本注册信息

image-20181229130920583

image-20181229130937301

image-20181229130953569

image-20181229131012762

zookeeper 路径

image-20181229131148838

ISR 系统

0.9.0.0 版本以前

  • 可通过 replica.lag.max.messages 来控制 follower 落后于 leader 的最大消息数. 超过的话, 则该 follower 为不同步状态, 从而被踢出 ISR
  • 可通过 replica.lag.time.max.ms 如果 follower 无法在该指定时间内向 leader 请求数据, 则该 follower 被视为不同步, 从而被踢出 ISR

0.9.0.0 版本之后

去掉了 replica.lag.max.messages , 只用 replica.lag.time.max.ms , 默认为10秒. (并且是持续性地超过这个时间才会被踢出 ISR)

底层文件系统

topic 在文件系统中创建了一个对应的子目录, 名字为 <topic>-<分区号>

xxxx.log 表示 xxx 位移 的信息.下一段的 yyyy.lo 表示 前一段共有 yyyy 条数据. 第一个log 文件为 00000000000000000000.log

xxxx.indexxxxx.timeindex 分别是位移索引和时间戳索引文件

controller

某个 broker 担任的角色. 管理集群中所有分区的状态并执行相应的管理操作.

image-20181229133347368

它的职责

  • 更新集群元数据信息
  • 创建 topic
  • 删除 topic
  • 分区重分配
  • preferred leader 副本选举
  • topic 分区扩展
  • broker 加入集群
  • broker 崩溃
  • 受控关闭
  • controller leader 选举

Producer 设计

image-20181229134006938

KafkaProducer 有一个专门的I/O线程负责将缓冲池中的消息分批次地发送给对应的 broker.

步骤

  1. 序列化 + 计算目标分区
  2. 追加写入消息缓冲区 (accumulator) . 大小由 buffer.memory参数指定)
  3. Sender 线程预处理及消息发送
  4. Sender 线程处理 Response

Consumer group 状态机

image-20181229134511342

事务

0.11.0.0 版本引入

幂等性 producer

若一个操作执行多次的结果与执行一次的结果是相同的, 那么该操作就是幂等操作.

对于单个 topic 分区而言, producer 这种幂等消除了各种错误导致的重复消息. 可在 producer 中设置参数 enable.idempotencetrue . (这只能保证单个 producer 的 EOS 语义, 即精确一次且只有一次处理, 无法在多个 producer 中保证)

        KafkaProducer producer = new KafkaProducer(properties);
        producer.initTransactions();
        try {
            producer.beginTransaction();
            producer.send(record0);
            producer.send(record1);
            producer.commitTransaction();
        } catch (KafkaException e) {
            producer.abortTransaction();
        } finally {
            producer.close();
        }

JMX

export JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties

查看消息元数据

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/00000000000000000.log --print-data-log

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ../datalogs/kafka_1/00000000000000000.log --deep-iteration

也可以看索引文件的元数据

性能收集

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 100 --throughput -1  --producer-props bootstrap.servers=localhost:9092 buffer.memory=104857600 acks=-1

杂项收集

从头开始消费

consumer.seekToBeginning(topicPartitions);