LangChain Agent完全指南:构建智能对话系统

前言 LangChain作为最流行的LLM应用开发框架,其Agent系统提供了强大的工具来构建智能对话系统。本文将全面介绍LangChain Agent的核心概念、实现方式和最佳实践。 1. LangChain Agent基础 1.1 核心概念 LangChain Agent由以下核心组件构成: 1 2 3 4 5 6 7 8 9 10 11 12 from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent from langchain.prompts import StringPromptTemplate from langchain import OpenAI, SerpAPIWrapper, LLMChain # Agent的核心组件 components = { "llm": "大语言模型", "tools": "可用工具集", "prompt": "提示模板", "output_parser": "输出解析器", "memory": "记忆系统" } 1.2 Agent类型 LangChain提供多种预构建的Agent类型: 1 2 3 4 5 6 7 8 9 10 11 # 1. Zero-shot ReAct Agent from langchain.agents import create_react_agent # 2. Conversational Agent from langchain.agents import create_openai_functions_agent # 3. Plan-and-Execute Agent from langchain_experimental.plan_and_execute import PlanAndExecute # 4. Self-ask with search from langchain.agents import create_self_ask_with_search_agent 2. 构建第一个Agent 2.1 基础设置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from langchain.agents import create_react_agent, AgentExecutor from langchain.tools import Tool from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate # 初始化LLM llm = ChatOpenAI( temperature=0, model="gpt-4-turbo-preview" ) # 定义工具 def calculate(expression: str) -> str: """计算数学表达式""" try: result = eval(expression) return str(result) except: return "计算错误" tools = [ Tool( name="Calculator", func=calculate, description="用于计算数学表达式。输入应该是有效的Python表达式。" ) ] 2.2 创建Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # 定义提示模板 prompt = PromptTemplate.from_template(""" Answer the following questions as best you can. You have access to the following tools: {tools} Use the following format: Question: the input question you must answer Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question Begin! Question: {input} Thought: {agent_scratchpad} """) # 创建Agent agent = create_react_agent( llm=llm, tools=tools, prompt=prompt ) # 创建执行器 agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 ) 3. 高级工具集成 3.1 自定义工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.tools import BaseTool from typing import Optional, Type from pydantic import BaseModel, Field class SearchInput(BaseModel): query: str = Field(description="搜索查询词") class CustomSearchTool(BaseTool): name = "custom_search" description = "搜索互联网信息" args_schema: Type[BaseModel] = SearchInput def _run(self, query: str) -> str: """执行搜索""" # 实现搜索逻辑 return f"搜索结果:{query}" async def _arun(self, query: str) -> str: """异步执行搜索""" # 实现异步搜索逻辑 return await self.async_search(query) 3.2 工具链组合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain.tools import StructuredTool def multi_step_analysis(data: dict) -> str: """多步骤数据分析工具""" steps = [] # 步骤1:数据清洗 cleaned_data = clean_data(data) steps.append("数据清洗完成") # 步骤2:统计分析 statistics = analyze_statistics(cleaned_data) steps.append(f"统计分析:{statistics}") # 步骤3:生成报告 report = generate_report(statistics) steps.append("报告生成完成") return "\n".join(steps) analysis_tool = StructuredTool.from_function( func=multi_step_analysis, name="DataAnalysis", description="执行多步骤数据分析" ) 4. 记忆系统集成 4.1 对话记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory from langchain.memory import ConversationBufferWindowMemory # 完整对话记忆 memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # 窗口记忆(只保留最近N轮) window_memory = ConversationBufferWindowMemory( k=5, # 保留最近5轮对话 memory_key="chat_history", return_messages=True ) # 摘要记忆 summary_memory = ConversationSummaryMemory( llm=llm, memory_key="chat_history", return_messages=True ) 4.2 实体记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.memory import ConversationEntityMemory entity_memory = ConversationEntityMemory( llm=llm, entity_extraction_prompt=ENTITY_EXTRACTION_PROMPT, entity_summarization_prompt=ENTITY_SUMMARIZATION_PROMPT ) # 使用实体记忆的Agent agent_with_memory = AgentExecutor( agent=agent, tools=tools, memory=entity_memory, verbose=True ) 5. 自定义Agent类型 5.1 计划执行Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class PlanExecuteAgent: def __init__(self, llm, tools): self.llm = llm self.tools = tools self.planner = self._create_planner() self.executor = self._create_executor() def _create_planner(self): """创建计划生成器""" planner_prompt = PromptTemplate( template=""" 给定目标:{objective} 创建一个详细的执行计划,将任务分解为具体步骤。 每个步骤应该清晰、可执行。 输出格式: 1. [步骤描述] 2. [步骤描述] ... """, input_variables=["objective"] ) return LLMChain(llm=self.llm, prompt=planner_prompt) def _create_executor(self): """创建执行器""" return AgentExecutor( agent=create_react_agent(self.llm, self.tools, REACT_PROMPT), tools=self.tools ) def run(self, objective: str): # 生成计划 plan = self.planner.run(objective) steps = self._parse_plan(plan) # 执行计划 results = [] for step in steps: result = self.executor.run(step) results.append(result) return self._synthesize_results(results) 5.2 自反思Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class SelfReflectionAgent: def __init__(self, llm): self.llm = llm self.reflection_prompt = PromptTemplate( template=""" 任务:{task} 初次回答:{initial_answer} 请评估这个回答: 1. 准确性如何? 2. 完整性如何? 3. 有什么可以改进的地方? 提供改进后的答案: """, input_variables=["task", "initial_answer"] ) def run(self, task: str): # 初次回答 initial_answer = self.llm.invoke(task) # 自我反思 reflection_chain = LLMChain( llm=self.llm, prompt=self.reflection_prompt ) improved_answer = reflection_chain.run( task=task, initial_answer=initial_answer ) return improved_answer 6. 流式输出与异步处理 6.1 流式响应 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler # 配置流式输出 streaming_llm = ChatOpenAI( temperature=0, streaming=True, callbacks=[StreamingStdOutCallbackHandler()] ) # 创建支持流式的Agent streaming_agent = create_react_agent( llm=streaming_llm, tools=tools, prompt=prompt ) # 流式执行 async def stream_agent_response(query: str): async for chunk in streaming_agent.astream({"input": query}): if "output" in chunk: yield chunk["output"] 6.2 异步Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio from langchain.agents import create_openai_tools_agent async def async_agent_execution(): # 创建异步Agent async_agent = create_openai_tools_agent( llm=llm, tools=tools, prompt=prompt ) # 异步执行器 async_executor = AgentExecutor( agent=async_agent, tools=tools, verbose=True ) # 并发执行多个查询 queries = ["问题1", "问题2", "问题3"] tasks = [async_executor.ainvoke({"input": q}) for q in queries] results = await asyncio.gather(*tasks) return results 7. 错误处理与重试机制 7.1 错误处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.agents import AgentExecutor from langchain.callbacks import CallbackManager from tenacity import retry, stop_after_attempt, wait_exponential class RobustAgent: def __init__(self, agent, tools): self.executor = AgentExecutor( agent=agent, tools=tools, max_iterations=5, early_stopping_method="generate", handle_parsing_errors=True ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def run_with_retry(self, input_text: str): try: result = self.executor.run(input_text) return result except Exception as e: print(f"错误: {e}") # 降级处理 return self.fallback_response(input_text) def fallback_response(self, input_text: str): """降级响应""" return f"抱歉,无法处理您的请求:{input_text}" 7.2 验证机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ValidatedAgent: def __init__(self, agent, validator): self.agent = agent self.validator = validator def run(self, input_text: str): # 输入验证 if not self.validator.validate_input(input_text): return "输入无效" # 执行Agent result = self.agent.run(input_text) # 输出验证 if not self.validator.validate_output(result): return self.agent.run_with_correction(input_text) return result 8. 性能优化 8.1 缓存策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from langchain.cache import InMemoryCache, RedisCache from langchain.globals import set_llm_cache # 内存缓存 set_llm_cache(InMemoryCache()) # Redis缓存 import redis redis_client = redis.Redis.from_url("redis://localhost:6379") set_llm_cache(RedisCache(redis_client)) # 自定义缓存 class CustomCache: def __init__(self): self.cache = {} def lookup(self, prompt: str, llm_string: str): key = f"{llm_string}:{prompt}" return self.cache.get(key) def update(self, prompt: str, llm_string: str, return_val: list): key = f"{llm_string}:{prompt}" self.cache[key] = return_val 8.2 批处理优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from langchain.llms import OpenAI from concurrent.futures import ThreadPoolExecutor class BatchProcessor: def __init__(self, agent, max_workers=5): self.agent = agent self.executor = ThreadPoolExecutor(max_workers=max_workers) def process_batch(self, inputs: list): """批量处理输入""" futures = [] for input_text in inputs: future = self.executor.submit(self.agent.run, input_text) futures.append(future) results = [] for future in futures: try: result = future.result(timeout=30) results.append(result) except Exception as e: results.append(f"处理失败: {e}") return results 9. 监控与日志 9.1 自定义回调 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.callbacks.base import BaseCallbackHandler import time class PerformanceCallbackHandler(BaseCallbackHandler): def __init__(self): self.start_times = {} self.metrics = { "total_tokens": 0, "total_cost": 0, "execution_times": [] } def on_llm_start(self, serialized, prompts, **kwargs): self.start_times["llm"] = time.time() def on_llm_end(self, response, **kwargs): duration = time.time() - self.start_times.get("llm", time.time()) self.metrics["execution_times"].append(duration) # 记录token使用 if hasattr(response, "llm_output"): tokens = response.llm_output.get("token_usage", {}) self.metrics["total_tokens"] += tokens.get("total_tokens", 0) def on_tool_start(self, serialized, input_str, **kwargs): self.start_times[f"tool_{serialized.get('name')}"] = time.time() def on_tool_end(self, output, **kwargs): # 记录工具执行时间 pass 9.2 追踪系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.callbacks import LangChainTracer # 配置追踪 tracer = LangChainTracer( project_name="my_agent_project", client=None # 可以配置自定义客户端 ) # 使用追踪的Agent traced_agent = AgentExecutor( agent=agent, tools=tools, callbacks=[tracer], verbose=True ) 10. 生产部署 10.1 API封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn app = FastAPI() class AgentRequest(BaseModel): query: str session_id: str = None class AgentResponse(BaseModel): result: str metadata: dict = {} @app.post("/agent/chat", response_model=AgentResponse) async def chat(request: AgentRequest): try: # 获取或创建会话 session = get_or_create_session(request.session_id) # 执行Agent result = await agent_executor.ainvoke({ "input": request.query, "chat_history": session.get_history() }) # 更新会话历史 session.add_message(request.query, result["output"]) return AgentResponse( result=result["output"], metadata={ "session_id": session.id, "tools_used": result.get("intermediate_steps", []) } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) 10.2 Docker部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # Dockerfile FROM python:3.11-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码 COPY . . # 环境变量 ENV OPENAI_API_KEY=${OPENAI_API_KEY} ENV LANGCHAIN_TRACING_V2=true ENV LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY} # 运行 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] 11. 测试策略 11.1 单元测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import pytest from unittest.mock import Mock, patch class TestAgent: @pytest.fixture def mock_llm(self): llm = Mock() llm.invoke.return_value = "模拟响应" return llm @pytest.fixture def test_agent(self, mock_llm): return create_react_agent( llm=mock_llm, tools=[], prompt=PromptTemplate.from_template("{input}") ) def test_agent_execution(self, test_agent): result = test_agent.run("测试输入") assert result is not None @patch('langchain.tools.Calculator') def test_tool_usage(self, mock_calculator): mock_calculator.run.return_value = "42" # 测试工具调用 11.2 集成测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class IntegrationTests: def test_end_to_end_flow(self): """端到端测试""" # 1. 创建Agent agent = create_production_agent() # 2. 测试简单查询 simple_result = agent.run("What is 2+2?") assert "4" in simple_result # 3. 测试复杂查询 complex_result = agent.run( "Search for the latest AI news and summarize it" ) assert len(complex_result) > 100 # 4. 测试错误处理 error_result = agent.run("Invalid input %$#@") assert "error" not in error_result.lower() 12. 最佳实践总结 明确的工具描述:确保工具描述准确、详细 适当的提示工程:根据任务调整提示模板 合理的迭代限制:防止无限循环 完善的错误处理:处理各种异常情况 性能监控:跟踪token使用和执行时间 渐进式复杂度:从简单Agent开始,逐步增加功能 测试覆盖:包括单元测试和集成测试 文档完善:记录Agent能力和限制 结论 LangChain Agent提供了构建智能对话系统的强大框架。通过合理使用其提供的工具和模式,我们可以快速构建出功能丰富、性能优秀的AI Agent应用。 ...

