Go语言实现向kafka中发送数据

前言

今天用go语言简单实现kafka发送文件示例。

kafla和zookeeper搭建

安装kafka之前首先要安装zookeeper。本次安装是在Mac os下实现,使用homebrew进行安装

安装zookeeper
1
brew install zookeeper
安装kafka
1
brew install kafka

启动zookeeper

1
sh zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka

1
sh kafka-server-start /usr/local/etc/kafka/server.properties &

下载sarama

在下载sarama过程中遇到很多问题。使用go get命令会提示下面类似错误

1
LibreSSL SSL_connect: SSL_ERROR_SYSCALL in connection to github.com:443

解决方法如下:

首先查看自己go的版本,我的是1.15,对于go版本1.13及以上采用如下两条命令

1
2
go env -w GO!!!MODULE=on
go env -w GOPROXY=https://goproxy.io,direct

对于其他版本请参考如下链接:https://www.jianshu.com/p/9a476a40e16e

kafka实例

创建main.go文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"github.com/Shopify/sarama"
)

//kafka client demo


func main() {
//生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //ACK
config.Producer.Partitioner = sarama.NewRandomPartitioner //分区
config.Producer.Return.Successes = true //确认

//连接kafka
client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)
if err != nil {
fmt.Println("producer closed err :",err)
return
}
defer client.Close()

//封装消息
msg := &sarama.ProducerMessage{}
msg.Topic = "shopping"
msg.Value = sarama.StringEncoder("this is a test log")


//发送消息
pid,offset,err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed,err:",err)
return
}
fmt.Printf("pid:%v offset:%v\n",pid,offset)

}
文件打包
1
go build main.go

在打包过程中又遇到了一些问题,类似报错如下,大致原因就是因为我们上面采用了代理下载go包,开启了 GO111MODULE,导致包管理非官方所说的在GOPATH\src,而是去了GOPATH\src\pkg\目录下。

1
cannot find module providing package github.com/xxx

此时就需要用go mod引入这些包 require github.com/gin-gonic/gin@latest ,从而可以解决import获取不了包的问题。运行命令如下

1
2
go mod init gin
go mod edit -require github.com/gin-gonic/gin@latest

成功打包之后执行命令

1

查看kafka日志

kafka日志文件在/usr/local/var/lib/目录下的kafka-logs中。kafka-logs文件夹下有一个shopping-0文件夹,这个就是我们在代码里设置的Topic,里面有四个文件,.log结尾的就是我们的日志文件。

2

消费者打印日志

启动消费者
1
sh kafka-console-consumer --bootstrap-server localhost:9092 --topic shopping --from-beginning
打印日志

3