kafka
阿里云兼容 kafka 消息队列:https://www.aliyun.com/product/ons
Kafka 是一种高吞吐量的分布式发布订阅消息系统,其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,用于构建实时数据管道和流应用程序
它具有水平可伸缩性、容错性、快速性
常用消息队列对比:
kafka 优势
kafka 为什么性能高?
- 顺序写入,kafka 数据写入磁盘,不是保存在内存,默认保存 168 小时
- kafka 通过 O(1)的磁盘数据结构提供消息的持久化,即使数以 TB 的消息存储也能够保持长时间的稳定性能
- O(1)是最低的时间复杂度,哈希算法就是典型的 O(1) 时间复杂度,无论数据规模多大,都可以在一次计算后找到目标
- 高吞吐量,即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息
- 支持通过 Kafka 服务器分区消息,可以将数据保存到不同的服务器
- 支持 Hadoop 并行数据加载
kafka 角色
broker:中文直译“中间人”,实际就是消息代理,是生产者和消费者中间代理保存消息的中转站,集群中每个 kafka 的 broker 都有唯一的 id,由 server.properties 中的 broker.id 指定,可以把每个 kafka 节点抽象的看成是一个 broker,也可以把整个 kafka 集群抽象的看成是一个 broker
topic:话题,生产者和消费者监听同一个 topic,生产者往里写消息,消费者从里面读消息
partition:分区,也叫分片,物理上的概念,每个分区对应一个文件夹,topic 可以将其消息分片储存,提高性能,然后每个分片做多个副本,保证高可用。
注意:分片数量不要超过 kafka 节点数量;副本数量也不要超过 kafka 节点数量;
- leader:分片副本的角色,主
- follower:分片副本的角色,从
对于一个分片,其副本只有一个是 leader,其他的都是 follower,leader 不能和 follower 在同一个节点,这样就失去了高可用的意义
高可用:当一个节点故障,其他的 follower 会选举出一个作为 leader
1
2
3
4上图中 topic1 分了两片:topic1-part1、topic1-part2;
上图中 topic2 只有一片:topic2-part1
上图中 topic1 和 topic2 的分片都做了三个副本:topicX-part1、topicX-part2、topicX-part3Producer:生产者,负责发布消息到 Kafka broker
Consumer:消费者,每个 consumer 属于一个特定的 consuer group(若不指定 group name 则属于默认 group),使用 consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group 内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息
kafka 和 zookeeper 的关系:
kafka 自身无法实现集群和高可用,kafka 依赖 zookeeper 实现集群和高可用
zookeeper 和 kafka 都可以存储数据,zookeeper 储存单个数据在 1MB 以内,只用来保存服务的元数据,不保存业务信息
- Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信息都会写入 Zookeeper 的 ZNode 节点中
- Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费。注意:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset 存储在本地
- Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个 Leader,这个是依托于 Zookeeper 的选举机制实现的
kafka 部署
1 | kakfa1.ljk.cn:10.0.1.101 |
快速部署:http://kafka.apache.org/quickstart
安装 zookeeper,这里就不配置集群了,安装单机 zookeeper
安装 kafka
1
2
3
4
5
6# kafka 下载页面:http://kafka.apache.org/downloads
[root@kakfa1 src]$tar -xzf kafka_2.13-2.7.0.tgz
[root@kakfa1 src]$mv kafka_2.13-2.7.0 /usr/local/kafka
[root@kakfa1 src]$cd /usr/local/
[root@kakfa1 local]$scp -r ./kafka/ 10.0.1.102:/usr/local
[root@kakfa1 local]$scp -r ./kafka/ 10.0.1.103:/usr/local配置 kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32[root@kakfa1 ~]$vim /etc/hosts
...
10.0.1.101 kafka1.ljk.cn
10.0.1.102 kafka2.ljk.cn
10.0.1.103 kafka3.ljk.cn
10.0.1.101 zk1.ljk.cn # zookeeper地址域名解析
[root@kakfa1 local]$vim kafka/config/server.properties
21 broker.id=1 # 每个 broker 在集群中的唯一标识,正整数
31 listeners=PLAINTEXT://kafka1.ljk.cn:9092
60 log.dirs=/usr/local/kafka/kafka-logs # kakfa用于保存数据的目录,所有的消都会存储在该目录当中
65 num.partitions=1 # 设置创建新topic的默认分区数量
103 log.retention.hours=168 # 设置kafka中消息保留时间,默认为168小时,即7天
# 指定连接的zookeeper的地址,zk中存储了broker的元数据信息,如果zk是集群,多个zk地址使用逗号分割,这里为了方便,使用单机zookeeper,推荐使用域名,如果使用ip可能无法启动,不知道为什么
123 zookeeper.connect=zk1.ljk.cn:2181
126 zookeeper.connection.timeout.ms=6000 # 设置连接zookeeper的超时时间,默认6s
[root@kakfa2 local]$vim kafka/config/server.properties
21 broker.id=2
31 listeners=PLAINTEXT://10.0.1.102:9092
60 log.dirs=/usr/local/kafka/kafka-logs
103 log.retention.hours=168
123 zookeeper.connect=zk1.ljk.cn:2181
126 zookeeper.connection.timeout.ms=6000
[root@kakfa3 local]$vim kafka/config/server.properties
21 broker.id=3
31 listeners=PLAINTEXT://10.0.1.103:9092
60 log.dirs=/usr/local/kafka/kafka-logs
103 log.retention.hours=168
123 zookeeper.connect=zk1.ljk.cn:2181
126 zookeeper.connection.timeout.ms=6000启动 kafka
1
2
3
4
5
6
7[root@kakfa1 bin]$pwd
/usr/local/kafka/bin
[root@kakfa1 bin]$./kafka-server-start.sh -daemon ../config/server.properties
[root@kakfa2 bin]$./kafka-server-start.sh -daemon ../config/server.properties
[root@kakfa3 bin]$./kafka-server-start.sh -daemon ../config/server.properties
测试 kafka 读写数据
http://kafka.apache.org/quickstart
创建 topic
1 | [root@kakfa1 bin]$./kafka-topics.sh --create \ |
- –create:创建 topic
- –zookeeper:指定 zk 地址,虽然配置文件中已经指定了,但是命令行还要指定
- –partitions:指定一个 topic 包含几个 partition,就是对 topic 分片,分片可以提高性能,但是一般不用分片,保持默认值 1 就可以,如果分片,也不要超过节点的数量
- –replication-factor:指定 partition 的副本数量,kafka 实现高可用全靠 partition 的副本,如果设置 3,则一个 partition 就存储 3 份,注意不是 4 份
- –topic:指定名称
假设集群有 4 个 broker,一个 topic 有 4 个 partition,每个 partition 有 3 个副本。下图是每个 broker 上的副本分配情况:
验证 topic
1 | [root@kakfa1 bin]$./kafka-topics.sh --describe \ |
说明:lujinkai 这个 topic 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)
获取所有 topic
1 | [root@kakfa1 bin]$./kafka-topics.sh --list --zookeeper zk1.ljk.cn:2181 |
测试发送消息
1 | [root@kakfa1 bin]$./kafka-console-producer.sh --topic lujinkai \ |
测试获取消息
1 | [root@kafka2 bin]$./kafka-console-consumer.sh --topic lujinkai \ |
- –bootstrap-server:kafak 集群的地址,实际只写一个地址也行
- –from-beginning:从最开始的数据进行消费
删除 topic
1 | [root@kakfa1 bin]$./kafka-topics.sh --delete \ |