https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/
[root@zookeeper ~]#cd /usr/local/src
[root@zookeeper src]# wget https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_hotspot_8u265b01.tar.gz
[root@zookeeper src]# tar -zxf OpenJDK8U-jdk_x64_linux_hotspot_8u252b09.tar.gz
[root@zookeeper src]# mv jdk8u252-b09/ /usr/local/jdk8
[root@zookeeper src]# cd /usr/local/jdk8
[root@zookeeper src]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk8
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
[root@zookeeper src]# source /etc/profile
[root@zookeeper src]# java -version
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
[root@zookeeper src]# wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
[root@zookeeper src]# tar -zxf apache-zookeeper-3.6.1-bin.tar.gz
[root@zookeeper src]# mv apache-zookeeper-3.6.1-bin /usr/local/zookeeper
[root@zookeeper src]# cd /usr/local/zookeeper
[root@zookeeper zookeeper]# mkdir data
[root@zookeeper zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg
[root@zookeeper zookeeper]# vim conf/zoo.cfg ## 修改 dataDir=/usr/local/zookeeper/data
[root@zookeeper zookeeper]# ./bin/zkServer.sh start
[root@zookeeper zookeeper]# ./bin/zkServer.sh restart
[root@zookeeper zookeeper]# ./bin/zkServer.sh stop
[root@zookeeper zookeeper]# ./bin/zkServer.sh status
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
[root@zookeeper src]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
[root@zookeeper src]# tar -zxf kafka_2.13-2.5.0.tgz
[root@zookeeper src]# mv kafka_2.13-2.5.0 /usr/local/kafka
[root@zookeeper src]# cd /usr/local/kafka/config
[root@zookeeper config]# vim server.properties
listeners=PLAINTEXT://192.168.0.203:9092
advertised.listeners=PLAINTEXT://192.168.0.203:9092
[root@zookeeper kafka] /usr/local/zookeeper/bin/zkServer.sh start
#前台启动
[root@zookeeper kafka] bin/kafka-server-start.sh config/server.properties
#后台启动
[root@zookeeper kafka] bin/kafka-server-start.sh -daemon config/server.properties
#查看java process status
[root@zookeeper kafka] jps
1561 QuorumPeerMain
3711 Kafka
3791 Jps
[root@zookeeper kafka] yum -y install net-tools
[root@zookeeper kafka] netstat -tunlp|egrep "(2181|9092)"
tcp6 0 0 192.168.0.203:9092 :::* LISTEN 3711/java
tcp6 0 0 :::2181 :::* LISTEN 1561/java
[root@zookeeper kafka] bin/kafka-server-stop.sh config/server.properties
创建了一个名为demo的主题,其中包含一个分区和一个副本因子,会在/tmp/kafka-logs里面创建一个demo的文件夹来存放主题日志
[root@zookeeper kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
[root@zookeeper kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
[root@zookeeper kafka]# bin/kafka-topics.sh --describe --topic demo --zookeeper localhost:2181
Topic: demo PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@zookeeper kafka]# bin/kafka-topics.sh --delete --topic demo --zookeeper localhost:2181
Topic demo is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
#执行后输入消息,消费者那边将受到消息
[root@zookeeper kafka]# bin/kafka-console-producer.sh --broker-list 192.168.0.203:9092 --topic demo
> hello timo
#在另一个shell里面执行
[root@zookeeper kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.203:9092 --topic demo --from-beginning
hello timo
package main
import (
"context"
"github.com/segmentio/kafka-go"
)
func main() {
// make a writer that produces to topic-A, using the least-bytes distribution
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"192.168.0.203:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Timo Go"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("Kafka Go"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Lets Go"),
},
)
w.Close()
}
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
// make a new reader that consumes from topic-A, partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"192.168.0.203:9092"},
Topic: "topic-A",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
r.SetOffset(0)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
r.Close()
}