• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

childe/gohangout: 使用 golang 模仿的 Logstash。用于消费 Kafka 数据,处理后写入 E ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

childe/gohangout

开源软件地址:

https://github.com/childe/gohangout

开源编程语言:

Go 97.3%

开源软件介绍:

ENG

之前因为 logstash 处理数据的效率比较低, 用 java 模仿 Logstash 写了一个java版本的 https://github.com/childe/hangout. 不知道现在 Logstash 效率怎么样了, 很久不用了.

后来因为Java的太吃内存了, 而且自己对java不熟, 又加上想学习一下golang, 就用golang又写了一次. 内存问题得到了很大的缓解. 目前我们使用golang版本的gohangout每天处理2000亿条以上的数据.

创建一个 QQ 群交流吧

QQ

安装

从源码编译

使用 go module 管理依赖. 直接 make 就可

make

go get

go get github.com/childe/gohangout

第三方 Plugin

使用 Plugin 的话,自己编译一下,将 CGO_ENABLED 打开:CGO_ENABLED=1

运行

gohangout --config config.yml

一个简单的配置文件如下,从标准输入读取数据,输出到标准输出。具体的配置说明见 配置一节

inputs:
    - Stdin: {}

outputs:
    - Stdout: {}

日志

日志模块使用 github.com/golang/glog , 几个常用参数如下:

  • -logtostderr 日志打印出标准错误

  • -v 5 设置日志级别. 我这边一般设置到 5. 如果要看更详细的日志, 可以设置到 10 或者20

pprof debug

  • -pprof=true (默认是不开启 pprof的)

  • -pprof-address 127.0.0.1:8899 pprof 的http地址

prometheus metrics

运行时加参数 --prometheus 0.0.0.0:2112,可以开一个 prometheus 监听服务。

在 Input/Output/Filter 里面配置 prometheus_counter

如下例子表示,如果数据通过 if 条件,则此 Add Filter 的计数加 1。

Add:
    prometheus_counter: 
        name: gohangout_dot_output
        namespace: rack_a
        help: 'rack_a gohangout dot output counter'
        constLabels:
            env: prod
    if:
    - 'EQ(a,nil)'
    fields:
        op: xyz

多线程处理

默认是一个线程

--worker 4

使用四个线程(goroutine)处理数据. 每个线程拥有自己的filter, output. 比如说translate filter, 每个线程有自己的字典, 他们占用多份内存. elasticsearch output也是一样的, 如果每个 elasticsearch 设置了2并发, 那一共就是8个并发.

进一步说明一下为什么添加了这个配置:

最开始是没有这个配置的, 如果需要多线程并发处理数据, 依赖 Input 里面的配置, 比如说 Kafka 配置 topicname: 2 就是两个线程去消费(需要 Topic 有至少2个Partition, 保证每个线程可以消费到一个 Partition 里面的数据).

但是后面出现一些矛盾, 比如说, Kafka 的 Consumer 个数多的情况下, 给 Kafka 带来更大压力, 可能导致 Rebalance 更频繁等. 所以如果 Kafka 消费数据没有瓶颈的情况下, 希望控制尽量少的 Consumer, 后面多线程的处理这些数据.

自动更新配置

默认不会监听文件系统更新,只在首次初始化时加载配置 --reload

开启这个参数后,当配置文件发生改变会马上触发shutdown,然后重新加载配置文件后运行

除此之外,kill -USR1 $pid也会触发重新加载配置文件

开发新的插件

配置

配置文件是 Yaml 格式

一个简单的配置示例

filters 是一个列表,会依次执行里面的每一个 Filter。

如下例,会先执行第一个 Grok Filter,解析 message 字段,按正则表达式提取出一些其他字段。

再执行第二个 Grok Filter,在这个 Grok 中,会首先判断 if 条件是不是符合,如果不符合就跑过不执行这个 Grok 了。

然后执行第三个 Date Filter,将 logtime 字符串转成 Date 类型的字段,存到 timestamp 字段中。

如果有多个 Output,数据会串行写到每一个 Output。

如果有多个 Input,每个 Input 进来的数据会并行处理后面的 Filter 和 Output。

inputs:
    - Kafka:
        topic:
            weblog: 1
        codec: json
        consumer_settings:
            bootstrap.servers: "10.0.0.100:9092"
            group.id: gohangout.weblog
filters:
    - Grok:
        src: message
        match:
            - '^(?P<logtime>\S+) (?P<name>\w+) (?P<cmd>.+)$'
            - '^(?P<logtime>\S+) (?P<name>\w+) (?P<status>\d+)$'
        remove_fields: ['message']
    - Grok:
        if:
          - EQ($.name,"childe")
        src: cmd
        match:
            - '^gohangout .*--config (?P<config_file>\S+)'
    - Date:
        location: 'Asia/Shanghai'
        src: logtime
        target: timestamp
        formats:
            - 'RFC3339'
        remove_fields: ["logtime"]
outputs:
    - Elasticsearch:
        hosts:
            - 'http://admin:[email protected]:9200'
        index: 'web-%{appid}-%{+2006-01-02}' #golang里面的渲染方式就是用数字, 而不是用YYMM.
        index_type: "logs"
        bulk_actions: 5000
        bulk_size: 20
        flush_interval: 60

字段格式约定

以 Add Filter 举例

fields:
    logtime: '%{date} %{time}'
    type: 'weblog'
    hostname: '[host]'
    name: '{{.firstname}}.{{.lastname}}'
    name2: '$.name'
    city: '[geo][cityname]'
    '[a][b]': '[stored][message]'
    indename: 'app-%{@metadata}{kafka}{topic}-%{+2006-01-02}-log'

格式1 JSONPATH 格式

相比格式2, 更推荐使用这种格式. 更标准, 也灵活, 性能也足够

如果以 $. 开头, 认为是这种格式

给几个下面文中的例子

$.store.book[0].title

$['store']['book'][0]['title']

$.store.book[(@.length-1)].title

$.store.book[?(@.price < 10)].title

具体的格式和例子参见 https://goessner.net/articles/JsonPath/

格式2 [XX][YY]

不再推荐使用, 请使用格式1

city: '[geo][cityname]' 是把 geo.cityname 的值赋值给 city 字段. 必须严格 [XX][YY] 格式, 前后不能有别的内容

格式3 {{XXX}}

如果含有 {{XXX}} 的内容, 就认为是 golang template 格式, 具体语法可以参考 https://golang.org/pkg/text/template/. 前后及中间可以含有别的内容, 像 name: 'my name is {{.firstname}}.{{.lastname}}'

Gohangout 使用了 https://github.com/Masterminds/sprig/ 的函数库

来举个例子吧, Date Filter 得到一个 Time 类型的字段, 然后按自己的格式格式化一个字符串出来

Add:
  fields:
    ts: '{{ .ts.Format "2006.01.02" }}'  ## 这里是使用了 Time 类型的自己的函数, 相当于 ts = ts.Format("2006.01.02")
    c: '{{ add .a .b }}' ## add 是 sprig 库里面的函数,相当于 c = a + b

格式4 %{XXX}{YYY}

含有 %{XXX}{YYY} 的内容, 使用自己定义的格式处理, 像上面的 %{date} %{time} 是把 date 字段和 time 字段组合成一个 logtime 字段. 前后以及中间可以有任何内容. 像 Elasticsearch 中的 index: web-%{appid}-%{+2006-01-02} 也是这种格式, %{+XXX} , 前面一个加号, 代表时间字段, 会按时间格式做格式化处理.

2006 01 02 15 04 05 这几个数字是 golang 里面特定的数字, 代表年月日时分秒. 1月2号3点4分5秒06年. 其实就像hangout里面的YYYY MM dd HH mm SS. 如果日期月份包含英文,也可把01换成Jan,比如:02-Jan-2006.

格式5 除了1,2,3,4 之外的其它

在不同Filter中, 可能意义不同. 像 Date 中的 src: logtime, 是说取 logtime 字段的值. Elasticsearch 中的 index_type: logs , 这里的 logs 不是指字段名, 就是字面值.

INPUT

Stdin

Stdin:
    codec: json

从标准输入读取数据.

codec

