Kafka 2.3 集群部署
基础概念
Broker(节点)
一个kafka节点就是一个broker,一个集群包含多个broker
Topic(主题)
信息类别,kafka根据topic对消息进行归类
Producer(生产者)
信息生产者,采用push模式向broker发送消息的一端,Producer可以发布到一个或更多Topic,并且可以选择用于存储数据的Partition
Partition(分区)
一个Topic包含一个或者多个Partition(类似于es的分片),Producer能决定将消息推送到Topic的哪些Partition,并且producer能直接发送消息到对应partition的Leader处
某个topic中的某个partition同时只能有一个Consumer消费
Consumer(消费者)
信息消费者,采用pull模式从broker中读取消息的一端,Consumer可以使用一个或多个Topic或Partition
Consumer Group(消费组)
每个Consumer属于一个特定的Consumer Group,topic会复制到各Consumer Group,但该Consumer Group中只能有一个Consumer能够消费该消息
replication(副本)
Topic的备份,消息以partition为单位分配到多个server,并以partition为单位进行备份,replication有leader和follower两种类型,leader提供服务,follower复制数据
leader 负责跟踪所有的 follower 状态,若Topic下的某个分区的flower比leader落后了太多或超过特定时间未发起数据的Pull求,则leader将其重ISR中移除
每个Topic下的每个分区的leader维护着与其基本保持同步的Replica列表,该列表称为ISR
复制类型:
同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高
异步复制: 只要leader拿到数据立即commit,等follower慢慢复制,可用性高,立即返回,一致性差
Commit: 是指leader告诉客户端这条数据写成功了(kafka尽量保证commit后若leader立即挂掉,其他flower都有该条数据)
在Producer端可以设置究竟使用那种同步方式:
request.required.asks=
0 相当于异步,不需要leader给予回复,producer立即返回,发送就是成功
1 当leader接收到消息之后发送ack,丢会重发,丢的概率很小
-1 当所有的follower都同步消息成功后发送ack. 丢失消息可能性比较低
segment
partition物理上由多个segment组成,同一个topic下有多个不同partition,每一个partition为一个文件夹,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1
offset
offset用于在partition内部唯一标识一条消息
Rebalance(重平衡)
Consumer Group内某个Consumer 故障后,其他Consumer自动重新订阅topic的过程
API(接口)
Kafka 有四个核心API,它们分别是:
Producer API: 它允许应用程序向一个或多个 topics 上发送消息记录
Consumer API: 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
Streams API: 它允许应用程序作为流处理器,从一个或多个Topic中消费输入流并为其生成输出流,有效的将输入流转换为输出流
Connector API: 它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者(关系数据库的连接器可能会捕获对表的所有更改)
集群部署
集群规划
节点ip
192.168.1.13 kafka1
192.168.1.14 kafka2
192.168.1.15 kafka3
192.168.1.16 zookeeper
数据目录: kafka: /usr/local/kafka jdk : /usr/local/jdk
软件下载
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.11-2.3.1.tgz https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
配置文件
zookeeper zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
server.1=192.168.1.16:30001:20001
参数说明:
tickTime: Client和Server或server之间的通信心跳时间(单位毫秒)
initLimit: Leader和Follower之间初始连接时能容忍的最多心跳数
syncLimit:follower与leader之间请求和应答之间能容忍的最多心跳数
dataDir:数据文件目录
clientPort:客户端连接端口
server.N(YYY:A:B): 服务器名称与地址(N服务器编号,Y服务器地址,A是LF通信端口,B选举端口)
zk1,zk2,zk3依次改server.1并且在dataDir下创建一个myid一次写入0,1,2即可 启动zk:
/usr/local/zookeeper/bin/zkServer.sh start
Kafka01 server.properties
broker.id=0
listeners=PLAINTEXT://192.168.1.13:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.16:2181,192.168.1.17:2181,192.168.1.18:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
group.initial.rebalance.delay.ms=0
参数说明:
broker.id:broker编号,集群中所有节点之间不能重复
listeners:监听地址(旧版本为host.name)
num.network.threads:处理网络请求的线程数,线程先将收到的消息放到内存,再从内存写入磁盘
num.io.threads:消息从内存写入磁盘时使用的线程数,处理磁盘IO的线程数
socket.send.buffer.bytes:发送套接字的缓冲区大小
socket.receive.buffer.bytes:接受套接字的缓冲区大小
socket.request.max.bytes:请求套接字的缓冲区大小
log.dirs:数据存放路径(事先创建)
num.partitions:若创建topic时没有给出划分partitions个数,则使用此默认数值代替
num.replica.fetchers: leader中进行复制的线程数,增大这个数值会增加relipca的IO
num.recovery.threads.per.data.dir: 设置恢复和清理data下数据时使用的的线程数,用于在启动时日志恢复/关闭时刷新
log.segment.bytes: 日志文件中每个segment文件的上限容量,默认1G
log.retention.check.interval.ms:定期检查segment文件有没有到达上面指定的限制容量的周期(毫秒)
log.retention.hours:segment文件保留的最长时间,默认7天,超时将被删除,单位hour
delete.topic.enable:物理删除topic需设为true,否则只是标记删除
启动kafka集群: /usr/local/kafka/bin/kafka-server-start.sh -daemon ./usr/local/kafka/config/server.properties
Kafka01,Kafka02,Kafka03只需要将broker.id,listeners修改即可
操作实战
创建topic
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.13:9092 --replication-factor 1 --partitions 3 --topic test-message
查看topic列表
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.1.13:9092
发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.13:9092 --topic test-message
消费消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.15:9092 --topic test-message --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.14:9092 --topic test-message --from-beginning
因为kafka02,kafka03是两个不同的Consumer所以都可以消费此消息
使用Consumer Group
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.14:9092 --topic test-message --from-beginning --group testgroup
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.15:9092 --topic test-message --from-beginning --group testgroup
此时便发现只有一个broker消费
查看topic详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 -describe -topic test-message`
删除topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 --delete --topic test-message`
修改topic Partition数量
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 --alter --partitions 3 --topic test-message
只能增加不能减少,若原有分散策略是hash的方式,将会受影响,发送端(默认10min会刷新本地元信息),消费端无需重启即生效
修改消息过期时间
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 –alter –-topic test-message --config delete.retention.ms=1
查看正在进行消费的group id
/usr/local/kafka/bin/kafka-topics.sh --new-consumer --bootstrap-server 192.168.1.13:9092 --list
通过 group ID 查看当前详细的消费情况
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.13:9092 --new-consumer --group testgroup --describe
重平衡leader
/usr/local/kafka/bin/kafka-preferred-replica-election.sh --zookeeper 192.168.1.13:9092
查看所有kafka节点
/usr/local/zookeeper/bin/zkCli.sh
ls /brokers/ids // 查看节点id
get /brokers/ids/{#} //根据id获取具体节点信息
获取使用者组中所有活动成员的列表
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.13:9092 --describe --group testgroup --members --verbose --all-topics
查看某特定TOPIC在各Broker上的分区容量信息
/usr/local/kafka/bin/kafka-log-dirs.sh --bootstrap-server 192.168.1.13:9092 --describe --topic-list <topics> --broker-list <broker.id_list> | grep '^{' | jq '[ ..|.size? | numbers ] | add'
--bootstrap-server: 必填项, <broker:port>.
--broker-list: 可选, 可指定查看某个broker.id上topic-partitions的size, 默认所有Broker!
--describe: 描述
--topic-list: 要查询的特定 topic 分区在disk上的空间占用情况,默认所有Topic
重启分配分区的topic
新建json文件
topics-to-move.json:
{
"topics": [
{"topic": "foo1"},
{"topic": "foo2"}
],
"version":1
}
使用kafka-reassign-partitions.sh生成分配规则到5/6机器(新节点)
/usr/local/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.13:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" -generate`
此时控制台显示出分配规则的json语句到控制台,复制到新建的expand-cluster-reassignment.json文件中,例如:
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]}]
}
开始分配
/usr/local/kafka/bin/kafka-reassign-partitions.sh--bootstrap-server 192.168.1.13:9092 --reassignment-json-file expand-cluster-reassignment.json –execute`
验证是否完成
/usr/local/kafka/bin/kafka-reassign-partitions.sh--bootstrap-server 192.168.1.13:9092 --reassignment-json-file expand-cluster-reassignment.json –verify`
kafka新建topic时的分区分配策略:随机选取第一个分区节点 ( broker_id递增的顺序 ),然后往后依次增加,在生产的同时进行数据迁移会出现重复数据,
所以迁移的时候避免重复生产数据,应停止迁移主题的生产。同时消费之后出现短暂的leader报错,会自动恢复
logstash 消费 kafka
input{
kafka {
zk_connect => "192.168.1.16:2181,192.168.1.17:2181,192.168.1.18:2181"
group_id => "logstash"
topic_id => "javalog"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
性能测试
消费者测试
/usr/local/kafka/bin/kafka-consumer-perf-test.sh --bootstrap-server 192.168.1.13:9092 --messages 50000000 --topic test-message --threads 1
生产者测试
/usr/local/kafka/bin/kafka-producer-perf-test.sh --broker-list 172.22.241.162:9092 --threads 3 --messages 10000 --batch-size 1 --message-size 1024 --topics test-message --sync
参数说明:
--messages 生产者发送的消息总量
--message-size 每条消息大小
--batch-size 每次批量发送消息的数量
--topics 生产者发送的topic
--threads 生产者使用几个线程同时发送
--producer-num-retries 每条消息失败发送重试次数
--request-timeout-ms 每条消息请求发送超时时间
--compression-codec 设置生产端压缩数据的codec,可选参数:"none","gzip", "snappy"
--producer.config CONFIG-FILE 生成器配置属性文件
--producer-props PROP-NAME = PROP-VALUE [PROP-NAME = PROP-VALUE ...] 生成器相关的配置属性,如bootstrap.servers,client.id等。这些优先于通过--producer.config传递的配置
常见问题
1:消息积压太多,如何处理?
答: 可能max.poll.records(一次poll返回的最大记录数默认是500)和max.poll.interval.ms(两次poll方法最大时间间隔 参数默认是300s)的设置,kafka使用poll()方法拉取信息,每一次拉取500条信息,当超过300s未处理完没有向服务端 发送poll()请求,而心跳heartbeat线程仍然在继续,会认为该consumer锁死,就会将该consumer退出group,并进行 再分配,当新的consumer同样问题时,频繁rebalance,会一直重复导致消息积压,因此调大时间间隔数或改小拉取数 量值(生产中做好消息积压监控和消费速度的监控)
2: kafka为什么这么快?
答:kafka使用零拷贝来处理数据,将记录数据以批次的形式发送并压缩,并按顺序读写的方式来处理数据