LangGraph 完全指南:构建复杂的AI工作流
LangGraph 完全指南:构建复杂的AI工作流
LangGraph 简介
LangGraph是LangChain生态系统中的一个强大框架,专门用于构建有状态的、多步骤的AI应用。它使用图结构来表示复杂的工作流,支持循环、条件分支和状态管理。
核心概念
1. 状态图 (State Graph)
from langgraph.graph import StateGraph
from typing import TypedDict, List
# 定义状态结构
class AgentState(TypedDict):
messages: List[str]
current_step: str
context: dict
result: str
2. 节点 (Nodes)
def research_node(state: AgentState) -> AgentState:
"""研究节点:收集信息"""
query = state["messages"][-1]
# 执行研究逻辑
research_result = perform_research(query)
return {
**state,
"context": {"research": research_result},
"current_step": "analysis"
}
def analysis_node(state: AgentState) -> AgentState:
"""分析节点:分析信息"""
research_data = state["context"]["research"]
# 执行分析逻辑
analysis_result = analyze_data(research_data)
return {
**state,
"context": {**state["context"], "analysis": analysis_result},
"current_step": "synthesis"
}
3. 边 (Edges) 和条件路由
def should_continue(state: AgentState) -> str:
"""条件路由:决定下一步"""
if state["current_step"] == "research":
return "analysis"
elif state["current_step"] == "analysis":
if needs_more_research(state):
return "research" # 循环回研究
else:
return "synthesis"
else:
return "end"
构建第一个LangGraph应用
简单的问答系统
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
class QAState(TypedDict):
question: str
context: str
answer: str
confidence: float
def retrieve_context(state: QAState) -> QAState:
"""检索相关上下文"""
question = state["question"]
# 这里可以集成向量数据库检索
context = vector_store.similarity_search(question)
return {
**state,
"context": context
}
def generate_answer(state: QAState) -> QAState:
"""生成答案"""
llm = ChatOpenAI(model="gpt-4")
prompt = f"""
基于以下上下文回答问题:
上下文:{state['context']}
问题:{state['question']}
请提供准确的答案:
"""
response = llm.invoke(prompt)
return {
**state,
"answer": response.content,
"confidence": 0.8 # 可以添加置信度评估
}
# 构建图
workflow = StateGraph(QAState)
# 添加节点
workflow.add_node("retrieve", retrieve_context)
workflow.add_node("generate", generate_answer)
# 添加边
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
# 设置入口点
workflow.set_entry_point("retrieve")
# 编译图
app = workflow.compile()
高级功能:多智能体协作
研究助手系统
from langgraph.graph import StateGraph, END
from typing import Literal
class ResearchState(TypedDict):
topic: str
research_plan: str
collected_info: List[dict]
analysis: str
report: str
current_agent: str
def planner_agent(state: ResearchState) -> ResearchState:
"""规划智能体:制定研究计划"""
llm = ChatOpenAI(model="gpt-4")
prompt = f"""
为以下主题制定详细的研究计划:
主题:{state['topic']}
请提供:
1. 研究的关键问题
2. 需要收集的信息类型
3. 研究的优先级
"""
plan = llm.invoke(prompt).content
return {
**state,
"research_plan": plan,
"current_agent": "researcher"
}
def researcher_agent(state: ResearchState) -> ResearchState:
"""研究智能体:收集信息"""
plan = state["research_plan"]
# 模拟信息收集
collected_info = []
for query in extract_queries_from_plan(plan):
info = search_information(query)
collected_info.append({
"query": query,
"info": info,
"source": "web_search"
})
return {
**state,
"collected_info": collected_info,
"current_agent": "analyst"
}
def analyst_agent(state: ResearchState) -> ResearchState:
"""分析智能体:分析信息"""
llm = ChatOpenAI(model="gpt-4")
info_summary = "\n".join([
f"查询:{item['query']}\n信息:{item['info']}\n"
for item in state["collected_info"]
])
prompt = f"""
分析以下研究信息:
{info_summary}
请提供:
1. 关键发现
2. 趋势分析
3. 潜在问题
4. 建议
"""
analysis = llm.invoke(prompt).content
return {
**state,
"analysis": analysis,
"current_agent": "writer"
}
def writer_agent(state: ResearchState) -> ResearchState:
"""写作智能体:生成报告"""
llm = ChatOpenAI(model="gpt-4")
prompt = f"""
基于以下信息生成研究报告:
主题:{state['topic']}
研究计划:{state['research_plan']}
分析结果:{state['analysis']}
请生成一份结构化的研究报告。
"""
report = llm.invoke(prompt).content
return {
**state,
"report": report,
"current_agent": "complete"
}
# 构建多智能体工作流
research_workflow = StateGraph(ResearchState)
# 添加智能体节点
research_workflow.add_node("planner", planner_agent)
research_workflow.add_node("researcher", researcher_agent)
research_workflow.add_node("analyst", analyst_agent)
research_workflow.add_node("writer", writer_agent)
# 添加顺序边
research_workflow.add_edge("planner", "researcher")
research_workflow.add_edge("researcher", "analyst")
research_workflow.add_edge("analyst", "writer")
research_workflow.add_edge("writer", END)
# 设置入口点
research_workflow.set_entry_point("planner")
# 编译
research_app = research_workflow.compile()
条件分支和循环
自适应问答系统
def quality_check(state: QAState) -> Literal["improve", "finalize"]:
"""质量检查:决定是否需要改进答案"""
confidence = state.get("confidence", 0)
if confidence < 0.7:
return "improve"
else:
return "finalize"
def improve_answer(state: QAState) -> QAState:
"""改进答案"""
llm = ChatOpenAI(model="gpt-4")
prompt = f"""
以下答案的置信度较低,请改进:
问题:{state['question']}
当前答案:{state['answer']}
上下文:{state['context']}
请提供更准确、更详细的答案:
"""
improved_answer = llm.invoke(prompt).content
return {
**state,
"answer": improved_answer,
"confidence": min(state["confidence"] + 0.2, 1.0)
}
# 添加条件分支
workflow.add_conditional_edges(
"generate",
quality_check,
{
"improve": "improve_answer",
"finalize": END
}
)
workflow.add_node("improve_answer", improve_answer)
workflow.add_edge("improve_answer", "generate") # 循环回检查
状态持久化
使用检查点保存状态
from langgraph.checkpoint.sqlite import SqliteSaver
# 创建检查点保存器
checkpointer = SqliteSaver.from_conn_string(":memory:")
# 编译时添加检查点
app = workflow.compile(checkpointer=checkpointer)
# 使用配置运行
config = {"configurable": {"thread_id": "conversation_1"}}
# 运行工作流
result = app.invoke(
{"question": "什么是机器学习?"},
config=config
)
# 获取状态历史
history = app.get_state_history(config)
for state in history:
print(f"步骤:{state.metadata['step']}")
print(f"状态:{state.values}")
流式处理
实时响应
# 流式运行
for chunk in app.stream(
{"question": "解释深度学习的原理"},
config=config
):
print(f"节点:{chunk.keys()}")
for node, output in chunk.items():
print(f"输出:{output}")
实际应用案例
1. 客户服务机器人
class CustomerServiceState(TypedDict):
customer_query: str
intent: str
customer_info: dict
solution: str
escalation_needed: bool
def intent_classifier(state: CustomerServiceState) -> CustomerServiceState:
"""意图分类"""
# 使用分类模型识别客户意图
intent = classify_intent(state["customer_query"])
return {**state, "intent": intent}
def info_retriever(state: CustomerServiceState) -> CustomerServiceState:
"""信息检索"""
# 根据意图检索相关信息
info = retrieve_customer_info(state["intent"])
return {**state, "customer_info": info}
def solution_generator(state: CustomerServiceState) -> CustomerServiceState:
"""解决方案生成"""
# 生成解决方案
solution = generate_solution(state["customer_query"], state["customer_info"])
return {**state, "solution": solution}
2. 代码审查助手
class CodeReviewState(TypedDict):
code: str
language: str
issues: List[dict]
suggestions: List[str]
score: float
def syntax_checker(state: CodeReviewState) -> CodeReviewState:
"""语法检查"""
issues = check_syntax(state["code"], state["language"])
return {**state, "issues": issues}
def style_checker(state: CodeReviewState) -> CodeReviewState:
"""代码风格检查"""
style_issues = check_code_style(state["code"], state["language"])
all_issues = state["issues"] + style_issues
return {**state, "issues": all_issues}
def security_checker(state: CodeReviewState) -> CodeReviewState:
"""安全检查"""
security_issues = check_security(state["code"], state["language"])
all_issues = state["issues"] + security_issues
return {**state, "issues": all_issues}
最佳实践
1. 状态设计
# 好的状态设计
class WellDesignedState(TypedDict):
# 输入数据
user_input: str
# 中间结果
processed_data: dict
context: dict
# 输出结果
final_result: str
# 元数据
step_count: int
execution_time: float
confidence: float
2. 错误处理
def safe_node(state: AgentState) -> AgentState:
"""安全的节点执行"""
try:
# 执行主要逻辑
result = process_data(state)
return result
except Exception as e:
# 记录错误并返回错误状态
logger.error(f"节点执行失败:{e}")
return {
**state,
"error": str(e),
"status": "failed"
}
3. 性能优化
# 并行执行
from langgraph.graph import StateGraph
import asyncio
async def parallel_processing(state: AgentState) -> AgentState:
"""并行处理多个任务"""
tasks = [
process_task_1(state),
process_task_2(state),
process_task_3(state)
]
results = await asyncio.gather(*tasks)
return {
**state,
"results": results
}
调试和监控
1. 可视化工作流
# 生成图的可视化
from IPython.display import Image, display
# 显示图结构
display(Image(app.get_graph().draw_mermaid_png()))
2. 日志记录
import logging
def logged_node(state: AgentState) -> AgentState:
"""带日志的节点"""
logging.info(f"执行节点,当前状态:{state}")
result = process_node(state)
logging.info(f"节点完成,结果:{result}")
return result
总结
LangGraph为构建复杂的AI工作流提供了强大的框架。通过状态图、条件分支、循环和检查点等功能,我们可以构建出智能、可靠、可扩展的AI应用。
关键优势:
- 状态管理:清晰的状态流转
- 可组合性:模块化的节点设计
- 可观测性:完整的执行历史
- 容错性:检查点和恢复机制
在实际应用中,合理设计状态结构、实现错误处理、添加监控日志是成功使用LangGraph的关键。
参考资源: