Kafka原理
基础概念
- producer: 生产者,将消息发到 - topic里的为生产者
- consumer: 消费者,从 - topic里读取消息的为消费者。Kafka以- group为单位组织- consumer,同一个- topic可有多个- consumer group,一条消息会发到所有- consumer group中,一个- consumer group可能会有多个- consumer,但一条消息只会被- consumer group内的一个- consumer消费,即一条消息在同一个- consumer group内只会被消费一次。Kafka有consumer rebalance机制,自动平衡消费者,但在平衡期间不会进行消费,consumer或broker的增减都会触发rebalance,Kafka也会将无法消费或消费过慢的consumer剔除后自动引发rebalance。
- broker: 每个Kafka实例称之为 - broker
- topic: Kafka以 - topic为单位对消息进行分类,相同类型的消息分到同一个- topic进行生成消费管理。一个- topic由一个或多个- partition组成。
- partition: Kafka中可将一个 - topic切分成一个或多个- partition,- partition是Kafka横向扩展及并行处理的关键。每个- partition都是一个顺序、不可变的消息队列。Kafka会将消息持久化到磁盘,但官方建议消息的高可用应由partition replication来保证。可为每个partition设置replicas,每个partition会选举出唯一一个leader partition,所有的读写请求都由leader处理,replicas只负责将消息同步到本地确保高可用。- Isr: In-Sync Replica,表示能与leader保持数据同步的replicas。如果replicas同步leader消息的延迟时间或延迟条数其中一个超出阀值,则会将延迟的replicas剔出Isr。当leader partition挂掉后,只有在Isr队列里的replicas才可参与leader partition的选举过程。
- replication-factor: 复制因子,topic partition的副本数目,默认1,即无副本。若replication-factor=N,则允许N-1台机器宕机以确保消息的高可用。
 
- offset: 每个 - partition内的消息都有一个序列号,称之为- offset。- offset在- partition内是有序的,不能跨- partition使用- offset,- offset的移动表示- partition内消息的消费位置。在Kafka的设计中,一个- partition中消费- offset完全是由消费者自己控制的。假若一个消费者将- offset设置成- 0,则意味着该消费者会从该- partition中最开始的消息开始消费。一个消费者的- offset操作不会影响到该- partition内的其他消费者,因为每个消费者都维护着自己的消费- offset。- 关于 - offset还涉及到以下4个概念- 1 - Last Commit Offset————Current Posision————High Watermark————Log End Offset - Last Commit Offset: consumer group最新一次commit的offset位置,- Last Commit Offset表示在此之前的所有消息都以消费完并确认。
- Current Posision: 当前consumer group消费消息的offset位置。- Last Commit Offset与- Current Posision之间的消息表示已经被consumer group读取但还在处理中,没有确认(commit)。
- High Watermark: 表示- High Watermark之前的数据都已被同步到所有replicas中,- High Watermarkoffset之前的数据是高可用的(消息有多个副本)。
- Log End Offset: 表示producer发送到该partition中最新消息的offset。- High Watermark和- Log End Offset之间的数据表示消息已写入leader partition但未同步至replicas中,这之间的数据没高可用,是不安全的,不允许消费
 - 从 - 0.9版本后的Kafka开始,新版的consumer api是将- offset存放在Kafka的- __consumer_offsetstopic中。该topic默认有- 50个partition和- 3个replicas。某个consumer group的offset具体存放在哪个partition是通过- abs(GroupId.hashCode()) % NumPartitions计算出来的(- NumPartitions默认- 50)。旧版本的Kafka默认是将offset存在ZooKeeper中。
- ack: ack确认机制是producer发送消息给Kafka,确认消息高可用的机制,依赖于producer的 - request.required.acks设置- 0: producer只管发送,不管Kafka是否接收成功
- 1: Kafka partition的leader写入后即返回成功,replicas异步同步消息。风险在于leader挂了,消息还没同步的话则会丢失数据。
- all/-1: 旧版Kafka(- 0.8)值为- -1,新版Kafka值为- all。等待所有replicas都同步消息后才返回成功,数据高可用最安全,生产消息性能会受影响。
 