January 2, 2025 · 9 min · Chico Gong

LangChain Graph 详解:构建智能知识图谱

引言 在人工智能和大语言模型(LLM)的应用中,知识的表示与组织方式直接影响系统的推理能力和智能水平。LangChain Graph 作为LangChain生态系统中的重要组件,提供了一套强大的工具,使开发者能够轻松地从文本中提取结构化知识,构建知识图谱,并基于图进行复杂推理。本文将深入探讨LangChain Graph的概念、工作原理、应用场景以及实践技巧,帮助您全面理解和应用这一强大工具。 知识图谱与LangChain Graph基础 什么是知识图谱? 知识图谱(Knowledge Graph)是一种结构化数据模型,用于表示实体(Entities)之间的关系(Relations)。它以图的形式组织信息,其中: 节点(Nodes):代表实体或概念 边(Edges):代表实体间的关系 graph LR A["艾伦·图灵"] -->|"发明"| B["图灵机"] A -->|"出生于"| C["英国"] A -->|"被誉为"| D["计算机科学之父"] B -->|"是"| E["理论计算模型"] LangChain Graph的定义与价值 LangChain Graph是LangChain框架中专注于知识图谱构建、存储和查询的模块集合。它将LLM的自然语言处理能力与图数据库的结构化表示结合,实现了: 自动从文本中提取实体和关系 构建和维护知识图谱 基于图结构进行复杂查询和推理 增强LLM应用的上下文理解和回答质量 LangChain Graph架构 LangChain Graph的整体架构可以通过以下图示来理解: flowchart TB subgraph "输入层" A["文本文档"] --> B["网页内容"] C["结构化数据"] --> D["用户查询"] end subgraph "处理层" E["实体提取 EntityExtractor"] F["关系提取 RelationExtractor"] G["知识图谱构建 KnowledgeGraphCreator"] end subgraph "存储层" H["图数据库 Neo4j/NetworkX"] I["向量存储 VectorStores"] end subgraph "应用层" J["图查询 GraphQuery"] K["图推理 GraphReasoning"] L["QA系统 GraphQAChain"] end A --> E B --> E C --> F D --> F E --> G F --> G G --> H G --> I H --> J H --> K I --> L 核心组件详解 1. 实体和关系提取器 这些组件负责从文本中识别实体和它们之间的关系: ...

December 27, 2024 · 7 min · Chico Gong