目录
  1. 1. Kafka原理
    1. 1.1. 基础概念
  2. 2. ZooKeeper原理
    1. 2.1. 基础概念
  3. 3. ZooKeeper集群部署
    1. 3.1. 安装
    2. 3.2. 集群配置
    3. 3.3. 启动ZooKeeper集群
    4. 3.4. ZooKeeper日志维护
    5. 3.5. ZooKeeper常用操作
      1. 3.5.1. 命令行连接ZooKeeper
      2. 3.5.2. 四字命令
  4. 4. Kafka集群部署
    1. 4.1. Kafka安装
    2. 4.2. Kafka集群配置
      1. 4.2.1. 配置文件
      2. 4.2.2. Kafka JVM设置
      3. 4.2.3. 启动Kafka
    3. 4.3. Kafka常用操作
      1. 4.3.1. 创建topic
      2. 4.3.2. 查看topic
      3. 4.3.3. 删除topic
      4. 4.3.4. 动态修改topic的数据保留时间
      5. 4.3.5. 生产消费测试
      6. 4.3.6. consumer group 操作
      7. 4.3.7. 动态增加partition数量
      8. 4.3.8. 手动修改Kafka topic offset
      9. 4.3.9. Kafka集群扩容
      10. 4.3.10. 增加topic的replication数量
      11. 4.3.11. Kafka集群间数据同步——MirrorMaker
      12. 4.3.12. KafkaOffsetMonitor监控部署

Kafka原理

基础概念

  • producer: 生产者,将消息发到topic里的为生产者

  • consumer: 消费者,从topic里读取消息的为消费者。Kafka以group为单位组织consumer,同一个topic可有多个consumer group,一条消息会发到所有consumer group中,一个consumer group可能会有多个consumer,但一条消息只会被consumer group内的一个consumer消费,即一条消息在同一个consumer group内只会被消费一次。Kafka有consumer rebalance机制,自动平衡消费者,但在平衡期间不会进行消费,consumer或broker的增减都会触发rebalance,Kafka也会将无法消费或消费过慢的consumer剔除后自动引发rebalance。

  • broker: 每个Kafka实例称之为broker

  • topic: Kafka以topic为单位对消息进行分类,相同类型的消息分到同一个topic进行生成消费管理。一个topic由一个或多个partition组成。

  • partition: Kafka中可将一个topic切分成一个或多个partitionpartition是Kafka横向扩展及并行处理的关键。每个partition都是一个顺序、不可变的消息队列。Kafka会将消息持久化到磁盘,但官方建议消息的高可用应由partition replication来保证。可为每个partition设置replicas,每个partition会选举出唯一一个leader partition,所有的读写请求都由leader处理,replicas只负责将消息同步到本地确保高可用。

    • Isr: In-Sync Replica,表示能与leader保持数据同步的replicas。如果replicas同步leader消息的延迟时间延迟条数其中一个超出阀值,则会将延迟的replicas剔出Isr。当leader partition挂掉后,只有在Isr队列里的replicas才可参与leader partition的选举过程
    • replication-factor: 复制因子,topic partition的副本数目,默认1,即无副本。若replication-factor=N,则允许N-1台机器宕机以确保消息的高可用。
  • offset: 每个partition内的消息都有一个序列号,称之为offsetoffsetpartition内是有序的,不能跨partition使用offsetoffset的移动表示partition内消息的消费位置。在Kafka的设计中,一个partition中消费offset完全是由消费者自己控制的。假若一个消费者将offset设置成0,则意味着该消费者会从该partition中最开始的消息开始消费。一个消费者的offset操作不会影响到该partition内的其他消费者,因为每个消费者都维护着自己的消费offset

    关于offset还涉及到以下4个概念

    1
    Last Commit Offset————Current Posision————High Watermark————Log End Offset
    • Last Commit Offset: consumer group最新一次commit的offset位置,Last Commit Offset表示在此之前的所有消息都以消费完并确认。
    • Current Posision: 当前consumer group消费消息的offset位置。Last Commit OffsetCurrent Posision之间的消息表示已经被consumer group读取但还在处理中,没有确认(commit)
    • High Watermark: 表示High Watermark之前的数据都已被同步到所有replicas中,High Watermarkoffset之前的数据是高可用的(消息有多个副本)。
    • Log End Offset: 表示producer发送到该partition中最新消息的offset。High WatermarkLog End Offset之间的数据表示消息已写入leader partition但未同步至replicas中,这之间的数据没高可用,是不安全的,不允许消费

    0.9版本后的Kafka开始,新版的consumer api是将offset存放在Kafka的__consumer_offsets topic中。该topic默认有50个partition和3个replicas。某个consumer group的offset具体存放在哪个partition是通过abs(GroupId.hashCode()) % NumPartitions 计算出来的(NumPartitions默认50)。旧版本的Kafka默认是将offset存在ZooKeeper中。

  • ack: ack确认机制是producer发送消息给Kafka,确认消息高可用的机制,依赖于producer的request.required.acks设置

    • 0: producer只管发送,不管Kafka是否接收成功
    • 1: Kafka partition的leader写入后即返回成功,replicas异步同步消息。风险在于leader挂了,消息还没同步的话则会丢失数据。
    • all/-1: 旧版Kafka(0.8)值为-1,新版Kafka值为all。等待所有replicas都同步消息后才返回成功,数据高可用最安全,生产消息性能会受影响。
  • ZeroCopy: Kafka使用ZeroCopy机制以提高性能,以从磁盘读取数据返回给消费者为例

    • 传统方式(涉及4次切换):
      1. 进程通过read()系统调用从用户态切换到内核态[1],内核向磁盘发起请求,将数据从磁盘读取到内核缓存区
      2. 进程将数据从内核空间拷贝到用户空间,从内核态切换到用户态[2]
      3. 进程将数据发送到Socket,从用户态切换到内核态[3],将数据写入到Socket Buffer
      4. 内核将Socket Buffer中的数据拷贝到NIC Buffer中,最终发送给消费者。从内核态切换用户态[4],完成操作
    • ZeroCopy方式(涉及2次切换):
      1. 进程通过sendfile()从用户态切换到内核态[1],内核从磁盘读取数据到内核缓存区,然后直接将数据拷贝到Socket Buffer,再将数据从Socket Buffer拷贝到NIC Buffer
      2. 完成操作后从内核态切换用户态[2]