目前有json/plain/json:not_usenumber三种.

  • json 对数据做 json 解析, 如果解析失败, 则将整条数据写到 message 字段, 并添加当前时间到 @timestamp 字段. 如果解析成功而且数据中没有 @timestamp 字段, 则添加当前时间到 @timestamp 字段.
  • plain 将整条数据写到 message 字段, 并添加当前时间到 @timestamp 字段.
  • json:not_usenumber 因为数字类型的位数有限, 会有一个最高精度, 为了不损失精度, 默认的 json 配置情况下, 数字类型的值默认转成字符串保存. 如果需要存成数字, 比如后续是要写 clickhouse, 可以使用 json:not_usenumber. 如果使用 json codec, 也可以配置 Convert Filter 转换成数字.

TCP

TCP:
    network: tcp4
    address: 0.0.0.0:10000
    codec: plain

network

默认为 tcp , 可以明确指定使用 tcp4 或者 tcp6

address

监听端口, 无默认值, 必须设置

codec

默认 plain

Kafka

Kafka:
    decorate_events: false
    topic:
        weblog: 1
    #assign:
    #   weblog: [0,9]
    codec: json
    consumer_settings:
        bootstrap.servers: "10.0.0.100:9092,10.0.0.101:9092"
        group.id: gohangout.weblog
        max.partition.fetch.bytes: '10485760'
        auto.commit.interval.ms: '5000'
        from.beginning: 'true'
        messages_queue_length: 10

        # sasl.mechanism: PLAIN
        # sasl.user: admin
        # sasl.password: admin-secret

        # tls.enabled: true
        # tls:
        #     cert: 'path/to/cert'
        #     key: 'path/to/key'
        #     ca: 'path/to/ca'
        #     insecure.skip.verify: false
        #     servername: xx

特别注意 参数需要是字符串, 像 auto.commit.interval.ms: '5000' , 以及 from.beginning: 'true' , 等等

decorate_events

默认为 false 配置为 true 的话, 可以把 topic/partition/offset 信息添加到 ["@metadata"]["kafka"] 字段中

topic

weblog: 1 是指开一个goroutine去消费 weblog 这个topic. 可以配置多个topic, 多个goroutine, 但我这边在实践中都是使用多进程(docker), 而不是多goroutine.

assign

assign 配置用来只消费特定的partition, 和 topic 配置是冲突的, 只能选择一个.

consumer_settings

bootstrap.servers group.id 必须配置

auto.commit.interval.ms 是指多久commit一次offset, 太长的话有可能造成数据重复消费,太短的话可能会对kafka造成太大压力.

max.partition.fetch.bytes 是指kafka client一次从kafka server读取多少数据,默认是10MB

from.beginning 如果第一次消费此topic, 或者是offset已经失效, 是从头消费还是从最新消费. 默认是 false. 但是如果已经有过commit offset, 会接着之前的消费.

messages_queue_length: 内部使用的消息 channel 的长度,默认为10.

sasl.mechanism 认证方式, 目前还只支持 PLAIN 一种

sasl.user sasl认证的用户名

sasl.password sasl认证的密码

servername 如果 servername 不为空的话,证书中的 IP 或者 DNS 名字,需要包含servername

更多配置参见 https://github.com/childe/healer/blob/dev/config.go#L40

OUTPUT

Stdout

Stdout:
    if:
        - '{{if .error}}y{{end}}'

输出到标准输出

if的语法参考下面 IF语法

TCP

TCP:
    network: tcp4
    address: 127.0.0.1:10000
    concurrent: 2

network

默认为 tcp , 可以明确指定使用 tcp4 或者 tcp6

address

TCP 远端地址, 无默认值, 必须设置

concurrent

开几个 tcp 连接一起写, 默认1

Elasticsearch

Elasticsearch:
    hosts:
        - 'http://10.0.0.100:9200'
        - 'http://admin:[email protected]:9200'
    # sniff:
        # refresh_interval: 3600
        # match: 'EQ($.attributes.type,"hot")'
    index: 'web-%{appid}-%{+2006-01-02}' #golang里面的渲染方式就是用数字, 而不是用YYMM.
    index_time_location: 'Local'
    index_type: "logs"
    bulk_actions: 5000
    routing: '[domain]'
    id: '[orderid]'
    bulk_size: 20
    flush_interval: 60
    concurrent: 3
    compress: false
    es_version: 7
    retry_response_code: [401, 502]

sniff

功能需求 es output 支持特定节点名的 sniffer

  • refresh_interval 是指多后台长时间去 Sniff 一次, 设置为 0 的话不会在后台刷新
  • match 是过滤条件, 符合条件的节点才会加到 Bulk 使用的列表中

Sniff 会调用 _nodes/_all/http 获取节点信息, 返回 publish_address 信息

index_time_location

渲染索引名字时, 使用什么时区. 默认是 UTC. 北京时间 2019-10-25 07:00:00 的日志, 会写到 2019.10.24 这个索引中.

内容如 Asia/Shanghai 等, 参考 https://timezonedb.com/time-zones

两个特殊值: UTC Local

bulk_actions

多少次提交一次Bulk请求到ES集群. 默认 5000

bulk_size

单位是MB, 多少大写提交一次到ES. 默认 15MB

flush_interval

单位秒, 间隔多少时间提交一次到ES. 默认 30

concurrent

bulk 的goroutine 最大值, 默认1

举例来说, 如果Bulk 1W条数据到ES需要5秒, 1W条数据从Input处理完所有Filters然后到Output也需要5秒. 那么把concurrent设置为1就合适, Bulk是异步的, 这5秒钟gohangout会去Filter接下来的数据.

如果Bulk 1W条数据需要10秒, Filter只要5秒, 那么concurrent设置为2可以达到更大的吞吐量.

routing

默认为空, 不做routing

id

默认为空, 不设置id (文档id由ES生成)

compress

默认 true, http请求时做zip压缩

es_version

默认为6,可以适配es6的版本,如果设置为7,则可以适配Elasticsearch7以上版本

retry_response_code

默认 [401, 502] , 当Bulk请求的返回码是401或者502时, 会重试.

两个额外的配置

source_field: _source
bytes_source_field: _source

没有这个配置的时候, 会把日志做 json.dump, 拿到dump后的[]byte写ES. 如果source_field或者bytes_source_field配置了, 则直接把配置的字段(上面的例子是 _source 字段)做为[]byte写到ES.

bytes_source_field优先级高于source_field. bytes_source_field是指字段是[]byte类型, source_field是指字段是string类型

增加这个配置的来由是这样的. 上游数据源已经是 json.dump之后的[]byte数据, 做一次json.parse, 然后再json.dump, 耗费了大量CPU做无用功.

Kafka

特别注意 参数需要是字符串, 像 flush.interval.ms: "3000" , 等等

Kafka:
    topic: applog
    producer_settings:
        bootstrap.servers: node1.kafka.corp.com:9092,node2.kafka.corp.com:9092,node3.kafka.corp.com:9092
        flush.interval.ms: "3000"
        metadata.max.age.ms: "10000"
        # sasl.mechanism: PLAIN
        # sasl.user: admin
        # sasl.password: admin-secret

clickhouse

Clickhouse:
    table: 'hotel.weblog'
    conn_max_life_time: 1800
    username: admin
    password: XXX
    hosts:
    - 'tcp://10.100.0.101:9000'
    - 'tcp://10.100.0.102:9000'
    # fields: ['datetime', 'appid', 'c_ip', 'domain', 'cs_method', 'cs_uri', 's_ip', 'sc_status', 'time_taken']
    bulk_actions: 1000
    flush_interval: 30
    concurrent: 1

Notice: 如果表中字段有 default 值, 目前只支持字符串和数字 的 DEFAULT 表达式解析和处理, 如果像 IPv4设置了default 值, 是处理不了的. 代码中写死了 IPv4 和 IPv6 的默认值都是0

table

表名. 必须配置

hosts

clickhouse 节点列表. 必须配置

fields

初始化的时候会从 ClickHouse 里面读取所有字段。

也可以手工配置,会优先使用手工配置。 为了暂时缓解 #159

bulk_actions

多少次提交一次Bulk请求到ES集群. 默认 1000

flush_interval

单位秒, 间隔多少时间提交一次到ES. 默认 30

concurrent

bulk 的goroutine 最大值, 默认1

conn_max_life_time

到 ClickHouse 的连接的生存时间, 单位为秒. 默认不设置, 也就是生存时间无限长.

FILTER

通用字段

热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap