Bootstrap

Logstash 中 Ruby filter 使用指南

引言

许多小伙伴在使用ELK进行数据处理的过程中,对Logstash中rugy插件的使用产生许多疑问,Logstash插件功能强大,详细理清ruby插件的使用方法对小伙伴日常工作多有助益。本文将对ruby插件的使用方式、部分原理进行详细说明,并附带丰富示例帮助小伙伴们理解消化。

Logstash插件原理简述

数据在logstash中从input插件流入到filter插件,最终通过output插件输出。在fiter阶段,可以通过logstash提供的数十个filter插件处理event中各字段的数据,包括但不限于event过滤、字段增加、字段删除、字段匹配选择修改等。logstash工具本身由java和ruby实现,Ruby插件灵活性高,处理logstash事件实用高效。

ruby插件介绍

ruby插件的使用包含两种方式:inline和script,在 ruby 插件的配置选项中,配置参数如下所述,其中 code 和 init 用在 inline 方式中, path 和script_params 用在 script 方式中。 tag_on_exception 参数用在 ruby 代码运行出错时的处理中。

inline方式

inline方式通过将需要执行的ruby代码以字符串的形式配置在logstash的配置文件中,如下所示:

filter {
ruby {
init => "ruby code run in plugin init stage"
code => "ruby code run within every event"
}
}

在inline方式中, init 中的代码在插件启动时执行, code 中的代码在每次event中都会执行,所以 init 中可以放置外部库导入代码,实例参数初始化代码等, init 和 code 在inline方式中起到的作用和script中的 register 方法和 filter 方法起到的作用分别对应。后面介绍script方式时会具体介绍。

  • init:初始化中执行的ruby代码

  • code:每条event都会执行的代码

script方式

如果插件处理逻辑不复杂,代码量不大,可以直接用inline方式使用插件就好,但如果逻辑复杂且代码量大,可以考虑使用script方式使用插件,优点是代码易读易维护。script方式用到另外两个配置参数: path 和 script_params

  • path: 插件脚本在执行环境中的存储绝对路径

  • script_params:从配置中传递给脚本的外部参数

script方式和inline方式最大的不同就是script方式需要按照插件要求的格式完成两个函数的编写,脚本模板如下:

def register(params)
end
def filter(event)
return [event]
end

其中register起到的作用和inline模式中init类似,filter和code类似,两个方法有各自要求和特点:

register方法与script_params配置参数相关,可通过其传参数到register
filter方法返回有具体要求,即返回event对象组成的数组。

script方式配置文件和脚本文件缺一不可,配置方式如下:

ruby {
path => "path in logstash environment"
script_params => { "param_key" => "param_value" }
}

event对象

在使用这inline和script两种方式使用Ruby插件时,代码中能操作的对象并不多,通过如下代码,可以输出code setting中代码的运行context有哪些:

ruby {
code => "puts local_variables; "
}

logstash运行输出两个变量: event 和 new_event_block ,其中 new_event_block 只在需要主动生成新的event时才起作用,需要重点关注的是 event 对象。接下来,查看一下 event 对象有哪些变量和方法可以使用。

ruby {
code => "puts event.instance_variables; puts event.public_methods"
}

运行发现,event对象并没有任何实例变量,公共方法却有不少,其中,比较重要的方法有:

to_json, to_hash : 格式化输出
set, get, remove, cancel :方法用来对数据字段进行自定义化的操作

举例说明

为了更好理解ruby插件各个参数的作用,以及inline方式和script方式的具体使用方法,通过几个例子来具体说明,举例按照从易到难 的方式排列。

为了更好说明ruby插件的使用方法,下列所有举例都使用相同的运行环境和测试数据。

运行环境为: kafka producer —-> kafka —-> logstash —-> es

测试数据为:

{
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
}

两种方式读取event中数据data字段

配置代码:

ruby {
code => "p event.get('[data]')"
}

output: “test data”

脚本代码:

def register(params)
end
def filter(event)
p event.get('[data]')
return [event]
end

output: “test data”

两种方式删除event数据字段

配置代码:

ruby {
code => "event.remove('data')"
}

脚本代码:

def register(params)
end
def filter(event)
event.remove('data')
return [event]
end

两种方式修改event中原有数据字段

配置代码:

ruby {
code => "event.set('[data]', 'new data')"
}

脚本代码:

def register(params)
end
def filter(event)
event.set('[data]', 'new data')
return [event]
end

两种方式新增event中数据字段

配置代码:

ruby {
code => "event.set('[new_key]', 'new data') if not event.include?('new_key')"
}

脚本代码:

def register(params)
end
def filter(event)
if not event.include?('new_key')
event.set('[new_key]', 'new data')
end
return [event]
end

两种方式匹配event中数据字段的字段⻓度

配置代码:

ruby {
code => "id_valid = event.get('[id]').length;event.set('[is_id_valid]', 1) if id_valid != 24"
}

脚本代码:

def register(params)
end
def filter(event)
	id_valid = event.get('[id]').length 
  if id_valid == 24
		event.set('[is_id_valid]', 0) 
  else
		event.set('[is_id_valid]', 1) 
  end
  return [event] 
end

两种方式匹配event中数据字段的字段是否包含html标签

配置代码: *

ruby {
code => "is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/);event.set('[is_data_valid]', 0) if not is_html"
}

脚本代码:

def register(params)
end
def filter(event)
is_html = event.get('[data]').index(/<\/?[a-z][\s\S]*>/)
if is_html
event.set('[is_data_valid]', 1 )
else
event.set('[is_data_valid]', 0 )
end
return [event]
end

两种方式匹配event中数据字段中时间格式是否为UTC格式

配置代码:

ruby {
code => "is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/);event.set('[is_time_valid]', 0) if
is_utc"
}

脚本代码:

def register(params)
end
def filter(event)
is_utc = event.get('[update_time]').index(/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/)
if is_utc
event.set('[is_time_valid]', 0 )
else
event.set('[is_data_valid]', 1 )
end
return [event]
end

两种方式匹配event中数据字段是否为合法URL

配置代码:

ruby {
code => "is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/);event.set('[is_url_valid]', 0) if is_url"
}

脚本代码:

def register(params)
end
def filter(event)
is_url = event.get('[comment_url]').index(/https?:\/\/[\S]+$/)
if is_url
event.set('[is_url_valid]', 0 )
else
event.set('[is_url_valid]', 1 )
end
return [event]
end

插件中引入外部库

当event内置API和ruby string的各类方法不能满足需求时,可以引入ruby的公共库来解决相关问题。下面距离说明引入Ruby公共json解析库来解析event相关字段,示例如下:

inline 方式:

ruby {
init => "require 'json'"
code => "data=event.to_json;new_data=JSON.parse(data);event.set('[new_data]', new_data)"
}

output:

{
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
"new_data" => {
"id" : "5f0970c84c1de6c4fdc907ee",
"comment_id" : "Y29tbWVudDoxMzg2MzkzNzMxNTU2NzM1XzEzODY0NDM4OTgyMTgzODU=",
"parent_comment_id" : 123 ,
"like_count" : 2 ,
"comment_url" : "test_url",
"update_time" : "2020-07-11 15:56:56",
"is_new" : 1 ,
"data": "test data"
}
"@version" => "1",
"@timestamp" => 2020 - 07 - 16T02: 11 :39.081Z
}

脚本方式:

def register(params)
#para1 = params['param_key']
#p para
#output: "param_value"
require 'json'
end

def filter(event)
data=event.to_json
new_data=JSON.parse(data)
event.set('[newdata]', newdata)
return [event]
end

output: 与inline方式相同

至此,关于Logstash中ruby插件的使用方式、部分原理和示例说明完毕,对以上内容有问题的小伙伴欢迎评论区留言交流。