Akemi

Prometheus自定义metrics指标与集成Deepseek

2026/01/20

Prometheus-client模块自定义metrics指标

特征维度 SDK(直接暴露) Exporter(转换代理) Pushgateway(推送中转)
数据流向 拉取 (Pull) 拉取 (Pull) 推送 → 暂存 → 拉取 (Push → Pull)
侵入性 (需改代码) (独立进程) (调用推送API即可)
适用对象 可控的、自研的服务 不可控的、第三方的服务 生命周期短、无法被拉取的任务
模型契合度 完全契合 Prometheus 拉模型 完美契合,是拉模型的延伸 妥协方案,破坏了部分语义(如实例状态)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# python3.12.12
pip install prometheus-client

from prometheus_client import start_http_server,Summary,Counter
import random
import time

# 定义计数器类型指标Counter
# 标签:
# user_type: 用户类型
# login_status: 登录状态
# ip_region:ip所属地区
LOGIN_COUNT = Counter(
"user_login_total", # 指标名称
'用户登录总次数', # 指标描述
['user_type', 'login_status', 'ip_region'] # 标签
)

# 定义一个函数,模拟用户登录行为
def simulate_login():
# 模拟用户类型
user_type = random.choices(['admin', 'user'], weights=[0.1, 0.9])
# 模拟登录状态
login_status = random.choices(['success', 'fail'], weights=[0.9, 0.1])
# 模拟IP地区
ip_region = random.choices(['china', 'usa', 'japan','korea','india'])

# 记录一次登录事件,每次吊用+1
LOGIN_COUNT.labels(
user_type=user_type,
login_status=login_status,
ip_region=ip_region
).inc()

if __name__ == '__main__':
# 启动http服务,端口为9092
start_http_server(9092)
print("用户登录监控服务启动,端口:9092")
while True:
simulate_login()
time.sleep(3) # 每三秒模拟一次用户登录行为

# 运行脚本
/root/miniconda3/envs/langchain/bin/python /root/prometheus-deepseek/custom_exporter.py
用户登录监控服务启动,端口:9092

定义Prometheus抓取指标(传统方式)

因为这里的服务并非k8s中的业务,所以没法使用serviceMonitor或者podMonitor,转而使用传统方式进行添加

对应到我使用的helm部署方式,就是在values.yaml的additionalScrapeConfigs中进行添加:

1
2
3
4
5
6
7
8
9
10
11
12
  # 如果这个配置添加错误,会直接导致prometheus无法启动
additionalScrapeConfigs:
- job_name: 'user-login-monitor'
static_configs:
- targets:
- '192.168.10.100:9092'
scrape_interval: 30s
scrape_timeout: 10s
metrics_path: /
scheme: http
# 升级release
helm upgrade prometheus ./ -f values.yaml

分类 语法/操作符 示例 说明/用途
基础查询 metric_name http_requests_total 即时查询,返回最新值
时间范围 [时间单位] [5m], [1h], [1d] 区间查询,单位:s,m,h,d,w,y
时间偏移 offset 时间 metric offset 1h 查询历史数据
标签过滤 {label="value"} {method="GET", status="200"} 精确匹配标签
标签过滤 {label!="value"} {status!="500"} 排除匹配标签
标签过滤 {label=~"regex"} {job=~"api.*"} 正则表达式匹配
标签过滤 {label!~"regex"} {method!~"POST.*"} 正则表达式排除
算术运算 + - * / % ^ requests * 2, errors/requests 数学运算
比较运算 == != > < >= <= response_time > 1, errors != 0 条件筛选
聚合函数 sum() sum(http_requests_total) 求和
聚合函数 avg() avg(cpu_usage) 平均值
聚合函数 min()/max() min(temp), max(memory) 最小/最大值
聚合函数 count() count(instances) 计数
聚合函数 stddev() stddev(latency) 标准差
聚合分组 by(label) sum(...) by (method) 按标签分组聚合
聚合排除 without(label) avg(...) without (instance) 排除标签后聚合
速率函数 rate() rate(http_requests_total[5m]) 每秒平均增长率(Counter用)
瞬时速率 irate() irate(metric[5m]) 瞬时增长率(更敏感)
增长量 increase() increase(metric[1h]) 指定时间增长量
差值函数 delta() delta(temperature[2h]) Gauge类型差值
时间窗口 avg_over_time() avg_over_time(metric[5m]) 时间窗口平均值
时间窗口 max_over_time() max_over_time(metric[1h]) 时间窗口最大值
时间窗口 quantile_over_time() quantile_over_time(0.95, metric[30m]) 时间窗口分位数
数学函数 abs() abs(value) 绝对值
数学函数 ceil()/floor() ceil(value), floor(value) 向上/下取整
数学函数 round() round(value, 0.01) 四舍五入
数学函数 clamp_max()/clamp_min() clamp_max(value, 100) 限制最大/最小值
标签函数 label_replace() label_replace(metric, "new", "$1", "old", "(.*)") 替换标签值
标签函数 label_join() label_join(metric, "new", ",", "label1", "label2") 连接多个标签
时间函数 time() time() 当前时间戳
时间函数 timestamp() timestamp(metric) 指标时间戳
布尔修饰 bool metric > bool 100 返回0/1而非过滤
逻辑运算 and or unless metric1 and metric2 集合运算
注释 # # 这是注释 单行注释

配置告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 创建PrometheusRule,user-login-monitor.yaml
---
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: user-login-alert-rules
labels:
release: prometheus # 匹配的选择器
spec:
groups:
- name: user-login-alerts
rules:
- alert: UserLoginFailedRate
expr: |
sum(rate(user_login_total{login_status="['fail']"}[1m]))
/
sum(rate(user_login_total[1m]))
> 0.05 # 失败总数/登录总数,失败错误率 > 5%就报警
for: 2m
labels:
severity: warning # 告警级别
ip_region: "{{ $labels.ip_region }}" # 自定义标签
user_type: "{{ $labels.user_type }}" # 自定义标签
annotations:
summary: "用户登录失败率过高"
description: "用户登录失败率 {{ $value }} > 5%,持续 2 分钟"

kubectl apply -f user-login-monitor.yaml
# 并重启prometheus

配置altermanager转发

Alertmanager的路由是基于标签匹配的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
alertmanager.yml
├── global: # 全局设置
├── route: # 路由设置
│ ├── receiver: # 默认接收器(名称)
│ ├── group_by: # 分组设置
│ └── routes: # 子路由列表
│ └── receiver: # 子路由接收器(名称)
└── receivers: # 接收器定义列表
├── name: 'default-email'
└── name: 'webhook_handler'

alertmanager:
config:
global:
resolve_timeout: 5m
smtp_smarthost: 'smtp.163.com:25'
smtp_from: 'alertwarning@163.com'
smtp_auth_username: 'alertwarning@163.com'
smtp_auth_password: 'GAANBCIHILSARBYD'
smtp_require_tls: false
route: # 根路由,若告警未匹配任何子路由,则直接使用根路由的receiver,即默认路由
receiver: 'default-email'
group_wait: 10s # 触发告警后等待10秒,若同组有其他告警则合并发送
group_interval: 1m
repeat_interval: 5m
routes:
- receiver: 'webhook_handler' # 所有warning告警 → webhook
matchers: # 标签匹配
- severity = "warning"
continue: false # 是否继续匹配
receivers: # 接收器配置
- name: 'default-email'
email_configs:
- to: '1320991378@qq.com'
send_resolved: true
headers:
Subject: 'Alert: {{ .CommonLabels.alertname }}'
- name: 'webhook_handler'
webhook_configs:
- url: 'http://192.168.10.100:5008/alter' # 告警处理服务地址,设置为本机5008端口
send_resolved: true


helm upgrade prometheus ./ -f values.yaml
kubectl rollout restart statefulset prometheus-prometheus-kube-prometheus-prometheus

处理告警并发送给Deepseek

altermanager发送的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 此时接收到的json数据是这样的
# 可见alerts是一个列表,里面放的是聚合告警,我这里只有一个告警,所以没有体现。但实际上应该去遍历
{
"receiver": "webhook_handler",
"status": "firing",
"alerts": [
{
"status": "firing",
"labels": {
"alertname": "UserLoginFailedRate",
"prometheus": "default/prometheus-kube-prometheus-prometheus",
"severity": "warning"
},
"annotations": {
"description": "用户登录失败率 0.23333333333333334 > 5%,持续 2 分钟",
"summary": "用户登录失败率过高"
},
"startsAt": "2026-01-20T02:50:48.671Z",
"endsAt": "0001-01-01T00:00:00Z",
"generatorURL": "http://prometheus-kube-prometheus-prometheus.default:9090/graph?g0.expr=sum%28rate%28user_login_total%7Blogin_status%3D%22%5B%27fail%27%5D%22%7D%5B1m%5D%29%29+%2F+sum%28rate%28user_login_total%5B1m%5D%29%29+%3E+0.05&g0.tab=1",
"fingerprint": "4c7fe64e5b33b2f8"
}
],
"groupLabels": {},
"commonLabels": {
"alertname": "UserLoginFailedRate",
"prometheus": "default/prometheus-kube-prometheus-prometheus",
"severity": "warning"
},
"commonAnnotations": {
"description": "用户登录失败率 0.23333333333333334 > 5%,持续 2 分钟",
"summary": "用户登录失败率过高"
},
"externalURL": "http://prometheus-kube-prometheus-alertmanager.default:9093",
"version": "4",
"groupKey": "{}/{severity=\"warning\"}:{}",
"truncatedAlerts": 0
}

创建alerts处理程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from datetime import date, datetime
import os

# 定义logger
import logging
current_dir = os.path.dirname(os.path.abspath(__file__))
alerts_log = os.path.join(current_dir, 'alerts.log') # 告警日志文件路径
alter_report = os.path.join(current_dir, 'alter_report.txt') # 告警报告文件路径
format = '%(asctime)s - %(levelname)s - %(message)s' # 定义日志格式
logging.basicConfig(filename=alerts_log,filemode='a',format=format,level=logging.INFO)

# 初始化Deepseek
from langchain_deepseek import ChatDeepSeek
from dotenv import load_dotenv
load_dotenv()
def call_deepseek(message):
deepseek = ChatDeepSeek(model="deepseek-chat",temperature=0.5)
logging.debug(f"Deepseek初始化完成...")

system_prompt = """
你是一名资深运维与安全分析师,擅长分析业务系统异常告警。
请根据提供的登录监控告警信息,生成结构化分析报告,包含:
1. 告警基本信息(名称、级别、时间、关键标签)
2. 可能的原因分析(结合业务场景,如攻击、系统故障、用户行为异常等)
3. 具体处理建议(分步骤说明,可操作)
4. 预防措施(如何避免类似问题再次发生)
报告风格需专业、简洁,重点突出。
"""

user_prompt = f"""
告警详细信息如下:
{message}
"""

messages = [system_prompt,user_prompt]
response = deepseek.invoke(messages)
with open(alter_report, "w", encoding="utf-8") as f:
f.write(f"[{datetime.now()}] DeepSeek报告:\n")
f.write(f"{response.content}")
logging.info(f"DeepSeek响应已保存至 {alter_report}")

# 定义flask框架、路由
from flask import Flask, request, jsonify
app = Flask(__name__)

@app.route('/alter', methods=['POST'])
def handle_alert():
try:
data = request.get_json()

# 确保数据包含 alerts 字段
if 'alerts' not in data:
logging.error("数据格式错误:缺少 alerts 字段")
return jsonify({"status": "error", "message": "Invalid data format: missing alerts field"}), 400
# 确保 alerts 字段是一个列表
if not isinstance(data['alerts'], list):
logging.error("数据格式错误:alerts 字段不是列表")
return jsonify({"status": "error", "message": "alerts field is not a list"}), 400

