单节点部署三台kafka
  1. 下载地址:http://archive.cloudera.com/kafka/kafka/4/kafka-2.2.1-kafka4.1.0.tar.gz

  2. 解压

    tar -zxvf kafka-2.2.1-kafka4.1.0.tar.gz -C ../app
  3. 在部署kafka之前 ,检测zookeeper是ok的

    [zk: localhost:2181(CONNECTED) 0] ls /
    [zktest, tunan, zookeeper, kafka]
  4. 编辑config/server.properties文件

    #broker.id=0
    #log.dirs=/tmp/kafka-logs

    broker.id=0
    host.name=hadoop
    port=9090
    log.dirs=/home/hadoop/tmp/kafka-logs00
    zookeeper.connect=hadoop:2181/kafka
  5. 复制Kafka文件夹为三份

    cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka01
    cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka02
    mv kafka_2.11-2.2.1-kafka-4.1.0/ kafka03
  6. 修改kafka02的config/server.properties文件

    broker.id=1
    host.name=hadoop
    port=9091
    log.dirs=/home/hadoop/tmp/kafka-logs01
    zookeeper.connect=hadoop:2181/kafka
  7. 修改kafka03的config/server.properties文件

    broker.id=2
    host.name=hadoop
    port=9092
    log.dirs=/home/hadoop/tmp/kafka-logs02
    zookeeper.connect=hadoop:2181/kafka
  8. 启动(三台都需要输入命令)

    bin/kafka-server-start.sh -daemon config/server.properties
  9. 创建topic

    ./kafka-topics.sh \
    --create \
    --zookeeper hadoop:2181/kafka \
    --partitions 3 \
    --replication-factor 2 \
    --topic test
  10. 查看topic

    ./kafka-topics.sh \
    --list \
    --zookeeper hadoop:2181/kafka
  11. 查看指定topic的状况

    ./kafka-topics.sh \
    --describe \
    --zookeeper hadoop:2181/kafka \
    --topic test
  12. 测试

    启动生产者

    ./kafka-console-producer.sh \
    --broker-list hadoop:9090,hadoop:9091,hadoop:9092 \
    --topic test

    启动消费者

    bin/kafka-console-consumer.sh \
    --bootstrap-server hadoop:9090,hadoop:9091,hadoop:9092 \
    --from-beginning \
    --topic test

    发送数据

    >a
    >a
    ...

    接收数据

    a
    a
    ...
  13. 关闭kafka集群(每台都要执行)

    修改kafka-server-stop.sh

    PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')

    修改为

    PIDS=$(jps -lm | grep -i 'kafka'| awk '{print $1}')

    命令详解:使用jps -lm命令列出所有的java进程,然后通过管道,利用grep -i ‘kafka.Kafka’命令将kafka进程筛出来,最后再接一管道命令,利用awk将进程号取出来。

    分别执行kafka-server-stop.sh

Author: Tunan
Link: http://yerias.github.io/2020/04/22/kafka/2/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.