• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

logstash中fliter进行JDBC与ruby操作

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

fliter中JDBC操作有两个插件:

jdbc_streaming:这个是流式执行的,每来一条数据就会执行一次JDBC操作,具体介绍见官网https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_streaming.html,具体使用如下:

input{
        stdin{}
}
filter{
        grok{
                match => {
                        "message" => '^{"DEVICE_CODE":(?<code>(.*))}'
                }
        }

         mutate{
            remove_field => ["message"]
        }


        jdbc_streaming{
                jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
                jdbc_driver_class => "com.mysql.jdbc.Driver"
                jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
                jdbc_user => "root"
                jdbc_password => "root"
                parameters => { "d_code" =>"code" }
                statement => "select DISTINCT(CJDID) from HC_ZS_STATIC_CJD_ZP where DEVICE_CODE= :d_code"
                target => "cjdid"
        }

        mutate{
                replace => { "cjdid" => " %{[cjdid][0][CIDID]}"}
        }

}
output{
        stdout{}
}
注意:parameters 中的code是filed的名称,列子中对应grok中匹配出来的字段,还有输出结果cjdid是一个JSON数组的结构。

通过jdbc_streaming操作,结果中就多了个cjdid字段了,先当于执行了sql操作了。但是这种操作是每来一条数据执行一次,新建一个JDBC链接,资源极大浪费。

 

 

jdbc_static:

jdbc_streaming是直接查询其他数据库,jdbc_static提供了另一种思路,logstash自带derby数据库,jdbc_static将远程数据库的数据加载到derby数据库中,在每次消息来到时从自带的数据库中查询数据。相当于中间视图的功能,但要注意这里并不是将数据放到了缓存中,仍然时存储在硬盘中的,只不过相比于jdbc_streaming不需要频繁的去获取数据库链接了,提升了效率。这个组件的具体介绍见官网https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_static.html#plugins-filters-jdbc_static-loader_schedule,下边是案例:

input{
    stdin{}
}
filter{
    grok{
        match => {
            "message" => '^{"DEVICE_CODE":(?<code>(.*))}'
        }
    }

     mutate{
            remove_field => ["message"]
        }

    jdbc_static {
        loaders => [
             {
                       id => "ZPCJDID"
                    query => "select DEVICE_CODE,CJDID from HC_ZS_STATIC_CJD_ZP WHERE CJDID!=12"
                    local_table => "ZPCJDID"
                  }
        ]
        local_db_objects => [
            {
                name => "ZPCJDID"
                    index_columns => ["DEVICE_CODE"]
                        columns => [
                          ["DEVICE_CODE", "varchar(30)"],
                          ["CJDID", "varchar(20)"]
                    ]
            }
        ]
        local_lookups => [
            {
                    query => "select DISTINCT(CJDID) from ZPCJDID  WHERE DEVICE_CODE = :device_code"
                    parameters => {device_code => "code"}
                    target => "cjdid1"
                  }
        ]
        jdbc_user => "root"
           jdbc_password => "root"
           jdbc_driver_class => "com.mysql.jdbc.Driver"
           jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
            jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
        add_field => { cjdid => "%{[cjdid1][0][cjdid]}"}
        remove_field => ["cjdid1"]
    }

    

}
output{
    stdout{}
}
 

 

下面来介绍filter中的另一个插件:ruby,将其和jdbc_static结合使用

业务要求:

              输入:{"DEVICE_CODE":HC131-GC151226}类似的数据

              过滤:mysql中查询相关的CJDID,在判断CJDID是否满足相应要求

              输出:满足条件的数据

               数据库表数据:

ruby中采用文件方式执行,ruby具体说明见官网:https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html#plugins-filters-ruby-init

代码如下:

logstash配置文件:

input{
    stdin{}
}
filter{
    grok{
        match => {
            "message" => '^{"DEVICE_CODE":(?<code>(.*))}'
        }
    }

     mutate{
            remove_field => ["message"]
        }

    jdbc_static {
        loaders => [
             {
                       id => "ZPCJDID"
                    query => "select DEVICE_CODE,CJDID from HC_ZS_STATIC_CJD_ZP WHERE CJDID!=12"
                    local_table => "ZPCJDID"
                  }
        ]
        local_db_objects => [
            {
                name => "ZPCJDID"
                    index_columns => ["DEVICE_CODE"]
                        columns => [
                          ["DEVICE_CODE", "varchar(30)"],
                          ["CJDID", "varchar(20)"]
                    ]
            }
        ]
        local_lookups => [
            {
                    query => "select DISTINCT(CJDID) from ZPCJDID  WHERE DEVICE_CODE = :device_code"
                    parameters => {device_code => "code"}
                    target => "cjdid1"
                  }
        ]
        jdbc_user => "root"
           jdbc_password => "root"
           jdbc_driver_class => "com.mysql.jdbc.Driver"
           jdbc_driver_library => "/root/logstashfile/test/mysql-connector-java-5.1.37.jar"
            jdbc_connection_string => "jdbc:mysql://10.10.0.101:3306/zeus?characterEncoding=UTF-8"
        add_field => { cjdid => "%{[cjdid1][0][cjdid]}"}
        remove_field => ["cjdid1"]
    }
    

    ruby{
        path => "/root/logstashfile/test/rubycode.rb"
        script_params => {"ids"=>"NHKS10099|DZQ10028|DZQ10041"}
    }
    
    

}
output{
    stdout{}
}
 

 

 

 

ruby代码文件rubycode.rb:

def register(params)
    @arg = params["ids"]
end

def filter(event)
    [email protected]('|')
    if arg.include?(event.get('cjdid')) 
        then
        return [event]
    else
        return []    
    end
end
 


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Ruby加密(md5,sha1,base64)发布时间:2022-07-14
下一篇:
Ruby on rails开发从头来(windows)(十二)-订单(Order)发布时间:2022-07-14
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap