基础概念

Broker(节点)

一个kafka节点就是一个broker,一个集群包含多个broker

Topic(主题)

信息类别,kafka根据topic对消息进行归类

Producer(生产者)

信息生产者,采用push模式向broker发送消息的一端,Producer可以发布到一个或更多Topic,并且可以选择用于存储数据的Partition

Partition(分区)

一个Topic包含一个或者多个Partition(类似于es的分片),Producer能决定将消息推送到Topic的哪些Partition,并且producer能直接发送消息到对应partition的Leader处
某个topic中的某个partition同时只能有一个Consumer消费

Consumer(消费者)

信息消费者,采用pull模式从broker中读取消息的一端,Consumer可以使用一个或多个Topic或Partition

Consumer Group(消费组)

每个Consumer属于一个特定的Consumer Group,topic会复制到各Consumer Group,但该Consumer Group中只能有一个Consumer能够消费该消息

replication(副本)

Topic的备份,消息以partition为单位分配到多个server,并以partition为单位进行备份,replication有leader和follower两种类型,leader提供服务,follower复制数据
leader 负责跟踪所有的 follower 状态,若Topic下的某个分区的flower比leader落后了太多或超过特定时间未发起数据的Pull求,则leader将其重ISR中移除
每个Topic下的每个分区的leader维护着与其基本保持同步的Replica列表,该列表称为ISR 
复制类型:
同步复制:  只有所有的follower把数据拿过去后才commit,一致性好,可用性不高
异步复制:  只要leader拿到数据立即commit,等follower慢慢复制,可用性高,立即返回,一致性差
Commit:   是指leader告诉客户端这条数据写成功了(kafka尽量保证commit后若leader立即挂掉,其他flower都有该条数据)
在Producer端可以设置究竟使用那种同步方式:
    request.required.asks=
        0   相当于异步,不需要leader给予回复,producer立即返回,发送就是成功
        1   当leader接收到消息之后发送ack,丢会重发,丢的概率很小
       -1   当所有的follower都同步消息成功后发送ack.  丢失消息可能性比较低

segment

partition物理上由多个segment组成,同一个topic下有多个不同partition,每一个partition为一个文件夹,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1

offset

    offset用于在partition内部唯一标识一条消息

Rebalance(重平衡)

Consumer Group内某个Consumer 故障后,其他Consumer自动重新订阅topic的过程

API(接口)

Kafka 有四个核心API,它们分别是:
Producer API:   它允许应用程序向一个或多个 topics 上发送消息记录
Consumer API:   允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
Streams  API:   它允许应用程序作为流处理器,从一个或多个Topic中消费输入流并为其生成输出流,有效的将输入流转换为输出流
Connector API:  它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者(关系数据库的连接器可能会捕获对表的所有更改)

集群部署

集群规划

节点ip

192.168.1.13        kafka1
192.168.1.14        kafka2
192.168.1.15        kafka3
192.168.1.16        zookeeper

数据目录: kafka: /usr/local/kafka jdk : /usr/local/jdk

软件下载

https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.11-2.3.1.tgz https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

配置文件

zookeeper zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
server.1=192.168.1.16:30001:20001

参数说明:

tickTime: Client和Server或server之间的通信心跳时间(单位毫秒)
initLimit: Leader和Follower之间初始连接时能容忍的最多心跳数
syncLimit:follower与leader之间请求和应答之间能容忍的最多心跳数
dataDir:数据文件目录
clientPort:客户端连接端口
server.N(YYY:A:B): 服务器名称与地址(N服务器编号,Y服务器地址,A是LF通信端口,B选举端口)

zk1,zk2,zk3依次改server.1并且在dataDir下创建一个myid一次写入0,1,2即可 启动zk: /usr/local/zookeeper/bin/zkServer.sh start

Kafka01 server.properties

broker.id=0
listeners=PLAINTEXT://192.168.1.13:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.16:2181,192.168.1.17:2181,192.168.1.18:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true  
group.initial.rebalance.delay.ms=0

参数说明:

broker.id:broker编号,集群中所有节点之间不能重复
listeners:监听地址(旧版本为host.name)
num.network.threads:处理网络请求的线程数,线程先将收到的消息放到内存,再从内存写入磁盘
num.io.threads:消息从内存写入磁盘时使用的线程数,处理磁盘IO的线程数
socket.send.buffer.bytes:发送套接字的缓冲区大小
socket.receive.buffer.bytes:接受套接字的缓冲区大小
socket.request.max.bytes:请求套接字的缓冲区大小
log.dirs:数据存放路径(事先创建)
num.partitions:若创建topic时没有给出划分partitions个数,则使用此默认数值代替
num.replica.fetchers: leader中进行复制的线程数,增大这个数值会增加relipca的IO
num.recovery.threads.per.data.dir: 设置恢复和清理data下数据时使用的的线程数,用于在启动时日志恢复/关闭时刷新
log.segment.bytes: 日志文件中每个segment文件的上限容量,默认1G
log.retention.check.interval.ms:定期检查segment文件有没有到达上面指定的限制容量的周期(毫秒)
log.retention.hours:segment文件保留的最长时间,默认7天,超时将被删除,单位hour
delete.topic.enable:物理删除topic需设为true,否则只是标记删除

启动kafka集群: /usr/local/kafka/bin/kafka-server-start.sh -daemon ./usr/local/kafka/config/server.properties

Kafka01,Kafka02,Kafka03只需要将broker.id,listeners修改即可

操作实战

创建topic

/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.13:9092 --replication-factor 1 --partitions 3 --topic test-message

