Akemi

两周复习EFK第二天之部署与使用logstash+filebeat+es

2026/03/23

创建一个索引模板

昨天手动put了syslog-security-2026.03.19的索引格式,但一条一条添加肯定不行

以后任何以 syslog-security- 开头的索引,都会自动变成 3 分片、1 副本,并且 src_ip 永远是 ip 类型。

部署与使用Logstash

支持input→filter→ouput的工作流,特别是filter功能可以识别IP、时间格式对数据进行拆分,输出规整的精美json

参考文档:
二进制部署:https://www.elastic.co/docs/reference/logstash/installing-logstash

目前我们是初学者,就先用二进制部署,用helm部署的k8s版本就再说吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# ubuntu版本
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elastic-keyring.gpg
sudo apt-get install apt-transport-https
echo "deb [signed-by=/usr/share/keyrings/elastic-keyring.gpg] https://artifacts.elastic.co/packages/9.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-9.x.list
sudo apt-get update && sudo apt-get install logstash

systemctl enable logstash.service --now
systemctl is-active logstash.service
active

# 红帽系
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-9.x]
name=Elastic repository for 9.x packages
baseurl=https://artifacts.elastic.co/packages/9.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
yum -y install logstash

# 新建配置文件
cat > /etc/logstash/conf.d/ssh_filter.conf <<'EOF'
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" }
}
if [auth_result] == "Failed" {
mutate {
add_field => { "event_type" => "ssh_brute_force" }
replace => { "severity" => 3 }
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["https://192.168.10.100:9200"]
index => "syslog-security-%{+YYYY.MM.dd}"
api_key => "MnQxUUJaMEIzMlk0TE9TWnFaeEQ6UjAyLVloeUFNRGw4Rl85d25BeEM3dw=="
ssl_verification_mode => "none"
}
}
EOF

这里的工作流解释:
1.接收beats插件给5044端口的数据,beats字段指插件
2.通过filiter使用grok插件规定格式进行正则筛选
3.对数据进行清洗,如果是Failed就标记严重性为3
4.将输出发送给控制台(debug),同时发送给es集群

systemctl restart logstash.service
# 查看日志是否报错
tail -f /var/log/logstash/logstash-plain.log

# 验证5044端口是否监听
ss -tunlp | grep 5044
tcp LISTEN 0 4096 *:5044 *:* users:(("java",pid=62517,fd=154))

部署与使用filebeat

极轻量的日志收集器,只搬运不加工,支持断点续传(offset偏移量)

适用二进制部署和k8s集群部署,我这k8s集群有点问题,暂时先用二进制部署

参考文档:https://www.elastic.co/docs/reference/beats/filebeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-9.3.2-x86_64.rpm
rpm -vi filebeat-9.3.2-x86_64.rpm

# 修改配置文件/etc/filebeat/filebeat.yml,这配置文件挺好认的,一段一段的
filebeat.inputs:
# 添加输入块,type决定了使用什么引擎去处理,比如filestream/log文本读取、syslog网络监听、container容器json
filebeat.inputs:
- type: filestream
id: ssh-security-logs
enabled: true
paths:
- /var/log/secure


# 添加输出块,默认有一段es的配置项,需要删除,因为我们现在用Logstash做聚合了
output.logstash:
hosts: ["192.168.10.100:5044"]

systemctl restart filebeat.service

测试链路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 制造假数据
logger "Mar 21 22:00:01 prod-web-01 sshd[123]: Failed password for root from 192.168.10.105 port 5566 ssh2"

# 在logstash中已经可以看到相关日志了
journalctl -u logstash -f
Mar 23 15:30:42 1panel logstash[128105]: "@version" => "1",
Mar 23 15:30:42 1panel logstash[128105]: "timestamp" => "Mar 23 15:30:00",
Mar 23 15:30:42 1panel logstash[128105]: "auth_result" => "Failed",
Mar 23 15:30:42 1panel logstash[128105]: "port" => "5566",
Mar 23 15:30:42 1panel logstash[128105]: "ecs" => {
Mar 23 15:30:42 1panel logstash[128105]: "version" => "8.0.0"
Mar 23 15:30:42 1panel logstash[128105]: },
Mar 23 15:30:42 1panel logstash[128105]: "log" => {
Mar 23 15:30:42 1panel logstash[128105]: "file" => {
Mar 23 15:30:42 1panel logstash[128105]: "inode" => "67682980",
Mar 23 15:30:42 1panel logstash[128105]: "fingerprint" => "98136ef7ebcc4f2aaf3c2f6ec8556cf7cb6674e1d26ad145fd40c8f4b98bad95",
Mar 23 15:30:42 1panel logstash[128105]: "path" => "/var/log/secure",
Mar 23 15:30:42 1panel logstash[128105]: "device_id" => "64768"
Mar 23 15:30:42 1panel logstash[128105]: },
Mar 23 15:30:42 1panel logstash[128105]: "offset" => 14796257
Mar 23 15:30:42 1panel logstash[128105]: },
Mar 23 15:30:42 1panel logstash[128105]: "tags" => [
Mar 23 15:30:42 1panel logstash[128105]: [0] "beats_input_codec_plain_applied"
Mar 23 15:30:42 1panel logstash[128105]: ],
Mar 23 15:30:42 1panel logstash[128105]: "input" => {
Mar 23 15:30:42 1panel logstash[128105]: "type" => "filestream"
Mar 23 15:30:42 1panel logstash[128105]: },
Mar 23 15:30:42 1panel logstash[128105]: "severity" => "3",
Mar 23 15:30:42 1panel logstash[128105]: "src_ip" => "10.2.3.6",
Mar 23 15:30:42 1panel logstash[128105]: "event_type" => "ssh_brute_force",
Mar 23 15:30:42 1panel logstash[128105]: "username" => "root",
Mar 23 15:30:42 1panel logstash[128105]: "@timestamp" => 2026-03-23T07:30:40.640Z,
Mar 23 15:30:42 1panel logstash[128105]: "message" => "Mar 23 15:30:38 1panel root[344379]: Mar 23 15:30:00 1panel sshd[12345]: Failed password for root from 10.2.3.6 port 5566 ssh2"

初步使用kibana

1
2
3
4
KQL还可以支持一些高级语法,比如
排除法: NOT src_ip : "192.168.10.1" # 排除内网IP
多重条件: event_type : "ssh_brute_force" AND username : "root"
范围查找: port > 1024
CATALOG
  1. 1. 创建一个索引模板
  2. 2. 部署与使用Logstash
  3. 3. 部署与使用filebeat
  4. 4. 测试链路