ZooKeeper原理

基础概念

  • 角色: ZK中分为3中角色。ZK所有节点都可接收并处理读请求(exists/getData/getChildren…),但只有Leader角色节点才能处理写请求(create/setData/delete)

    • Leader: 主节点,响应所有其余节点请求(包括写请求),维护集群状态
    • Follower: 从Leader中同步消息和状态,接收并处理读请求,转发写请求到Leader。可参与选举
    • Observer: 3.3.0版本开始引入,仅处理读请求,提高读请求吞吐量,转发写请求到Leader。数据不持久化到磁盘,不可参与选举
  • ZNODE: ZK使用类似文件系统方式组织资源,资源中的每个节点称为ZNODE。ZNODE分4中类型

    • 持久节点(persistent): 只能通过delete指令删除,数据会被持久化
    • 临时节点(ephemeral): 创建该ZNODE的客户端断开连接后,临时节点会被自动删除
    • 有序节点: 创建节点时会自动在节点名后添加一串单调递增序号,/tasks/task0000000001
    • 无序节点: 默认创建无序节点
  • ZNODE stat结构: 每个ZNODE都会有一个stat信息,可通过stat </zookeeper/test/node>ls2 </zookeeper/test/node>查看

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [zk: localhost(CONNECTED) 0] stat /zookeeper
    cZxid = 0x0
    ctime = Thu Jan 01 05:30:00 IST 1970
    mZxid = 0x0
    mtime = Thu Jan 01 05:30:00 IST 1970
    pZxid = 0x0
    cversion = -1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1
    • cZxid: 导致创建znode更改的事务ID
    • mZxid: 最后修改znode更改的事务ID
    • pZxid: 用于添加或删除子节点的znode更改的事务ID
    • ctime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode创建时间
    • mtime: 表示从1970-01-01T00:00:00Z开始以毫秒为单位的znode最近修改时间
    • dataVersion: 表示对该znode的数据所做的更改次数
    • cversion: 这表示对此znode的子节点进行的更改次数
    • aclVersion: 表示对此znode的ACL进行更改的次数
    • ephemeralOwner: 如果znode是临时节点,则为此znode所有者的 session ID;如果znode不是临时节点,则该字段设置为零
    • dataLength: 这是znode数据字段的长度
    • numChildren: 这表示znode的子节点的数量
  • 监视通知机制: ZK自身提供监视通知机制,可对某ZNODE设置一个监视watcher,当监视事件发生时会发送通知。可设置2种类似watcher

    • Data Watches: ZNODE数据变更触发通知(create/setData)
    • Child Watches: ZNODE子节点变更触发通知(delete/create[child znode])
  • 会话(session): client连接到ZK时,便会创建一个会话(session)。会话失效后ZK会清除会话相关信息(临时节点/Watcher)。每个会话session都有4个基本属性

    • SID: 会话ID,唯一标识每个会话
    • TimeOut: 会话超时时间。会话异常断开后,client会尝试重连,若在TimeOut时间内重连上,则收到syncconnected事件重用会话;若超过TimeOut时间重连上,则收到expired事件,结束会话重新发起新连接。
    • TickTime: 下次会话超时时间点,13位long型数据,TickTime ~= NowTime + TimeOut
    • isClosing: 会话超时标记,会话超时失效时,将会话标记为已关闭
  • Leader选举:

    • ZAB协议: ZK通过ZAB(Zookeeper Atomic Broadcast)协议进行消息传递,选举消息自然使用ZAB协议进行
    • 选举算法: 3.4.10为止,有4种选举算法,默认3
      • 0: 基于UDP的LeaderElection
      • 1: 基于UDP的FastLeaderElection
      • 2: 基于UDP和认证的FastLeaderElection
      • 3: 基于TCP的FastLeaderElection
    • 选举消息
      • sid: ZK配置文件中配置server.<X>,ZK集群的唯一ID
      • zxid: 事务ID,每个zxid由64位数字表示,zxid全局单调递增
      • epoch: zxid高32位为epoch,从1开始单调递增,每选举投票一轮epoch + 1。低32位为epoch内序号,epoch变化则重置
    • 选举优先级原则
      • epoch > zxid(低32位) > sid
        • 只有epoch为最新的follower才能参加选举(收到的epoch比自身大,无条件放弃选举)
        • zxid大的优先级高
        • sid大的优先级高
      • 过半原则: 得票数 > 集群总节点数/2,则产生新Leader
    • 选举大致过程
      • 产生选举信息。所有ZK节点(Observer除外)产生(sid, zxid)选举信息,并都认为自己是Leader,投自己一票
      • 接收选举信息。ZK节点之间会建立TCP连接,为避免重复建立连接,ZK节点只允许sid大于自身的节点与自己建立连接,否则断开连接,并主动和对方建立连接
      • 处理选举信息。
        • 检查epoch,收到的epoch大于自身epoch,退出选举,更新投票信息,发送投票结果
        • epoch相同,比较zxid,较大节点胜出
        • zxid相同,比较sid,较大节点胜出
      • 统计选举信息。每轮投票结束都会统计选举结果,若过半则产生新Leader,否则继续下一轮选举投票