查看topic列表

/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.1.13:9092

发送消息

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list  192.168.1.13:9092  --topic test-message

消费消息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.15:9092  --topic test-message --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.14:9092  --topic test-message --from-beginning

因为kafka02,kafka03是两个不同的Consumer所以都可以消费此消息

使用Consumer Group

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.14:9092 --topic test-message --from-beginning --group testgroup
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.15:9092 --topic test-message --from-beginning --group testgroup

此时便发现只有一个broker消费

查看topic详情

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 -describe -topic test-message`

删除topic

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 --delete --topic test-message`

修改topic Partition数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 --alter --partitions 3 --topic test-message

只能增加不能减少,若原有分散策略是hash的方式,将会受影响,发送端(默认10min会刷新本地元信息),消费端无需重启即生效

修改消息过期时间

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.13:9092 –alter –-topic test-message --config delete.retention.ms=1

查看正在进行消费的group id

/usr/local/kafka/bin/kafka-topics.sh --new-consumer --bootstrap-server 192.168.1.13:9092 --list

通过 group ID 查看当前详细的消费情况

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.13:9092 --new-consumer  --group testgroup --describe

重平衡leader

/usr/local/kafka/bin/kafka-preferred-replica-election.sh --zookeeper 192.168.1.13:9092

查看所有kafka节点

/usr/local/zookeeper/bin/zkCli.sh
ls /brokers/ids         // 查看节点id
get /brokers/ids/{#}    //根据id获取具体节点信息

获取使用者组中所有活动成员的列表

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.13:9092 --describe --group testgroup --members --verbose --all-topics

查看某特定TOPIC在各Broker上的分区容量信息

/usr/local/kafka/bin/kafka-log-dirs.sh --bootstrap-server 192.168.1.13:9092  --describe --topic-list <topics> --broker-list <broker.id_list> | grep '^{' | jq '[ ..|.size? | numbers ] | add'
--bootstrap-server:     必填项, <broker:port>.
--broker-list:  可选, 可指定查看某个broker.id上topic-partitions的size, 默认所有Broker!
--describe:     描述
--topic-list:   要查询的特定 topic 分区在disk上的空间占用情况,默认所有Topic

重启分配分区的topic

新建json文件

topics-to-move.json:

{
  "topics": [
    {"topic": "foo1"},     
    {"topic": "foo2"}
  ],
  "version":1
}
使用kafka-reassign-partitions.sh生成分配规则到5/6机器(新节点)
/usr/local/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.13:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6"  -generate`

此时控制台显示出分配规则的json语句到控制台,复制到新建的expand-cluster-reassignment.json文件中,例如:

{"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},        
 		        {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]}]
}
开始分配
/usr/local/kafka/bin/kafka-reassign-partitions.sh--bootstrap-server 192.168.1.13:9092 --reassignment-json-file expand-cluster-reassignment.json –execute`
验证是否完成
/usr/local/kafka/bin/kafka-reassign-partitions.sh--bootstrap-server 192.168.1.13:9092  --reassignment-json-file expand-cluster-reassignment.json –verify`
kafka新建topic时的分区分配策略:随机选取第一个分区节点 ( broker_id递增的顺序 ),然后往后依次增加,在生产的同时进行数据迁移会出现重复数据,
                              所以迁移的时候避免重复生产数据,应停止迁移主题的生产。同时消费之后出现短暂的leader报错,会自动恢复

logstash 消费 kafka

input{
    kafka {
        zk_connect => "192.168.1.16:2181,192.168.1.17:2181,192.168.1.18:2181"
        group_id => "logstash"
        topic_id => "javalog"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
}

性能测试

消费者测试
/usr/local/kafka/bin/kafka-consumer-perf-test.sh --bootstrap-server 192.168.1.13:9092 --messages 50000000 --topic test-message --threads 1
生产者测试
/usr/local/kafka/bin/kafka-producer-perf-test.sh --broker-list 172.22.241.162:9092 --threads 3 --messages 10000 --batch-size 1 --message-size 1024 --topics test-message  --sync

参数说明:

--messages                          生产者发送的消息总量
--message-size                      每条消息大小
--batch-size                        每次批量发送消息的数量
--topics                            生产者发送的topic
--threads                           生产者使用几个线程同时发送
--producer-num-retries              每条消息失败发送重试次数
--request-timeout-ms                每条消息请求发送超时时间
--compression-codec                 设置生产端压缩数据的codec,可选参数:"none""gzip""snappy"
--producer.config CONFIG-FILE       生成器配置属性文件
--producer-props PROP-NAME = PROP-VALUE [PROP-NAME = PROP-VALUE ...]                  生成器相关的配置属性,如bootstrap.servers,client.id等。这些优先于通过--producer.config传递的配置

常见问题

1:消息积压太多,如何处理?

答: 可能max.poll.records(一次poll返回的最大记录数默认是500)和max.poll.interval.ms(两次poll方法最大时间间隔 参数默认是300s)的设置,kafka使用poll()方法拉取信息,每一次拉取500条信息,当超过300s未处理完没有向服务端 发送poll()请求,而心跳heartbeat线程仍然在继续,会认为该consumer锁死,就会将该consumer退出group,并进行 再分配,当新的consumer同样问题时,频繁rebalance,会一直重复导致消息积压,因此调大时间间隔数或改小拉取数 量值(生产中做好消息积压监控和消费速度的监控)

2: kafka为什么这么快?

答:kafka使用零拷贝来处理数据,将记录数据以批次的形式发送并压缩,并按顺序读写的方式来处理数据