Linux安装Java、Zookeeper、Kafka


安装Java

下载

https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/

[root@zookeeper ~]#cd /usr/local/src
[root@zookeeper src]# wget https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_hotspot_8u265b01.tar.gz
[root@zookeeper src]# tar -zxf OpenJDK8U-jdk_x64_linux_hotspot_8u252b09.tar.gz
[root@zookeeper src]# mv jdk8u252-b09/ /usr/local/jdk8
[root@zookeeper src]# cd /usr/local/jdk8

环境变量

[root@zookeeper src]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk8

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$PATH:$JAVA_HOME/bin

[root@zookeeper src]# source /etc/profile

查看是否安装成功

[root@zookeeper src]# java -version

安装Zookeeper

下载

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

[root@zookeeper src]# wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
[root@zookeeper src]# tar -zxf apache-zookeeper-3.6.1-bin.tar.gz
[root@zookeeper src]# mv apache-zookeeper-3.6.1-bin /usr/local/zookeeper
[root@zookeeper src]# cd /usr/local/zookeeper

配置

[root@zookeeper zookeeper]# mkdir data
[root@zookeeper zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg
[root@zookeeper zookeeper]# vim conf/zoo.cfg ## 修改 dataDir=/usr/local/zookeeper/data

启动

[root@zookeeper zookeeper]# ./bin/zkServer.sh start

重启

[root@zookeeper zookeeper]# ./bin/zkServer.sh restart

停止

[root@zookeeper zookeeper]# ./bin/zkServer.sh stop

状态

[root@zookeeper zookeeper]# ./bin/zkServer.sh status

安装Kafka

下载

https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/

[root@zookeeper src]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
[root@zookeeper src]# tar -zxf kafka_2.13-2.5.0.tgz
[root@zookeeper src]# mv kafka_2.13-2.5.0 /usr/local/kafka

配置

[root@zookeeper src]# cd /usr/local/kafka/config
[root@zookeeper config]# vim server.properties
listeners=PLAINTEXT://192.168.0.203:9092

advertised.listeners=PLAINTEXT://192.168.0.203:9092

启动

1、启动Zookeeper

[root@zookeeper kafka] /usr/local/zookeeper/bin/zkServer.sh start

2、启动kafka

#前台启动
[root@zookeeper kafka] bin/kafka-server-start.sh config/server.properties

#后台启动
[root@zookeeper kafka] bin/kafka-server-start.sh -daemon config/server.properties

状态

#查看java process status
[root@zookeeper kafka] jps
1561 QuorumPeerMain
3711 Kafka
3791 Jps

[root@zookeeper kafka] yum -y install net-tools
[root@zookeeper kafka] netstat -tunlp|egrep "(2181|9092)"
tcp6       0      0 192.168.0.203:9092      :::*                    LISTEN      3711/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      1561/java

停止

[root@zookeeper kafka] bin/kafka-server-stop.sh config/server.properties

主题(Topic)

1、创建

创建了一个名为demo的主题,其中包含一个分区和一个副本因子,会在/tmp/kafka-logs里面创建一个demo的文件夹来存放主题日志
[root@zookeeper kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

2、列表

[root@zookeeper kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181

3、信息

[root@zookeeper kafka]# bin/kafka-topics.sh --describe --topic demo --zookeeper localhost:2181
Topic: demo     PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

4、删除

[root@zookeeper kafka]# bin/kafka-topics.sh --delete --topic demo --zookeeper localhost:2181
Topic demo is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

5、生产者

#执行后输入消息,消费者那边将受到消息
[root@zookeeper kafka]# bin/kafka-console-producer.sh --broker-list 192.168.0.203:9092 --topic demo
> hello timo

6、消费者

#在另一个shell里面执行
[root@zookeeper kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.203:9092 --topic demo --from-beginning
hello timo

实践

发布消息

package main

import (
    "context"
    "github.com/segmentio/kafka-go"
)

func main()  {
    // make a writer that produces to topic-A, using the least-bytes distribution
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"192.168.0.203:9092"},
        Topic:   "topic-A",
        Balancer: &kafka.LeastBytes{},
    })

    w.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("Key-A"),
            Value: []byte("Timo Go"),
        },
        kafka.Message{
            Key:   []byte("Key-B"),
            Value: []byte("Kafka Go"),
        },
        kafka.Message{
            Key:   []byte("Key-C"),
            Value: []byte("Lets Go"),
        },
    )

    w.Close()
}

消费消息

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
)

func main() {
    // make a new reader that consumes from topic-A, partition 0, at offset 42
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{"192.168.0.203:9092"},
        Topic:     "topic-A",
        Partition: 0,
        MinBytes:  10e3, // 10KB
        MaxBytes:  10e6, // 10MB
    })
    r.SetOffset(0)

    for {
        m, err := r.ReadMessage(context.Background())
        if err != nil {
            break
        }
        fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
    }

    r.Close()
}

参考文档

Linux安装(Java、Zookeeper、Kafka)

Linux安装Kafka

Zookeeper-3.6.1安装(单机版)

apache kafka技术分享系列(目录索引)