一、搭建kafka集群 ( 消息中间件集群 )
1. kafka集群拓扑结构如下
2. 首先要保证zookeeper集群先启动,启动命令:zkServer.sh start
未安装zookeeper集群的小伙伴可参考我这篇博客,地址:https://blog.csdn.net/weixin_44422604/article/details/107145297
3. 下载kafka到node1节点/usr/local/目录下解压,配置环境变量
(1) 进入目录命令:cd /usr/local
(2) 下载kafka命令:wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
(3) 解压命令:tar -xvf kafka_2.11-2.0.0.tgz
(4) 重命名命令:mv kafka_2.11-2.0.0 kafka211_200
(5) 配置环境变量:vi /etc/profile
(6) 刷新环境变量:source /etc/profile
4. 编写 kafka集群自启动脚本到 kafka的bin目录,粘贴后保存
进入目录命令:cd /usr/local/kafka211_200/bin
编辑脚本命令:vi kafka-all.sh
赋予脚本权限:chmod +x kafka-all.sh
#! /bin/bash
# Kafka代理节点地址
hosts=(node1 node2 node3)
# 打印启动分布式脚本信息
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 执行分布式开启命令
function start()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
sleep 1
done
}
# 执行分布式关闭命令
function stop()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
sleep 1
done
}
# 查看Kafka代理节点状态
function status()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
sleep 1
done
}
# 判断输入的Kafka命令参数是否有效
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|status}"
RETVAL=1
esac
5. 修改 kafka安装目录conf里的配置文件server.properties
进入目录命令:cd /usr/local/kafka211_200/config
编辑配置命令(修改为下面的配置):vi server.properties
#节点id
broker.id=0
#设置消息记录存储路径
log.dirs=/opt/kafka-logs
#设置zk地址(根据自己zookeeper节点位置配置)
zookeeper.connect=node1:2181,node2:2181,node3:2181
#开启删除kafka主题属性(这个自己添加,没有修改项)
delete.topic.enable=true
#非sasl模式配置kafka集群,注意别的节点中这里要设置为node2,node3
listeners=PLAINTEXT://:9092
#下面内容自己添加,注意修改为自己的ip位置
port=9092
host.name=192.168.28.201
advertised.host.name=192.168.28.201
advertised.port=9092
#设置网络请求处理线程数
num.network.threads=10
#设置磁盘io请求线程数
num.io.threads=20
#设置发送buffer字节数
socket.send.buffer.bytes=1024000
#设置接收buffer字节数
socket.receive.buffer.bytes=1024000
#设置最大请求字节数
socket.request.max.bytes=1048576000
#设置分区数
num.partitions=6
#设置主题保留时间
log.retention.hours=168
6. 同步配置完成node1中的kafka到node2,node3节点上
进入kafka安装目录:cd /usr/local
同步node2命令:scp -r kafka211_200 node2:/usr/local
同步node3命令:scp -r kafka211_200 node3:/usr/local
像node1中一样,修改环境变量
刷新环境变量:source /etc/profile
7. 修改node2,3中,kafka目录config中的server.properties配置
修改broker.id分别为 1 2
修改ip为对应节点ip
8. 在node1节点上执行kafka-all.sh脚本,启动kafka集群
执行命令:kafka-all.sh start
9. 设置kafka服务自启动,注意:在 node1,2,3 上完成
(1) 进入到 /etc/rc.d/init.d目录下,创建一个 kafkaCluster 脚本,命令:vi /etc/rc.d/init.d/kafkaCluster
#! /bin/bash
#chkconfig: 345 93 88
#description: kafka cluster node1 node2 node3
#processname:kafka_cluster
# Kafka代理节点地址
hosts=(node1 node2 node3)
# 打印启动分布式脚本信息
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 执行分布式开启命令
function start()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
sleep 1
done
}
# 执行分布式关闭命令
function stop()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
sleep 1
done
}
# 查看Kafka代理节点状态
function status()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
sleep 1
done
}
# 判断输入的Kafka命令参数是否有效
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|status}"
RETVAL=1
esac
(2) 粘贴保存后,修改执行权限
进入目录命令:cd /etc/rc.d/init.d
修改执行权限:chmod +x kafkaCluster
(3) 加入到 chkconfig 服务列表中
加入列表命令:chkconfig --add kafkaCluster
查看列表命令:chkconfig --list
(4) 以后就可以用以下命令启动服务了
启动命令:service kafkaCluster start
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13725.html