- ZeroCopy: Kafka使用ZeroCopy机制以提高性能,以从磁盘读取数据返回给消费者为例 - 传统方式(涉及4次切换):- 进程通过read()系统调用从用户态切换到内核态[1],内核向磁盘发起请求,将数据从磁盘读取到内核缓存区
- 进程将数据从内核空间拷贝到用户空间,从内核态切换到用户态[2]
- 进程将数据发送到Socket,从用户态切换到内核态[3],将数据写入到Socket Buffer
- 内核将Socket Buffer中的数据拷贝到NIC Buffer中,最终发送给消费者。从内核态切换用户态[4],完成操作
 
- 进程通过
- ZeroCopy方式(涉及2次切换):- 进程通过sendfile()从用户态切换到内核态[1],内核从磁盘读取数据到内核缓存区,然后直接将数据拷贝到Socket Buffer,再将数据从Socket Buffer拷贝到NIC Buffer
- 完成操作后从内核态切换用户态[2]
 
- 进程通过
 
- 传统方式(涉及4次切换):
ZooKeeper原理
基础概念
- 角色: ZK中分为3中角色。ZK所有节点都可接收并处理读请求( - exists/- getData/- getChildren…),但只有Leader角色节点才能处理写请求(- create/- setData/- delete)- Leader: 主节点,响应所有其余节点请求(包括写请求),维护集群状态
- Follower: 从Leader中同步消息和状态,接收并处理读请求,转发写请求到Leader。可参与选举
- Observer: 3.3.0版本开始引入,仅处理读请求,提高读请求吞吐量,转发写请求到Leader。数据不持久化到磁盘,不可参与选举
 
- ZNODE: ZK使用类似文件系统方式组织资源,资源中的每个节点称为ZNODE。ZNODE分4中类型 - 持久节点(persistent): 只能通过delete指令删除,数据会被持久化
- 临时节点(ephemeral): 创建该ZNODE的客户端断开连接后,临时节点会被自动删除
- 有序节点: 创建节点时会自动在节点名后添加一串单调递增序号,/tasks/task0000000001
- 无序节点: 默认创建无序节点
 
- 持久节点(persistent): 只能通过
- ZNODE stat结构: 每个ZNODE都会有一个 - stat信息,可通过- stat </zookeeper/test/node>或- ls2 </zookeeper/test/node>查看- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- [zk: localhost(CONNECTED) 0] stat /zookeeper 
 cZxid = 0x0
 ctime = Thu Jan 01 05:30:00 IST 1970
 mZxid = 0x0
 mtime = Thu Jan 01 05:30:00 IST 1970
 pZxid = 0x0
 cversion = -1
 dataVersion = 0
 aclVersion = 0
 ephemeralOwner = 0x0
 dataLength = 0
 numChildren = 1- cZxid: 导致创建znode更改的事务ID
- mZxid: 最后修改znode更改的事务ID
- pZxid: 用于添加或删除子节点的znode更改的事务ID
- ctime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode创建时间
- mtime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode最近修改时间
- dataVersion: 表示对该znode的数据所做的更改次数
- cversion: 这表示对此znode的子节点进行的更改次数
- aclVersion: 表示对此znode的ACL进行更改的次数
- ephemeralOwner: 如果znode是临时节点,则为此znode所有者的 session ID;如果znode不是临时节点,则该字段设置为零
- dataLength: 这是znode数据字段的长度
- numChildren: 这表示znode的子节点的数量
 
- 监视通知机制: ZK自身提供监视通知机制,可对某ZNODE设置一个监视 - watcher,当监视事件发生时会发送通知。可设置2种类似watcher- Data Watches: ZNODE数据变更触发通知(create/setData)
- Child Watches: ZNODE子节点变更触发通知(delete/create[child znode])
 