logging.info("成功接收到告警数据...")
alerts_list = data['alerts']

all_messages = []
# 遍历 alerts 列表,提取每个告警的详细信息
for alert in alerts_list:
# 提取 annotations 中的 description 和 summary
logging.debug("开始解析告警信息...")
annotations = alert.get('annotations', {})
description = annotations.get('description', 'No description provided')
summary = annotations.get('summary', 'No summary provided')

# 提取 labels 中的关键信息
labels = alert.get('labels', {})
alert_name = labels.get('alertname', 'Unknown')
severity = labels.get('severity', 'Unknown')

# 提取时间信息
starts_at = alert.get('startsAt', 'Unknown')
status = alert.get('status', 'Unknown')

# 将信息整合为message
message = f"告警名称: {alert_name}\n" \
f"严重程度: {severity}\n" \
f"状 态: {status}\n" \
f"开始时间: {starts_at}\n" \
f"摘要: {summary}\n" \
f"描述: {description}\n"
all_messages.append(message)
logging.debug("告警提取完成...")

if all_messages:
final_content = "\n" + "="*20 + " 批量告警汇总 " + "="*20 + "\n"
final_content += "\n---\n".join(all_messages)
logging.info(f"正在发送 {len(all_messages)} 条告警至 DeepSeek 分析...")
call_deepseek(final_content) # 这里传入整合后的字符串
return jsonify({"status": "success"}), 200

except Exception as e:
logging.error(f"处理告警时出错: {e}")
return jsonify({"status": "error", "message": str(e)}), 400

if __name__ == '__main__':
try:
print(f"告警日志文件路径: {alerts_log}")
app.run(host='0.0.0.0', port=5008, debug=True)
except Exception as e:
print(f"启动应用时出错: {e}")

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
2026-01-20 15:15:52,149 - INFO - 172.19.0.4 - - [20/Jan/2026 15:15:52] "POST /alter HTTP/1.1" 200 -
2026-01-20 15:17:58,854 - INFO - 成功接收到告警数据...
2026-01-20 15:17:58,854 - INFO - 正在发送 1 条告警至 DeepSeek 分析...
2026-01-20 15:17:59,010 - INFO - HTTP Request: POST https://api.deepseek.com/v1/chat/completions "HTTP/1.1 200 OK"
2026-01-20 15:18:47,779 - INFO - DeepSeek响应已保存至 /root/prometheus-deepseek/alter_report.txt

[2026-01-20 15:18:47.779645] DeepSeek报告:
### **用户登录失败率过高告警分析报告**

---

#### **1. 告警基本信息**
* **告警名称**: UserLoginFailedRate
* **严重级别**: Warning (警告)
* **触发时间**: 2026-01-20 02:50:48 (UTC)
* **关键标签/指标**:
* **告警阈值**: 登录失败率 > 5%
* **当前值**: 16.67% (显著高于阈值)
* **持续时间**: 2分钟
* **告警状态**: Firing (正在告警中)

#### **2. 可能的原因分析**
结合业务场景,高登录失败率可能由以下原因导致,需按优先级排查:

1. **恶意攻击行为 (高概率)**:
* **密码爆破/撞库攻击**:攻击者使用自动化工具,针对一个或多个用户账户尝试常用密码或泄露的密码组合。
* **凭证填充攻击**:攻击者利用从其他网站泄露的用户名/密码对,在目标系统上进行批量登录尝试。
* **扫描与探测**:攻击者使用工具扫描系统,尝试默认账户或弱密码,以寻找可入侵入口。

2. **系统或配置故障 (中概率)**:
* **认证服务异常**:后端认证服务(如LDAP、AD、OAuth服务、数据库)出现性能瓶颈、连接超时或暂时性故障,导致合法请求也无法通过。
* **网络或中间件问题**:负载均衡器、反向代理或防火墙规则配置错误,导致登录请求被异常拦截或无法到达认证端点。
* **密钥/证书过期**:如果使用单点登录(SSO)或令牌认证,相关证书可能已过期。

