LangGraph是一个用于构建有状态、多角色协作的LLM应用编排框架。它的核心思想是将工作流建模为图(Graph),由节点(Node)、边(Edge) 和共享状态(State) 构成
1. 定义状态 (State)
状态是一个在所有节点间共享的数据容器,是图运行的“记忆”。在Python中,通常通过继承TypedDict来定义其结构。
from typing import TypedDict, List, Annotated import operator # 定义状态结构 class AgentState(TypedDict): messages: Annotated[List[dict], operator.add] # 使用operator.add作为reducer,用于追加新消息[reference:5] question: str answer: str # ... 其他字段
关键点:Annotated类型和reducer函数(如operator.add)用于定义当多个节点更新同一状态键时,如何合并这些更新(例如,追加而非覆盖)
2. 构建图 (StateGraph)
图是工作流的蓝图。首先实例化StateGraph,并传入之前定义的状态类。
from langgraph.graph import StateGraph, START, END # 初始化图 workflow = StateGraph(AgentState)
3. 添加节点 (Node)
节点是图的基本执行单元,通常是一个接收当前状态并返回部分状态更新的函数
# 定义一个节点函数
def search_node(state: AgentState) -> dict:
# 从状态中读取数据
query = state["question"]
# 执行具体逻辑(如调用搜索API)
result = perform_search(query)
# 返回需要更新的状态字段
return {"search_result": result}
# 将节点函数添加到图中,并赋予一个名称
workflow.add_node("search", search_node)4. 添加边 (Edge)
边定义了节点之间的执行顺序和流转逻辑。
普通边:表示无条件地从一个节点执行到另一个节点。
workflow.add_edge("search", "generate") # search执行后,无条件进入generate节点
workflow.add_edge(START, "search") # 设置图的入口节点条件边:允许根据当前状态动态选择下一个要执行的节点,是实现分支和循环的关键
# 定义路由函数,返回下一个节点的名称
def route_decision(state: AgentState) -> str:
if state["need_retry"]:
return "search"
else:
return "generate"
# 添加条件边
workflow.add_conditional_edges(
"generate", # 起始节点
route_decision, # 路由函数
{
"search": "search", # 如果返回"search",则前往"search"节点
"generate": END # 如果返回"generate",则结束流程
}
)5. 编译与执行
在定义好所有节点和边之后,将图编译成可执行的应用程序。
# 编译图
app = workflow.compile()
# 执行图,输入初始状态
initial_state = {"question": "今天天气怎么样?"}
final_state = app.invoke(initial_state)
print(final_state["answer"])完整示例:带工具调用的Agent
下面是一个更完整的示例,展示了一个能够自主决定是否调用工具的Agent
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
# 1. 定义状态
class GraphState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
# 2. 定义工具
def lookup_weather(city: str) -> str:
return f"The weather in {city} is sunny."
tools = [lookup_weather]
tool_node = ToolNode(tools) # 工具节点,自动处理工具调用
# 3. 初始化支持工具调用的LLM
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
# 4. 定义Agent节点
def agent_step(state: GraphState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 5. 定义路由函数
def route_decision(state: GraphState):
last_msg = state["messages"][-1]
if last_msg.tool_calls: # 如果LLM请求调用工具
return "tool"
return END # 否则结束
# 6. 构建并编译图
graph = StateGraph(GraphState)
graph.add_node("agent", agent_step)
graph.add_node("tool", tool_node)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", route_decision)
graph.add_edge("tool", "agent") # 工具执行后回到agent
app = graph.compile()
# 7. 运行
result = app.invoke(
{"messages": [HumanMessage(content="伦敦的天气怎么样?")]}
)
print(result["messages"][-1].content)高级特性与关键API
1. 持久化与检查点 (Persistence & Checkpointer)
通过启用检查点(checkpointer),LangGraph可以在每个节点执行后自动保存状态。这使得工作流具备持久化执行能力,可以从故障或中断中恢复。
from langgraph.checkpoint.memory import MemorySaver checkpointer = MemorySaver() # 也可使用PostgresSaver等生产级存储器[reference:13] app = workflow.compile(checkpointer=checkpointer)
2. 人机协同 (Human-in-the-loop)
通过interrupt函数,可以在节点中设置断点,暂停图执行以等待人工输入
from langgraph.types import interrupt, Command
def human_review_node(state: AgentState):
# 暂停执行,并将问题"请审核答案"发送给客户端
human_input = interrupt("请审核答案")
# 收到人工回复后,继续执行
return {"human_feedback": human_input}
# 恢复执行时,通过Command传入人工输入[reference:16]
app.stream(Command(resume="人工审核通过"))这为实现需要人工审批或干预的复杂工作流提供了基础
3. 命令 (Command)
Command对象提供了一种强大的方式来同时更新状态和路由,可以替代复杂的条件边逻辑
from langgraph.types import Command
def smart_node(state: State):
# 根据逻辑决定下一步
next_node = "node_b" if condition else "node_c"
# 返回一个Command,同时更新状态并指定下一跳
return Command(
update={"status": "processed"}, # 更新状态
goto=next_node # 路由到下一个节点
)总结
LangGraph通过将复杂的智能体工作流抽象为状态图,提供了清晰的编排模型。其关键代码模式围绕定义状态 → 构建图 → 添加节点和边 → 编译执行展开。在此基础上,检查点、中断和命令等高级特性,为构建健壮、可交互、生产级的LLM应用提供了坚实基础。
| 概念 | 说明 | 关键代码 |
|---|---|---|
| 状态定义 (State) | 定义节点间共享的数据结构,通过 Annotated 和 Reducer 函数(如 operator.add)指定相同字段被多次更新时的合并逻辑(如追加而非覆盖)。 | class AgentState(TypedDict):messages: Annotated[List[dict], operator.add]user_query: str |
| 图初始化 | 实例化状态图,绑定状态类,作为整个工作流的容器。 | from langgraph.graph import StateGraphworkflow = StateGraph(AgentState) |
| 添加节点 (Node) | 将执行函数注册为图中的节点。节点函数接收当前状态,返回需更新的状态字段(字典)。 | workflow.add_node("node_name", function_name)*(函数签名: def fn(state: AgentState) -> dict:) * |
| 普通边 (Edge) | 定义确定性的流转路径,即从一个节点无条件执行到另一个节点。使用 START 作为入口,END 作为终点。 | workflow.add_edge(START, "first_node")workflow.add_edge("node_a", "node_b")workflow.add_edge("node_c", END) |
| 条件边 (Conditional Edge) | 根据当前状态动态计算下一个节点,用于实现 if-else 分支或 while 循环。 | workflow.add_conditional_edges("source_node",routing_function, # 返回目标节点名字符串{"target_a": "node_a", "target_b": END}) |
| 编译与执行 | 将图编译为可运行的 Runnable 对象。invoke 接收初始输入并阻塞式返回最终状态。 | app = workflow.compile()final_state = app.invoke({"user_query": "Hello"}) |
| 流式输出 (Streaming) | 使用 stream 方法逐块获取输出,适合长耗时任务,能实时展示中间节点的状态更新。 | for chunk in app.stream(initial_state):print(chunk) |
| 持久化与检查点 | 启用检查点后,图在执行每个节点后自动保存状态快照,支持故障恢复、对话记忆和“回溯”执行。 | from langgraph.checkpoint.memory import MemorySavercheckpointer = MemorySaver()app = workflow.compile(checkpointer=checkpointer) |
| 人机协同 (中断) | 在节点中调用 interrupt 暂停图执行,并向客户端抛出需要人工干预的问题。恢复时通过 Command(resume=...) 传入人工输入。 | 暂停:human_input = interrupt("请审核此答案")恢复: app.stream(Command(resume="审核通过"), config=config) |
| 命令 (Command) | 在节点返回值中使用 Command,可以同时完成两件事:更新状态 (update) 并直接跳转到指定节点 (goto),可替代复杂的条件边。 | from langgraph.types import Commandreturn Command(update={"status": "ok"}, goto="next_node") |
| 子图 (Subgraph) | 将一个已编译好的完整图作为节点添加到主图中,实现复杂工作流的模块化封装和复用。 | workflow.add_node("sub_module", compiled_sub_graph) |
|
|---|