Akemi

Langchain模块Callback组件

2025/12/12

回调的基本概念:

在llm应用执行的每个阶段,比如日志记录、监控、流处理等

回调处理器是实现了callbackhandler接口的对象,每个可以订阅的事件都有一个方法,当事件被触发时,CallbackManager会在每个处理器上调用相应的方法

在哪些地方定义回调函数:

  • 构造函数时定义:llmchain=(callbacks=[handler])
  • 发出请求时定义:llm.invoke(callbacks=…
  • 构造函数中设置verbose为true,调用StdOutCallHandler

使用最简单回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from langchain.callbacks import StdOutCallbackHandler
from langchain_deepseek import ChatDeepSeek
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
from langchain.schema.output_parser import StrOutputParser

load_dotenv()
llm = ChatDeepSeek(model="deepseek-chat")
prompt = PromptTemplate.from_template("1 + {number} = ")

# 创建回调处理器
handler = StdOutCallbackHandler()

# 初始化时定义回调
chain = prompt | llm | StrOutputParser()

# 调用时传入回调
print(chain.invoke(
{"number": 2},
config={"callbacks": [handler]}
))

# > Entering new RunnableSequence chain...
# > Entering new PromptTemplate chain...
# > Finished chain.
# > Entering new StrOutputParser chain...
# > Finished chain.
# > Finished chain.
# Let’s add the numbers together:
# \[
# 1 + 2 = 3
# \]
# So the answer is **3**.

自定义同步/异步回调触发器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
from typing import Any,Dict,List
from langchain_deepseek import ChatDeepSeek
from langchain.schema import LLMResult,HumanMessage
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain_core.callbacks import BaseCallbackHandler
from dotenv import load_dotenv

load_dotenv()

class CustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self,token: str,**kwargs) -> None:
print(f"同步处理器: token: {token}")
def on_llm_start(self,serialized: dict[str, any],prompts: List[str],**kwargs: any) -> None:
print("同步处理:LLM正在开始")
def on_llm_end(self,response: LLMResult,**kwargs: any) -> None:
print("同步处理:LLM正在结束")

class CustomAsyncHandler(BaseCallbackHandler):
def on_llm_new_token(self,token: str,**kwargs) -> None:
print(f"异步处理器: token: {token}")
async def on_llm_start(self,serialized: dict[str, any],prompts: List[str],**kwargs: any) -> None:
print("异步处理:LLM正在开始")
async def on_llm_end(self,response: LLMResult,**kwargs: any) -> None:
print("同步处理:LLM正在结束")

llm_sync = ChatDeepSeek(model="deepseek-chat",callbacks=[CustomSyncHandler()])
llm_async = ChatDeepSeek(model="deepseek-chat",streaming=True,callbacks=[CustomAsyncHandler()])

def main():
# 同步调用(使用同步处理器)
print("=== 同步调用示例 ===")
result_sync = llm_sync.invoke(input=[HumanMessage(content="给我讲一个笑话")])
print(f"同步结果: {result_sync.content}")

# 异步调用(使用异步处理器)
print("\n=== 异步调用示例 ===")
async def async_call():
result_async = await llm_async.ainvoke(
input=[HumanMessage(content="给我讲一个笑话")]
)
print(f"异步结果: {result_async.content}")

asyncio.run(async_call())

if __name__ == "__main__":
main()

# === 同步调用示例 ===
# 同步处理:LLM正在开始
# 同步处理:LLM正在结束
# 同步结果: 有一天,一位程序员去面试。

# 面试官问:“你简历上写着精通多种编程语言,那请问你如何向一个完全不懂技术的人解释‘递归’这个概念?”

# 程序员想了想,然后转身对面试官说:“这样,我给你讲个故事吧——从前有个面试官,他让程序员解释递归,程序员就讲了个故事:‘从前有个面试官,他让程序员解释递归,程序员就讲了个故事……’”

# 面试官沉默两秒:“好了,你被录取了。不过出门左转记得把刚刚的‘无限递归’加个终止条件。”
...

FileCallbackHandler文件回调处理器

将链的执行过程和日志写入文件,而不是输出到控制台,同时使用loguru库可以记录其他输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from loguru import logger
from langchain.callbacks import FileCallbackHandler
from langchain_deepseek import ChatDeepSeek
from langchain.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from dotenv import load_dotenv

load_dotenv()
logfile = "./output.log"

# 配置 loguru 日志记录器:
# - colorize=True:启用彩色输出
# - enqueue=True:启用队列,确保线程安全的日志记录
logger.add(
logfile,
colorize=True,
enqueue=True,
)

# 创建 FileCallbackHandler 实例,指定将 LangChain 的执行日志写入到 logfile
handler = FileCallbackHandler(logfile)

llm = ChatDeepSeek(model="deepseek-chat",streaming=True)
prompt = PromptTemplate.from_template("1 + {number} =")

# 构建LangChain链,StrOutputParser:将模型输出解析为字符串
chain = prompt | llm | StrOutputParser()

# 调用链
response = chain.invoke(
{"number": 3},
config={"callbacks": [handler]}
)
logger.info(response)

# cat ./output.log
# > Entering new RunnableSequence chain...

# > Entering new PromptTemplate chain...

# > Finished chain.

# > Entering new StrOutputParser chain...

# > Finished chain.

# > Finished chain.
# 2025-12-11 15:46:22.100 | INFO | __main__:<module>:28 - Let’s solve it step-by-step:

# \[
# 1 + 3 = 4
# \]

# **Answer:** 4

CATALOG
  1. 1. 使用最简单回调
  2. 2. 自定义同步/异步回调触发器
  3. 3. FileCallbackHandler文件回调处理器