filebeat netflow模块输出kafka多topic问题

版本

1
2
3
4
5
~ cat /etc/centos-release
CentOS Linux release 7.6.1810 (Core)
~ yum list installed | grep filebeat
filebeat.x86_64 7.10.2-1 @elastic-7.x

需求

想让各出口的路由器,通过netflow、netstream将流量送到es保存起来进行分析。之前测试是直接用filebeat output到es的,但考虑到性能问题,决定先将数据送到kafka,通过logstash进行处理再送入es。

因为出口链路较多,设计不同的出口数据,送入不同的filebeat端口,以不同的topic存入kafka,方便管理。

当然,不同流量也可以采用相同的topic,后期在es上根据observer或tag来区分。

问题

按官网说明,在module中加上 fields.log_topic

1
2
3
4
5
6
7
8
9
10
11
~ grep -v "^\s*#" /etc/filebeat/modules.d/netflow_3055.yml | grep -v ^$
- module: netflow
log:
enabled: true
var:
netflow_host: 0.0.0.0
netflow_port: 3055
internal_networks:
- 10.0.0.0/8
fields:
log_topic: ipnce

在filebeat.yml中指定topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
~ grep -v "^\s*#" /etc/filebeat/filebeat.yml | grep -v ^$
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
logging.to_syslog: false
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output:
kafka:
output.kafka:
hosts: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
topic: "netstream-%{[fields.log_topic]}"
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

按以上配置,一直报错

1
Dropping event: no topic could be selected

应该是fields中,没有log_topic值,虽然在module中已经设置了,但没有生效。问了google,有些说需要在配置中加上

1
fields_under_root: true

依然无效

解决

测试了几天,实在是找不头绪,只能使用全局topic传入kafka,在es上观察数据后,发现netflow的数据在存入es后,有一些特定的字段,比如

1
ecs.version: 1.5.0

这些字段应该是filebeat默认的模板通过processors加上去的,于是查看了netflow模块的默认模板,果然在processors中发现了ecs.version字段,该yml文件中,还有一些关于module中的变量替换。于是想通过在这里引用module中的vars,设定log_topic。

在末尾添加 log_topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
~ grep -v "^\s*#" /usr/share/filebeat/module/netflow/log/config/netflow.yml | grep -v ^$
type: netflow
protocols: [v1, v5, v6, v7, v8, v9, ipfix]
host: '{{.netflow_host}}:{{.netflow_port}}'
max_message_size: '{{.max_message_size}}'
expiration_timeout: '{{.expiration_timeout}}'
queue_size: {{.queue_size}}
{{if .timeout}}
timeout: '{{.timeout}}'
{{end}}
{{if .read_buffer}}
read_buffer: '{{.read_buffer}}'
{{end}}
{{ if .custom_definitions}}
custom_definitions:
{{range .custom_definitions}}
- '{{ . }}'
{{end}}
{{end}}
{{ if .detect_sequence_reset}}
detect_sequence_reset: {{.detect_sequence_reset}}
{{end}}
tags: {{.tags | tojson}}
publisher_pipeline.disable_host: {{ inList .tags "forwarded" }}
processors:
- add_fields:
target: ''
fields:
ecs.version: 1.5.0
log_topic: {{ .log_topic }}

修改module,添加log_topic变量

1
2
3
4
5
6
7
8
9
10
11
12
~ grep -v "^\s*#" /etc/filebeat/modules.d/netflow_3055.yml | grep -v ^$
- module: netflow
log:
enabled: true
var:
netflow_host: 0.0.0.0
netflow_port: 3055
tags:
- ipn_ce
log_topic: ipn_ce
internal_networks:
- 10.0.0.0/8

重启filebeat后,可根据log_topic在kafka中正常生成topic

1
2
3
~ kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
netstream-ipn_ce

能用了,为什么官方配置不行的问题,也不想深究了 。。。

0%