下载地址:http://archive.cloudera.com/kafka/kafka/4/kafka-2.2.1-kafka4.1.0.tar.gz
解压
tar -zxvf kafka-2.2.1-kafka4.1.0.tar.gz -C ../app
在部署kafka之前 ,检测zookeeper是ok的
[zk: localhost:2181(CONNECTED) 0] ls /
[zktest, tunan, zookeeper, kafka]编辑config/server.properties文件
broker.id=0
log.dirs=/tmp/kafka-logs
broker.id=0
host.name=hadoop
port=9090
log.dirs=/home/hadoop/tmp/kafka-logs00
zookeeper.connect=hadoop:2181/kafka复制Kafka文件夹为三份
cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka01
cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka02
mv kafka_2.11-2.2.1-kafka-4.1.0/ kafka03修改kafka02的config/server.properties文件
broker.id=1
host.name=hadoop
port=9091
log.dirs=/home/hadoop/tmp/kafka-logs01
zookeeper.connect=hadoop:2181/kafka修改kafka03的config/server.properties文件
broker.id=2
host.name=hadoop
port=9092
log.dirs=/home/hadoop/tmp/kafka-logs02
zookeeper.connect=hadoop:2181/kafka启动(三台都需要输入命令)
bin/kafka-server-start.sh -daemon config/server.properties
创建topic
./kafka-topics.sh \
--create \
--zookeeper hadoop:2181/kafka \
--partitions 3 \
--replication-factor 2 \
--topic test查看topic
./kafka-topics.sh \
--list \
--zookeeper hadoop:2181/kafka查看指定topic的状况
./kafka-topics.sh \
--describe \
--zookeeper hadoop:2181/kafka \
--topic test测试
启动生产者
./kafka-console-producer.sh \
--broker-list hadoop:9090,hadoop:9091,hadoop:9092 \
--topic test启动消费者
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop:9090,hadoop:9091,hadoop:9092 \
--from-beginning \
--topic test发送数据
a
a
...接收数据
a
a
...关闭kafka集群(每台都要执行)
修改kafka-server-stop.sh
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
修改为
PIDS=$(jps -lm | grep -i 'kafka'| awk '{print $1}')
命令详解:使用jps -lm命令列出所有的java进程,然后通过管道,利用grep -i ‘kafka.Kafka’命令将kafka进程筛出来,最后再接一管道命令,利用awk将进程号取出来。
分别执行kafka-server-stop.sh