3. **用户行为或客户端问题 (低概率)**:
* **批量操作失误**:内部员工或集成系统使用脚本批量处理任务时,使用了错误的凭证。
* **客户端缓存/配置错误**:特定客户端应用(如移动APP)版本更新后,缓存了旧的令牌或密码,引发集中性失败。
* **密码集中过期**:如果公司有统一的密码策略,可能导致一批用户密码在同一时间段内过期,用户集中尝试旧密码登录。

#### **3. 具体处理建议 (分步骤)**
**第一步:紧急确认与遏制 (5-15分钟内)**
1. **登录监控与日志系统**:立即查询登录审计日志、应用日志或WAF日志,聚焦告警时间段(02:50前后)。
2. **分析失败模式**:
* **源IP分析**:失败请求是否集中在少数几个IP(攻击特征)?还是分散在所有用户IP(系统故障特征)?
* **用户名分析**:失败是针对大量不同用户名(撞库),还是反复针对少数几个特定用户名(定向爆破)?
* **错误码分析**:失败返回的错误信息是“密码错误”、“用户不存在”还是“系统繁忙”、“连接超时”?
3. **初步响应**:
* 若确认为**攻击**:立即在防火墙、WAF或应用层对恶意源IP实施临时封禁,并启用或强化账户锁定策略(如连续失败5次锁定15分钟)。
* 若怀疑**系统故障**:检查认证相关服务的健康状态(CPU、内存、错误日志)、网络连通性和依赖服务状态。

**第二步:深入调查与根因确定 (30-60分钟)**
1. **关联分析**:将登录失败日志与同期其他告警(如网络流量异常、服务器错误率升高)进行关联。
2. **追溯源头**:
* 对于攻击,尝试判断攻击工具特征,并检查是否有账户已被成功入侵的迹象。
* 对于故障,进行配置回溯,检查近期是否有相关组件变更(发布、配置更新、证书轮换)。
3. **业务影响评估**:确认是否有真实用户因此无法登录,影响业务范围。

**第三步:解决与恢复**
1. **攻击场景**:确认攻击停止,解除对正常IP的误封。通知受影响用户(如有)修改密码,并考虑强制高危账户修改密码。
2. **故障场景**:根据根因修复问题,如重启异常服务、回滚错误配置、更新过期证书等。
3. **验证**:监控登录失败率指标,确认其已恢复至正常基线(远低于5%)并保持稳定。

#### **4. 预防措施**
1. **强化认证安全**:
* 强制实施**多因素认证(MFA)**,特别是对管理员和高权限账户。
* 部署**智能风险识别**(如通过IP信誉、登录频率、设备指纹、行为基线),对高风险登录尝试要求额外验证。
* 使用**强密码策略**并定期检查是否存在弱密码或已泄露密码(对接Have I Been Pwned等数据库)。
2. **优化监控与告警**:
* 在现有“失败率”告警基础上,增加**多维度告警**,如:“单一IP对多用户失败”、“单一用户从多地频繁失败”,以便更快定位攻击类型。
* 设置**分层阈值**,例如:5%持续2分钟(Warning),10%持续1分钟(Critical)。
3. **提升系统韧性**:
* 对认证服务进行**容量规划和压力测试**,确保能应对突发流量。
* 建立认证服务的**故障转移和熔断机制**,避免单点故障导致全站登录瘫痪。
* 定期审计和演练**密钥、证书的轮换流程**,避免过期。
4. **制定并演练响应预案**:
* 编写针对“登录失败率激增”的**标准化应急响应流程(SOP)**,明确各团队职责。
* 定期进行攻防演练,测试监控、告警、封禁、溯源全流程的有效性。
CATALOG
  1. 1. Prometheus-client模块自定义metrics指标
  2. 2. 定义Prometheus抓取指标(传统方式)
  3. 3. 配置告警规则
  4. 4. 配置altermanager转发
  5. 5. 处理告警并发送给Deepseek
    1. 5.1. altermanager发送的信息
    2. 5.2. 创建alerts处理程序