ZooKeeper集群部署

安装

JDK安装省略

1
2
3
4
wget http://www.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz
tar -zxvf zookeeper-3.4.8.tar.gz
mv zookeeper-3.4.8 /usr/local/zookeeper
mkdir /usr/local/zookeeper/data /usr/local/zookeeper/log /usr/local/zookeeper/datalog

集群配置

  • zoo.cfg

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data
    dataLogDir=/usr/local/zookeeper/datalog
    clientPort=2181

    server.1=10.201.3.222:2888:3888
    server.2=10.201.3.223:2888:3888
    server.3=10.201.3.224:2888:3888

    maxClientCnxns=500
    minSessionTimeout=30000
    maxSessionTimeout=60000
    • tickTime: 基本时间单元,以毫秒为单位。它用来控制心跳和超时,默认情况下最小的会话超时时间为两倍的tickTime
    • initLimit: 允许follower连接并同步到leader的初始化连接时间,以tickTime倍数计算
    • syncLimit: Leader与Follower间请求和应答时间长度
    • dataDir: 快照文件目录
    • dataLogDir: 事务日志目录(dataDirdataLogDir目录最好是在两个不同磁盘中,避免IO竞争)
    • 2181: ZooKeeper监听端口(客户端连接端口)
    • 2888: leader和follower之间数据同步使用的端口号
    • 3888: leader选举专用的端口号
    • maxClientCnxns: 客户端并发连接数目
  • ServerID

    集群每个节点都需要配置唯一的ServerID,ServerID保持和zoo.cfgserver.X中的X一致

    1
    2
    3
    4
    5
    6
    # 10.201.3.222
    echo 1 > /usr/local/zookeeper/data/myid
    # 10.201.3.223
    echo 2 > /usr/local/zookeeper/data/myid
    # 10.201.3.224
    echo 3 > /usr/local/zookeeper/data/myid
  • 优化配置——zkEnv.sh

    一些额外的优化配置建议写在zkEnv.sh中,如JVM、Log Level等。修改滚动日志选项(size等)则在conf/log4j.properties

    1
    2
    3
    4
    5
    6
    # zkEnv.sh
    JAVA_HOME=/usr/share/java
    ZOO_LOG_DIR=/usr/local/zookeeper/log
    # 设置日志轮转
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
    JVMFLAGS="-Xms1024m -Xmx1024m $JVMFLAGS"

启动ZooKeeper集群

1
2
3
4
# **所有节点执行,启动ZooKeeper集群**
/usr/local/zookeeper/bin/zkServer.sh start
# **查看集群状态**
/usr/local/zookeeper/bin/zkServer.sh status

ZooKeeper日志维护

默认ZooKeeper不会定期清除日志,3.4.0开始支持定期清理

1
2
3
4
5
# **zoo.cfg**
# **清理频率,单位小时**
autopurge.purgeInterval=24
# **保留文件数目,默认3**
autopurge.snapRetainCount=10

但一般采用计划任务选择在空闲时间段定期删除,避免占用大量IO影响ZooKeeper。ZooKeeper每次执行事务都会写入事务日志,而且需要过半节点同步。磁盘IO直接影响事务处理速度!

有鉴于此,尽量让dataDirdataLogDir分开在不同的磁盘,避免IO竞争。ZooKeeper提供zkCleanup.sh脚本帮助清理。

1
/usr/local/zookeeper/bin/zkCleanup.sh -n <日志保留数目/默认3>

ZooKeeper常用操作

命令行连接ZooKeeper

1
/usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181

四字命令

可通过常用的ZooKeeper四字命令监控ZooKeeper,一般用法为:echo <四字命令> | nc localhost 2181

命令 解释
conf 3.3.0版本引入的。打印出服务相关配置的详细信息。
cons 3.3.0版本引入的。列出所有连接到这台服务器的客户端全部连接/会话详细信息。包括”接受/发送”的包数量、会话id、操作延迟、最后的操作执行等等信息。
crst 3.3.0版本引入的。重置所有连接的连接和会话统计信息。
dump 列出那些比较重要的会话和临时节点。这个命令只能在leader节点上有用。
envi 打印出服务环境的详细信息。
reqs 列出未经处理的请求
ruok 测试服务是否处于正确状态。如果确实如此,那么服务返回”imok”,否则不做任何相应。
stat 输出关于性能和连接的客户端的列表。
srst 重置服务器的统计。
srvr 3.3.0版本引入的。列出连接服务器的详细信息
wchs 3.3.0版本引入的。列出服务器watch的详细信息。
wchc 3.3.0版本引入的。通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表。
wchp 3.3.0版本引入的。通过路径列出服务器watch的详细信息。它输出一个与session相关的路径。
mntr 3.4.0版本引入的。输出可用于检测集群健康状态的变量列表

Kafka集群部署

Kafka安装

JDK安装省略

1
2
3
wget 'http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz'
tar -zxf kafka_2.11-0.10.0.0.tgz && mv kafka_2.11-0.10.0.0 /usr/local/kafka
mkdir -p /usr/local/kafka/{logs,data}

Kafka集群配置

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# **config/server.properties**
broker.id=1
port=9092
advertised.host.name=10.201.3.222
host.name=10.201.3.222
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/data
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
default.replication.factor=2
  • broker.id: kafka broker唯一标识,集群各节点必须唯一。
  • num.network.threads: 网络IO线程数(优化配置成CPU核心数目一致)
  • num.io.threads: 磁盘IO线程数(优化配置成CPU核心数的2倍)
  • socket.send.buffer.bytes/socket.receive.buffer.bytes: 发送/接收缓冲区大小
  • socket.request.max.bytes: 最大请求接收大小
  • log.dirs: kafka数据存放目录,多目录间用,分隔
  • num.partitions: 默认partition数目
  • num.recovery.threads.per.data.dir: 每个数据目录kafka启动恢复日志、关闭刷盘的线程数
  • log.retention.hours: 消息保留时间
  • log.segment.bytes: 日志段大小。日志文件达到该值则切割
  • log.retention.check.interval.ms: 检查消息/日志是否达到删除要求的间隔时间
  • delete.topic.enable: 允许删除topic而非仅标识为delete
  • default.replication.factor: 默认消息备份数目,默认为1不做复制

Kafka JVM设置

/usr/local/kafka/bin/kafka-server-start.sh,kafka heap大小最好不要超过4G

1
export KAFKA_HEAP_OPTS="-Xmx3G -Xms3G"

启动Kafka

尽量使用Supervisor管理启动

1
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &> /usr/local/kafka/logs/kafka.log &

Kafka常用操作

创建topic

replication-factor的数目要小于等于broker数目。replication-factor为复制因子,默认值1表示无副本,2表示有一个副本。partitions为partition数目,partition越多并发消费量越大,partition维护成本也越大。

1
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --replication-factor 2 --partitions 2 --topic <topic_name>

查看topic

  • 列出所有topic

    1
    /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
  • 查看具体topic

    1
    2
    3
    4
    /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name>
    Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
    Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

删除topic

如果delete.topic.enable=true则topic会被直接删除,否则仅仅只是会被先标记为删除并不会实际删。

1
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic test

动态修改topic的数据保留时间

topic数据保留时间由配置文件中log.retention.hours指定,运行时可动态修改具体topic保留时间

1
2
3
4
5
6
# **旧方法**
#/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --alter --topic <topic_name> --config retention.ms=86400000
# **新方法**
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --alter --entity-type topics --entity-name <topic_name> --add-config retention.ms=86400000
# **查看修改**
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --entity-type topics --entity-name <topic_name>

生产消费测试

  • 生产消息

    1
    /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --topic <topic_name>
  • 消费消息

    1
    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic <topic_name> --from-beginning

consumer group 操作

  • 列出所有消费组

    1
    2
    3
    4
    # old consumer api
    /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --list
    # new consumer api
    /usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --list
  • 查看具体消费组

    如果是使用logstash作为consumer的话,默认的grout_name为logstash

    1
    2
    3
    4
    # old consumer api
    /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --group <group_name>
    # new consumer api
    /usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092 --describe --group <group_name>
  • 删除具体消费组

    1
    /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --delete --group <group-name>

动态增加partition数量

动态增加partition数目可能会影响消息消费的顺序

1
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --alter --topic <topic_name> --partitions 8

手动修改Kafka topic offset

先手动导出Kafka topic的offset

1
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --group <group_id> --output-file kafka_offset.txt

手动调整Kafka topic的offset

  • 方法一

    这种方法只能简单的将offset设置成earliestlatest

    • 配置文件——manual_offset.properties

      group.id为consumer group的名称,针对特定的comsumer group来设置,切勿弄错!

      1
      2
      3
      4
      zookeeper.connect=10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
      zookeeper.connection.timeout.ms=6000
      #consumer group id
      group.id=logstash
    • 执行

      将topic的offset设置成最新,即忽略已经堆积的历史消息。

      1
      2
      # /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [earliest|latest] <config_file> <topic_name>
      /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest manual_offset.properties test
  • 方法二

    这种方法允许指定offset到具体位置,需要通过ZooKeeper修改

    • 进入ZooKeeper命令行

      1
      /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
    • 查看具体topic partition的offset

      第一行的值就是当前offset

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      get /consumers/logstash/offsets/test/0
      13652
      cZxid = 0x300007fe0
      ctime = Tue Nov 01 16:11:03 CST 2016
      mZxid = 0xd000ad433
      mtime = Tue Jun 20 10:25:37 CST 2017
      pZxid = 0x300007fe0
      cversion = 0
      dataVersion = 252
      aclVersion = 0
      ephemeralOwner = 0x0
      dataLength = 5
      numChildren = 0
    • 设置具体topic partition的offset

      1
      set /consumers/logstash/offsets/test/0 13650
    • 命令详解

      1
      Usage: [get|set] /consumers/[groupId]/offsets/[topic]/[partitionId]
  • 方法三:

    方法一、方法二都适用于旧版的consumer api,新版Kafka使用了新的consumer api不再依赖ZooKeeper管理offset,而是直接将offset存放在__consumer_offsets这个内部topic中。最简单的识别方法是,新版api使用--bootstrap-server而非--zookeeper,如果consumer是使用--bootstrap-server则使用的是新api,方法一、二并不适用新版api手动调整offset,需要使用方法三。

    方法三也有个限制,只适用与kafka 0.11.0.0之后的版本,而且consumer group的状态须为inactive,即没有消费者正在消费消息。0.11.0.0之前的版本,新版consumer api只能通过编写Java程序手动调用KafkaConsumer seek方法。

    • 新版consumer api查看consumer group情况

      1
      /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirror --new-consumer
    • 手动调整offset

      1
      2
      #/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --all-topics --to-earliest
      /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --execute --group mirror --reset-offsets --topic jr_tomcat --to-latest