- Data Watches: ZNODE数据变更触发通知(
- 会话(session): client连接到ZK时,便会创建一个会话(session)。会话失效后ZK会清除会话相关信息(临时节点/Watcher)。每个会话session都有4个基本属性 - SID: 会话ID,唯一标识每个会话
- TimeOut: 会话超时时间。会话异常断开后,client会尝试重连,若在TimeOut时间内重连上,则收到syncconnected事件重用会话;若超过TimeOut时间重连上,则收到expired事件,结束会话重新发起新连接。
- TickTime: 下次会话超时时间点,13位long型数据,TickTime ~= NowTime + TimeOut
- isClosing: 会话超时标记,会话超时失效时,将会话标记为已关闭
 
- Leader选举: - ZAB协议: ZK通过ZAB(Zookeeper Atomic Broadcast)协议进行消息传递,选举消息自然使用ZAB协议进行
- 选举算法: 3.4.10为止,有4种选举算法,默认3- 0: 基于UDP的LeaderElection
- 1: 基于UDP的FastLeaderElection
- 2: 基于UDP和认证的FastLeaderElection
- 3: 基于TCP的FastLeaderElection
 
- 选举消息- sid: ZK配置文件中配置server.<X>,ZK集群的唯一ID
- zxid: 事务ID,每个zxid由64位数字表示,zxid全局单调递增
- epoch: zxid高32位为epoch,从1开始单调递增,每选举投票一轮epoch + 1。低32位为epoch内序号,epoch变化则重置
 
- sid: ZK配置文件中配置
- 选举优先级原则- epoch > zxid(低32位) > sid- 只有epoch为最新的follower才能参加选举(收到的epoch比自身大,无条件放弃选举)
- zxid大的优先级高
- sid大的优先级高
 
- 过半原则: 得票数 > 集群总节点数/2,则产生新Leader
 
- epoch > zxid(低32位) > sid
- 选举大致过程- 产生选举信息。所有ZK节点(Observer除外)产生(sid, zxid)选举信息,并都认为自己是Leader,投自己一票
- 接收选举信息。ZK节点之间会建立TCP连接,为避免重复建立连接,ZK节点只允许sid大于自身的节点与自己建立连接,否则断开连接,并主动和对方建立连接
- 处理选举信息。- 检查epoch,收到的epoch大于自身epoch,退出选举,更新投票信息,发送投票结果
- epoch相同,比较zxid,较大节点胜出
- zxid相同,比较sid,较大节点胜出
 
- 统计选举信息。每轮投票结束都会统计选举结果,若过半则产生新Leader,否则继续下一轮选举投票
 
- 产生选举信息。所有ZK节点(Observer除外)产生
 
ZooKeeper集群部署
安装
JDK安装省略
| 1 | wget http://www.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz | 
集群配置
- zoo.cfg - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14- tickTime=2000 
 initLimit=10
 syncLimit=5
 dataDir=/usr/local/zookeeper/data
 dataLogDir=/usr/local/zookeeper/datalog
 clientPort=2181
 server.1=10.201.3.222:2888:3888
 server.2=10.201.3.223:2888:3888
 server.3=10.201.3.224:2888:3888
 maxClientCnxns=500
 minSessionTimeout=30000
 maxSessionTimeout=60000- tickTime: 基本时间单元,以毫秒为单位。它用来控制心跳和超时,默认情况下最小的会话超时时间为两倍的tickTime
- initLimit: 允许follower连接并同步到leader的初始化连接时间,以tickTime倍数计算
- syncLimit: Leader与Follower间请求和应答时间长度
- dataDir: 快照文件目录
- dataLogDir: 事务日志目录(- dataDir和- dataLogDir目录最好是在两个不同磁盘中,避免IO竞争)
- 2181: ZooKeeper监听端口(客户端连接端口)
- 2888: leader和follower之间数据同步使用的端口号
- 3888: leader选举专用的端口号
- maxClientCnxns: 客户端并发连接数目
 
- ServerID - 集群每个节点都需要配置唯一的ServerID,ServerID保持和 - zoo.cfg中- server.X中的- X一致- 1 
 2
 3
 4
 5
 6- # 10.201.3.222 
 echo 1 > /usr/local/zookeeper/data/myid
 # 10.201.3.223
 echo 2 > /usr/local/zookeeper/data/myid
 # 10.201.3.224
 echo 3 > /usr/local/zookeeper/data/myid
- 优化配置—— - zkEnv.sh- 一些额外的优化配置建议写在 - zkEnv.sh中,如JVM、Log Level等。修改滚动日志选项(size等)则在- conf/log4j.properties- 1 
 2
 3
 4
 5
 6- # zkEnv.sh 
 JAVA_HOME=/usr/share/java
 ZOO_LOG_DIR=/usr/local/zookeeper/log
 # 设置日志轮转
 ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
 JVMFLAGS="-Xms1024m -Xmx1024m $JVMFLAGS"
启动ZooKeeper集群
| 1 | # **所有节点执行,启动ZooKeeper集群** | 
ZooKeeper日志维护
默认ZooKeeper不会定期清除日志,3.4.0开始支持定期清理
| 1 | # **zoo.cfg** | 
但一般采用计划任务选择在空闲时间段定期删除,避免占用大量IO影响ZooKeeper。ZooKeeper每次执行事务都会写入事务日志,而且需要过半节点同步。磁盘IO直接影响事务处理速度!
有鉴于此,尽量让dataDir和dataLogDir分开在不同的磁盘,避免IO竞争。ZooKeeper提供zkCleanup.sh脚本帮助清理。
| 1 | /usr/local/zookeeper/bin/zkCleanup.sh -n <日志保留数目/默认3> | 
ZooKeeper常用操作
命令行连接ZooKeeper
| 1 | /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 | 
四字命令
可通过常用的ZooKeeper四字命令监控ZooKeeper,一般用法为:echo <四字命令> | nc localhost 2181
| 命令 | 解释 | 
|---|---|
| conf | 3.3.0版本引入的。打印出服务相关配置的详细信息。 | 
| cons | 3.3.0版本引入的。列出所有连接到这台服务器的客户端全部连接/会话详细信息。包括”接受/发送”的包数量、会话id、操作延迟、最后的操作执行等等信息。 | 
| crst | 3.3.0版本引入的。重置所有连接的连接和会话统计信息。 | 
| dump | 列出那些比较重要的会话和临时节点。这个命令只能在leader节点上有用。 | 
| envi | 打印出服务环境的详细信息。 | 
| reqs | 列出未经处理的请求 | 
| ruok | 测试服务是否处于正确状态。如果确实如此,那么服务返回”imok”,否则不做任何相应。 | 
| stat | 输出关于性能和连接的客户端的列表。 | 
| srst | 重置服务器的统计。 | 
| srvr | 3.3.0版本引入的。列出连接服务器的详细信息 | 
| wchs | 3.3.0版本引入的。列出服务器watch的详细信息。 | 
| wchc | 3.3.0版本引入的。通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表。 | 
| wchp | 3.3.0版本引入的。通过路径列出服务器watch的详细信息。它输出一个与session相关的路径。 | 
| mntr | 3.4.0版本引入的。输出可用于检测集群健康状态的变量列表 | 
Kafka集群部署
Kafka安装
JDK安装省略
| 1 | wget 'http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz' | 
Kafka集群配置
配置文件
| 1 | # **config/server.properties** | 
- broker.id: kafka broker唯一标识,集群各节点必须唯一。
- num.network.threads: 网络IO线程数(优化配置成CPU核心数目一致)
- num.io.threads: 磁盘IO线程数(优化配置成CPU核心数的2倍)
- socket.send.buffer.bytes/socket.receive.buffer.bytes: 发送/接收缓冲区大小
- socket.request.max.bytes: 最大请求接收大小
- log.dirs: kafka数据存放目录,多目录间用- ,分隔
- num.partitions: 默认partition数目
- num.recovery.threads.per.data.dir: 每个数据目录kafka启动恢复日志、关闭刷盘的线程数
- log.retention.hours: 消息保留时间
- log.segment.bytes: 日志段大小。日志文件达到该值则切割
- log.retention.check.interval.ms: 检查消息/日志是否达到删除要求的间隔时间
- delete.topic.enable: 允许删除topic而非仅标识为- delete
- default.replication.factor: 默认消息备份数目,默认为- 1不做复制
Kafka JVM设置
/usr/local/kafka/bin/kafka-server-start.sh,kafka heap大小最好不要超过4G
| 1 | export KAFKA_HEAP_OPTS="-Xmx3G -Xms3G" | 
启动Kafka
尽量使用Supervisor管理启动
| 1 | nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &> /usr/local/kafka/logs/kafka.log & | 
Kafka常用操作
创建topic
replication-factor的数目要小于等于broker数目。replication-factor为复制因子,默认值1表示无副本,2表示有一个副本。partitions为partition数目,partition越多并发消费量越大,partition维护成本也越大。
| 1 | /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --replication-factor 2 --partitions 2 --topic <topic_name> | 
查看topic
- 列出所有topic - 1 - /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 
- 查看具体topic - 1 
 2
 3
 4- /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name> 
 Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
 Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
删除topic
如果delete.topic.enable=true则topic会被直接删除,否则仅仅只是会被先标记为删除并不会实际删。
| 1 | /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic test | 
动态修改topic的数据保留时间
topic数据保留时间由配置文件中log.retention.hours指定,运行时可动态修改具体topic保留时间
| 1 | # **旧方法** | 
生产消费测试
- 生产消息 - 1 - /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --topic <topic_name> 
- 消费消息 - 1 - /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name> --from-beginning 
consumer group 操作
- 列出所有消费组 - 1 
 2
 3
 4- # old consumer api 
 /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --list
 # new consumer api
 /usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --list
- 查看具体消费组 - 如果是使用logstash作为consumer的话,默认的grout_name为 - logstash- 1 
 2
 3
 4- # old consumer api 
 /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --group <group_name>
 # new consumer api
 /usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --describe --group <group_name>
- 删除具体消费组 - 1 - /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --delete --group <group-name> 
动态增加partition数量
动态增加partition数目可能会影响消息消费的顺序
| 1 | /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --alter --topic <topic_name> --partitions 8 | 
手动修改Kafka topic offset
先手动导出Kafka topic的offset
| 1 | /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --group <group_id> --output-file kafka_offset.txt | 
手动调整Kafka topic的offset
- 方法一 - 这种方法只能简单的将 - offset设置成- earliest或- latest- 配置文件—— - manual_offset.properties- group.id为consumer group的名称,针对特定的comsumer group来设置,切勿弄错!- 1 
 2
 3
 4- zookeeper.connect=10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 
 zookeeper.connection.timeout.ms=6000
 #consumer group id
 group.id=logstash
- 执行 - 将topic的offset设置成最新,即忽略已经堆积的历史消息。 - 1 
 2- # /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [earliest|latest] <config_file> <topic_name> 
 /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest manual_offset.properties test
 
- 方法二 - 这种方法允许指定 - offset到具体位置,需要通过ZooKeeper修改- 进入ZooKeeper命令行 - 1 - /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 
- 查看具体topic partition的offset - 第一行的值就是当前 - offset值- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13- get /consumers/logstash/offsets/test/0 
 13652
 cZxid = 0x300007fe0
 ctime = Tue Nov 01 16:11:03 CST 2016
 mZxid = 0xd000ad433
 mtime = Tue Jun 20 10:25:37 CST 2017
 pZxid = 0x300007fe0
 cversion = 0
 dataVersion = 252
 aclVersion = 0
 ephemeralOwner = 0x0
 dataLength = 5
 numChildren = 0
- 设置具体topic partition的offset - 1 - set /consumers/logstash/offsets/test/0 13650 
- 命令详解 - 1 - Usage: [get|set] /consumers/[groupId]/offsets/[topic]/[partitionId] 
 
- 方法三: - 方法一、方法二都适用于旧版的consumer api,新版Kafka使用了新的consumer api不再依赖ZooKeeper管理offset,而是直接将offset存放在 - __consumer_offsets这个内部topic中。最简单的识别方法是,新版api使用- --bootstrap-server而非- --zookeeper,如果consumer是使用- --bootstrap-server则使用的是新api,方法一、二并不适用新版api手动调整offset,需要使用方法三。- 方法三也有个限制,只适用与 - kafka 0.11.0.0之后的版本,而且consumer group的状态须为inactive,即没有消费者正在消费消息。- 0.11.0.0之前的版本,新版consumer api只能通过编写Java程序手动调用- KafkaConsumer seek方法。- 新版consumer api查看consumer group情况 - 1 - /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirror --new-consumer 
- 手动调整offset - 1 
 2- #/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --all-topics --to-earliest 
 /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --topic jr_tomcat --to-latest
 
Kafka集群扩容
Kafka集群扩容,新节点机器加入集群后,原本存在topic的partitions是不会自动迁移到新节点,只有新建的topic会分配到新节点上,这样新节点对于旧有的topic是无用的。为了旧有的topic也能用到新节点,在扩容后需要手动将旧有topic的partition迁移到新节点中。
- 配置新节点并启动Kafka - 复制现有Kafka集群节点配置,修改 - broker.id后启动Kafka。通过ZooKeeper查看新增- broker id,验证Kafka已加入集群- 1 
 2
 3
 4
 5
 6- /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 
 [zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181(CONNECTED) 0] ls /brokers/ids
 [1, 2, 3]
 [zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181/(CONNECTED) 1] ls /brokers/ids
 [1, 2, 3, 4]- 查看旧有topic partition的Leader、Replicas都是没有 - broker.id=4的新节点的,只有旧的节点- 1, 2, 3- 1 
 2
 3
 4
 5
 6
 7- /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic jr_tomcat 
 Topic:jr_tomcat PartitionCount:5 ReplicationFactor:2 Configs:
 Topic: jr_tomcat Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
 Topic: jr_tomcat Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
 Topic: jr_tomcat Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
 Topic: jr_tomcat Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
 Topic: jr_tomcat Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
- 生成迁移分配规则文件 - 假若需要对 - jr_tomcat这个topic进行重新平衡- 创建需要迁移topic的json文件—— - topic-to-move.json- 多个topic写到 - []数组里,用- ,分隔- 1 
 2
 3
 4
 5
 6
 7
 8
 9- # topic-to-move.json 
 {"topics":
 [{"topic":"jr_tomcat"}],
 "version": 1
 }
 #{"topics": [{"topic": "topic_1"},
 # {"topic": "topic_2"}],
 # "version":1
 #}
- 生成迁移分配规则文件 - broker-list为所有集群节点的- broker.id- Proposed partition reassignment configuration下面内容是新节点加入后的partition分配情况- 1 
 2
 3
 4
 5
 6
 7- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topics-to-move-json-file topic-to-move.json --broker-list "1,2,3,4" --generate 
 Current partition replica assignment
 {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}
 Proposed partition reassignment configuration
 {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[1,2]},{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]}]}
