前言

LangChain作为最流行的LLM应用开发框架,其Agent系统提供了强大的工具来构建智能对话系统。本文将全面介绍LangChain Agent的核心概念、实现方式和最佳实践。

1. LangChain Agent基础

1.1 核心概念

LangChain Agent由以下核心组件构成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import StringPromptTemplate
from langchain import OpenAI, SerpAPIWrapper, LLMChain

# Agent的核心组件
components = {
    "llm": "大语言模型",
    "tools": "可用工具集",
    "prompt": "提示模板",
    "output_parser": "输出解析器",
    "memory": "记忆系统"
}

1.2 Agent类型

LangChain提供多种预构建的Agent类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 1. Zero-shot ReAct Agent
from langchain.agents import create_react_agent

# 2. Conversational Agent
from langchain.agents import create_openai_functions_agent

# 3. Plan-and-Execute Agent
from langchain_experimental.plan_and_execute import PlanAndExecute

# 4. Self-ask with search
from langchain.agents import create_self_ask_with_search_agent

2. 构建第一个Agent

2.1 基础设置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate

# 初始化LLM
llm = ChatOpenAI(
    temperature=0,
    model="gpt-4-turbo-preview"
)

# 定义工具
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return str(result)
    except:
        return "计算错误"

tools = [
    Tool(
        name="Calculator",
        func=calculate,
        description="用于计算数学表达式。输入应该是有效的Python表达式。"
    )
]

2.2 创建Agent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 定义提示模板
prompt = PromptTemplate.from_template("""
Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought: {agent_scratchpad}
""")

# 创建Agent
agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

# 创建执行器
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5
)

3. 高级工具集成

3.1 自定义工具类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    query: str = Field(description="搜索查询词")

class CustomSearchTool(BaseTool):
    name = "custom_search"
    description = "搜索互联网信息"
    args_schema: Type[BaseModel] = SearchInput
    
    def _run(self, query: str) -> str:
        """执行搜索"""
        # 实现搜索逻辑
        return f"搜索结果:{query}"
    
    async def _arun(self, query: str) -> str:
        """异步执行搜索"""
        # 实现异步搜索逻辑
        return await self.async_search(query)

3.2 工具链组合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain.tools import StructuredTool

def multi_step_analysis(data: dict) -> str:
    """多步骤数据分析工具"""
    steps = []
    
    # 步骤1:数据清洗
    cleaned_data = clean_data(data)
    steps.append("数据清洗完成")
    
    # 步骤2:统计分析
    statistics = analyze_statistics(cleaned_data)
    steps.append(f"统计分析:{statistics}")
    
    # 步骤3:生成报告
    report = generate_report(statistics)
    steps.append("报告生成完成")
    
    return "\n".join(steps)

analysis_tool = StructuredTool.from_function(
    func=multi_step_analysis,
    name="DataAnalysis",
    description="执行多步骤数据分析"
)

4. 记忆系统集成

4.1 对话记忆

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain.memory import ConversationBufferWindowMemory

# 完整对话记忆
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 窗口记忆(只保留最近N轮)
window_memory = ConversationBufferWindowMemory(
    k=5,  # 保留最近5轮对话
    memory_key="chat_history",
    return_messages=True
)

# 摘要记忆
summary_memory = ConversationSummaryMemory(
    llm=llm,
    memory_key="chat_history",
    return_messages=True
)

4.2 实体记忆

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from langchain.memory import ConversationEntityMemory

entity_memory = ConversationEntityMemory(
    llm=llm,
    entity_extraction_prompt=ENTITY_EXTRACTION_PROMPT,
    entity_summarization_prompt=ENTITY_SUMMARIZATION_PROMPT
)

# 使用实体记忆的Agent
agent_with_memory = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=entity_memory,
    verbose=True
)

5. 自定义Agent类型

5.1 计划执行Agent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class PlanExecuteAgent:
    def __init__(self, llm, tools):
        self.llm = llm
        self.tools = tools
        self.planner = self._create_planner()
        self.executor = self._create_executor()
    
    def _create_planner(self):
        """创建计划生成器"""
        planner_prompt = PromptTemplate(
            template="""
            给定目标:{objective}
            
            创建一个详细的执行计划,将任务分解为具体步骤。
            每个步骤应该清晰、可执行。
            
            输出格式:
            1. [步骤描述]
            2. [步骤描述]
            ...
            """,
            input_variables=["objective"]
        )
        return LLMChain(llm=self.llm, prompt=planner_prompt)
    
    def _create_executor(self):
        """创建执行器"""
        return AgentExecutor(
            agent=create_react_agent(self.llm, self.tools, REACT_PROMPT),
            tools=self.tools
        )
    
    def run(self, objective: str):
        # 生成计划
        plan = self.planner.run(objective)
        steps = self._parse_plan(plan)
        
        # 执行计划
        results = []
        for step in steps:
            result = self.executor.run(step)
            results.append(result)
            
        return self._synthesize_results(results)