Kafka集群扩容

Kafka集群扩容,新节点机器加入集群后,原本存在topic的partitions是不会自动迁移到新节点,只有新建的topic会分配到新节点上,这样新节点对于旧有的topic是无用的。为了旧有的topic也能用到新节点,在扩容后需要手动将旧有topic的partition迁移到新节点中。

  • 配置新节点并启动Kafka

    复制现有Kafka集群节点配置,修改broker.id后启动Kafka。通过ZooKeeper查看新增broker id,验证Kafka已加入集群

    1
    2
    3
    4
    5
    6
    /usr/local/zookeeper/bin/zkCli.sh -server 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181
    [zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181(CONNECTED) 0] ls /brokers/ids
    [1, 2, 3]

    [zk: 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181/(CONNECTED) 1] ls /brokers/ids
    [1, 2, 3, 4]

    查看旧有topic partition的Leader、Replicas都是没有broker.id=4的新节点的,只有旧的节点1, 2, 3

    1
    2
    3
    4
    5
    6
    7
    /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topic jr_tomcat
    Topic:jr_tomcat PartitionCount:5 ReplicationFactor:2 Configs:
    Topic: jr_tomcat Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
    Topic: jr_tomcat Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
    Topic: jr_tomcat Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
    Topic: jr_tomcat Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
    Topic: jr_tomcat Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
  • 生成迁移分配规则文件

    假若需要对jr_tomcat这个topic进行重新平衡

    • 创建需要迁移topic的json文件——topic-to-move.json

      多个topic写到[]数组里,用,分隔

      1
      2
      3
      4
      5
      6
      7
      8
      9
      # topic-to-move.json
      {"topics":
      [{"topic":"jr_tomcat"}],
      "version": 1
      }
      #{"topics": [{"topic": "topic_1"},
      # {"topic": "topic_2"}],
      # "version":1
      #}
    • 生成迁移分配规则文件

      broker-list为所有集群节点的broker.id

      Proposed partition reassignment configuration下面内容是新节点加入后的partition分配情况

      1
      2
      3
      4
      5
      6
      7
      /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --topics-to-move-json-file topic-to-move.json --broker-list "1,2,3,4" --generate

      Current partition replica assignment
      {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}

      Proposed partition reassignment configuration
      {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[1,2]},{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]}]}
    • 执行迁移分配

      将上一步得到的新的partition分配情况保存成expand-reassignment.json文件并执行partition迁移分配

      1
      2
      3
      4
      5
      6
      7
      /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --execute

      Current partition replica assignment
      {"version":1,"partitions":[{"topic":"jr_tomcat","partition":0,"replicas":[2,3]},{"topic":"jr_tomcat","partition":4,"replicas":[3,2]},{"topic":"jr_tomcat","partition":1,"replicas":[3,1]},{"topic":"jr_tomcat","partition":3,"replicas":[2,1]},{"topic":"jr_tomcat","partition":2,"replicas":[1,2]}]}
      Save this to use as the --reassignment-json-file option during rollback

      Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"jr_tomcat","partition":4,"replicas":[1,3]},{"topic":"jr_tomcat","partition":3,"replicas":[4,1]},{"topic":"jr_tomcat","partition":1,"replicas":[2,3]},{"topic":"jr_tomcat","partition":2,"replicas":[3,4]},{"topic":"jr_tomcat","partition":0,"replicas":[1,2]}]}
    • 检测执行状态

      1
      2
      3
      4
      5
      6
      7
      /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file expand-reassignment.json --verify
      Status of partition reassignment:
      Reassignment of partition [jr_tomcat,4] completed successfully
      Reassignment of partition [jr_tomcat,3] completed successfully
      Reassignment of partition [jr_tomcat,1] completed successfully
      Reassignment of partition [jr_tomcat,2] completed successfully
      Reassignment of partition [jr_tomcat,0] completed successfully