- 执行迁移分配 - 将上一步得到的新的partition分配情况保存成 - expand-reassignment.json文件并执行partition迁移分配- 1 
 2
 3
 4
 5
 6
 7- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --execute 
 Current partition replica assignment
 {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}
 Save this to use as the --reassignment-json-file option during rollback
 Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]},{"topic":"jr_tomcat","partition":0,"replicas":[1,2]}]}
- 检测执行状态 - 1 
 2
 3
 4
 5
 6
 7- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --verify 
 Status of partition reassignment:
 Reassignment of partition [jr_tomcat,4] completed successfully
 Reassignment of partition [jr_tomcat,3] completed successfully
 Reassignment of partition [jr_tomcat,1] completed successfully
 Reassignment of partition [jr_tomcat,2] completed successfully
 Reassignment of partition [jr_tomcat,0] completed successfully
 
增加topic的replication数量
- 查看topic情况 - 1 
 2
 3
 4
 5- /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test 
 Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
 Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
 Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
- 对单独partition增加replication - 先对 - partition 0增加replication- 准备文件—— - increase-replication-factor.json- 1 - {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]} 
- 执行增加replication操作。执行后会显示旧有配置,提示保存以便回滚。 - 1 
 2
 3
 4
 5
 6- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute 
 Current partition replica assignment
 {"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":2,"replicas":[2,3]},{"topic":"test","partition":0,"replicas":[3,2]}]}
 Save this to use as the --reassignment-json-file option during rollback
 Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]}
 