5.2 自反思Agent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SelfReflectionAgent:
    def __init__(self, llm):
        self.llm = llm
        self.reflection_prompt = PromptTemplate(
            template="""
            任务:{task}
            初次回答:{initial_answer}
            
            请评估这个回答:
            1. 准确性如何?
            2. 完整性如何?
            3. 有什么可以改进的地方?
            
            提供改进后的答案:
            """,
            input_variables=["task", "initial_answer"]
        )
    
    def run(self, task: str):
        # 初次回答
        initial_answer = self.llm.invoke(task)
        
        # 自我反思
        reflection_chain = LLMChain(
            llm=self.llm,
            prompt=self.reflection_prompt
        )
        
        improved_answer = reflection_chain.run(
            task=task,
            initial_answer=initial_answer
        )
        
        return improved_answer

6. 流式输出与异步处理

6.1 流式响应

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# 配置流式输出
streaming_llm = ChatOpenAI(
    temperature=0,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# 创建支持流式的Agent
streaming_agent = create_react_agent(
    llm=streaming_llm,
    tools=tools,
    prompt=prompt
)

# 流式执行
async def stream_agent_response(query: str):
    async for chunk in streaming_agent.astream({"input": query}):
        if "output" in chunk:
            yield chunk["output"]

6.2 异步Agent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
from langchain.agents import create_openai_tools_agent

async def async_agent_execution():
    # 创建异步Agent
    async_agent = create_openai_tools_agent(
        llm=llm,
        tools=tools,
        prompt=prompt
    )
    
    # 异步执行器
    async_executor = AgentExecutor(
        agent=async_agent,
        tools=tools,
        verbose=True
    )
    
    # 并发执行多个查询
    queries = ["问题1", "问题2", "问题3"]
    tasks = [async_executor.ainvoke({"input": q}) for q in queries]
    results = await asyncio.gather(*tasks)
    
    return results

7. 错误处理与重试机制

7.1 错误处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from langchain.agents import AgentExecutor
from langchain.callbacks import CallbackManager
from tenacity import retry, stop_after_attempt, wait_exponential

class RobustAgent:
    def __init__(self, agent, tools):
        self.executor = AgentExecutor(
            agent=agent,
            tools=tools,
            max_iterations=5,
            early_stopping_method="generate",
            handle_parsing_errors=True
        )
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def run_with_retry(self, input_text: str):
        try:
            result = self.executor.run(input_text)
            return result
        except Exception as e:
            print(f"错误: {e}")
            # 降级处理
            return self.fallback_response(input_text)
    
    def fallback_response(self, input_text: str):
        """降级响应"""
        return f"抱歉,无法处理您的请求:{input_text}"

7.2 验证机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class ValidatedAgent:
    def __init__(self, agent, validator):
        self.agent = agent
        self.validator = validator
    
    def run(self, input_text: str):
        # 输入验证
        if not self.validator.validate_input(input_text):
            return "输入无效"
        
        # 执行Agent
        result = self.agent.run(input_text)
        
        # 输出验证
        if not self.validator.validate_output(result):
            return self.agent.run_with_correction(input_text)
        
        return result

8. 性能优化

8.1 缓存策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain.cache import InMemoryCache, RedisCache
from langchain.globals import set_llm_cache

# 内存缓存
set_llm_cache(InMemoryCache())

# Redis缓存
import redis
redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))

# 自定义缓存
class CustomCache:
    def __init__(self):
        self.cache = {}
    
    def lookup(self, prompt: str, llm_string: str):
        key = f"{llm_string}:{prompt}"
        return self.cache.get(key)
    
    def update(self, prompt: str, llm_string: str, return_val: list):
        key = f"{llm_string}:{prompt}"
        self.cache[key] = return_val

8.2 批处理优化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain.llms import OpenAI
from concurrent.futures import ThreadPoolExecutor

