input { kafka { bootstrap_servers => ["172.16.10.108:9092"] group_id =>"1" topics => ["trans","service-log","test"] consumer_threads => 5 decorate_events =>true codec => "json" } } filter { if [@metadata][kafka][topic]=="service-log"{ mutate{ add_field => {"[@metadata][index]" => "service-log-%{+YYYY.MM.dd}"} } grok { #match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:className}\:%{GREEDYDATA:msg}"} #因为我们取出了字段,所以不需要原来这个message字段,这个字段里边包含之前beat 输入的所有字段。 #remove_field => ["message"] } }else if [@metadata][kafka][topic]=="trans" { mutate{ add_field => {"[@metadata][index]" => "trans-%{+YYYY.MM}"} } }else if [@metadata][kafka][topic]=="test" { mutate{ add_field => {"[@metadata][index]" => "test-%{+YYYY.MM}"} } } } output { elasticsearch { hosts => ["172.16.10.108:9200"] #index => "service-%{type}-%{host}-%{+YYYY.MM.dd}" index => "%{[@metadata][index]}" codec => "json" } }