在Linux上部署kafka集群
在Linux上部署kafka集群
00 kafak优点
异步/缓存、缓存/消峰、解耦
数据保存在服务器磁盘上。
- **
Producer
**:消息生产者,就是向Kafka broker发消息的客户端。 - **
Consumer
**:消息消费者,向Kafka broker取消息的客户端。 - **
Consumer Group
**(CG):消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 - **
Broker
**:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 - **
Topic
**:可以理解为一个队列,生产者和消费者面向的都是一个topic。 - **
Partition
**:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个partition是一个有序的队列。 - **
Replica
**:副本。一个topic的每个分区都有若干个副本,一个Leader 和若干个Follower。 - **
Leader
**:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。 - **
Follower
**:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
01 测试环境信息
虚拟机:VMware Workstation Pro 16.2.5 build-20904516
OS:Linux CentOS 7 2009
zookeeper:3.8.1
kafka:2.13-3.4.0
java:openjdk version “20”
ip地址:10.10.0.110,10.10.0.111,10.10.0.112
02 添加hosts配置,设置免密登录
2.1 设置免密登录
生成私钥和公钥
1 |
|
将公钥分发到kafka02、kafka03集群上
1 |
|
2.2 添加hosts配置
1 |
|
03 配置java环境
1 |
|
04 安装zookeeper cluster
zookeeper下载地址:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
zookeeper默认监听端口为:
2181
安装zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42# 下载zookeeper
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz -O /opt/apache-zookeeper-3.8.1-bin.tar.gz --no-check-certificate
tar -zxvf /opt/apache-zookeeper-3.8.1-bin.tar.gz -C /opt/
# 编辑zookeeper配置文件,配置文件名为zoo.cfg
# 将/opt/apache-zookeeper-3.8.1-bin/conf/zoo_sample.cfg 复制为zoo.cfg配置文件。
cp /opt/apache-zookeeper-3.8.1-bin/conf/zoo_sample.cfg /opt/apache-zookeeper-3.8.1-bin/conf/zoo.cfg
# 修改zoo.cfg 配置文件内容
vim /opt/apache-zookeeper-3.8.1-bin/conf/zoo.cfg
# 将 dataDir 路径修改为:
dataDir=/var/kafka/zookeeper
# 添加集群配置
# cluster
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
# 格式: server.A=B:C:D
# A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
# B 是这个服务器的地址;
# C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
# D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
# 配置zookeeper环境变量,三台测试服务器都需要进行配置
vim /etc/profile
export ZOOKEEPER_HOME="/opt/apache-zookeeper-3.8.1-bin"
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 刷新配置
source /etc/profile
# 创建zookeeper数据保存目录
mkdir -p /var/kafka/zookeeper
# 创建集群myid文件,myid需要具有唯一性
# 10.10.0.110 1
# 10.10.0.111 2
# 10.10.0.112 3
vim /var/kafka/zookeeper/myid
1完成配置后,将zookeeper安装包分发到其他两台测试服务器上,需要修改
/var/kafka/zookeeper/myid
为唯一值。1
scp -r /opt/apache-zookeeper-3.8.1-bin 10.10.0.111:`pwd`
创建zookeeper.service配置文件,使用systemd管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20vim /etc/systemd/system/zookeeper.service
cat /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache ZooKeeper server
After=network.target
[Service]
Type=forking
User=root
Group=root
# java 环境变量
Environment=JAVA_HOME=/usr/local/jdk-20
# zookeeper 日志保存目录
Environment=ZOO_LOG_DIR=/var/kafka/zookeeper
ExecStart=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh start
ExecStop=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh stop
ExecReload=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target命令 描述 systemctl start zookeeper 启动zookeeper systemctl stop zookeeper 停止zookeeper systemctl restart zookeeper 重启zookeeper systemctl status zookeeper 查看zookeeper状态 systemctl enable zookeeper 设置zookeeper开机自启动 systemctl disable zookeeper 关闭zookeeper开机自启动 启动zookeeper
1
systemctl start zookeeper
查看zookeeper角色
1
2
3
4
5/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
05 安装kafka cluster
kafka 下载地址:https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
kafka 默认监听端口为:
9092
kafka server.properties
1 |
|
1 |
|
完成配置后,将kafka安装包分发到其他两台测试服务器上,需要修改broker.id=0
为唯一值。
1 |
|
设置kafka 环境暴变量,三台测试服务器都需要进行配置
1 |
|
创建kafka.service配置文件,使用systemd管理
1 |
|
命令 | 描述 |
---|---|
systemctl start kafka | 启动kafka |
systemctl stop kafka | 停止kafka |
systemctl restart kafka | 重启kafka |
systemctl status kafka | 查看kafka状态 |
systemctl enable kafka | 设置kafka开机自启动 |
systemctl disable kafka | 关闭kafka开机自启动 |
06 kafka常用命令
6.1 topic
bin/kafka-topics.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号 |
–topic <String: topic> | 操作的topic名称 |
–create | 创建主题 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题 |
–describe | 查看主题详细描述 |
–partitions <Integer: # of partitions> | 设置分区数 |
–replication-factor<Integer: replication factor> | 设置分区副本 |
–config <String: name=value> | 更新系统默认的配置 |
Topic |
|
kafka-topics.sh –bootstrap-server kafka03:9092 –create –replication-factor 1 –partitions 1 –topic hellokafka | 创建hellokafka topics |
kafka-topics.sh –bootstrap-server kafka03:9092 –list | 查看所有的topics |
kafka-topics.sh –bootstrap-server kafka03:9092 –describe –topic hellokafka | 查看hellokafka topics 详细信息 |
kafka-topics.sh –bootstrap-server kafka03:9092 –alter –topic hellokafka –partitions 3 | 增加Topic 的partition 数,partition 只能增加不能减少。 |
kafka-topics.sh –bootstrap-server kafka03:9092 –topic hellokafka –delete | 删除topics |
kafka-run-class.sh kafka.tools.GetOffsetShell –topic hellokafka –time -1 –broker-list 127.0.0.1:9092 –partitions 0 | 查看topics 指定分区offset 的最大值或最小值。time 为-1时表示最大值,为-2 时表示最小值。 |
6.2 生产者命令行操作
bin/kafka-console-producer.sh
命令 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的**Kafka Broker **主机名称和端口号 |
–topic <String: topic> | 操作的**topic **名称 |
生产者生产消息 |
|
kafka-console-producer.sh –bootstrap-server kafka03:9092 –topic hellokafka | 创建生产者,给指定**topic **生产消息 |
6.3 消费者命令行操作
命令 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–from-beginning | 从头开始消费 |
–group <String: consumer group id> | 指定消费者组名称 |
消费消息 |
|
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –from-beginning | 从头开始消费hellokafka topic 数据 |
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 | 从尾部开始,从尾部开始取数据,必需要指定分区。 |
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 | 指定分区 |
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 –max-messages 1 | 取指定个数--max-messages 1 为个数 |
消费者 Group |
|
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –group hellokafka_group –from-beginning | 指定消费组**Group ** |
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –list | 消费者**Group **列表 |
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –group hellokafka_group –describe | 查看**Group **详情 |
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –group hellokafka_group –topic hellokafka01 –delete | 删除消费**Group 中Topic ** |
07 安装kafka Eagle监控kafka集群
7.1 环境准备
7.1.1 安装kafka-Eagle需要mysql数据库
安装Mysql方法参考:https://www.hesc.info/archives/9
7.1.2 关闭zookeeper和kafka集群
systemctl stop zookeeper.service
systemctl stop kafka.service
7.1.3 修改$KAFKA_HOME/bin/kafka-server-start.sh
修改如下参数:
1
2
3if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi为
1
2
3
4
5
6
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi完成修改后将kafka-service-start.sh 分发到其他的kafka节点。
1scp $KAFKA_HOME/bin/kafka-server-start.sh root@kafka02:$KAFKA_HOME/bin/
7.2 安装kafka-Eagle
7.2.1 下载kafka-Eagle安装包
1 |
|
7.2.2 解压kafka-Eagle安装包
1 |
|
7.2.3 修改kafka-Eagle配置文件
1 |
|
7.3 创建kafka-eagle.service使用systemd管理
1 |
|