增加topic的replication数量

  • 查看topic情况

    1
    2
    3
    4
    5
    /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
    Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
    Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
    Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
    Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
  • 对单独partition增加replication

    先对partition 0增加replication

    • 准备文件——increase-replication-factor.json

      1
      {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]}
    • 执行增加replication操作。执行后会显示旧有配置,提示保存以便回滚。

      1
      2
      3
      4
      5
      6
      /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute

      Current partition replica assignment
      {"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":2,"replicas":[2,3]},{"topic":"test","partition":0,"replicas":[3,2]}]}
      Save this to use as the --reassignment-json-file option during rollback
      Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3,2,1]}]}
  • 确认执行情况

    1
    2
    3
    4
    /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify

    Status of partition reassignment:
    Reassignment of partition [test,0] completed successfully
  • 查看partition分区情况

    partition 0的replicas已从3,1增加为3,2,1

    1
    2
    3
    4
    5
    /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test
    Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
    Topic: test Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
    Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
    Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
  • 批量增加剩余partition的replication

    修改increase-replication-factor.json文件,执行增加命令

    1
    2
    3
    /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --execute
    /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --reassignment-json-file increase-replication-factor.json --verify
    /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 --describe --topic test

Kafka集群间数据同步——MirrorMaker

KafkaMirrorMaker提供不同Kafka集群之间数据同步,基本就是从Source Kafka Cluster消费消息然后生产到Target Kafka Cluster中。若Target Kafka Cluster中不存在topic,KafkaMirrorMaker会自动在Target Kafka Cluster上创建和Source Kafka Cluster完全相同的topic(topic_name、partition、replication)。从Kafka0.9版本开始引入新的consumer api,默认是使用旧版api,使用--new.consumer参数可指定使用新版api。

  • mirror_maker_consumer.config

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #新版consumer
    bootstrap.servers=10.201.5.30:9092,10.201.5.31:9092
    #旧版comsumer
    #zookeeper.connect=10.201.5.30:2181,10.201.5.31:2181
    #default request timeout 40000
    request.timeout.ms=50000
    #default heartbeat interval 3000
    heartbeat.interval.ms=15000
    #default session timeout 30000
    session.timeout.ms=40000
    #consumer group id
    group.id=mirrormaker
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    #default max poll records 2147483647
    max.poll.records=20000
    #default receive buffer 64kB, now 512kb
    receive.buffer.bytes=524288
    #default max amount of data per partition to override 1048576
    max.partition.fetch.bytes=5248576
    #consumer timeout
    #consumer.timeout.ms=5000
  • mirror_maker_producer.config

    1
    2
    3
    4
    5
    6
    7
    bootstrap.servers=10.201.3.222:9092,10.201.3.223:9092,10.201.3.224:9092
    #buffer.memory=134217728
    batch.size=32768
    receive.buffer.bytes=327680
    send.buffer.bytes=262144
    max.request.size=10485760
    linger.ms=3000
  • 启动KafkaMirrorMaker

    --whitelist支持正则匹配符|,也支持,,比如--whitelist 'nginx_log|tomcat_log'

    1
    /usr/local/kafka/bin/kafka-mirror-maker.sh --new.consumer --consumer.config mirror_maker_consumer.config --num.streams 3 --producer.config mirror_maker_producer.config --whitelist="nginx_log"
  • 查看生产消费情况

    在mirror consumer端查看消费情况

    1
    /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 10.201.5.30:9092,10.201.5.31:9092 --describe --group mirrormaker --new-consumer

KafkaOffsetMonitor监控部署

KafkaOffsetMonitor监控Kafka消费队列堆积情况

下载KafkaOffsetMonitor-assembly-0.2.1.jar

1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk 10.201.3.222:2181,10.201.3.223:2181,10.201.3.224:2181 \
--port 8088 \
--refresh 30.seconds \
--retain 7.days

Powered: Hexo, Theme: Nadya remastered from NadyMain