1. 概述
1.1. 各软件版本
软件名称 | 版本号 :—: | :—: Elasticsearch | 6.7.0 Logstash | 6.7.0 Kibana | 6.7.0 Filebeat | 6.7.0 Kafka | 2.12-2.2.0 zookeeper | 3.4.13
1.2. 日志收集处理流程
filebeat –> kafka –> logstash –> elasticsearch
1.3. 服务启动顺序
启动顺序不能颠倒,filebeat一定放在最后启动。否则会出现logstash读取不到kafka数据的问题。
1 、elasticsearch、kafka、kibana
2、logstash
3、filebeat
1.4. JDK版本
需先安装JDK 1.8及以上版本
1.5. 软件下载地址
elastic系列软件下载地址:https://www.elastic.co/downloads
2. Elasticsearch安装
2.1. 解压安装包
解压安装包到/home/efk
tar -zxvf elasticsearch-6.7.0.tar.gz
2.2. 创建用户
因Elasticsearch版本在6.0之后不允许用root用户启动,故必须创建新的用户
回到/home目录下,创建用户和组
1、创建用户组
groupadd es
2、创建用户
useradd es -g es -p Elasticsearch
3、更改efk目录所属用户
chown -R es:es efk
2.3. 创建data目录和日志目录
切换到es用户:
su es
创建data目录:
mkdir -p /home/efk/elasticsearch_data/data/
创建log目录:
mkdir -p /home/efk/elasticsearch_data/logs/
2.4. 修改配置文件
cluster.name: Elasticsearch
path.data: /home/elasticsearch-data/data
path.logs: /home/elasticsearch-data/logs
network.host: 0.0.0.0
2.5. 修改 Linux下/etc/security/limits.conf文件设置
vim /etc/security/limits.conf
在文件最后添加如下内容:
* soft nofile 261444
* hard nofile 262144
2.6. 修改配置 Linux下/etc/sysctl.conf文件设置
vim /etc/sysctl.conf
vm.max_map_count = 262144
并执行命令:
sysctl –p
可以用以下命令替代:
sysctl -w vm.max_map_count=262144
2.7. 启动
进入/home/efk/elasticsearch-6.7.0/bin路径
./elasticsearch
后台长期启动
./elasticsearch –d
查找ES进程
ps -ef | grep elasticsearch
杀掉ES进程
kill -9 2382(进程号)
重启ES
sh elasticsearch –d
3. Kibana安装
3.1. 解压安装包
解压安装包到/home/efk
tar -zxvf kibana-6.7.0-linux-x86_64.tar.gz
3.2. 修改配置
server.host: "0.0.0.0"
3.3. 启动
进入到bin目录下:
长期启动
nohup ./kibana &exit
4. Kafka安装
4.1. 下载
下载地址:https://kafka.apache.org/downloads
这里下载的版本为:kafka_2.12-2.2.0.tgz
4.2. 解压
tar -zxvf kafka_2.12-2.2.0.tgz
4.3. 启动zookeeper
用kafka自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
默认使用的2181端口,可在配置文件修改
4.4. 启动kafka
进入到kafka所在路径
./bin/kafka-server-start.sh config/server.properties
长期启动:
nohup ./bin/kafka-server-start.sh config/server.properties &
查找进程号
ps –ef|grep kafka
4.5. 创建test主题
进入到kafka安装路径
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test
增加分区
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic futurist --partitions 5
4.6. 查看主题
bin/kafka-topics.sh –list –zookeeper localhost:2181
5. Logstash安装
5.1. 解压安装包
解压安装包到/home/efk
tar -zxvf logstash-6.7.0.tar.gz
5.2. 创建配置文件
input {
kafka {
bootstrap_servers => ["127.0.0.1:9092"]
group_id =>"1"
topics => ["trans","service-log","test"]
consumer_threads => 5
decorate_events =>true
codec => "json"
}
}
filter {
if [@metadata][kafka][topic]=="service-log"{
mutate{
add_field => {"[@metadata][index]" => "service-log-%{+YYYY.MM.dd}"}
}
grok {
#match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:className}\:%{GREEDYDATA:msg}"}
#因为我们取出了字段,所以不需要原来这个message字段,这个字段里边包含之前beat 输入的所有字段。
#remove_field => ["message"]
}
}else if [@metadata][kafka][topic]=="trans" {
mutate{
add_field => {"[@metadata][index]" => "trans-%{+YYYY.MM}"}
}
}else if [@metadata][kafka][topic]=="test" {
mutate{
add_field => {"[@metadata][index]" => "test-%{+YYYY.MM}"}
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
#index => "service-%{type}-%{host}-%{+YYYY.MM.dd}"
index => "%{[@metadata][index]}"
codec => "json"
}
}
5.3. 启动
进入到Logstash安装目录
./bin/logstash -f config/logstash-kafka.conf
长期启动:
nohup ./bin/logstash -f config/ logstash-kafka.conf &
查找logstash进程
ps -ef | grep logstash
杀掉logstash进程
kill -9 2382(进程号)
5.4 启动多个配置
修改pipeline.yml配置文件,添加如下配置
https://segmentfault.com/a/1190000016592277
- pipeline.id: my-pipeline_1
path.config: "/etc/path/to/*.conf"
pipeline.workers: 3
长期启动:不需要再指定配置文件路径 nohup ./bin/logstash
6. Filebeat安装
6.1. 解压安装包
解压安装包到/home/efk
filebeat-6.7.0-linux-x86_64.tar.gz
6.2. 配置文件
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/logs/*.log
tail_files: true
fields:
log_topics: "service-log"
multiline.match: after
# 增加该配置后,解决multiline丢失数据问题
multiline.max_lines: 10000
#multiline.pattern: '^\['
#multiline.negate: true
#multiline.match: after
#logging.to_files: true
#logging.files:
# path: /var/log/filebeat
# name: filebeat
# keepfiles: 7
# permissions: 0644
- type: log
enabled: true
paths:
- /data/apps/*/monitor/*infoNewest.txt
json.keys_under_root: true
json.overwrite_keys: true
fields:
log_topics: "trans"
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
#index.codec: best_compression
#_source.enabled: false
setup.kibana:
host: "192.168.0.2:5601"
output.kafka:
enabled: true
hosts: ["127.0.0.1:9092"]
topic: "%{[fields][log_topics]}"
max_message_bytes:"1000000"
processors:
- add_host_metadata: ~
#- add_cloud_metadata: ~
6.3. 启动
./filebeat -e -c filebeat.yml -d "publish"
长期启动:
nohup ./filebeat -e -c filebeat.yml &
nohup ./filebeat -e -c zam.yml >/dev/null 2>&1 &
>/dev/null 2>log &
查找进程ID并kill掉:
ps -ef |grep filebeat
kill -9 进程号