Kafka原理
基础概念
producer: 生产者,将消息发到
topic
里的为生产者consumer: 消费者,从
topic
里读取消息的为消费者。Kafka以group
为单位组织consumer
,同一个topic
可有多个consumer group
,一条消息会发到所有consumer group
中,一个consumer group
可能会有多个consumer
,但一条消息只会被consumer group
内的一个consumer
消费,即一条消息在同一个consumer group
内只会被消费一次。Kafka有consumer rebalance机制,自动平衡消费者,但在平衡期间不会进行消费,consumer或broker的增减都会触发rebalance,Kafka也会将无法消费或消费过慢的consumer剔除后自动引发rebalance。broker: 每个Kafka实例称之为
broker
topic: Kafka以
topic
为单位对消息进行分类,相同类型的消息分到同一个topic
进行生成消费管理。一个topic
由一个或多个partition
组成。partition: Kafka中可将一个
topic
切分成一个或多个partition
,partition
是Kafka横向扩展及并行处理的关键。每个partition
都是一个顺序、不可变的消息队列。Kafka会将消息持久化到磁盘,但官方建议消息的高可用应由partition replication来保证。可为每个partition设置replicas,每个partition会选举出唯一一个leader partition,所有的读写请求都由leader处理,replicas只负责将消息同步到本地确保高可用。- Isr: In-Sync Replica,表示能与leader保持数据同步的replicas。如果replicas同步leader消息的延迟时间或延迟条数其中一个超出阀值,则会将延迟的replicas剔出Isr。当leader partition挂掉后,只有在Isr队列里的replicas才可参与leader partition的选举过程。
- replication-factor: 复制因子,topic partition的副本数目,默认
1
,即无副本。若replication-factor=N
,则允许N-1
台机器宕机以确保消息的高可用。
offset: 每个
partition
内的消息都有一个序列号,称之为offset
。offset
在partition
内是有序的,不能跨partition
使用offset
,offset
的移动表示partition
内消息的消费位置。在Kafka的设计中,一个partition
中消费offset
完全是由消费者自己控制的。假若一个消费者将offset
设置成0
,则意味着该消费者会从该partition
中最开始的消息开始消费。一个消费者的offset
操作不会影响到该partition
内的其他消费者,因为每个消费者都维护着自己的消费offset
。关于
offset
还涉及到以下4个概念1
Last Commit Offset————Current Posision————High Watermark————Log End Offset
Last Commit Offset
: consumer group最新一次commit的offset位置,Last Commit Offset
表示在此之前的所有消息都以消费完并确认。Current Posision
: 当前consumer group消费消息的offset位置。Last Commit Offset
与Current Posision
之间的消息表示已经被consumer group读取但还在处理中,没有确认(commit)。High Watermark
: 表示High Watermark
之前的数据都已被同步到所有replicas中,High Watermark
offset之前的数据是高可用的(消息有多个副本)。Log End Offset
: 表示producer发送到该partition中最新消息的offset。High Watermark
和Log End Offset
之间的数据表示消息已写入leader partition但未同步至replicas中,这之间的数据没高可用,是不安全的,不允许消费
从
0.9
版本后的Kafka开始,新版的consumer api是将offset
存放在Kafka的__consumer_offsets
topic中。该topic默认有50
个partition和3
个replicas。某个consumer group的offset具体存放在哪个partition是通过abs(GroupId.hashCode()) % NumPartitions
计算出来的(NumPartitions
默认50
)。旧版本的Kafka默认是将offset存在ZooKeeper中。ack: ack确认机制是producer发送消息给Kafka,确认消息高可用的机制,依赖于producer的
request.required.acks
设置0
: producer只管发送,不管Kafka是否接收成功1
: Kafka partition的leader写入后即返回成功,replicas异步同步消息。风险在于leader挂了,消息还没同步的话则会丢失数据。all/-1
: 旧版Kafka(0.8
)值为-1
,新版Kafka值为all
。等待所有replicas都同步消息后才返回成功,数据高可用最安全,生产消息性能会受影响。
ZeroCopy: Kafka使用ZeroCopy机制以提高性能,以从磁盘读取数据返回给消费者为例
- 传统方式(涉及4次切换):
- 进程通过
read()
系统调用从用户态切换到内核态[1],内核向磁盘发起请求,将数据从磁盘读取到内核缓存区 - 进程将数据从内核空间拷贝到用户空间,从内核态切换到用户态[2]
- 进程将数据发送到Socket,从用户态切换到内核态[3],将数据写入到Socket Buffer
- 内核将Socket Buffer中的数据拷贝到NIC Buffer中,最终发送给消费者。从内核态切换用户态[4],完成操作
- 进程通过
- ZeroCopy方式(涉及2次切换):
- 进程通过
sendfile()
从用户态切换到内核态[1],内核从磁盘读取数据到内核缓存区,然后直接将数据拷贝到Socket Buffer,再将数据从Socket Buffer拷贝到NIC Buffer - 完成操作后从内核态切换用户态[2]
- 进程通过
- 传统方式(涉及4次切换):
ZooKeeper原理
基础概念
角色: ZK中分为3中角色。ZK所有节点都可接收并处理读请求(
exists
/getData
/getChildren
…),但只有Leader角色节点才能处理写请求(create
/setData
/delete
)- Leader: 主节点,响应所有其余节点请求(包括写请求),维护集群状态
- Follower: 从Leader中同步消息和状态,接收并处理读请求,转发写请求到Leader。可参与选举
- Observer:
3.3.0
版本开始引入,仅处理读请求,提高读请求吞吐量,转发写请求到Leader。数据不持久化到磁盘,不可参与选举
ZNODE: ZK使用类似文件系统方式组织资源,资源中的每个节点称为ZNODE。ZNODE分4中类型
- 持久节点(persistent): 只能通过
delete
指令删除,数据会被持久化 - 临时节点(ephemeral): 创建该ZNODE的客户端断开连接后,临时节点会被自动删除
- 有序节点: 创建节点时会自动在节点名后添加一串单调递增序号,
/tasks/task0000000001
- 无序节点: 默认创建无序节点
- 持久节点(persistent): 只能通过
ZNODE stat结构: 每个ZNODE都会有一个
stat
信息,可通过stat </zookeeper/test/node>
或ls2 </zookeeper/test/node>
查看1
2
3
4
5
6
7
8
9
10
11
12[zk: localhost(CONNECTED) 0] stat /zookeeper
cZxid = 0x0
ctime = Thu Jan 01 05:30:00 IST 1970
mZxid = 0x0
mtime = Thu Jan 01 05:30:00 IST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1- cZxid: 导致创建znode更改的事务ID
- mZxid: 最后修改znode更改的事务ID
- pZxid: 用于添加或删除子节点的znode更改的事务ID
- ctime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode创建时间
- mtime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode最近修改时间
- dataVersion: 表示对该znode的数据所做的更改次数
- cversion: 这表示对此znode的子节点进行的更改次数
- aclVersion: 表示对此znode的ACL进行更改的次数
- ephemeralOwner: 如果znode是临时节点,则为此znode所有者的 session ID;如果znode不是临时节点,则该字段设置为零
- dataLength: 这是znode数据字段的长度
- numChildren: 这表示znode的子节点的数量
监视通知机制: ZK自身提供监视通知机制,可对某ZNODE设置一个监视
watcher
,当监视事件发生时会发送通知。可设置2种类似watcher- Data Watches: ZNODE数据变更触发通知(
create
/setData
) - Child Watches: ZNODE子节点变更触发通知(
delete
/create
[child znode])
- Data Watches: ZNODE数据变更触发通知(
会话(session): client连接到ZK时,便会创建一个会话(session)。会话失效后ZK会清除会话相关信息(临时节点/Watcher)。每个会话session都有4个基本属性
- SID: 会话ID,唯一标识每个会话
- TimeOut: 会话超时时间。会话异常断开后,client会尝试重连,若在TimeOut时间内重连上,则收到
syncconnected
事件重用会话;若超过TimeOut时间重连上,则收到expired
事件,结束会话重新发起新连接。 - TickTime: 下次会话超时时间点,13位long型数据,
TickTime ~= NowTime + TimeOut
- isClosing: 会话超时标记,会话超时失效时,将会话标记为已关闭
Leader选举:
- ZAB协议: ZK通过ZAB(Zookeeper Atomic Broadcast)协议进行消息传递,选举消息自然使用ZAB协议进行
- 选举算法:
3.4.10
为止,有4种选举算法,默认30
: 基于UDP的LeaderElection1
: 基于UDP的FastLeaderElection2
: 基于UDP和认证的FastLeaderElection3
: 基于TCP的FastLeaderElection
- 选举消息
- sid: ZK配置文件中配置
server.<X>
,ZK集群的唯一ID - zxid: 事务ID,每个zxid由64位数字表示,zxid全局单调递增
- epoch: zxid高32位为epoch,从
1
开始单调递增,每选举投票一轮epoch + 1
。低32位为epoch内序号,epoch变化则重置
- sid: ZK配置文件中配置
- 选举优先级原则
- epoch > zxid(低32位) > sid
- 只有epoch为最新的follower才能参加选举(收到的epoch比自身大,无条件放弃选举)
- zxid大的优先级高
- sid大的优先级高
- 过半原则: 得票数 > 集群总节点数/2,则产生新Leader
- epoch > zxid(低32位) > sid
- 选举大致过程
- 产生选举信息。所有ZK节点(Observer除外)产生
(sid, zxid)
选举信息,并都认为自己是Leader,投自己一票 - 接收选举信息。ZK节点之间会建立TCP连接,为避免重复建立连接,ZK节点只允许sid大于自身的节点与自己建立连接,否则断开连接,并主动和对方建立连接
- 处理选举信息。
- 检查epoch,收到的epoch大于自身epoch,退出选举,更新投票信息,发送投票结果
- epoch相同,比较zxid,较大节点胜出
- zxid相同,比较sid,较大节点胜出
- 统计选举信息。每轮投票结束都会统计选举结果,若过半则产生新Leader,否则继续下一轮选举投票
- 产生选举信息。所有ZK节点(Observer除外)产生
ZooKeeper集群部署
安装
JDK安装省略
1 | wget http://www.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz |
集群配置
zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/datalog
clientPort=2181
server.1=10.201.3.222:2888:3888
server.2=10.201.3.223:2888:3888
server.3=10.201.3.224:2888:3888
maxClientCnxns=500
minSessionTimeout=30000
maxSessionTimeout=60000tickTime
: 基本时间单元,以毫秒为单位。它用来控制心跳和超时,默认情况下最小的会话超时时间为两倍的tickTimeinitLimit
: 允许follower连接并同步到leader的初始化连接时间,以tickTime倍数计算syncLimit
: Leader与Follower间请求和应答时间长度dataDir
: 快照文件目录dataLogDir
: 事务日志目录(dataDir
和dataLogDir
目录最好是在两个不同磁盘中,避免IO竞争)2181
: ZooKeeper监听端口(客户端连接端口)2888
: leader和follower之间数据同步使用的端口号3888
: leader选举专用的端口号maxClientCnxns
: 客户端并发连接数目
ServerID
集群每个节点都需要配置唯一的ServerID,ServerID保持和
zoo.cfg
中server.X
中的X
一致1
2
3
4
5
6# 10.201.3.222
echo 1 > /usr/local/zookeeper/data/myid
# 10.201.3.223
echo 2 > /usr/local/zookeeper/data/myid
# 10.201.3.224
echo 3 > /usr/local/zookeeper/data/myid优化配置——
zkEnv.sh
一些额外的优化配置建议写在
zkEnv.sh
中,如JVM、Log Level等。修改滚动日志选项(size等)则在conf/log4j.properties
1
2
3
4
5
6# zkEnv.sh
JAVA_HOME=/usr/share/java
ZOO_LOG_DIR=/usr/local/zookeeper/log
# 设置日志轮转
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
JVMFLAGS="-Xms1024m -Xmx1024m $JVMFLAGS"
启动ZooKeeper集群
1 | # **所有节点执行,启动ZooKeeper集群** |
ZooKeeper日志维护
默认ZooKeeper不会定期清除日志,3.4.0
开始支持定期清理
1 | # **zoo.cfg** |
但一般采用计划任务选择在空闲时间段定期删除,避免占用大量IO影响ZooKeeper。ZooKeeper每次执行事务都会写入事务日志,而且需要过半节点同步。磁盘IO直接影响事务处理速度!
有鉴于此,尽量让dataDir
和dataLogDir
分开在不同的磁盘,避免IO竞争。ZooKeeper提供zkCleanup.sh
脚本帮助清理。
1 | /usr/local/zookeeper/bin/zkCleanup.sh -n <日志保留数目/默认3> |
ZooKeeper常用操作
命令行连接ZooKeeper
1 | /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 |
四字命令
可通过常用的ZooKeeper四字命令监控ZooKeeper,一般用法为:echo <四字命令> | nc localhost 2181
命令 | 解释 |
---|---|
conf | 3.3.0版本引入的。打印出服务相关配置的详细信息。 |
cons | 3.3.0版本引入的。列出所有连接到这台服务器的客户端全部连接/会话详细信息。包括”接受/发送”的包数量、会话id、操作延迟、最后的操作执行等等信息。 |
crst | 3.3.0版本引入的。重置所有连接的连接和会话统计信息。 |
dump | 列出那些比较重要的会话和临时节点。这个命令只能在leader节点上有用。 |
envi | 打印出服务环境的详细信息。 |
reqs | 列出未经处理的请求 |
ruok | 测试服务是否处于正确状态。如果确实如此,那么服务返回”imok”,否则不做任何相应。 |
stat | 输出关于性能和连接的客户端的列表。 |
srst | 重置服务器的统计。 |
srvr | 3.3.0版本引入的。列出连接服务器的详细信息 |
wchs | 3.3.0版本引入的。列出服务器watch的详细信息。 |
wchc | 3.3.0版本引入的。通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表。 |
wchp | 3.3.0版本引入的。通过路径列出服务器watch的详细信息。它输出一个与session相关的路径。 |
mntr | 3.4.0版本引入的。输出可用于检测集群健康状态的变量列表 |
Kafka集群部署
Kafka安装
JDK安装省略
1 | wget 'http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz' |
Kafka集群配置
配置文件
1 | # **config/server.properties** |
broker.id
: kafka broker唯一标识,集群各节点必须唯一。num.network.threads
: 网络IO线程数(优化配置成CPU核心数目一致)num.io.threads
: 磁盘IO线程数(优化配置成CPU核心数的2倍)socket.send.buffer.bytes/socket.receive.buffer.bytes
: 发送/接收缓冲区大小socket.request.max.bytes
: 最大请求接收大小log.dirs
: kafka数据存放目录,多目录间用,
分隔num.partitions
: 默认partition数目num.recovery.threads.per.data.dir
: 每个数据目录kafka启动恢复日志、关闭刷盘的线程数log.retention.hours
: 消息保留时间log.segment.bytes
: 日志段大小。日志文件达到该值则切割log.retention.check.interval.ms
: 检查消息/日志是否达到删除要求的间隔时间delete.topic.enable
: 允许删除topic而非仅标识为delete
default.replication.factor
: 默认消息备份数目,默认为1
不做复制
Kafka JVM设置
/usr/local/kafka/bin/kafka-server-start.sh
,kafka heap大小最好不要超过4G
1 | export KAFKA_HEAP_OPTS="-Xmx3G -Xms3G" |
启动Kafka
尽量使用Supervisor管理启动
1 | nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &> /usr/local/kafka/logs/kafka.log & |
Kafka常用操作
创建topic
replication-factor
的数目要小于等于broker
数目。replication-factor
为复制因子,默认值1
表示无副本,2
表示有一个副本。partitions
为partition数目,partition越多并发消费量越大,partition维护成本也越大。
1 | /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --replication-factor 2 --partitions 2 --topic <topic_name> |
查看topic
列出所有topic
1
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
查看具体topic
1
2
3
4/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name>
Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
删除topic
如果delete.topic.enable=true
则topic会被直接删除,否则仅仅只是会被先标记为删除并不会实际删。
1 | /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic test |
动态修改topic的数据保留时间
topic数据保留时间由配置文件中log.retention.hours
指定,运行时可动态修改具体topic保留时间
1 | # **旧方法** |
生产消费测试
生产消息
1
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --topic <topic_name>
消费消息
1
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name> --from-beginning
consumer group 操作
列出所有消费组
1
2
3
4# old consumer api
/usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --list
# new consumer api
/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --list查看具体消费组
如果是使用logstash作为consumer的话,默认的grout_name为
logstash
1
2
3
4# old consumer api
/usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --group <group_name>
# new consumer api
/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --describe --group <group_name>删除具体消费组
1
/usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --delete --group <group-name>
动态增加partition数量
动态增加partition数目可能会影响消息消费的顺序
1 | /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --alter --topic <topic_name> --partitions 8 |
手动修改Kafka topic offset
先手动导出Kafka topic的offset
1 | /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --group <group_id> --output-file kafka_offset.txt |
手动调整Kafka topic的offset
方法一
这种方法只能简单的将
offset
设置成earliest
或latest
配置文件——
manual_offset.properties
group.id
为consumer group的名称,针对特定的comsumer group来设置,切勿弄错!1
2
3
4zookeeper.connect=10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=logstash执行
将topic的offset设置成最新,即忽略已经堆积的历史消息。
1
2# /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [earliest|latest] <config_file> <topic_name>
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest manual_offset.properties test
方法二
这种方法允许指定
offset
到具体位置,需要通过ZooKeeper修改进入ZooKeeper命令行
1
/usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
查看具体topic partition的offset
第一行的值就是当前
offset
值1
2
3
4
5
6
7
8
9
10
11
12
13get /consumers/logstash/offsets/test/0
13652
cZxid = 0x300007fe0
ctime = Tue Nov 01 16:11:03 CST 2016
mZxid = 0xd000ad433
mtime = Tue Jun 20 10:25:37 CST 2017
pZxid = 0x300007fe0
cversion = 0
dataVersion = 252
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0设置具体topic partition的offset
1
set /consumers/logstash/offsets/test/0 13650
命令详解
1
Usage: [get|set] /consumers/[groupId]/offsets/[topic]/[partitionId]
方法三:
方法一、方法二都适用于旧版的consumer api,新版Kafka使用了新的consumer api不再依赖ZooKeeper管理offset,而是直接将offset存放在
__consumer_offsets
这个内部topic中。最简单的识别方法是,新版api使用--bootstrap-server
而非--zookeeper
,如果consumer是使用--bootstrap-server
则使用的是新api,方法一、二并不适用新版api手动调整offset,需要使用方法三。方法三也有个限制,只适用与
kafka 0.11.0.0
之后的版本,而且consumer group的状态须为inactive,即没有消费者正在消费消息。0.11.0.0
之前的版本,新版consumer api只能通过编写Java程序手动调用KafkaConsumer seek
方法。新版consumer api查看consumer group情况
1
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirror --new-consumer
手动调整offset
1
2#/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --all-topics --to-earliest
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --topic jr_tomcat --to-latest
Kafka集群扩容
Kafka集群扩容,新节点机器加入集群后,原本存在topic的partitions是不会自动迁移到新节点,只有新建的topic会分配到新节点上,这样新节点对于旧有的topic是无用的。为了旧有的topic也能用到新节点,在扩容后需要手动将旧有topic的partition迁移到新节点中。
配置新节点并启动Kafka
复制现有Kafka集群节点配置,修改
broker.id
后启动Kafka。通过ZooKeeper查看新增broker id
,验证Kafka已加入集群1
2
3
4
5
6/usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
[zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181(CONNECTED) 0] ls /brokers/ids
[1, 2, 3]
[zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181/(CONNECTED) 1] ls /brokers/ids
[1, 2, 3, 4]查看旧有topic partition的Leader、Replicas都是没有
broker.id=4
的新节点的,只有旧的节点1, 2, 3
1
2
3
4
5
6
7/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic jr_tomcat
Topic:jr_tomcat PartitionCount:5 ReplicationFactor:2 Configs:
Topic: jr_tomcat Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: jr_tomcat Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: jr_tomcat Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: jr_tomcat Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: jr_tomcat Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2生成迁移分配规则文件
假若需要对
jr_tomcat
这个topic进行重新平衡创建需要迁移topic的json文件——
topic-to-move.json
多个topic写到
[]
数组里,用,
分隔1
2
3
4
5
6
7
8
9# topic-to-move.json
{"topics":
[{"topic":"jr_tomcat"}],
"version": 1
}
#{"topics": [{"topic": "topic_1"},
# {"topic": "topic_2"}],
# "version":1
#}生成迁移分配规则文件
broker-list
为所有集群节点的broker.id
Proposed partition reassignment configuration
下面内容是新节点加入后的partition分配情况1
2
3
4
5
6
7/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topics-to-move-json-file topic-to-move.json --broker-list "1,2,3,4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[1,2]},{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]}]}执行迁移分配
将上一步得到的新的partition分配情况保存成
expand-reassignment.json
文件并执行partition迁移分配1
2
3
4
5
6
7/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]},{"topic":"jr_tomcat","partition":0,"replicas":[1,2]}]}检测执行状态
1
2
3
4
5
6
7/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [jr_tomcat,4] completed successfully
Reassignment of partition [jr_tomcat,3] completed successfully
Reassignment of partition [jr_tomcat,1] completed successfully
Reassignment of partition [jr_tomcat,2] completed successfully
Reassignment of partition [jr_tomcat,0] completed successfully
增加topic的replication数量
查看topic情况
1
2
3
4
5/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3对单独partition增加replication
先对
partition 0
增加replication准备文件——
increase-replication-factor.json
1
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]}
执行增加replication操作。执行后会显示旧有配置,提示保存以便回滚。
1
2
3
4
5
6/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":2,"replicas":[2,3]},{"topic":"test","partition":0,"replicas":[3,2]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]}
确认执行情况
1
2
3
4/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [test,0] completed successfully查看partition分区情况
partition 0
的replicas已从3,1
增加为3,2,1
1
2
3
4
5/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3批量增加剩余partition的replication
修改
increase-replication-factor.json
文件,执行增加命令1
2
3/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute
/usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
Kafka集群间数据同步——MirrorMaker
KafkaMirrorMaker提供不同Kafka集群之间数据同步,基本就是从Source Kafka Cluster消费消息然后生产到Target Kafka Cluster中。若Target Kafka Cluster中不存在topic,KafkaMirrorMaker会自动在Target Kafka Cluster上创建和Source Kafka Cluster完全相同的topic(topic_name、partition、replication)。从Kafka0.9版本开始引入新的consumer api,默认是使用旧版api,使用--new.consumer
参数可指定使用新版api。
mirror_maker_consumer.config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21#新版consumer
bootstrap.servers=10.201.5.30:9092,10.201.5.31:9092
#旧版comsumer
#zookeeper.connect=10.201.5.30:2181,10.201.5.31:2181
#default request timeout 40000
request.timeout.ms=50000
#default heartbeat interval 3000
heartbeat.interval.ms=15000
#default session timeout 30000
session.timeout.ms=40000
#consumer group id
group.id=mirrormaker
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#default max poll records 2147483647
max.poll.records=20000
#default receive buffer 64kB, now 512kb
receive.buffer.bytes=524288
#default max amount of data per partition to override 1048576
max.partition.fetch.bytes=5248576
#consumer timeout
#consumer.timeout.ms=5000mirror_maker_producer.config
1
2
3
4
5
6
7bootstrap.servers=10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092
#buffer.memory=134217728
batch.size=32768
receive.buffer.bytes=327680
send.buffer.bytes=262144
max.request.size=10485760
linger.ms=3000启动KafkaMirrorMaker
--whitelist
支持正则匹配符|
,也支持,
,比如--whitelist 'nginx_log|tomcat_log'
1
/usr/local/kafka/bin/kafka-mirror-maker.sh --new.consumer --consumer.config mirror_maker_consumer.config --num.streams 3 --producer.config mirror_maker_producer.config --whitelist="nginx_log"
查看生产消费情况
在mirror consumer端查看消费情况
1
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirrormaker --new-consumer
KafkaOffsetMonitor监控部署
KafkaOffsetMonitor监控Kafka消费队列堆积情况
下载KafkaOffsetMonitor-assembly-0.2.1.jar
1 | java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ |