fliter中JDBC操作有两个插件:
jdbc_streaming:这个是流式执行的,每来一条数据就会执行一次JDBC操作,具体介绍见官网https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_streaming.html,具体使用如下:
input{
stdin{}
}
filter{
grok{
match => {
"message" => '^{"DEVICE_CODE":(?<code>(.*))}'
}
}
mutate{
remove_field => ["message"]
}
jdbc_streaming{
jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "root"
parameters => { "d_code" =>"code" }
statement => "select DISTINCT(CJDID) from HC_ZS_STATIC_CJD_ZP where DEVICE_CODE= :d_code"
target => "cjdid"
}
mutate{
replace => { "cjdid" => " %{[cjdid][0][CIDID]}"}
}
}
output{
stdout{}
} 注意:parameters 中的code是filed的名称,列子中对应grok中匹配出来的字段,还有输出结果cjdid是一个JSON数组的结构。
通过jdbc_streaming操作,结果中就多了个cjdid字段了,先当于执行了sql操作了。但是这种操作是每来一条数据执行一次,新建一个JDBC链接,资源极大浪费。
jdbc_static:
jdbc_streaming是直接查询其他数据库,jdbc_static提供了另一种思路,logstash自带derby数据库,jdbc_static将远程数据库的数据加载到derby数据库中,在每次消息来到时从自带的数据库中查询数据。相当于中间视图的功能,但要注意这里并不是将数据放到了缓存中,仍然时存储在硬盘中的,只不过相比于jdbc_streaming不需要频繁的去获取数据库链接了,提升了效率。这个组件的具体介绍见官网https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_static.html#plugins-filters-jdbc_static-loader_schedule,下边是案例:
input{
stdin{}
}
filter{
grok{
match => {
"message" => '^{"DEVICE_CODE":(?<code>(.*))}'
}
}
mutate{
remove_field => ["message"]
}
jdbc_static {
loaders => [
{
id => "ZPCJDID"
query => "select DEVICE_CODE,CJDID from HC_ZS_STATIC_CJD_ZP WHERE CJDID!=12"
local_table => "ZPCJDID"
}
]
local_db_objects => [
{
name => "ZPCJDID"
index_columns => ["DEVICE_CODE"]
columns => [
["DEVICE_CODE", "varchar(30)"],
["CJDID", "varchar(20)"]
]
}
]
local_lookups => [
{
query => "select DISTINCT(CJDID) from ZPCJDID WHERE DEVICE_CODE = :device_code"
parameters => {device_code => "code"}
target => "cjdid1"
}
]
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
add_field => { cjdid => "%{[cjdid1][0][cjdid]}"}
remove_field => ["cjdid1"]
}
}
output{
stdout{}
}
下面来介绍filter中的另一个插件:ruby,将其和jdbc_static结合使用
业务要求:
输入:{"DEVICE_CODE":HC131-GC151226}类似的数据
过滤:mysql中查询相关的CJDID,在判断CJDID是否满足相应要求
输出:满足条件的数据
数据库表数据:
ruby中采用文件方式执行,ruby具体说明见官网:https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html#plugins-filters-ruby-init
代码如下:
logstash配置文件:
input{
stdin{}
}
filter{
grok{
match => {
"message" => '^{"DEVICE_CODE":(?<code>(.*))}'
}
}
mutate{
remove_field => ["message"]
}
jdbc_static {
loaders => [
{
id => "ZPCJDID"
query => "select DEVICE_CODE,CJDID from HC_ZS_STATIC_CJD_ZP WHERE CJDID!=12"
local_table => "ZPCJDID"
}
]
local_db_objects => [
{
name => "ZPCJDID"
index_columns => ["DEVICE_CODE"]
columns => [
["DEVICE_CODE", "varchar(30)"],
["CJDID", "varchar(20)"]
]
}
]
local_lookups => [
{
query => "select DISTINCT(CJDID) from ZPCJDID WHERE DEVICE_CODE = :device_code"
parameters => {device_code => "code"}
target => "cjdid1"
}
]
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
add_field => { cjdid => "%{[cjdid1][0][cjdid]}"}
remove_field => ["cjdid1"]
}
ruby{
path => "/root/logstashfile/test/rubycode.rb"
script_params => {"ids"=>"NHKS10099|DZQ10028|DZQ10041"}
}
}
output{
stdout{}
}
ruby代码文件rubycode.rb:
def register(params)
@arg = params["ids"]
end
def filter(event)
[email protected]('|')
if arg.include?(event.get('cjdid'))
then
return [event]
else
return []
end
end
|
请发表评论