elk stack 6开始,logstash有内置的Azure Event hubs Plugin,如果要接入eventhub的日志,可以直接在input模块中配置来源为azure_event_hubs即可。
input {
azure_event_hubs {
config_mode => "advanced"
threads => 2
decorate_events => true
storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=xxx;EndpointSuffix=core.chinacloudapi.cn"
event_hubs => [
{"azurelog" => {
event_hub_connection => "Endpoint=sb://xxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=ElkAccess;SharedAccessKey=xxx"
consumer_group => "$Default"
}}
]
}
}
storage_connection & event_hub_connection可以从azure console获得。
我们使用advanced配置,可以同时配置多个eventhubs。在azure_event_hubs=>eventhubs列表中,每个项目即为每个eventhub实例。
azure console上需要做的操作主要就是创建一个consumer group。基本版的eventhub不支持自己创建consumer group,这时只能使用默认的"$Default"组。官方建议使用不同的group,主要是为了防止数据消费冲突。
threads的确定主要取决于eventhub的数量以及每个eventhub所拥有的partition的数量。公式如下:
(eventhub_numbers + 1) < threads < (partitions_per_eventhub x eventhub_numbers)
logstash从eventhub中收集的数据为json格式,大致内容如下: {"records": [{ RECORD1 },{ RECORD2 },{ ... }] }
因为收集到的数据是一个json array,可以在filter中使用split来把records列表分成不同的records,然后再按照json数据处理即可。
filter {
json {
source => "message"
}
split {
field => "[records]"
}
mutate {
remove_field => "message"
}
date {
match => ["records.time", "yyyy-MM-dd HH:mm:ss:SSS", "ISO8601"]
target => "@timestamp"
}
}
输出部分指定template,做一些基本的index设置。
output {
elasticsearch {
hosts => ['{{ es_url }}:{{ es_port }}']
user => "{{ es_user }}"
password => "{{ es_password }}"
template => "/usr/share/logstash/config/template/eventhub-log.json"
template_name => "eventhub-log"
template_overwrite => true
index => "eventhub-azurelogs-%{+YYYY.MM}"
}
}
eventhub-log.json:
{
"index_patterns": [ "eventhub-azurelogs-*" ],
"version" : 60003,
"settings" : {
"index.refresh_interval" : "5s",
"number_of_shards": 1,
"number_of_replicas": 1
}
}
收入到es之后,可以在kibana上看到,records字段已经被拆分成很多records.xxx字段,而且都是searchable的。这样就完成了eventhub日志的收入。