Go日志收集项目流程梳理

简介

该项目主要分为三个部分,web端负责对etcd的写入以及从mysql读写数据进行展示。logAgent负责日志的收集,主要是通过tail从etcd中实时获取所要收集的日志项,然后通过sarama从相应的日志文件读取日志信息发送到Kafka。logTransfer负责从Kafka中读取日志,并将日志写入elasticsearch最后通过Kibana进行日志的检索。

logBeegoWeb

前端使用Beego框架完成。在运行项目之前,首先安装bee脚手架。

1
go get github.com/beego/bee

执行bee命令查看是否安装成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
caoyifan@MacBookPro ~ % bee
Bee is a Fast and Flexible tool for managing your Beego Web Application.

USAGE
bee command [arguments]

AVAILABLE COMMANDS

version Prints the current Bee version
migrate Runs database migrations
api Creates a Beego API application
bale Transforms non-Go files to Go source files
fix Fixes your application by making it compatible with newer versions of Beego
pro Source code generator
dlv Start a debugging session using Delve
dockerize Generates a Dockerfile for your Beego application
generate Source code generator
hprose Creates an RPC application based on Hprose and Beego frameworks
new Creates a Beego application
pack Compresses a Beego application into a single file
rs Run customized scripts
run Run the application by starting a local development server
server serving static content over HTTP on port
update Update Bee

Use bee help [command] for more information about a command.

ADDITIONAL HELP TOPICS


Use bee help [topic] for more information about that topic.

进入logBeegoWeb文件夹,bee run启动项目

1
2
3
4
5
6
7
8
9
10
11
12
13
caoyifan@MacBookPro logBeegoWeb % bee run
______
| ___ \
| |_/ / ___ ___
| ___ \ / _ \ / _ \
| |_/ /| __/| __/
\____/ \___| \___| v1.12.0
2021/11/12 15:09:30 INFO ▶ 0001 Using 'logBeegoWeb' as 'appname'
2021/11/12 15:09:30 INFO ▶ 0002 Initializing watcher...
2021/11/12 15:09:32 SUCCESS ▶ 0003 Built Successfully!
2021/11/12 15:09:32 INFO ▶ 0004 Restarting 'logBeegoWeb'...
2021/11/12 15:09:32 SUCCESS ▶ 0005 './logBeegoWeb' is running...
2021/11/12 15:09:32.451 [I] [asm_amd64.s:1374] http server Running on http://127.0.0.1:8080

查看本地8080端口,项目启动成功

1

数据库

创建数据库的sql源码如下,创建名为logCollect 的数据库,并创建三张表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
create database logCollect;

create table tbl_app_info(
app_id int auto_increment primary key,
app_name varchar(1024) not null,
app_type varchar(64) not null,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
develop_path varchar(256) not null
) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;

