发新帖

[LangGraph] 核心概念与关键代码

零下一度 14小时前 17

LangGraph是一个用于构建有状态、多角色协作的LLM应用编排框架。它的核心思想是将工作流建模为图(Graph),由节点(Node)、边(Edge) 和共享状态(State) 构成


1. 定义状态 (State)

状态是一个在所有节点间共享的数据容器,是图运行的“记忆”。在Python中,通常通过继承TypedDict来定义其结构。

from typing import TypedDict, List, Annotated
import operator
# 定义状态结构
class AgentState(TypedDict):
    messages: Annotated[List[dict], operator.add]  # 使用operator.add作为reducer,用于追加新消息[reference:5]
    question: str
    answer: str
    # ... 其他字段

关键点:Annotated类型和reducer函数(如operator.add)用于定义当多个节点更新同一状态键时,如何合并这些更新(例如,追加而非覆盖)


2. 构建图 (StateGraph)

图是工作流的蓝图。首先实例化StateGraph,并传入之前定义的状态类。

from langgraph.graph import StateGraph, START, END
# 初始化图
workflow = StateGraph(AgentState)


3. 添加节点 (Node)

节点是图的基本执行单元,通常是一个接收当前状态并返回部分状态更新的函数

# 定义一个节点函数
def search_node(state: AgentState) -> dict:
    # 从状态中读取数据
    query = state["question"]
    # 执行具体逻辑(如调用搜索API)
    result = perform_search(query)
    # 返回需要更新的状态字段
    return {"search_result": result}
# 将节点函数添加到图中,并赋予一个名称
workflow.add_node("search", search_node)


4. 添加边 (Edge)

边定义了节点之间的执行顺序和流转逻辑。

普通边:表示无条件地从一个节点执行到另一个节点。

workflow.add_edge("search", "generate")  # search执行后,无条件进入generate节点
workflow.add_edge(START, "search")       # 设置图的入口节点

条件边:允许根据当前状态动态选择下一个要执行的节点,是实现分支和循环的关键

# 定义路由函数,返回下一个节点的名称
def route_decision(state: AgentState) -> str:
    if state["need_retry"]:
        return "search"
    else:
        return "generate"
# 添加条件边
workflow.add_conditional_edges(
    "generate",          # 起始节点
    route_decision,      # 路由函数
    {
        "search": "search",   # 如果返回"search",则前往"search"节点
        "generate": END       # 如果返回"generate",则结束流程
    }
)


5. 编译与执行

在定义好所有节点和边之后,将图编译成可执行的应用程序。

# 编译图
app = workflow.compile()
# 执行图,输入初始状态
initial_state = {"question": "今天天气怎么样?"}
final_state = app.invoke(initial_state)
print(final_state["answer"])


完整示例:带工具调用的Agent

下面是一个更完整的示例,展示了一个能够自主决定是否调用工具的Agent

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
# 1. 定义状态
class GraphState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
# 2. 定义工具
def lookup_weather(city: str) -> str:
    return f"The weather in {city} is sunny."
tools = [lookup_weather]
tool_node = ToolNode(tools)  # 工具节点,自动处理工具调用
# 3. 初始化支持工具调用的LLM
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
# 4. 定义Agent节点
def agent_step(state: GraphState):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
# 5. 定义路由函数
def route_decision(state: GraphState):
    last_msg = state["messages"][-1]
    if last_msg.tool_calls:  # 如果LLM请求调用工具
        return "tool"
    return END  # 否则结束
# 6. 构建并编译图
graph = StateGraph(GraphState)
graph.add_node("agent", agent_step)
graph.add_node("tool", tool_node)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", route_decision)
graph.add_edge("tool", "agent")  # 工具执行后回到agent
app = graph.compile()
# 7. 运行
result = app.invoke(
    {"messages": [HumanMessage(content="伦敦的天气怎么样?")]}
)
print(result["messages"][-1].content)


高级特性与关键API

1. 持久化与检查点 (Persistence & Checkpointer)