class BatchProcessor:
    def __init__(self, agent, max_workers=5):
        self.agent = agent
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def process_batch(self, inputs: list):
        """批量处理输入"""
        futures = []
        for input_text in inputs:
            future = self.executor.submit(self.agent.run, input_text)
            futures.append(future)
        
        results = []
        for future in futures:
            try:
                result = future.result(timeout=30)
                results.append(result)
            except Exception as e:
                results.append(f"处理失败: {e}")
        
        return results

9. 监控与日志

9.1 自定义回调

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from langchain.callbacks.base import BaseCallbackHandler
import time

class PerformanceCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.start_times = {}
        self.metrics = {
            "total_tokens": 0,
            "total_cost": 0,
            "execution_times": []
        }
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        self.start_times["llm"] = time.time()
    
    def on_llm_end(self, response, **kwargs):
        duration = time.time() - self.start_times.get("llm", time.time())
        self.metrics["execution_times"].append(duration)
        
        # 记录token使用
        if hasattr(response, "llm_output"):
            tokens = response.llm_output.get("token_usage", {})
            self.metrics["total_tokens"] += tokens.get("total_tokens", 0)
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        self.start_times[f"tool_{serialized.get('name')}"] = time.time()
    
    def on_tool_end(self, output, **kwargs):
        # 记录工具执行时间
        pass

9.2 追踪系统

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from langchain.callbacks import LangChainTracer

# 配置追踪
tracer = LangChainTracer(
    project_name="my_agent_project",
    client=None  # 可以配置自定义客户端
)

# 使用追踪的Agent
traced_agent = AgentExecutor(
    agent=agent,
    tools=tools,
    callbacks=[tracer],
    verbose=True
)

10. 生产部署

10.1 API封装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

app = FastAPI()

class AgentRequest(BaseModel):
    query: str
    session_id: str = None

class AgentResponse(BaseModel):
    result: str
    metadata: dict = {}

@app.post("/agent/chat", response_model=AgentResponse)
async def chat(request: AgentRequest):
    try:
        # 获取或创建会话
        session = get_or_create_session(request.session_id)
        
        # 执行Agent
        result = await agent_executor.ainvoke({
            "input": request.query,
            "chat_history": session.get_history()
        })
        
        # 更新会话历史
        session.add_message(request.query, result["output"])
        
        return AgentResponse(
            result=result["output"],
            metadata={
                "session_id": session.id,
                "tools_used": result.get("intermediate_steps", [])
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

10.2 Docker部署

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 环境变量
ENV OPENAI_API_KEY=${OPENAI_API_KEY}
ENV LANGCHAIN_TRACING_V2=true
ENV LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}

# 运行
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

11. 测试策略

11.1 单元测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pytest
from unittest.mock import Mock, patch

class TestAgent:
    @pytest.fixture
    def mock_llm(self):
        llm = Mock()
        llm.invoke.return_value = "模拟响应"
        return llm
    
    @pytest.fixture
    def test_agent(self, mock_llm):
        return create_react_agent(
            llm=mock_llm,
            tools=[],
            prompt=PromptTemplate.from_template("{input}")
        )
    
    def test_agent_execution(self, test_agent):
        result = test_agent.run("测试输入")
        assert result is not None
    
    @patch('langchain.tools.Calculator')
    def test_tool_usage(self, mock_calculator):
        mock_calculator.run.return_value = "42"
        # 测试工具调用

11.2 集成测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class IntegrationTests:
    def test_end_to_end_flow(self):
        """端到端测试"""
        # 1. 创建Agent
        agent = create_production_agent()
        
        # 2. 测试简单查询
        simple_result = agent.run("What is 2+2?")
        assert "4" in simple_result
        
        # 3. 测试复杂查询
        complex_result = agent.run(
            "Search for the latest AI news and summarize it"
        )
        assert len(complex_result) > 100
        
        # 4. 测试错误处理
        error_result = agent.run("Invalid input %$#@")
        assert "error" not in error_result.lower()

12. 最佳实践总结

  1. 明确的工具描述:确保工具描述准确、详细
  2. 适当的提示工程:根据任务调整提示模板
  3. 合理的迭代限制:防止无限循环
  4. 完善的错误处理:处理各种异常情况
  5. 性能监控:跟踪token使用和执行时间
  6. 渐进式复杂度:从简单Agent开始,逐步增加功能
  7. 测试覆盖:包括单元测试和集成测试
  8. 文档完善:记录Agent能力和限制

结论

LangChain Agent提供了构建智能对话系统的强大框架。通过合理使用其提供的工具和模式,我们可以快速构建出功能丰富、性能优秀的AI Agent应用。

参考资源