create table tbl_app_ip(
app_id int,
ip varchar(64),
Key app_id_ip_index (app_id, ip)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;

create table tbl_log_info(
log_id int auto_increment primary key,
app_id varchar(1024) not null,
log_path varchar(64) not null,
topic varchar(1024) not null,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status tinyint default 1
) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;

数据库创建成功

1
2
3
4
5
6
7
8
9
mysql> show tables;
+----------------------+
| Tables_in_logcollect |
+----------------------+
| tbl_app_info |
| tbl_app_ip |
| tbl_log_info |
+----------------------+
3 rows in set (0.01 sec)

在前端主页面,项目列表对应tbl_app_info这张表,日志列表对应tbl_log_info这张表。tbl_app_ip这张表用来保存项目所在的ip地址。

etcd

当我们申请好项目之后,就可以针对该项目所产生的日志进行收集。对于所要收集的日志配置信息,可以采用etcd进行存储。

例如我们可以在日志申请页写入所要收集的项目信息,前提是项目的名字必须在项目列表中是存在的,否则日志申请就会失败。

2

etcd中获取key的值

1
2
3
caoyifan@MacBookPro ~ % etcdctl get /backend/logagent/config/10.100.163.201
/backend/logagent/config/10.100.163.201
[{"logpath":"/Users/caoyifan/test.log","topic":"kafka_test"}]

logAgent

logAgent项目树结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
logAgent
├── conf
│   └── logAgent.conf
├── kafka
│   └── kafka.go
├── logs
│   └── my.log
├── main
│   ├── config.go
│   ├── etcd.go
│   ├── ip.go
│   ├── log.go
│   ├── main.go
│   └── server.go
├── tailf
│   └── tail.go
└── tools
└── SetConf
└── main.go

其中每个文件的具体作用如下

1
2
3
4
5
6
7
8
9
10
11
logagent.conf :配置文件
kafka.go:对kafka的操作,包括初始化kafka连接,以及给kafka发送消息
my.log:产生的日志文件
config.go:用于初始化读取配置文件中的内容,这里的配置文件加载是通过之前自己实现的配置文件热加载包处理的
etcd.go:对etcd的操作,包括初始化etcd和监听etcd
ip.go:获取本机所有的网卡ip,连接etcd
log.go:日志的处理与序列化
main.go: 初始化入口文件,与执行server的入口函数
server.go:主要是tail 的相关操作,用于去读日志文件并将内容放到channel中
tail.go: 用于去读日志文件
SetConf.main.go:将设置的配置信息导入到etcd中

进入main文件夹中,执行go build命令,之后执行./main启动项目,项目启动日志输出如下

1
2
3
4
5
6
7
8
9
10
2021/11/12 15:32:11.007 [D]  get config from etcd success, [{/Users/caoyifan/test.log kafka_test}]
2021/11/12 15:41:14.985 [D] 导入日志成功&{debug /Users/caoyifan/go/src/Golang_logCollect/logAgent/logs/my.log 100 0.0.0.0:9092 [] 0.0.0.0:2379 /backend/logagent/config/}
2021/11/12 15:41:14.987 [D] resp from etcd:[key:"/backend/logagent/config/10.100.163.201" create_revision:41 mod_revision:61 version:14 value:"[{\"logpath\":\"/Users/caoyifan/test.log\",\"topic\":\"kafka_test\"}]" ]
2021/11/12 15:41:14.987 [D] 日志设置为[{/Users/caoyifan/test.log kafka_test}]
2021/11/12 15:41:14.987 [D] 连接etcd成功
2021/11/12 15:41:14.987 [D] 初始化etcd成功!
2021/11/12 15:41:14.987 [D] 初始化tailf成功!
2021/11/12 15:41:14.988 [D] 开始监控key: /backend/logagent/config/10.100.163.201
2021/11/12 15:41:14.997 [D] 初始化Kafka producer成功,地址为: 0.0.0.0:9092
2021/11/12 15:41:14.997 [D] 初始化Kafka成功!

可以看到,项目启动之后首先会从etcd中去获取前端写入etcd中的日志收集配置信息,之后etcd会持续监控key的变化,一旦有新的日志配置加入etcd中,就会更新配置文件,随后启动kafka准备从日志文件中读取日志信息。

测试日志文件读取

/Users/caoyifan目录下创建test.log文件并尝试写入一些信息。

3

发现kafka能够成功读取写入的文件信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2021/11/12 15:46:50.530 [D]  导入日志成功&{debug /Users/caoyifan/go/src/Golang_logCollect/logAgent/logs/my.log 100 0.0.0.0:9092 [] 0.0.0.0:2379 /backend/logagent/config/}
2021/11/12 15:46:50.533 [D] resp from etcd:[key:"/backend/logagent/config/10.100.163.201" create_revision:41 mod_revision:61 version:14 value:"[{\"logpath\":\"/Users/caoyifan/test.log\",\"topic\":\"kafka_test\"}]" ]
2021/11/12 15:46:50.533 [D] 日志设置为[{/Users/caoyifan/test.log kafka_test}]
2021/11/12 15:46:50.533 [D] 连接etcd成功
2021/11/12 15:46:50.533 [D] 初始化etcd成功!
2021/11/12 15:46:50.533 [D] 初始化tailf成功!
2021/11/12 15:46:50.533 [D] 开始监控key: /backend/logagent/config/10.100.163.201
2021/11/12 15:46:50.536 [D] 初始化Kafka producer成功,地址为: 0.0.0.0:9092
2021/11/12 15:46:50.536 [D] 初始化Kafka成功!
2021/11/12 15:47:21.637 [D] read success, pid:0, offset:0, topic:kafka_test

2021/11/12 15:47:21.641 [D] read success, pid:0, offset:1, topic:kafka_test

2021/11/12 15:47:21.642 [D] read success, pid:0, offset:2, topic:kafka_test

logTransfer

确保已经启动了kafkaElasticSearchkibana

进入main文件夹下,执行go build,输出日志如下

1
2
caoyifan@MacBookPro main % ./main  
2021/11/17 17:51:56.784 [D] 初始化配置成功

这个时候我们查看一下etcd中的日志配置信息

1
2
3
caoyifan@MacBookPro ~ % etcdctl get /backend/logagent/config/10.100.163.201
/backend/logagent/config/10.100.163.201
[{"logpath":"/Users/caoyifan/kafka.log","topic":"kafka_log"}]

接着我们尝试向kafka.log中写入一些测试信息

4

这个时候我们查看logAgent中的日志,发现消息已经成功写入了kafka中

5

再查看logTransfer中的日志,发现已经成功收集到日志信息

6

Kibana可视化

这个时候我们已经成功收集到了kafka中的消息,并且已经将消息写入到了ElasticSearch中,接下来我们配置一下Kibana查看收集到的相关信息。

首先我们在Kibana中创建索引,索引名就是我们在etcd中获取到的topic名称。

7

通过postman,我们看到当前ElasticSearch中一共有三个索引,其中有一个就是我们创建好的kafka_log索引

8

最后我们在Kibana的Discover模块下,选择相应的索引,就可以看到索引下的相关数据。

9