Akemi

Dify创建工作流并处理Prometheus数据"

2026/03/18

在开始之前,我收回之前对于langchain的所有诋毁,langchain难啃是难啃,东西确实都是干货

总体思路

Prometheus → Crontab脚本 → Dify API → DeepSeek

少了一层Alertmanager和Webhook转发,只需要维护脚本

  • 提取参数,从用户自然语言中提取登录总人数
  • 得到方案,通过代码判断异常登录的比例,从而判断是否要继续
  • 细化方案,输入基本解决方案,通过知识库检索详细解决方案
  • 生成报表LLM,输出解决方案

dify安装部署

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/langgenius/dify.git

cd dify/docker/
cp .env.example .env

sed -i 's/^EXPOSE_NGINX_PORT.*/EXPOSE_NGINX_PORT=8085/g' .env
sed -i 's/^EXPOSE_NGINX_SSL_PORT.*/EXPOSE_NGINX_SSL_PORT=8443/g' .env

docker compose up -d

网页访问192.168.10.100:8085进行注册登录

http://## 创建工作流

1.开始节点,用来接受用户输入
2.参数提取器,将参数从用户输入中提取出来
3.代码执行模块,根据提取出的参数提出解决方案(粗略
4.知识检索,添加知识库并选择所使用的嵌入式模型,提供详细的方案(详细
5.大模型根据代码执行的结果和知识检索,给出最终的方案

参数提取

代码执行模块

基于不同的登录失败率,生成初步的方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def main(arg1: str, arg2: str) -> dict:
"""
参数:
arg1: 总登录人数(字符串类型,需要转换为整数)
arg2: 异常登录人数(字符串类型,需要转换为整数)

返回:
包含处理建议的字典
"""
# 将字符串参数转换为整数
try:
total_login = int(arg1)
failed_login_count = int(arg2)
except ValueError:
return {"result": "参数类型错误,请输入有效的数字"}

# 边界情况处理
if total_login == 0:
return {"result": "总登录人数为0,无法计算异常率,请检查指标采集是否正常"}

# 计算异常率(保留2位小数)
anomaly_rate = round(failed_login_count / total_login * 100, 2)

# 1级:轻微异常(异常率≤2%)
if anomaly_rate <= 2:
suggestion = """
1. 监控层面:开启5分钟粒度的异常趋势跟踪,无需触发告警
2. 操作层面:抽样检查3-5条异常日志,确认是否为用户操作失误
3. 后续跟进:若10分钟内异常率持续上升至2%以上,手动升级处理
""".strip()

# 2级:一般异常(2%<异常率≤5%)
elif anomaly_rate <= 5:
suggestion = """
1. 服务检查:执行kubectl get pods | grep login-service确认服务存活
2. 日志分析:按错误类型统计(4xx/5xx),排查是否集中在特定区域
3. 处理动作:重启异常服务实例,清理登录相关缓存
4. 通知:同步当日值班运维工程师关注
""".strip()

# 3级:中度异常(5%<异常率≤10%)
elif anomaly_rate <= 10:
suggestion = """
1. 紧急排查:检查服务资源使用率(CPU<80%、内存<85%),分析登录SQL慢查询
2. 控制影响:封禁1分钟内失败次数>20次的IP(临时1小时),启用缓存降级
3. 协同响应:通知运维团队全员,同步开发查看近期代码变更
4. 进度跟踪:每15分钟输出一次异常处理进展
""".strip()

# 4级:严重异常(10%<异常率≤20%)
elif anomaly_rate <= 20:
suggestion = """
1. 资源扩容:登录服务实例紧急扩容至原数量×2,数据库连接数调至2000
2. 服务降级:关闭登录积分等非核心功能,对非会员启用验证码
3. 应急响应:电话通知运维负责人(15分钟内到场),启动应急预案
4. 进度同步:每10分钟向技术总监汇报处理进度
""".strip()

# 5级:紧急异常(异常率>20%)
else:
suggestion = """
1. 全网告警:触发短信+电话告警(技术总监、运维/开发负责人)
2. 系统处理:启动灾备系统,实施流量限流(仅允许30%正常流量)
3. 业务协同:通知业务负责人评估影响,成立临时应急小组
4. 高层汇报:每5分钟在应急群更新进展,30分钟向管理层汇报
""".strip()

return {"result": suggestion}

创建知识库

rerank就是细化检索#

将最后的结果送给大模型分析

添加最后的输出结果

Agent调用工作流

打包完成后发布为工具,可以将其作为一个agent进行调用

创建AI Agent。说实话学过langchain,这种东西看一眼就知道是啥意思了,langchain还是太权威了

这里显然告警等级有点问题,但是不重要,估计是代码执行的时候出了点问题

Crontab脚本调用工作流

使用crontab调用脚本,脚本中使用api进行调用(api_key可以在dify网页端创建)

脚本一方面使用curl从prometheus中获取数据,一方面将获取的数据格式化成自然语言后,调用dify的接口发给它来运行工作流

注:我这个脚本抓取的是有史以来所有的数据,但在实践中会对抓取窗口做限制

省略的步骤:
使用了prometheus的sdk,在本机上暴露了端口发送登录的信息,prometheus配置抓取该端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/bin/bash

# ===== 配置部分 =====
API_KEY="app-RWFngEfUSKmWPVDHXQy4mHvB"
BASE_URL="http://192.168.10.100:8085/v1"
USER_ID="wangsheng" # 用户的ID,随意
PROM_URL="http://localhost:9090"
LOG_FILE="inspector.log"

# ===== jq 路径 =====
JQ_CMD="/usr/bin/jq"

# ===== 日志函数 =====
log() {
TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")
echo "[$TIMESTAMP] $1" | tee -a "$LOG_FILE"
}

log "脚本开始执行"

# ===== 查询 Prometheus 指标 =====
log "查询 Prometheus: 总登录次数"
TOTAL_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | .value[1] | tonumber] | add')
TOTAL_LOGINS=${TOTAL_LOGINS:-0}
log "总登录次数: $TOTAL_LOGINS"

log "查询 Prometheus: 登录成功次数"
SUCCESS_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | select(.metric.login_status=="['\''success'\'']") | .value[1] | tonumber] | add')
SUCCESS_LOGINS=${SUCCESS_LOGINS:-0}
log "登录成功次数: $SUCCESS_LOGINS"

log "查询 Prometheus: 登录失败次数"
FAIL_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | select(.metric.login_status=="['\''fail'\'']") | .value[1] | tonumber] | add')
FAIL_LOGINS=${FAIL_LOGINS:-0}
log "登录失败次数: $FAIL_LOGINS"

# ===== 生成 Workflow 输入文本 =====
INPUT_TEXT="用户登录统计:总登录次数: ${TOTAL_LOGINS},登录成功: ${SUCCESS_LOGINS},登录失败: ${FAIL_LOGINS}"
SAFE_INPUT_TEXT=$(echo "$INPUT_TEXT" | $JQ_CMD -Rs .)
log "生成 Workflow 输入文本: $INPUT_TEXT"

# ===== 调用 Workflow =====
log "🚀 正在调用 Workflow ..."
RESPONSE=$(curl -s -X POST "${BASE_URL}/workflows/run" \
-H "Authorization: Bearer ${API_KEY}" \
-H "Content-Type: application/json" \
-d "{
\"inputs\": {
\"user_query\": $SAFE_INPUT_TEXT
},
\"response_mode\": \"blocking\",
\"user\": \"${USER_ID}\"
}")

log "收到 Workflow 原始响应: $RESPONSE"

# ===== 输出结果 =====
echo "✅ Workflow 返回结果:"
RESULT=$(echo "${RESPONSE}" | $JQ_CMD -r '.data.outputs.tool_response')
echo "$RESULT"
log "Workflow 中文结果: $RESULT"

log "脚本执行完成"

# ===== 保存报告 =====
TIMESTAMP=$(date +"%Y%m%d%H%M%S")
REPORT_DIR="/Users/cuihao/docker/dify/report"
mkdir -p "$REPORT_DIR"
REPORT_FILE="${REPORT_DIR}/inspector_${TIMESTAMP}.json"
echo "$RESULT" > "$REPORT_FILE"
log "Workflow 结果已保存到: $REPORT_FILE"

log "脚本执行完成"

Crontab脚本调用结果

dify层面日志

CATALOG
  1. 1. 总体思路
  2. 2. dify安装部署
    1. 2.1. 参数提取
    2. 2.2. 代码执行模块
    3. 2.3. 创建知识库
    4. 2.4. 将最后的结果送给大模型分析
  3. 3. Agent调用工作流
  4. 4. Crontab脚本调用工作流