- 确认执行情况 - 1 
 2
 3
 4- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify 
 Status of partition reassignment:
 Reassignment of partition [test,0] completed successfully
- 查看partition分区情况 - partition 0的replicas已从- 3,1增加为- 3,2,1- 1 
 2
 3
 4
 5- /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test 
 Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
 Topic: test Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
 Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
- 批量增加剩余partition的replication - 修改 - increase-replication-factor.json文件,执行增加命令- 1 
 2
 3- /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute 
 /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify
 /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
Kafka集群间数据同步——MirrorMaker
KafkaMirrorMaker提供不同Kafka集群之间数据同步,基本就是从Source Kafka Cluster消费消息然后生产到Target Kafka Cluster中。若Target Kafka Cluster中不存在topic,KafkaMirrorMaker会自动在Target Kafka Cluster上创建和Source Kafka Cluster完全相同的topic(topic_name、partition、replication)。从Kafka0.9版本开始引入新的consumer api,默认是使用旧版api,使用--new.consumer参数可指定使用新版api。
- mirror_maker_consumer.config - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21- #新版consumer 
 bootstrap.servers=10.201.5.30:9092,10.201.5.31:9092
 #旧版comsumer
 #zookeeper.connect=10.201.5.30:2181,10.201.5.31:2181
 #default request timeout 40000
 request.timeout.ms=50000
 #default heartbeat interval 3000
 heartbeat.interval.ms=15000
 #default session timeout 30000
 session.timeout.ms=40000
 #consumer group id
 group.id=mirrormaker
 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
 #default max poll records 2147483647
 max.poll.records=20000
 #default receive buffer 64kB, now 512kb
 receive.buffer.bytes=524288
 #default max amount of data per partition to override 1048576
 max.partition.fetch.bytes=5248576
 #consumer timeout
 #consumer.timeout.ms=5000
- mirror_maker_producer.config - 1 
 2
 3
 4
 5
 6
 7- bootstrap.servers=10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 
 #buffer.memory=134217728
 batch.size=32768
 receive.buffer.bytes=327680
 send.buffer.bytes=262144
 max.request.size=10485760
 linger.ms=3000
- 启动KafkaMirrorMaker - --whitelist支持正则匹配符- |,也支持- ,,比如- --whitelist 'nginx_log|tomcat_log'- 1 - /usr/local/kafka/bin/kafka-mirror-maker.sh --new.consumer --consumer.config mirror_maker_consumer.config --num.streams 3 --producer.config mirror_maker_producer.config --whitelist="nginx_log" 
- 查看生产消费情况 - 在mirror consumer端查看消费情况 - 1 - /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirrormaker --new-consumer 
KafkaOffsetMonitor监控部署
KafkaOffsetMonitor监控Kafka消费队列堆积情况
下载KafkaOffsetMonitor-assembly-0.2.1.jar
| 1 | java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ |