在Linux上部署kafka集群

logo-kafka

在Linux上部署kafka集群

00 kafak优点

异步/缓存、缓存/消峰、解耦

数据保存在服务器磁盘上。

image-20230406235150920

  1. **Producer**:消息生产者,就是向Kafka broker发消息的客户端。
  2. **Consumer**:消息消费者,向Kafka broker取消息的客户端。
  3. **Consumer Group**(CG):消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  4. **Broker**:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  5. **Topic**:可以理解为一个队列,生产者和消费者面向的都是一个topic。
  6. **Partition**:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个partition是一个有序的队列。
  7. **Replica**:副本。一个topic的每个分区都有若干个副本,一个Leader 和若干个Follower。
  8. **Leader**:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  9. **Follower**:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

01 测试环境信息

虚拟机:VMware Workstation Pro 16.2.5 build-20904516
OS:Linux CentOS 7 2009
zookeeper:3.8.1
kafka:2.13-3.4.0
java:openjdk version “20”
ip地址:10.10.0.110,10.10.0.111,10.10.0.112

02 添加hosts配置,设置免密登录

2.1 设置免密登录

生成私钥和公钥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@kafka01 ~]# ssh-keygen 
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:YbR3Ixp4Naf4wLFGCIHmgc05moFa0N0bbOBhSzSef5I root@kafka02
The key's randomart image is:
+---[RSA 2048]----+
|o* =X* .+ o . |
|+ @=.=*= * + |
|.B o=..o% + o |
|+ . ..= B o . |
| E S . |
| o |
| |
| |
| |
+----[SHA256]-----+

将公钥分发到kafka02、kafka03集群上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@linux-kafka01 ~]# ssh-copy-id root@kafka02
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub"
The authenticity of host 'kafka02 (10.10.0.111)' can't be established.
ECDSA key fingerprint is SHA256:j9zdg1ssn53fjKEGbDn+M9ytFgGfWlPo/Aw9taSxKOI.
ECDSA key fingerprint is MD5:20:fb:c8:10:e0:95:c1:fc:b3:dd:f4:f9:70:00:43:00.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@kafka01's password:

Number of key(s) added: 1

Now try logging into the machine, with: "ssh 'root@kafka02'"
and check to make sure that only the key(s) you wanted were added.

2.2 添加hosts配置

1
2
3
4
vim /etc/hosts
10.10.0.110 kafka01
10.10.0.111 kafka02
10.10.0.112 kafka03

03 配置java环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# java安装包下载链接:https://d6.injdk.cn/openjdk/openjdk/20/openjdk-20_linux-x64_bin.tar.gz
# 下载java安装包
wget https://d6.injdk.cn/openjdk/openjdk/20/openjdk-20_linux-x64_bin.tar.gz -O /opt/openjdk-20_linux-x64_bin.tar.gz
tar -zxvf /opt/openjdk-20_linux-x64_bin.tar.gz -C /usr/local/

# 配置Java环境变量
vim /etc/profile
export JAVA_HOME="/usr/local/jdk-20"
export PATH=$PATH:$JAVA_HOME/bin

# 刷新配置
source /etc/profile

# 检查Java环境变量是否生效
java -version
openjdk version "20" 2023-03-21
OpenJDK Runtime Environment (build 20+36-2344)
OpenJDK 64-Bit Server VM (build 20+36-2344, mixed mode, sharing)

04 安装zookeeper cluster

zookeeper下载地址:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz

zookeeper默认监听端口为:2181

  • 安装zookeeper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    # 下载zookeeper
    wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz -O /opt/apache-zookeeper-3.8.1-bin.tar.gz --no-check-certificate
    tar -zxvf /opt/apache-zookeeper-3.8.1-bin.tar.gz -C /opt/

    # 编辑zookeeper配置文件,配置文件名为zoo.cfg
    # 将/opt/apache-zookeeper-3.8.1-bin/conf/zoo_sample.cfg 复制为zoo.cfg配置文件。
    cp /opt/apache-zookeeper-3.8.1-bin/conf/zoo_sample.cfg /opt/apache-zookeeper-3.8.1-bin/conf/zoo.cfg

    # 修改zoo.cfg 配置文件内容
    vim /opt/apache-zookeeper-3.8.1-bin/conf/zoo.cfg
    # 将 dataDir 路径修改为:
    dataDir=/var/kafka/zookeeper

    # 添加集群配置
    # cluster
    server.1=kafka01:2888:3888
    server.2=kafka02:2888:3888
    server.3=kafka03:2888:3888

    # 格式: server.A=B:C:D
    # A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
    # B 是这个服务器的地址;
    # C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
    # D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    # 配置zookeeper环境变量,三台测试服务器都需要进行配置
    vim /etc/profile
    export ZOOKEEPER_HOME="/opt/apache-zookeeper-3.8.1-bin"
    export PATH=$PATH:$ZOOKEEPER_HOME/bin

    # 刷新配置
    source /etc/profile

    # 创建zookeeper数据保存目录
    mkdir -p /var/kafka/zookeeper

    # 创建集群myid文件,myid需要具有唯一性
    # 10.10.0.110 1
    # 10.10.0.111 2
    # 10.10.0.112 3
    vim /var/kafka/zookeeper/myid
    1

    完成配置后,将zookeeper安装包分发到其他两台测试服务器上,需要修改/var/kafka/zookeeper/myid为唯一值。

    1
    scp -r /opt/apache-zookeeper-3.8.1-bin 10.10.0.111:`pwd`
  • 创建zookeeper.service配置文件,使用systemd管理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    vim /etc/systemd/system/zookeeper.service
    cat /etc/systemd/system/zookeeper.service
    [Unit]
    Description=Apache ZooKeeper server
    After=network.target

    [Service]
    Type=forking
    User=root
    Group=root
    # java 环境变量
    Environment=JAVA_HOME=/usr/local/jdk-20
    # zookeeper 日志保存目录
    Environment=ZOO_LOG_DIR=/var/kafka/zookeeper
    ExecStart=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh start
    ExecStop=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh stop
    ExecReload=/opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh restart

    [Install]
    WantedBy=multi-user.target
    命令 描述
    systemctl start zookeeper 启动zookeeper
    systemctl stop zookeeper 停止zookeeper
    systemctl restart zookeeper 重启zookeeper
    systemctl status zookeeper 查看zookeeper状态
    systemctl enable zookeeper 设置zookeeper开机自启动
    systemctl disable zookeeper 关闭zookeeper开机自启动
  • 启动zookeeper

    1
    systemctl start zookeeper
  • 查看zookeeper角色

    1
    2
    3
    4
    5
    /opt/apache-zookeeper-3.8.1-bin/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: follower

05 安装kafka cluster

kafka 下载地址:https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz

kafka 默认监听端口为:9092

kafka server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 每个broker在集群中唯一存在,数值必须是正数;kafka根据id来识别broker机器,当该服务的地址发生变化时,broker.id没有变化,则不会影响consumers的消息情况。
broker.id=0

# broker server服务端口
port=9092

# broker的主机地址
host.name=localhost

# 套接字服务器侦听的地址
listeners=PLAINTEXT://192.168.243.133:9092

# 代理将向生产者和消费者公布主机名和端口。如果没有设置,
# 如果已配置,它将使用“侦听器”的值。否则,它将使用该值从java.net.InetAddress.getCanonicalHostName()返回
advertised.listeners=PLAINTEXT://192.168.243.133:9092

# broker处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3

# broker处理磁盘IO的线程数,数值应该大于你的硬盘数
num.io.threads=8

# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400

# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600

# kafka数据的存放地址,多个地址的话用逗号分割/root/kafka_2.12-2.8.0/log-1,/root/kafka_2.12-2.8.0/log-2
log.dirs=/root/kafka_2.12-2.8.0/logs

# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1

# 在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数。
num.recovery.threads.per.data.dir=1

# 组元数据内部主题“__consumer_offsets”和“__transaction_state”的复制因子
# 对于除开发测试之外的任何其他内容,建议使用大于1的值以确保可用性,例如3。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 日志文件因使用年限而有资格删除的最短使用年限
log.retention.hours=168

# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824

# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000

# 必须配置项,zookeeper集群的地址,可以是多个,以逗号分隔开,一般端口都为2181。hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=192.168.243.133:2181

# ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms=18000

# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
#zookeeper.session.timeout.ms=6000

#ZooKeeper集群中leader和follower之间的同步实际时间
#zookeeper.sync.time.ms =2000

#随着新成员加入组,重新平衡将进一步延迟的时长,默认最大延迟为3秒,这里将其覆盖为0,因为这有助于为开发和测试提供更好的开箱即用体验。
#但是,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间进行不必要且可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 下载kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz -O /opt/kafka_2.13-3.4.0.tgz
tar -zxvf /opt/kafka_2.13-3.4.0.tgz -C /opt/

# 编辑server.properties
vim /opt/kafka_2.13-3.4.0/config/server.properties

# 修改broker.id,集群里面的修改broker.id需要设置为唯一值。
# 10.10.0.110 0
# 10.10.0.111 1
# 10.10.0.112 2
broker.id=0

# 修改log.dirs kafka日志保存路径
log.dirs=/var/kafka/kafka-logs

# 修改zookeeper链接配置。有多少台zookeeper需要配置多少台,2181为zookeeper通信端口。
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

完成配置后,将kafka安装包分发到其他两台测试服务器上,需要修改broker.id=0为唯一值。

1
scp -r /opt/kafka_2.13-3.4.0 10.10.0.111:`pwd`

设置kafka 环境暴变量,三台测试服务器都需要进行配置

1
2
3
4
5
6
vim /etc/profile
export KAFKA_HOME="/opt/kafka_2.13-3.4.0"
export PATH=$PATH:$KAFKA_HOME/bin

# 刷新配置
source /etc/profile

创建kafka.service配置文件,使用systemd管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
vim /var/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server
After=network.target

[Service]
Type=simple
User=root
Group=root
# java 环境变量
Environment=JAVA_HOME=/usr/local/jdk-20
# kafka 环境变量
Environment=KAFKA_HOME=/opt/kafka_2.13-3.4.0
ExecStart=/opt/kafka_2.13-3.4.0/bin/kafka-server-start.sh /opt/kafka_2.13-3.4.0/config/server.properties
ExecStop=/opt/kafka_2.13-3.4.0/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
命令 描述
systemctl start kafka 启动kafka
systemctl stop kafka 停止kafka
systemctl restart kafka 重启kafka
systemctl status kafka 查看kafka状态
systemctl enable kafka 设置kafka开机自启动
systemctl disable kafka 关闭kafka开机自启动

06 kafka常用命令

6.1 topic

bin/kafka-topics.sh

参数 描述
–bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号
–topic <String: topic> 操作的topic名称
–create 创建主题
–delete 删除主题。
–alter 修改主题。
–list 查看所有主题
–describe 查看主题详细描述
–partitions <Integer: # of partitions> 设置分区数
–replication-factor<Integer: replication factor> 设置分区副本
–config <String: name=value> 更新系统默认的配置
Topic
kafka-topics.sh –bootstrap-server kafka03:9092 –create –replication-factor 1 –partitions 1 –topic hellokafka 创建hellokafka topics
kafka-topics.sh –bootstrap-server kafka03:9092 –list 查看所有的topics
kafka-topics.sh –bootstrap-server kafka03:9092 –describe –topic hellokafka 查看hellokafka topics详细信息
kafka-topics.sh –bootstrap-server kafka03:9092 –alter –topic hellokafka –partitions 3 增加Topicpartition数,partition只能增加不能减少。
kafka-topics.sh –bootstrap-server kafka03:9092 –topic hellokafka –delete 删除topics
kafka-run-class.sh kafka.tools.GetOffsetShell –topic hellokafka –time -1 –broker-list 127.0.0.1:9092 –partitions 0 查看topics指定分区offset的最大值或最小值。time为-1时表示最大值,为-2 时表示最小值。

6.2 生产者命令行操作

bin/kafka-console-producer.sh

命令 描述
–bootstrap-server <String: server toconnect to> 连接的**Kafka Broker**主机名称和端口号
–topic <String: topic> 操作的**topic**名称
生产者生产消息
kafka-console-producer.sh –bootstrap-server kafka03:9092 –topic hellokafka 创建生产者,给指定**topic**生产消息

6.3 消费者命令行操作

命令 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
–topic <String: topic> 操作的 topic 名称
–from-beginning 从头开始消费
–group <String: consumer group id> 指定消费者组名称
消费消息
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –from-beginning 从头开始消费hellokafka topic数据
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 从尾部开始,从尾部开始取数据,必需要指定分区。
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 指定分区
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –offset latest –partition 0 –max-messages 1 取指定个数--max-messages 1为个数
消费者 Group
kafka-console-consumer.sh –bootstrap-server kafka03:9092 –topic hellokafka –group hellokafka_group –from-beginning 指定消费组**Group**
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –list 消费者**Group**列表
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –group hellokafka_group –describe 查看**Group**详情
kafka-consumer-groups.sh –bootstrap-server kafka03:9092 –group hellokafka_group –topic hellokafka01 –delete 删除消费**GroupTopic**

07 安装kafka Eagle监控kafka集群

7.1 环境准备

7.1.1 安装kafka-Eagle需要mysql数据库

安装Mysql方法参考:https://www.hesc.info/archives/9

7.1.2 关闭zookeeper和kafka集群

systemctl stop zookeeper.service
systemctl stop kafka.service

7.1.3 修改$KAFKA_HOME/bin/kafka-server-start.sh

修改如下参数:

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

1
2
3
4
5
6
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

完成修改后将kafka-service-start.sh 分发到其他的kafka节点。

1
scp $KAFKA_HOME/bin/kafka-server-start.sh root@kafka02:$KAFKA_HOME/bin/

7.2 安装kafka-Eagle

7.2.1 下载kafka-Eagle安装包

1
2
# 下载kafka-Eagle安装包
wget https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/refs/tags/v3.0.1 -O /opt/kafka-eagle-bin-2.0.8.tar.gz

7.2.2 解压kafka-Eagle安装包

1
2
3
4
# 解压kafka-Eagle安装包,解压两次....
tar -zxvf /opt/kafka-eagle-bin-3.0.1.tar.gz
cd /opt/kafka-eagle-bin-3.0.1
tar -zxvf efak-webweb-3.0.1-bin.tar.gz -C /opt

7.2.3 修改kafka-Eagle配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# 修改kafka-Eagle配置文件
# 配置文件路径:/opt/efak-web-3.0.1/conf/system-config.properties
vim /opt/efak-web-3.0.1/conf/system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 需要修改,连接kafka集群配置
efak.zk.cluster.alias=cluster1
cluster1.zk.list=kafka01:2181,kafka02:2181,kafka03:2181
# cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
# 需要修改,offset保存在kafka
cluster1.efak.offset.storage=kafka
# cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
# 需要修改,连接mysql数据客库配置
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://10.10.0.100:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root

# 添加kafka-Eagle环境变量
vim /etc/profile
export KE_HOME="/opt/efak-web-3.0.1"
export PATH=$PATH:$KE_HOME/bin

# 刷新/etc/profile文件
source /etc/profile

# 启动前需要先启动zookeeper和kafka
systemctl start zookeeper
systemctl start kafka

# 使用jps查看服务状态
[root@kafka01 conf]# jps
1049 QuorumPeerMain
10715 Kafka
18215 Jps

# 启动kafka-Eagle
/opt/efak-web-3.0.1/bin/ke.sh start
[2023-04-06 23:43:26] INFO: Port Progress: [##################################################] | 100%
[2023-04-06 23:43:29] INFO: Config Progress: [##################################################] | 100%
[2023-04-06 23:43:32] INFO: Startup Progress: [##################################################] | 100%
[2023-04-06 23:43:22] INFO: Status Code[0]
[2023-04-06 23:43:22] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )

Version v3.0.1 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.10.0.110:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
## 启动成功!!!!!!!!!

7.3 创建kafka-eagle.service使用systemd管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
vim /etc/systemd/system/kafka-eagle.service
[Unit]
Description=Kafka Eagle
After=kafka.service

[Service]
Environment=KE_HOME=/opt/efak-web-3.0.1
Environment=JAVA_HOME=/usr/local/jdk-20
User=root
Group=root
Type=forking

ExecStart=/opt/efak-web-3.0.1/bin/ke.sh start
ExecReload=/opt/efak-web-3.0.1/bin/ke.sh restart
ExecStop=/opt/efak-web-3.0.1/bin/ke.sh stop

#启动失败后重启
Restart=on-failure
#每次尝试重启间隔60秒
StartLimitInterval=60
#最终尝试重启50次
StartLimitBurst=50

[Install]
WantedBy=multi-user.target

在Linux上部署kafka集群
https://hesc.info/d0d17af205b2/
作者
需要哈气的纸飞机
发布于
2024年8月14日
许可协议