通过启用检查点(checkpointer),LangGraph可以在每个节点执行后自动保存状态。这使得工作流具备持久化执行能力,可以从故障或中断中恢复。

from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()  # 也可使用PostgresSaver等生产级存储器[reference:13]
app = workflow.compile(checkpointer=checkpointer)

2. 人机协同 (Human-in-the-loop)

通过interrupt函数,可以在节点中设置断点,暂停图执行以等待人工输入

from langgraph.types import interrupt, Command
def human_review_node(state: AgentState):
    # 暂停执行,并将问题"请审核答案"发送给客户端
    human_input = interrupt("请审核答案")
    # 收到人工回复后,继续执行
    return {"human_feedback": human_input}
# 恢复执行时,通过Command传入人工输入[reference:16]
app.stream(Command(resume="人工审核通过"))

这为实现需要人工审批或干预的复杂工作流提供了基础

3. 命令 (Command)

Command对象提供了一种强大的方式来同时更新状态和路由,可以替代复杂的条件边逻辑

from langgraph.types import Command
def smart_node(state: State):
    # 根据逻辑决定下一步
    next_node = "node_b" if condition else "node_c"
    # 返回一个Command,同时更新状态并指定下一跳
    return Command(
        update={"status": "processed"},  # 更新状态
        goto=next_node                   # 路由到下一个节点
    )

总结

LangGraph通过将复杂的智能体工作流抽象为状态图,提供了清晰的编排模型。其关键代码模式围绕定义状态 → 构建图 → 添加节点和边 → 编译执行展开。在此基础上,检查点中断和命令等高级特性,为构建健壮、可交互、生产级的LLM应用提供了坚实基础。


LangGraph 关键代码速查表


概念说明关键代码
状态定义 (State)定义节点间共享的数据结构,通过 Annotated 和 Reducer 函数(如 operator.add)指定相同字段被多次更新时的合并逻辑(如追加而非覆盖)。class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
user_query: str
图初始化实例化状态图,绑定状态类,作为整个工作流的容器。from langgraph.graph import StateGraph
workflow = StateGraph(AgentState)
添加节点 (Node)将执行函数注册为图中的节点。节点函数接收当前状态,返回需更新的状态字段(字典)。workflow.add_node("node_name", function_name)
*(函数签名:def fn(state: AgentState) -> dict:) *
普通边 (Edge)定义确定性的流转路径,即从一个节点无条件执行到另一个节点。使用 START 作为入口,END 作为终点。workflow.add_edge(START, "first_node")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_c", END)
条件边 (Conditional Edge)根据当前状态动态计算下一个节点,用于实现 if-else 分支或 while 循环。workflow.add_conditional_edges(
"source_node",
routing_function, # 返回目标节点名字符串
{"target_a": "node_a", "target_b": END}
)
编译与执行将图编译为可运行的 Runnable 对象。invoke 接收初始输入并阻塞式返回最终状态。app = workflow.compile()
final_state = app.invoke({"user_query": "Hello"})
流式输出 (Streaming)使用 stream 方法逐块获取输出,适合长耗时任务,能实时展示中间节点的状态更新。for chunk in app.stream(initial_state):
print(chunk)
持久化与检查点启用检查点后,图在执行每个节点后自动保存状态快照,支持故障恢复、对话记忆和“回溯”执行。from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
人机协同 (中断)在节点中调用 interrupt 暂停图执行,并向客户端抛出需要人工干预的问题。恢复时通过 Command(resume=...) 传入人工输入。暂停human_input = interrupt("请审核此答案")
恢复app.stream(Command(resume="审核通过"), config=config)
命令 (Command)在节点返回值中使用 Command,可以同时完成两件事:更新状态 (update) 并直接跳转到指定节点 (goto),可替代复杂的条件边。from langgraph.types import Command
return Command(update={"status": "ok"}, goto="next_node")
子图 (Subgraph)将一个已编译好的完整图作为节点添加到主图中,实现复杂工作流的模块化封装和复用。workflow.add_node("sub_module", compiled_sub_graph)









最新回复 (0)
返回
零下一度
主题数
974
帖子数
0
注册排名
1