本栏目专注于AI Agent系统的核心技术,包括:
- Agent架构设计 - 从ReAct到AutoGPT的架构演进
- 多Agent协作 - 分布式智能体系统的协同机制
- RAG技术 - 检索增强生成的实战应用
- LangChain框架 - 构建复杂Agent应用的最佳实践
- 记忆系统 - Agent的长短期记忆设计
- 知识图谱 - 结构化知识在Agent中的应用
本栏目专注于AI Agent系统的核心技术,包括:
引言 记忆是智能的基石。一个没有记忆的Agent就像得了健忘症的助手,无法积累经验、学习模式或维持上下文。本文深入探讨如何为AI Agent构建高效的记忆系统。 1. 记忆系统架构 1.1 记忆类型分层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from enum import Enum from typing import Any, List, Dict, Optional import time class MemoryType(Enum): SENSORY = "sensory" # 感官记忆(<1秒) WORKING = "working" # 工作记忆(秒级) SHORT_TERM = "short_term" # 短期记忆(分钟级) LONG_TERM = "long_term" # 长期记忆(永久) EPISODIC = "episodic" # 情景记忆 SEMANTIC = "semantic" # 语义记忆 PROCEDURAL = "procedural" # 程序记忆 class MemorySystem: def __init__(self): self.sensory_buffer = SensoryMemory(capacity=10, ttl=1) self.working_memory = WorkingMemory(capacity=7) self.short_term_memory = ShortTermMemory(capacity=100, ttl=300) self.long_term_memory = LongTermMemory() self.episodic_memory = EpisodicMemory() self.semantic_memory = SemanticMemory() self.procedural_memory = ProceduralMemory() def process_input(self, input_data: Any): """处理输入信息的记忆流程""" # 1. 感官记忆 self.sensory_buffer.store(input_data) # 2. 注意力机制筛选 if self.is_important(input_data): # 3. 进入工作记忆 self.working_memory.add(input_data) # 4. 编码到短期记忆 encoded = self.encode_memory(input_data) self.short_term_memory.store(encoded) # 5. 巩固到长期记忆 if self.should_consolidate(encoded): self.consolidate_to_long_term(encoded) 1.2 工作记忆实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class WorkingMemory: """Miller's Magic Number: 7±2""" def __init__(self, capacity: int = 7): self.capacity = capacity self.buffer = [] self.attention_weights = [] def add(self, item: Any): """添加项目到工作记忆""" if len(self.buffer) >= self.capacity: # 移除注意力权重最低的项 min_idx = self.attention_weights.index(min(self.attention_weights)) self.buffer.pop(min_idx) self.attention_weights.pop(min_idx) self.buffer.append(item) self.attention_weights.append(1.0) def update_attention(self, idx: int, weight_delta: float): """更新注意力权重""" if 0 <= idx < len(self.attention_weights): self.attention_weights[idx] += weight_delta # 归一化 total = sum(self.attention_weights) self.attention_weights = [w/total for w in self.attention_weights] def get_context(self) -> List[Any]: """获取当前工作记忆上下文""" # 按注意力权重排序 sorted_items = sorted( zip(self.buffer, self.attention_weights), key=lambda x: x[1], reverse=True ) return [item for item, _ in sorted_items] 2. 长期记忆管理 2.1 记忆编码与存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import hashlib import numpy as np from dataclasses import dataclass from datetime import datetime @dataclass class Memory: id: str content: Any embedding: np.ndarray timestamp: float access_count: int = 0 importance: float = 0.5 last_accessed: float = None metadata: Dict = None class LongTermMemory: def __init__(self, vector_dim: int = 768): self.memories = {} self.embeddings = [] self.index = None # FAISS索引 self.vector_dim = vector_dim self._init_index() def _init_index(self): """初始化向量索引""" import faiss self.index = faiss.IndexFlatL2(self.vector_dim) def store(self, content: Any, importance: float = 0.5): """存储记忆""" # 生成唯一ID memory_id = self._generate_id(content) # 生成嵌入向量 embedding = self._encode_content(content) # 创建记忆对象 memory = Memory( id=memory_id, content=content, embedding=embedding, timestamp=time.time(), importance=importance, metadata=self._extract_metadata(content) ) # 存储 self.memories[memory_id] = memory self.index.add(np.array([embedding])) return memory_id def retrieve(self, query: Any, k: int = 5) -> List[Memory]: """检索相关记忆""" query_embedding = self._encode_content(query) # 向量检索 distances, indices = self.index.search( np.array([query_embedding]), k ) # 获取记忆对象 retrieved = [] for idx in indices[0]: if idx < len(self.memories): memory_id = list(self.memories.keys())[idx] memory = self.memories[memory_id] # 更新访问信息 memory.access_count += 1 memory.last_accessed = time.time() retrieved.append(memory) # 按相关性和重要性重排 return self._rerank_memories(retrieved, query) def _rerank_memories(self, memories: List[Memory], query: Any) -> List[Memory]: """重排记忆(考虑时间衰减、重要性等)""" current_time = time.time() scored_memories = [] for memory in memories: # 时间衰减因子 time_decay = np.exp(-(current_time - memory.timestamp) / 86400) # 日衰减 # 访问频率因子 access_factor = np.log1p(memory.access_count) / 10 # 综合得分 score = ( 0.4 * memory.importance + 0.3 * time_decay + 0.3 * access_factor ) scored_memories.append((memory, score)) # 按得分排序 scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 2.2 记忆巩固机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class MemoryConsolidation: def __init__(self, consolidation_threshold: float = 0.7): self.threshold = consolidation_threshold self.consolidation_queue = [] def evaluate_for_consolidation(self, memory: Memory) -> bool: """评估是否需要巩固到长期记忆""" # 重要性评分 importance_score = memory.importance # 重复接触评分 repetition_score = min(1.0, memory.access_count / 5) # 情感强度评分 emotion_score = self._evaluate_emotional_intensity(memory.content) # 新颖性评分 novelty_score = self._evaluate_novelty(memory.content) # 综合评分 consolidation_score = ( 0.3 * importance_score + 0.2 * repetition_score + 0.3 * emotion_score + 0.2 * novelty_score ) return consolidation_score >= self.threshold def consolidate(self, short_term_memories: List[Memory]) -> List[Memory]: """巩固短期记忆到长期记忆""" consolidated = [] for memory in short_term_memories: if self.evaluate_for_consolidation(memory): # 增强记忆编码 enhanced_memory = self._enhance_memory(memory) # 创建关联连接 self._create_associations(enhanced_memory, consolidated) consolidated.append(enhanced_memory) return consolidated def _enhance_memory(self, memory: Memory) -> Memory: """增强记忆编码(添加更多细节和关联)""" # 提取关键概念 concepts = self._extract_concepts(memory.content) # 生成记忆摘要 summary = self._generate_summary(memory.content) # 更新元数据 memory.metadata.update({ "concepts": concepts, "summary": summary, "consolidation_time": time.time() }) return memory 3. 情景记忆系统 3.1 情景记忆结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @dataclass class Episode: id: str start_time: float end_time: float events: List[Dict] context: Dict outcome: Any emotional_valence: float # -1到1,负面到正面 class EpisodicMemory: def __init__(self): self.episodes = {} self.current_episode = None self.episode_index = {} # 用于快速检索 def start_episode(self, context: Dict): """开始新情景""" episode_id = f"ep_{int(time.time() * 1000)}" self.current_episode = Episode( id=episode_id, start_time=time.time(), end_time=None, events=[], context=context, outcome=None, emotional_valence=0 ) return episode_id def add_event(self, event: Dict): """向当前情景添加事件""" if self.current_episode: event["timestamp"] = time.time() self.current_episode.events.append(event) # 更新情感效价 if "emotion" in event: self._update_emotional_valence(event["emotion"]) def end_episode(self, outcome: Any): """结束当前情景""" if self.current_episode: self.current_episode.end_time = time.time() self.current_episode.outcome = outcome # 存储情景 self.episodes[self.current_episode.id] = self.current_episode # 建立索引 self._index_episode(self.current_episode) # 重置当前情景 self.current_episode = None def recall_similar_episodes(self, query_context: Dict, k: int = 3) -> List[Episode]: """回忆相似情景""" similar_episodes = [] for episode in self.episodes.values(): similarity = self._calculate_context_similarity( query_context, episode.context ) similar_episodes.append((episode, similarity)) # 排序并返回top-k similar_episodes.sort(key=lambda x: x[1], reverse=True) return [ep for ep, _ in similar_episodes[:k]] def extract_patterns(self) -> Dict: """从情景中提取行为模式""" patterns = { "successful_patterns": [], "failure_patterns": [], "emotional_triggers": [] } for episode in self.episodes.values(): # 分析成功模式 if self._is_successful_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["successful_patterns"].append(pattern) # 分析失败模式 elif self._is_failure_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["failure_patterns"].append(pattern) # 分析情感触发器 if abs(episode.emotional_valence) > 0.5: trigger = self._identify_emotional_trigger(episode) patterns["emotional_triggers"].append(trigger) return patterns 3.2 情景压缩与摘要 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class EpisodeCompression: def __init__(self, compression_ratio: float = 0.3): self.compression_ratio = compression_ratio def compress_episode(self, episode: Episode) -> Dict: """压缩情景为摘要""" # 识别关键事件 key_events = self._identify_key_events(episode.events) # 提取转折点 turning_points = self._find_turning_points(episode.events) # 生成叙事摘要 narrative = self._generate_narrative( key_events, turning_points, episode.outcome ) compressed = { "id": episode.id, "duration": episode.end_time - episode.start_time, "key_events": key_events, "turning_points": turning_points, "narrative": narrative, "outcome": episode.outcome, "emotional_arc": self._extract_emotional_arc(episode) } return compressed def _identify_key_events(self, events: List[Dict]) -> List[Dict]: """识别关键事件""" if len(events) <= 5: return events # 计算事件重要性 event_scores = [] for event in events: score = self._calculate_event_importance(event) event_scores.append((event, score)) # 选择top事件 event_scores.sort(key=lambda x: x[1], reverse=True) num_key_events = max(3, int(len(events) * self.compression_ratio)) key_events = [event for event, _ in event_scores[:num_key_events]] # 保持时间顺序 key_events.sort(key=lambda x: x["timestamp"]) return key_events 4. 语义记忆网络 4.1 概念网络构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import networkx as nx class SemanticMemory: def __init__(self): self.concept_graph = nx.DiGraph() self.concept_embeddings = {} self.relation_types = [ "is_a", "part_of", "has_property", "causes", "prevents", "related_to" ] def add_concept(self, concept: str, properties: Dict = None): """添加概念节点""" if concept not in self.concept_graph: self.concept_graph.add_node( concept, properties=properties or {}, activation=0.0, last_activated=None ) # 生成概念嵌入 self.concept_embeddings[concept] = self._embed_concept(concept) def add_relation(self, concept1: str, relation: str, concept2: str, strength: float = 1.0): """添加概念关系""" self.add_concept(concept1) self.add_concept(concept2) self.concept_graph.add_edge( concept1, concept2, relation=relation, strength=strength, created_at=time.time() ) def activate_concept(self, concept: str, activation: float = 1.0): """激活概念(扩散激活)""" if concept not in self.concept_graph: return # 设置初始激活 self.concept_graph.nodes[concept]["activation"] = activation self.concept_graph.nodes[concept]["last_activated"] = time.time() # 扩散激活 self._spread_activation(concept, activation, decay=0.5, depth=3) def _spread_activation(self, source: str, activation: float, decay: float, depth: int): """扩散激活算法""" if depth <= 0 or activation < 0.1: return # 激活相邻节点 for neighbor in self.concept_graph.neighbors(source): edge_data = self.concept_graph[source][neighbor] spread_activation = activation * edge_data["strength"] * decay current_activation = self.concept_graph.nodes[neighbor].get("activation", 0) new_activation = current_activation + spread_activation self.concept_graph.nodes[neighbor]["activation"] = min(1.0, new_activation) # 递归扩散 self._spread_activation(neighbor, spread_activation, decay, depth - 1) def query_concepts(self, query: str, k: int = 5) -> List[str]: """查询相关概念""" # 激活查询相关概念 query_concepts = self._extract_concepts_from_text(query) for concept in query_concepts: self.activate_concept(concept) # 获取激活度最高的概念 activated_concepts = [ (node, data["activation"]) for node, data in self.concept_graph.nodes(data=True) if data["activation"] > 0 ] activated_concepts.sort(key=lambda x: x[1], reverse=True) return [concept for concept, _ in activated_concepts[:k]] 4.2 知识推理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class SemanticReasoning: def __init__(self, semantic_memory: SemanticMemory): self.semantic_memory = semantic_memory def infer_relations(self, concept1: str, concept2: str) -> List[Dict]: """推理两个概念之间的关系""" inferences = [] # 直接关系 if self.semantic_memory.concept_graph.has_edge(concept1, concept2): edge_data = self.semantic_memory.concept_graph[concept1][concept2] inferences.append({ "type": "direct", "relation": edge_data["relation"], "confidence": edge_data["strength"] }) # 传递关系 try: paths = list(nx.all_simple_paths( self.semantic_memory.concept_graph, concept1, concept2, cutoff=3 )) for path in paths: if len(path) > 2: inference = self._analyze_path(path) inferences.append(inference) except nx.NetworkXNoPath: pass # 类比推理 analogies = self._find_analogies(concept1, concept2) inferences.extend(analogies) return inferences def _find_analogies(self, concept1: str, concept2: str) -> List[Dict]: """查找类比关系""" analogies = [] # 获取concept1的关系模式 patterns1 = self._get_relation_patterns(concept1) # 查找相似模式 for node in self.semantic_memory.concept_graph.nodes(): if node != concept1: patterns = self._get_relation_patterns(node) similarity = self._pattern_similarity(patterns1, patterns) if similarity > 0.7: analogies.append({ "type": "analogy", "base": concept1, "target": node, "mapped_to": concept2, "confidence": similarity }) return analogies 5. 程序记忆系统 5.1 技能学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @dataclass class Skill: name: str steps: List[Dict] preconditions: List[str] postconditions: List[str] success_rate: float = 0.0 execution_count: int = 0 class ProceduralMemory: def __init__(self): self.skills = {} self.skill_hierarchy = nx.DiGraph() self.execution_history = [] def learn_skill(self, demonstration: List[Dict]) -> Skill: """从演示中学习技能""" # 提取动作序列 action_sequence = self._extract_actions(demonstration) # 识别前置和后置条件 preconditions = self._identify_preconditions(demonstration[0]) postconditions = self._identify_postconditions(demonstration[-1]) # 创建技能 skill = Skill( name=self._generate_skill_name(action_sequence), steps=action_sequence, preconditions=preconditions, postconditions=postconditions ) # 存储技能 self.skills[skill.name] = skill # 更新技能层次 self._update_skill_hierarchy(skill) return skill def execute_skill(self, skill_name: str, context: Dict) -> Dict: """执行技能""" if skill_name not in self.skills: return {"success": False, "error": "Skill not found"} skill = self.skills[skill_name] # 检查前置条件 if not self._check_preconditions(skill.preconditions, context): return {"success": False, "error": "Preconditions not met"} # 执行步骤 result = {"success": True, "steps_executed": []} for step in skill.steps: step_result = self._execute_step(step, context) result["steps_executed"].append(step_result) if not step_result["success"]: result["success"] = False result["error"] = f"Failed at step: {step}" break # 更新技能统计 skill.execution_count += 1 if result["success"]: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1) + 1) / skill.execution_count ) else: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1)) / skill.execution_count ) # 记录执行历史 self.execution_history.append({ "skill": skill_name, "context": context, "result": result, "timestamp": time.time() }) return result def compose_skills(self, goal: str) -> List[str]: """组合技能以达成目标""" # 查找能达成目标的技能序列 relevant_skills = self._find_relevant_skills(goal) # 规划技能执行顺序 skill_plan = self._plan_skill_sequence(relevant_skills, goal) return skill_plan 5.2 技能优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class SkillOptimizer: def __init__(self, procedural_memory: ProceduralMemory): self.procedural_memory = procedural_memory def optimize_skill(self, skill_name: str): """优化技能执行""" skill = self.procedural_memory.skills.get(skill_name) if not skill: return # 分析执行历史 history = [ h for h in self.procedural_memory.execution_history if h["skill"] == skill_name ] # 识别失败模式 failure_patterns = self._identify_failure_patterns(history) # 优化步骤 optimized_steps = self._optimize_steps( skill.steps, failure_patterns ) # 创建优化版本 optimized_skill = Skill( name=f"{skill_name}_optimized", steps=optimized_steps, preconditions=skill.preconditions, postconditions=skill.postconditions ) self.procedural_memory.skills[optimized_skill.name] = optimized_skill return optimized_skill def _identify_failure_patterns(self, history: List[Dict]) -> List[Dict]: """识别失败模式""" failures = [h for h in history if not h["result"]["success"]] patterns = [] for failure in failures: failed_step = failure["result"].get("error", "") context = failure["context"] pattern = { "step": failed_step, "context_conditions": self._extract_conditions(context), "frequency": 1 } # 合并相似模式 merged = False for existing_pattern in patterns: if self._patterns_similar(pattern, existing_pattern): existing_pattern["frequency"] += 1 merged = True break if not merged: patterns.append(pattern) return patterns 6. 记忆检索优化 6.1 上下文感知检索 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class ContextAwareRetrieval: def __init__(self, memory_system: MemorySystem): self.memory_system = memory_system self.context_window = 10 # 考虑最近10个交互 def retrieve(self, query: str, context: List[Dict]) -> List[Memory]: """上下文感知的记忆检索""" # 1. 提取上下文特征 context_features = self._extract_context_features(context) # 2. 扩展查询 expanded_query = self._expand_query_with_context(query, context_features) # 3. 多源检索 candidates = [] # 从长期记忆检索 ltm_results = self.memory_system.long_term_memory.retrieve( expanded_query, k=10 ) candidates.extend(ltm_results) # 从情景记忆检索 episodes = self.memory_system.episodic_memory.recall_similar_episodes( context_features, k=3 ) for episode in episodes: candidates.extend(self._extract_memories_from_episode(episode)) # 从语义记忆检索 concepts = self.memory_system.semantic_memory.query_concepts( query, k=5 ) for concept in concepts: candidates.extend(self._get_concept_memories(concept)) # 4. 重排和去重 unique_memories = self._deduplicate_memories(candidates) ranked_memories = self._rank_by_relevance( unique_memories, query, context_features ) return ranked_memories[:5] def _rank_by_relevance(self, memories: List[Memory], query: str, context: Dict) -> List[Memory]: """按相关性排序记忆""" scored_memories = [] for memory in memories: # 查询相关性 query_relevance = self._calculate_similarity( memory.content, query ) # 上下文相关性 context_relevance = self._calculate_context_relevance( memory, context ) # 时间相关性 time_relevance = self._calculate_time_relevance(memory) # 综合评分 score = ( 0.4 * query_relevance + 0.3 * context_relevance + 0.2 * time_relevance + 0.1 * memory.importance ) scored_memories.append((memory, score)) scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 6.2 记忆链构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class MemoryChain: def __init__(self): self.chain_graph = nx.DiGraph() def build_memory_chain(self, seed_memory: Memory, memory_pool: List[Memory], max_length: int = 5) -> List[Memory]: """构建记忆链""" chain = [seed_memory] current = seed_memory while len(chain) < max_length: # 找到最相关的下一个记忆 next_memory = self._find_next_memory( current, memory_pool, chain ) if next_memory is None: break chain.append(next_memory) current = next_memory return chain def _find_next_memory(self, current: Memory, candidates: List[Memory], chain: List[Memory]) -> Optional[Memory]: """找到链中的下一个记忆""" best_memory = None best_score = -1 for candidate in candidates: if candidate in chain: continue # 计算连接强度 connection_strength = self._calculate_connection_strength( current, candidate ) # 计算多样性奖励 diversity_bonus = self._calculate_diversity_bonus( candidate, chain ) score = connection_strength + 0.2 * diversity_bonus if score > best_score: best_score = score best_memory = candidate return best_memory if best_score > 0.3 else None 7. 记忆更新与遗忘 7.1 自适应遗忘 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class AdaptiveForgetting: def __init__(self, base_decay_rate: float = 0.01): self.base_decay_rate = base_decay_rate def update_memories(self, memories: Dict[str, Memory]): """更新记忆(包括遗忘)""" current_time = time.time() to_forget = [] for memory_id, memory in memories.items(): # 计算遗忘曲线 time_since_access = current_time - (memory.last_accessed or memory.timestamp) time_since_creation = current_time - memory.timestamp # Ebbinghaus遗忘曲线 retention = self._calculate_retention( time_since_access, memory.access_count, memory.importance ) # 更新记忆强度 memory.strength = retention # 标记需要遗忘的记忆 if retention < 0.1 and time_since_creation > 86400: # 24小时 to_forget.append(memory_id) # 执行遗忘 for memory_id in to_forget: self._forget_memory(memories, memory_id) def _calculate_retention(self, time_elapsed: float, access_count: int, importance: float) -> float: """计算记忆保持率""" # 基础遗忘率 base_retention = np.exp(-self.base_decay_rate * time_elapsed / 3600) # 重复强化因子 repetition_factor = 1 + np.log1p(access_count) * 0.1 # 重要性调节 importance_factor = 1 + importance * 0.5 # 最终保持率 retention = min(1.0, base_retention * repetition_factor * importance_factor) return retention def _forget_memory(self, memories: Dict, memory_id: str): """遗忘记忆(不是删除,而是转为痕迹)""" memory = memories[memory_id] # 保留痕迹 trace = { "id": memory_id, "summary": self._create_summary(memory.content), "timestamp": memory.timestamp, "importance": memory.importance * 0.1 } # 存储痕迹(可以用于后续的重建) self._store_trace(trace) # 从活跃记忆中移除 del memories[memory_id] 8. 记忆系统集成 8.1 统一记忆接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class UnifiedMemoryInterface: def __init__(self): self.memory_system = MemorySystem() self.retrieval = ContextAwareRetrieval(self.memory_system) self.forgetting = AdaptiveForgetting() async def remember(self, content: Any, memory_type: MemoryType = None): """记住信息""" # 自动判断记忆类型 if memory_type is None: memory_type = self._infer_memory_type(content) if memory_type == MemoryType.EPISODIC: self.memory_system.episodic_memory.add_event(content) elif memory_type == MemoryType.SEMANTIC: concepts = self._extract_concepts(content) for concept in concepts: self.memory_system.semantic_memory.add_concept(concept) elif memory_type == MemoryType.PROCEDURAL: if self._is_skill_demonstration(content): self.memory_system.procedural_memory.learn_skill(content) else: # 默认存储到长期记忆 self.memory_system.long_term_memory.store(content) async def recall(self, query: str, context: List[Dict] = None) -> List[Any]: """回忆信息""" # 并行从各种记忆类型检索 tasks = [ self._recall_from_ltm(query), self._recall_from_episodic(query, context), self._recall_from_semantic(query), self._recall_from_procedural(query) ] results = await asyncio.gather(*tasks) # 合并和排序结果 all_memories = [] for result in results: all_memories.extend(result) # 去重和排序 unique_memories = self._deduplicate(all_memories) ranked_memories = self._rank_memories(unique_memories, query) return ranked_memories[:10] def reflect(self) -> Dict: """反思和总结记忆""" reflection = { "patterns": self.memory_system.episodic_memory.extract_patterns(), "important_concepts": self._get_important_concepts(), "skill_improvements": self._suggest_skill_improvements(), "memory_statistics": self._get_memory_stats() } return reflection 9. 最佳实践 分层存储:根据访问频率和重要性分层 智能遗忘:模拟人类遗忘曲线 关联构建:自动构建记忆间的关联 上下文感知:考虑当前上下文进行检索 持续学习:从交互中不断优化记忆系统 压缩策略:定期压缩和总结记忆 结论 一个优秀的Agent记忆系统不仅要能存储和检索信息,还要能够学习、关联、遗忘和总结。通过模拟人类记忆的多层次结构和处理机制,我们可以构建出更智能、更有"记忆"的AI Agent。 ...
引言 检索增强生成(Retrieval-Augmented Generation, RAG)是提升LLM准确性和时效性的关键技术。本文详细介绍如何构建生产级的RAG Agent系统,包括文档处理、向量存储、检索优化和生成策略。 1. RAG架构设计 flowchart TB subgraph "RAG系统架构" U[用户查询] --> QP[查询处理] subgraph "检索模块" QP --> VR[向量检索] QP --> KR[关键词检索] VR --> HM[混合匹配] KR --> HM end subgraph "知识库" DS[(文档存储)] VS[(向量数据库)] IS[(倒排索引)] end HM --> RR[重排序] RR --> CA[上下文聚合] subgraph "生成模块" CA --> PE[提示工程] PE --> LLM[大语言模型] LLM --> PP[后处理] end PP --> R[响应输出] DS -.-> VR VS -.-> VR IS -.-> KR end style U fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style R fill:#fff3e0,stroke:#ff9800,stroke-width:2px style LLM fill:#e3f2fd,stroke:#2196f3,stroke-width:3px 1.1 系统架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from typing import List, Dict, Any, Optional import numpy as np from dataclasses import dataclass @dataclass class RAGConfig: chunk_size: int = 1000 chunk_overlap: int = 200 embedding_model: str = "text-embedding-ada-002" vector_store: str = "chromadb" retrieval_top_k: int = 5 rerank_top_k: int = 3 temperature: float = 0.7 class RAGAgent: def __init__(self, config: RAGConfig): self.config = config self.document_processor = DocumentProcessor(config) self.vector_store = self._init_vector_store() self.retriever = HybridRetriever(self.vector_store) self.reranker = CrossEncoderReranker() self.generator = AugmentedGenerator() def _init_vector_store(self): """初始化向量存储""" if self.config.vector_store == "chromadb": from chromadb import Client return ChromaVectorStore(Client()) elif self.config.vector_store == "pinecone": import pinecone return PineconeVectorStore(pinecone) elif self.config.vector_store == "weaviate": import weaviate return WeaviateVectorStore(weaviate.Client()) 1.2 文档处理管道 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class DocumentProcessor: def __init__(self, config: RAGConfig): self.config = config self.text_splitter = self._init_splitter() self.metadata_extractor = MetadataExtractor() def _init_splitter(self): """初始化文本分割器""" from langchain.text_splitter import RecursiveCharacterTextSplitter return RecursiveCharacterTextSplitter( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap, separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""], length_function=len ) def process_documents(self, documents: List[Dict]) -> List[Dict]: """处理文档""" processed_chunks = [] for doc in documents: # 提取元数据 metadata = self.metadata_extractor.extract(doc) # 预处理文本 cleaned_text = self.preprocess_text(doc["content"]) # 智能分块 chunks = self.smart_chunking(cleaned_text, metadata) # 添加上下文 chunks_with_context = self.add_context(chunks) processed_chunks.extend(chunks_with_context) return processed_chunks def smart_chunking(self, text: str, metadata: Dict) -> List[str]: """智能分块策略""" # 根据文档类型选择分块策略 doc_type = metadata.get("type", "general") if doc_type == "code": return self.chunk_code(text) elif doc_type == "table": return self.chunk_table(text) elif doc_type == "conversation": return self.chunk_conversation(text) else: return self.text_splitter.split_text(text) def chunk_code(self, code: str) -> List[str]: """代码分块""" import ast chunks = [] try: tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.ClassDef)): chunk = ast.get_source_segment(code, node) if chunk: chunks.append(chunk) except: # 回退到普通分块 chunks = self.text_splitter.split_text(code) return chunks 2. 向量存储与索引 2.1 多级索引策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class HierarchicalIndex: def __init__(self): self.document_index = {} # 文档级索引 self.chunk_index = {} # 块级索引 self.summary_index = {} # 摘要索引 def build_index(self, documents: List[Dict]): """构建多级索引""" for doc in documents: doc_id = doc["id"] # 文档级索引 self.document_index[doc_id] = { "title": doc.get("title", ""), "summary": self.generate_summary(doc["content"]), "metadata": doc.get("metadata", {}), "chunk_ids": [] } # 块级索引 chunks = self.create_chunks(doc["content"]) for i, chunk in enumerate(chunks): chunk_id = f"{doc_id}_chunk_{i}" self.chunk_index[chunk_id] = { "content": chunk, "doc_id": doc_id, "position": i, "embedding": None # 后续填充 } self.document_index[doc_id]["chunk_ids"].append(chunk_id) # 摘要索引 summary_embedding = self.embed_text( self.document_index[doc_id]["summary"] ) self.summary_index[doc_id] = summary_embedding def generate_summary(self, content: str) -> str: """生成文档摘要""" from transformers import pipeline summarizer = pipeline("summarization", model="facebook/bart-large-cnn") summary = summarizer(content, max_length=150, min_length=50)[0]["summary_text"] return summary 2.2 向量数据库操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class VectorStore: def __init__(self, embedding_model): self.embedding_model = embedding_model self.index = None self.metadata = {} async def add_documents(self, documents: List[Dict]): """添加文档到向量存储""" embeddings = [] metadatas = [] for doc in documents: # 生成嵌入 embedding = await self.embedding_model.embed(doc["content"]) embeddings.append(embedding) # 保存元数据 metadata = { "id": doc["id"], "source": doc.get("source", ""), "timestamp": doc.get("timestamp", time.time()), "type": doc.get("type", "text") } metadatas.append(metadata) # 批量插入 await self.batch_insert(embeddings, metadatas) async def batch_insert(self, embeddings: List[np.ndarray], metadatas: List[Dict]): """批量插入向量""" batch_size = 100 for i in range(0, len(embeddings), batch_size): batch_embeddings = embeddings[i:i+batch_size] batch_metadatas = metadatas[i:i+batch_size] # 插入到向量数据库 ids = self.index.add( embeddings=batch_embeddings, metadatas=batch_metadatas ) # 更新元数据映射 for id, metadata in zip(ids, batch_metadatas): self.metadata[id] = metadata 3. 混合检索策略 graph LR subgraph "混合检索流程" Q[查询] --> QE[查询扩展] QE --> V[向量检索] QE --> K[关键词检索] QE --> S[语义缓存] V --> VR[向量结果] K --> KR[关键词结果] S --> SR[缓存结果] VR --> F[结果融合RRF算法] KR --> F SR --> F F --> R[重排序] R --> T[Top-K结果] end style Q fill:#e1f5fe,stroke:#01579b,stroke-width:2px style T fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style F fill:#fff3e0,stroke:#e65100,stroke-width:2px 3.1 多路检索 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 class HybridRetriever: def __init__(self, vector_store): self.vector_store = vector_store self.bm25_index = BM25Index() self.semantic_cache = {} async def retrieve(self, query: str, top_k: int = 5) -> List[Dict]: """混合检索""" # 1. 向量检索 vector_results = await self.vector_search(query, top_k * 2) # 2. 关键词检索 keyword_results = self.keyword_search(query, top_k * 2) # 3. 语义缓存检索 cache_results = self.search_cache(query) # 4. 融合结果 fused_results = self.fuse_results( vector_results, keyword_results, cache_results ) return fused_results[:top_k] async def vector_search(self, query: str, top_k: int) -> List[Dict]: """向量相似度检索""" query_embedding = await self.embed_query(query) results = self.vector_store.similarity_search( query_embedding, k=top_k, filter=self.build_filter(query) ) return results def keyword_search(self, query: str, top_k: int) -> List[Dict]: """BM25关键词检索""" # 分词 tokens = self.tokenize(query) # BM25评分 scores = self.bm25_index.get_scores(tokens) # 获取top-k top_indices = np.argsort(scores)[-top_k:][::-1] results = [] for idx in top_indices: results.append({ "content": self.bm25_index.documents[idx], "score": scores[idx], "type": "keyword" }) return results def fuse_results(self, *result_sets) -> List[Dict]: """结果融合(RRF算法)""" from collections import defaultdict k = 60 # RRF常数 scores = defaultdict(float) documents = {} for results in result_sets: for rank, result in enumerate(results): doc_id = result.get("id", str(result)) scores[doc_id] += 1 / (k + rank + 1) documents[doc_id] = result # 按融合分数排序 sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True) fused_results = [] for doc_id in sorted_ids: doc = documents[doc_id] doc["fusion_score"] = scores[doc_id] fused_results.append(doc) return fused_results 3.2 查询扩展 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class QueryExpander: def __init__(self, llm): self.llm = llm self.synonym_dict = self.load_synonyms() async def expand_query(self, query: str) -> List[str]: """扩展查询""" expanded_queries = [query] # 1. 同义词扩展 synonyms = self.get_synonyms(query) expanded_queries.extend(synonyms) # 2. LLM改写 rephrased = await self.rephrase_query(query) expanded_queries.extend(rephrased) # 3. 假设文档生成(HyDE) hypothetical = await self.generate_hypothetical_document(query) expanded_queries.append(hypothetical) return expanded_queries async def rephrase_query(self, query: str) -> List[str]: """使用LLM改写查询""" prompt = f""" 请将以下查询改写成3个不同的版本,保持语义相同: 原始查询:{query} 改写版本: 1. 2. 3. """ response = await self.llm.agenerate([prompt]) rephrased = self.parse_rephrased(response) return rephrased async def generate_hypothetical_document(self, query: str) -> str: """生成假设文档""" prompt = f""" 假设你正在写一个文档来回答以下问题: {query} 请写出这个文档的第一段: """ response = await self.llm.agenerate([prompt]) return response.generations[0][0].text 4. 重排序机制 4.1 交叉编码器重排 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class CrossEncoderReranker: def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"): from sentence_transformers import CrossEncoder self.model = CrossEncoder(model_name) def rerank(self, query: str, documents: List[Dict], top_k: int = 3) -> List[Dict]: """重排序文档""" # 准备输入对 pairs = [] for doc in documents: pairs.append([query, doc["content"]]) # 计算相关性分数 scores = self.model.predict(pairs) # 添加分数并排序 for doc, score in zip(documents, scores): doc["rerank_score"] = float(score) # 按分数排序 reranked = sorted(documents, key=lambda x: x["rerank_score"], reverse=True) return reranked[:top_k] 4.2 多样性重排 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class DiversityReranker: def __init__(self, lambda_param: float = 0.5): self.lambda_param = lambda_param def rerank_with_diversity(self, documents: List[Dict], top_k: int = 5) -> List[Dict]: """MMR(最大边际相关性)重排""" import numpy as np from sklearn.metrics.pairwise import cosine_similarity # 获取文档嵌入 embeddings = np.array([doc["embedding"] for doc in documents]) # 初始化 selected = [] selected_embeddings = [] remaining = list(range(len(documents))) # 选择第一个文档(相关性最高) first_idx = 0 selected.append(first_idx) selected_embeddings.append(embeddings[first_idx]) remaining.remove(first_idx) # 迭代选择 while len(selected) < min(top_k, len(documents)): mmr_scores = [] for idx in remaining: # 相关性分数 relevance = documents[idx].get("score", 0) # 与已选文档的最大相似度 similarities = cosine_similarity( [embeddings[idx]], selected_embeddings )[0] max_similarity = max(similarities) if similarities.size > 0 else 0 # MMR分数 mmr = self.lambda_param * relevance - (1 - self.lambda_param) * max_similarity mmr_scores.append((idx, mmr)) # 选择MMR最高的文档 best_idx = max(mmr_scores, key=lambda x: x[1])[0] selected.append(best_idx) selected_embeddings.append(embeddings[best_idx]) remaining.remove(best_idx) # 返回重排后的文档 return [documents[idx] for idx in selected] 5. 生成策略优化 5.1 上下文优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class ContextOptimizer: def __init__(self, max_tokens: int = 4000): self.max_tokens = max_tokens def optimize_context(self, query: str, documents: List[Dict]) -> str: """优化上下文""" # 1. 压缩文档 compressed_docs = self.compress_documents(documents) # 2. 排序和截断 prioritized_docs = self.prioritize_content(query, compressed_docs) # 3. 格式化上下文 context = self.format_context(prioritized_docs) # 4. 确保不超过token限制 context = self.truncate_to_limit(context) return context def compress_documents(self, documents: List[Dict]) -> List[Dict]: """压缩文档内容""" compressed = [] for doc in documents: # 提取关键句子 key_sentences = self.extract_key_sentences(doc["content"]) compressed.append({ **doc, "compressed_content": " ".join(key_sentences) }) return compressed def extract_key_sentences(self, text: str, num_sentences: int = 3) -> List[str]: """提取关键句子""" from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np # 分句 sentences = text.split("。") if len(sentences) <= num_sentences: return sentences # 计算句子嵌入 model = SentenceTransformer('paraphrase-MiniLM-L6-v2') embeddings = model.encode(sentences) # 计算中心性 similarity_matrix = cosine_similarity(embeddings) scores = similarity_matrix.sum(axis=1) # 选择得分最高的句子 top_indices = np.argsort(scores)[-num_sentences:] # 保持原始顺序 top_indices = sorted(top_indices) return [sentences[i] for i in top_indices] 5.2 增强生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class AugmentedGenerator: def __init__(self, llm): self.llm = llm self.prompt_templates = self.load_templates() async def generate(self, query: str, context: str, generation_config: Dict = None) -> str: """增强生成""" # 1. 选择提示模板 template = self.select_template(query) # 2. 构建提示 prompt = self.build_prompt(template, query, context) # 3. 生成回答 response = await self.llm.agenerate( [prompt], **generation_config or {} ) # 4. 后处理 answer = self.postprocess(response.generations[0][0].text) # 5. 验证答案 if not self.validate_answer(answer, context): answer = await self.regenerate_with_constraints(query, context) return answer def build_prompt(self, template: str, query: str, context: str) -> str: """构建提示""" return template.format( context=context, query=query, current_date=datetime.now().strftime("%Y-%m-%d") ) def validate_answer(self, answer: str, context: str) -> bool: """验证答案""" # 检查是否基于上下文 if "根据提供的信息" not in answer and len(answer) > 100: return False # 检查是否包含幻觉 facts_in_answer = self.extract_facts(answer) facts_in_context = self.extract_facts(context) for fact in facts_in_answer: if not self.fact_supported(fact, facts_in_context): return False return True 6. 评估与优化 6.1 检索质量评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 class RetrievalEvaluator: def __init__(self): self.metrics = {} def evaluate(self, queries: List[str], ground_truth: List[List[str]], retrieved: List[List[str]]) -> Dict: """评估检索质量""" metrics = { "mrr": self.calculate_mrr(ground_truth, retrieved), "map": self.calculate_map(ground_truth, retrieved), "ndcg": self.calculate_ndcg(ground_truth, retrieved), "recall@k": {}, "precision@k": {} } for k in [1, 3, 5, 10]: metrics["recall@k"][k] = self.calculate_recall_at_k( ground_truth, retrieved, k ) metrics["precision@k"][k] = self.calculate_precision_at_k( ground_truth, retrieved, k ) return metrics def calculate_mrr(self, ground_truth: List[List[str]], retrieved: List[List[str]]) -> float: """计算MRR(平均倒数排名)""" mrr = 0 for gt, ret in zip(ground_truth, retrieved): for i, doc in enumerate(ret): if doc in gt: mrr += 1 / (i + 1) break return mrr / len(ground_truth) def calculate_ndcg(self, ground_truth: List[List[str]], retrieved: List[List[str]], k: int = 10) -> float: """计算NDCG""" import numpy as np def dcg(relevances, k): relevances = np.array(relevances)[:k] if relevances.size: return np.sum(relevances / np.log2(np.arange(2, relevances.size + 2))) return 0 ndcg_scores = [] for gt, ret in zip(ground_truth, retrieved): relevances = [1 if doc in gt else 0 for doc in ret[:k]] ideal_relevances = [1] * min(len(gt), k) + [0] * (k - min(len(gt), k)) dcg_score = dcg(relevances, k) idcg_score = dcg(ideal_relevances, k) if idcg_score > 0: ndcg_scores.append(dcg_score / idcg_score) else: ndcg_scores.append(0) return np.mean(ndcg_scores) 6.2 生成质量评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class GenerationEvaluator: def __init__(self): self.faithfulness_model = self.load_faithfulness_model() self.relevance_model = self.load_relevance_model() async def evaluate_generation(self, query: str, context: str, answer: str) -> Dict: """评估生成质量""" metrics = { "faithfulness": await self.evaluate_faithfulness(answer, context), "relevance": await self.evaluate_relevance(answer, query), "completeness": self.evaluate_completeness(answer, query), "fluency": self.evaluate_fluency(answer) } return metrics async def evaluate_faithfulness(self, answer: str, context: str) -> float: """评估忠实度""" prompt = f""" 请评估答案是否忠实于给定的上下文。 上下文:{context} 答案:{answer} 评分(0-1): """ response = await self.faithfulness_model.agenerate([prompt]) score = self.extract_score(response) return score def evaluate_fluency(self, answer: str) -> float: """评估流畅度""" from transformers import GPT2LMHeadModel, GPT2Tokenizer import torch model = GPT2LMHeadModel.from_pretrained('gpt2') tokenizer = GPT2Tokenizer.from_pretrained('gpt2') inputs = tokenizer(answer, return_tensors='pt') with torch.no_grad(): outputs = model(**inputs, labels=inputs["input_ids"]) loss = outputs.loss perplexity = torch.exp(loss) # 将困惑度转换为0-1分数 fluency_score = 1 / (1 + perplexity.item() / 100) return fluency_score 7. 生产部署 7.1 API服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import asyncio app = FastAPI() class RAGRequest(BaseModel): query: str top_k: int = 5 temperature: float = 0.7 max_tokens: int = 500 class RAGResponse(BaseModel): answer: str sources: List[Dict] confidence: float # 初始化RAG系统 rag_agent = RAGAgent(RAGConfig()) @app.post("/rag/query", response_model=RAGResponse) async def query_rag(request: RAGRequest): try: # 检索相关文档 documents = await rag_agent.retriever.retrieve( request.query, request.top_k ) # 重排序 reranked = rag_agent.reranker.rerank( request.query, documents, top_k=3 ) # 生成答案 answer = await rag_agent.generator.generate( request.query, reranked, { "temperature": request.temperature, "max_tokens": request.max_tokens } ) return RAGResponse( answer=answer, sources=[{"content": doc["content"][:200], "score": doc.get("score", 0)} for doc in reranked], confidence=calculate_confidence(reranked) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/rag/index") async def index_documents(documents: List[Dict]): """索引新文档""" try: processed = rag_agent.document_processor.process_documents(documents) await rag_agent.vector_store.add_documents(processed) return {"status": "success", "indexed": len(processed)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) 7.2 性能优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class RAGOptimizer: def __init__(self, rag_agent: RAGAgent): self.rag_agent = rag_agent self.cache = LRUCache(maxsize=1000) self.query_embeddings_cache = {} async def optimized_retrieve(self, query: str) -> List[Dict]: """优化的检索""" # 1. 检查缓存 cache_key = self.get_cache_key(query) if cache_key in self.cache: return self.cache[cache_key] # 2. 批量嵌入优化 if query in self.query_embeddings_cache: query_embedding = self.query_embeddings_cache[query] else: query_embedding = await self.batch_embed([query])[0] self.query_embeddings_cache[query] = query_embedding # 3. 并行检索 tasks = [ self.rag_agent.vector_store.search(query_embedding), self.rag_agent.bm25_index.search(query) ] vector_results, keyword_results = await asyncio.gather(*tasks) # 4. 融合和缓存结果 results = self.rag_agent.retriever.fuse_results( vector_results, keyword_results ) self.cache[cache_key] = results return results 8. 高级特性 8.1 增量学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class IncrementalLearningRAG: def __init__(self): self.feedback_buffer = [] self.update_frequency = 100 async def learn_from_feedback(self, query: str, answer: str, feedback: Dict): """从反馈中学习""" self.feedback_buffer.append({ "query": query, "answer": answer, "feedback": feedback, "timestamp": time.time() }) if len(self.feedback_buffer) >= self.update_frequency: await self.update_model() async def update_model(self): """更新模型""" # 1. 分析反馈 positive_examples = [] negative_examples = [] for item in self.feedback_buffer: if item["feedback"]["rating"] >= 4: positive_examples.append(item) else: negative_examples.append(item) # 2. 生成训练数据 training_data = self.prepare_training_data( positive_examples, negative_examples ) # 3. 微调嵌入模型 await self.finetune_embeddings(training_data) # 4. 更新检索策略 self.update_retrieval_strategy(training_data) # 清空缓冲区 self.feedback_buffer = [] 8.2 多模态RAG 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MultiModalRAG: def __init__(self): self.text_encoder = self.load_text_encoder() self.image_encoder = self.load_image_encoder() self.cross_modal_encoder = self.load_cross_modal_encoder() async def process_multimodal_query(self, query: Dict) -> str: """处理多模态查询""" text_query = query.get("text", "") image_query = query.get("image") # 1. 编码查询 if text_query and image_query: query_embedding = await self.encode_multimodal( text_query, image_query ) elif text_query: query_embedding = await self.text_encoder.encode(text_query) elif image_query: query_embedding = await self.image_encoder.encode(image_query) # 2. 多模态检索 results = await self.retrieve_multimodal(query_embedding) # 3. 生成答案 answer = await self.generate_from_multimodal(results, query) return answer 9. 最佳实践 文档预处理:清洗、去重、格式统一 智能分块:根据文档类型选择分块策略 混合检索:结合向量和关键词检索 重排序:使用交叉编码器提升精度 上下文优化:压缩和优先级排序 缓存策略:多级缓存提升性能 持续优化:基于反馈改进系统 结论 RAG Agent通过结合检索和生成,显著提升了LLM的准确性和实用性。关键在于优化检索质量、上下文管理和生成策略。 ...
引言 单个Agent虽然强大,但面对复杂任务时往往力不从心。多Agent系统通过协作分工,能够处理更复杂的问题,实现1+1>2的效果。本文深入探讨多Agent系统的设计原理、协作机制和实践案例。 1. 多Agent系统架构 graph TB subgraph "多Agent系统拓扑" subgraph "层次结构" M[管理Agent] M --> A1[执行Agent1] M --> A2[执行Agent2] M --> A3[执行Agent3] end subgraph "对等网络" P1[Agent1] <--> P2[Agent2] P2 <--> P3[Agent3] P3 <--> P1 end subgraph "黑板模式" BB[共享黑板] B1[Agent1] --> BB B2[Agent2] --> BB B3[Agent3] --> BB BB --> B1 BB --> B2 BB --> B3 end end style M fill:#ffeb3b,stroke:#f57c00,stroke-width:2px style BB fill:#e1f5fe,stroke:#01579b,stroke-width:2px 1.1 系统拓扑结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from enum import Enum from typing import List, Dict, Any class TopologyType(Enum): HIERARCHICAL = "hierarchical" # 层次结构 PEER_TO_PEER = "p2p" # 对等网络 BLACKBOARD = "blackboard" # 黑板模式 PIPELINE = "pipeline" # 流水线 HYBRID = "hybrid" # 混合模式 class MultiAgentSystem: def __init__(self, topology: TopologyType): self.topology = topology self.agents = {} self.communication_channels = {} self.shared_memory = {} def add_agent(self, agent_id: str, agent: Any): """添加Agent到系统""" self.agents[agent_id] = agent self.setup_communication(agent_id) def setup_communication(self, agent_id: str): """设置通信通道""" if self.topology == TopologyType.HIERARCHICAL: self._setup_hierarchical_comm(agent_id) elif self.topology == TopologyType.PEER_TO_PEER: self._setup_p2p_comm(agent_id) 1.2 Agent角色定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class AgentRole(Enum): COORDINATOR = "coordinator" # 协调者 EXECUTOR = "executor" # 执行者 VALIDATOR = "validator" # 验证者 MONITOR = "monitor" # 监控者 SPECIALIST = "specialist" # 专家 class BaseAgent: def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]): self.agent_id = agent_id self.role = role self.capabilities = capabilities self.message_queue = [] self.state = AgentState.IDLE async def receive_message(self, message: Dict): """接收消息""" self.message_queue.append(message) await self.process_message(message) async def send_message(self, recipient: str, content: Dict): """发送消息""" message = { "sender": self.agent_id, "recipient": recipient, "content": content, "timestamp": time.time() } await self.communication_channel.send(message) 2. 通信协议设计 2.1 消息格式定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from dataclasses import dataclass from typing import Optional, Any @dataclass class Message: sender_id: str recipient_id: str message_type: str # REQUEST, RESPONSE, BROADCAST, etc. content: Any conversation_id: str timestamp: float priority: int = 0 requires_response: bool = False class MessageProtocol: """消息协议定义""" # 消息类型 REQUEST = "REQUEST" RESPONSE = "RESPONSE" BROADCAST = "BROADCAST" SUBSCRIBE = "SUBSCRIBE" UNSUBSCRIBE = "UNSUBSCRIBE" HEARTBEAT = "HEARTBEAT" # 内容格式 @staticmethod def create_task_request(task: str, requirements: Dict) -> Dict: return { "type": MessageProtocol.REQUEST, "task": task, "requirements": requirements, "deadline": None } @staticmethod def create_capability_announcement(capabilities: List[str]) -> Dict: return { "type": MessageProtocol.BROADCAST, "capabilities": capabilities, "availability": True } 2.2 通信中间件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncio from collections import defaultdict class CommunicationMiddleware: def __init__(self): self.subscribers = defaultdict(list) self.message_buffer = asyncio.Queue() self.routing_table = {} async def publish(self, topic: str, message: Message): """发布消息到主题""" subscribers = self.subscribers.get(topic, []) tasks = [] for subscriber in subscribers: task = asyncio.create_task( subscriber.receive_message(message) ) tasks.append(task) await asyncio.gather(*tasks) def subscribe(self, topic: str, agent: BaseAgent): """订阅主题""" self.subscribers[topic].append(agent) async def route_message(self, message: Message): """路由消息""" if message.recipient_id == "BROADCAST": await self.broadcast(message) else: recipient = self.routing_table.get(message.recipient_id) if recipient: await recipient.receive_message(message) 3. 任务分配与协调 3.1 任务分解策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class TaskDecomposer: def __init__(self, llm): self.llm = llm def decompose_task(self, task: str) -> List[Dict]: """将复杂任务分解为子任务""" prompt = f""" 将以下任务分解为独立的子任务: 任务:{task} 要求: 1. 每个子任务应该是独立可执行的 2. 标注每个子任务所需的能力 3. 标注子任务之间的依赖关系 输出JSON格式: {{ "subtasks": [ {{ "id": "task_1", "description": "...", "required_capabilities": [...], "dependencies": [], "estimated_time": 0 }} ] }} """ response = self.llm.invoke(prompt) return json.loads(response)["subtasks"] class TaskCoordinator: def __init__(self, agents: Dict[str, BaseAgent]): self.agents = agents self.task_queue = asyncio.Queue() self.task_assignments = {} async def assign_task(self, task: Dict): """分配任务给合适的Agent""" # 找到具备所需能力的Agent capable_agents = self.find_capable_agents( task["required_capabilities"] ) if not capable_agents: return None # 选择最优Agent(考虑负载均衡) selected_agent = self.select_optimal_agent(capable_agents) # 分配任务 self.task_assignments[task["id"]] = selected_agent.agent_id await selected_agent.receive_message( MessageProtocol.create_task_request( task["description"], task.get("requirements", {}) ) ) return selected_agent.agent_id def find_capable_agents(self, required_capabilities: List[str]): """查找具备所需能力的Agent""" capable = [] for agent in self.agents.values(): if all(cap in agent.capabilities for cap in required_capabilities): capable.append(agent) return capable 3.2 协商机制 sequenceDiagram participant C as 协调Agent participant A1 as Agent1 participant A2 as Agent2 participant A3 as Agent3 C->>A1: 任务公告 C->>A2: 任务公告 C->>A3: 任务公告 Note over A1,A3: 评估任务能力 A1-->>C: 投标(成本:10, 时间:5min) A2-->>C: 投标(成本:8, 时间:7min) A3-->>C: 不投标 Note over C: 评估投标 C->>A2: 授予合同 A2->>C: 接受合同 Note over A2: 执行任务 A2->>C: 任务完成报告 C->>A2: 确认完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class ContractNetProtocol: """合同网协议实现""" def __init__(self, coordinator: BaseAgent): self.coordinator = coordinator self.bids = {} async def announce_task(self, task: Dict): """发布任务公告""" announcement = { "type": "TASK_ANNOUNCEMENT", "task": task, "deadline": time.time() + 10 # 10秒投标期 } # 广播任务 await self.coordinator.send_message( "BROADCAST", announcement ) # 等待投标 await asyncio.sleep(10) # 选择中标者 winner = self.select_winner() if winner: await self.award_contract(winner, task) async def submit_bid(self, agent_id: str, bid: Dict): """提交投标""" self.bids[agent_id] = { "cost": bid.get("cost", float('inf')), "time": bid.get("estimated_time", float('inf')), "confidence": bid.get("confidence", 0) } def select_winner(self) -> Optional[str]: """选择中标Agent""" if not self.bids: return None # 综合评分(可自定义权重) best_agent = None best_score = float('-inf') for agent_id, bid in self.bids.items(): score = ( bid["confidence"] * 0.5 - bid["cost"] * 0.3 - bid["time"] * 0.2 ) if score > best_score: best_score = score best_agent = agent_id return best_agent 4. 知识共享机制 4.1 黑板系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class BlackboardSystem: """黑板系统实现""" def __init__(self): self.blackboard = {} self.knowledge_sources = [] self.controller = None self.locks = {} async def write(self, key: str, value: Any, agent_id: str): """写入知识""" async with self.get_lock(key): self.blackboard[key] = { "value": value, "author": agent_id, "timestamp": time.time(), "version": self.get_version(key) + 1 } # 通知订阅者 await self.notify_subscribers(key, value) async def read(self, key: str) -> Any: """读取知识""" entry = self.blackboard.get(key) return entry["value"] if entry else None def subscribe(self, pattern: str, callback): """订阅知识更新""" self.knowledge_sources.append({ "pattern": pattern, "callback": callback }) async def notify_subscribers(self, key: str, value: Any): """通知订阅者""" for source in self.knowledge_sources: if self.match_pattern(key, source["pattern"]): await source["callback"](key, value) 4.2 知识图谱共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 from typing import Tuple import networkx as nx class SharedKnowledgeGraph: def __init__(self): self.graph = nx.DiGraph() self.embeddings = {} # 节点嵌入 def add_knowledge(self, subject: str, predicate: str, object: str, agent_id: str, confidence: float = 1.0): """添加知识三元组""" # 添加节点 if subject not in self.graph: self.graph.add_node(subject, type="entity") if object not in self.graph: self.graph.add_node(object, type="entity") # 添加边 self.graph.add_edge( subject, object, predicate=predicate, contributor=agent_id, confidence=confidence, timestamp=time.time() ) def query(self, query_type: str, **kwargs) -> List: """查询知识""" if query_type == "neighbors": node = kwargs.get("node") return list(self.graph.neighbors(node)) elif query_type == "path": source = kwargs.get("source") target = kwargs.get("target") try: path = nx.shortest_path(self.graph, source, target) return path except nx.NetworkXNoPath: return [] elif query_type == "subgraph": nodes = kwargs.get("nodes", []) return self.graph.subgraph(nodes) def merge_knowledge(self, other_graph: 'SharedKnowledgeGraph'): """合并其他Agent的知识""" for edge in other_graph.graph.edges(data=True): source, target, data = edge existing_edge = self.graph.get_edge_data(source, target) if existing_edge: # 更新置信度(加权平均) new_confidence = ( existing_edge["confidence"] + data["confidence"] ) / 2 self.graph[source][target]["confidence"] = new_confidence else: self.add_knowledge( source, data["predicate"], target, data["contributor"], data["confidence"] ) 5. 冲突解决机制 5.1 投票机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class VotingMechanism: def __init__(self, voting_type: str = "majority"): self.voting_type = voting_type self.votes = {} async def collect_votes(self, issue: str, options: List[str], voters: List[BaseAgent]): """收集投票""" self.votes[issue] = defaultdict(list) # 并行收集投票 tasks = [] for voter in voters: task = asyncio.create_task( self.get_vote(voter, issue, options) ) tasks.append(task) votes = await asyncio.gather(*tasks) # 统计投票 for voter, vote in zip(voters, votes): self.votes[issue][vote].append(voter.agent_id) return self.determine_winner(issue) async def get_vote(self, voter: BaseAgent, issue: str, options: List[str]) -> str: """获取单个Agent的投票""" vote_request = { "type": "VOTE_REQUEST", "issue": issue, "options": options } response = await voter.process_vote_request(vote_request) return response["vote"] def determine_winner(self, issue: str) -> str: """确定获胜选项""" vote_counts = { option: len(voters) for option, voters in self.votes[issue].items() } if self.voting_type == "majority": return max(vote_counts, key=vote_counts.get) elif self.voting_type == "unanimous": if len(vote_counts) == 1: return list(vote_counts.keys())[0] return None 5.2 协商与妥协 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class NegotiationProtocol: def __init__(self): self.negotiation_history = [] async def negotiate(self, agents: List[BaseAgent], issue: Dict): """多轮协商""" max_rounds = 5 current_round = 0 while current_round < max_rounds: proposals = await self.collect_proposals(agents, issue) # 评估提案 evaluations = await self.evaluate_proposals( agents, proposals ) # 检查是否达成共识 consensus = self.check_consensus(evaluations) if consensus: return consensus # 生成反提案 issue = self.generate_counter_proposal(evaluations) current_round += 1 # 未达成共识,使用仲裁 return await self.arbitrate(agents, issue) async def collect_proposals(self, agents: List[BaseAgent], issue: Dict) -> List[Dict]: """收集提案""" proposals = [] for agent in agents: proposal = await agent.generate_proposal(issue) proposals.append({ "agent_id": agent.agent_id, "proposal": proposal }) return proposals 6. 实际应用案例 6.1 软件开发团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class SoftwareDevelopmentTeam: def __init__(self): self.product_manager = self.create_pm_agent() self.architect = self.create_architect_agent() self.developers = self.create_developer_agents(3) self.qa_engineer = self.create_qa_agent() self.devops = self.create_devops_agent() def create_pm_agent(self): return Agent( agent_id="pm_001", role=AgentRole.COORDINATOR, capabilities=["requirement_analysis", "planning"], llm=ChatOpenAI(model="gpt-4") ) def create_architect_agent(self): return Agent( agent_id="architect_001", role=AgentRole.SPECIALIST, capabilities=["system_design", "tech_selection"], llm=ChatOpenAI(model="gpt-4") ) async def develop_feature(self, feature_request: str): """开发新功能的完整流程""" # 1. PM分析需求 requirements = await self.product_manager.analyze_requirements( feature_request ) # 2. 架构师设计系统 design = await self.architect.create_design(requirements) # 3. 分配开发任务 tasks = self.decompose_development_tasks(design) development_results = await self.parallel_development( tasks, self.developers ) # 4. QA测试 test_results = await self.qa_engineer.test_feature( development_results ) # 5. DevOps部署 if test_results["passed"]: deployment = await self.devops.deploy(development_results) return deployment else: # 返回开发阶段修复bug return await self.fix_bugs(test_results["issues"]) 6.2 研究分析团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class ResearchTeam: def __init__(self): self.lead_researcher = Agent( "lead_001", AgentRole.COORDINATOR, ["research_planning", "synthesis"] ) self.data_collectors = [ Agent(f"collector_{i}", AgentRole.EXECUTOR, ["web_search", "data_extraction"]) for i in range(3) ] self.analysts = [ Agent(f"analyst_{i}", AgentRole.SPECIALIST, ["data_analysis", "visualization"]) for i in range(2) ] self.fact_checker = Agent( "checker_001", AgentRole.VALIDATOR, ["fact_checking", "source_verification"] ) async def conduct_research(self, topic: str): """执行研究项目""" # 1. 制定研究计划 research_plan = await self.lead_researcher.create_plan(topic) # 2. 并行数据收集 data_collection_tasks = [] for collector in self.data_collectors: task = asyncio.create_task( collector.collect_data(research_plan["queries"]) ) data_collection_tasks.append(task) raw_data = await asyncio.gather(*data_collection_tasks) # 3. 事实核查 verified_data = await self.fact_checker.verify_data(raw_data) # 4. 数据分析 analysis_results = [] for analyst in self.analysts: result = await analyst.analyze(verified_data) analysis_results.append(result) # 5. 综合报告 final_report = await self.lead_researcher.synthesize( analysis_results ) return final_report 7. 性能优化 7.1 负载均衡 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class LoadBalancer: def __init__(self, agents: List[BaseAgent]): self.agents = agents self.agent_loads = {agent.agent_id: 0 for agent in agents} self.agent_performance = { agent.agent_id: {"avg_time": 0, "success_rate": 1.0} for agent in agents } def select_agent(self, task: Dict) -> BaseAgent: """选择负载最低的Agent""" # 计算综合得分 scores = {} for agent in self.agents: if self.is_capable(agent, task): load_score = 1 / (1 + self.agent_loads[agent.agent_id]) perf_score = self.agent_performance[agent.agent_id]["success_rate"] scores[agent.agent_id] = load_score * perf_score if not scores: return None # 选择得分最高的Agent best_agent_id = max(scores, key=scores.get) selected_agent = next( a for a in self.agents if a.agent_id == best_agent_id ) # 更新负载 self.agent_loads[best_agent_id] += 1 return selected_agent def update_performance(self, agent_id: str, execution_time: float, success: bool): """更新Agent性能指标""" perf = self.agent_performance[agent_id] # 更新平均执行时间(指数移动平均) alpha = 0.3 perf["avg_time"] = ( alpha * execution_time + (1 - alpha) * perf["avg_time"] ) # 更新成功率 perf["success_rate"] = ( perf["success_rate"] * 0.9 + (1.0 if success else 0.0) * 0.1 ) # 更新负载 self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1) 7.2 缓存与共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class SharedCache: def __init__(self, max_size: int = 1000): self.cache = {} self.access_count = defaultdict(int) self.max_size = max_size async def get(self, key: str) -> Any: """获取缓存""" if key in self.cache: self.access_count[key] += 1 return self.cache[key]["value"] return None async def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存""" if len(self.cache) >= self.max_size: # LFU淘汰策略 self.evict_least_frequent() self.cache[key] = { "value": value, "expire_at": time.time() + ttl, "set_by": None # 可以记录是哪个Agent设置的 } def evict_least_frequent(self): """淘汰最少使用的缓存""" if not self.cache: return least_used = min( self.cache.keys(), key=lambda k: self.access_count.get(k, 0) ) del self.cache[least_used] del self.access_count[least_used] 8. 监控与调试 8.1 系统监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class SystemMonitor: def __init__(self): self.metrics = { "message_count": 0, "task_completed": 0, "task_failed": 0, "avg_response_time": 0, "agent_utilization": {} } self.event_log = [] def log_event(self, event_type: str, details: Dict): """记录事件""" event = { "timestamp": time.time(), "type": event_type, "details": details } self.event_log.append(event) # 更新指标 self.update_metrics(event) def update_metrics(self, event: Dict): """更新系统指标""" if event["type"] == "MESSAGE_SENT": self.metrics["message_count"] += 1 elif event["type"] == "TASK_COMPLETED": self.metrics["task_completed"] += 1 elif event["type"] == "TASK_FAILED": self.metrics["task_failed"] += 1 def generate_report(self) -> Dict: """生成监控报告""" return { "metrics": self.metrics, "health_status": self.check_health(), "bottlenecks": self.identify_bottlenecks(), "recommendations": self.generate_recommendations() } def check_health(self) -> str: """检查系统健康状态""" success_rate = ( self.metrics["task_completed"] / max(1, self.metrics["task_completed"] + self.metrics["task_failed"]) ) if success_rate > 0.95: return "HEALTHY" elif success_rate > 0.8: return "DEGRADED" else: return "CRITICAL" 8.2 调试工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Debugger: def __init__(self, system: MultiAgentSystem): self.system = system self.breakpoints = set() self.watch_list = {} def set_breakpoint(self, agent_id: str, event_type: str): """设置断点""" self.breakpoints.add((agent_id, event_type)) async def trace_execution(self, duration: int = 60): """追踪执行过程""" trace_data = [] start_time = time.time() while time.time() - start_time < duration: for agent_id, agent in self.system.agents.items(): state = { "agent_id": agent_id, "state": agent.state, "queue_size": len(agent.message_queue), "timestamp": time.time() } trace_data.append(state) await asyncio.sleep(1) return self.analyze_trace(trace_data) def analyze_trace(self, trace_data: List[Dict]): """分析追踪数据""" analysis = { "deadlocks": self.detect_deadlocks(trace_data), "performance_issues": self.detect_performance_issues(trace_data), "communication_patterns": self.analyze_communication(trace_data) } return analysis 9. 最佳实践 明确的角色定义:每个Agent应有清晰的职责边界 高效的通信协议:减少不必要的消息传递 容错机制:处理Agent失败的情况 可扩展性设计:支持动态添加/移除Agent 监控和日志:全面的系统监控 测试策略:包括单元测试和集成测试 结论 多Agent系统通过协作能够解决单个Agent无法处理的复杂问题。关键在于设计合理的架构、高效的通信机制和智能的协调策略。 ...
前言 LangChain作为最流行的LLM应用开发框架,其Agent系统提供了强大的工具来构建智能对话系统。本文将全面介绍LangChain Agent的核心概念、实现方式和最佳实践。 1. LangChain Agent基础 1.1 核心概念 LangChain Agent由以下核心组件构成: 1 2 3 4 5 6 7 8 9 10 11 12 from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent from langchain.prompts import StringPromptTemplate from langchain import OpenAI, SerpAPIWrapper, LLMChain # Agent的核心组件 components = { "llm": "大语言模型", "tools": "可用工具集", "prompt": "提示模板", "output_parser": "输出解析器", "memory": "记忆系统" } 1.2 Agent类型 LangChain提供多种预构建的Agent类型: 1 2 3 4 5 6 7 8 9 10 11 # 1. Zero-shot ReAct Agent from langchain.agents import create_react_agent # 2. Conversational Agent from langchain.agents import create_openai_functions_agent # 3. Plan-and-Execute Agent from langchain_experimental.plan_and_execute import PlanAndExecute # 4. Self-ask with search from langchain.agents import create_self_ask_with_search_agent 2. 构建第一个Agent 2.1 基础设置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from langchain.agents import create_react_agent, AgentExecutor from langchain.tools import Tool from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate # 初始化LLM llm = ChatOpenAI( temperature=0, model="gpt-4-turbo-preview" ) # 定义工具 def calculate(expression: str) -> str: """计算数学表达式""" try: result = eval(expression) return str(result) except: return "计算错误" tools = [ Tool( name="Calculator", func=calculate, description="用于计算数学表达式。输入应该是有效的Python表达式。" ) ] 2.2 创建Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # 定义提示模板 prompt = PromptTemplate.from_template(""" Answer the following questions as best you can. You have access to the following tools: {tools} Use the following format: Question: the input question you must answer Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question Begin! Question: {input} Thought: {agent_scratchpad} """) # 创建Agent agent = create_react_agent( llm=llm, tools=tools, prompt=prompt ) # 创建执行器 agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 ) 3. 高级工具集成 3.1 自定义工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.tools import BaseTool from typing import Optional, Type from pydantic import BaseModel, Field class SearchInput(BaseModel): query: str = Field(description="搜索查询词") class CustomSearchTool(BaseTool): name = "custom_search" description = "搜索互联网信息" args_schema: Type[BaseModel] = SearchInput def _run(self, query: str) -> str: """执行搜索""" # 实现搜索逻辑 return f"搜索结果:{query}" async def _arun(self, query: str) -> str: """异步执行搜索""" # 实现异步搜索逻辑 return await self.async_search(query) 3.2 工具链组合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain.tools import StructuredTool def multi_step_analysis(data: dict) -> str: """多步骤数据分析工具""" steps = [] # 步骤1:数据清洗 cleaned_data = clean_data(data) steps.append("数据清洗完成") # 步骤2:统计分析 statistics = analyze_statistics(cleaned_data) steps.append(f"统计分析:{statistics}") # 步骤3:生成报告 report = generate_report(statistics) steps.append("报告生成完成") return "\n".join(steps) analysis_tool = StructuredTool.from_function( func=multi_step_analysis, name="DataAnalysis", description="执行多步骤数据分析" ) 4. 记忆系统集成 4.1 对话记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory from langchain.memory import ConversationBufferWindowMemory # 完整对话记忆 memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # 窗口记忆(只保留最近N轮) window_memory = ConversationBufferWindowMemory( k=5, # 保留最近5轮对话 memory_key="chat_history", return_messages=True ) # 摘要记忆 summary_memory = ConversationSummaryMemory( llm=llm, memory_key="chat_history", return_messages=True ) 4.2 实体记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.memory import ConversationEntityMemory entity_memory = ConversationEntityMemory( llm=llm, entity_extraction_prompt=ENTITY_EXTRACTION_PROMPT, entity_summarization_prompt=ENTITY_SUMMARIZATION_PROMPT ) # 使用实体记忆的Agent agent_with_memory = AgentExecutor( agent=agent, tools=tools, memory=entity_memory, verbose=True ) 5. 自定义Agent类型 5.1 计划执行Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class PlanExecuteAgent: def __init__(self, llm, tools): self.llm = llm self.tools = tools self.planner = self._create_planner() self.executor = self._create_executor() def _create_planner(self): """创建计划生成器""" planner_prompt = PromptTemplate( template=""" 给定目标:{objective} 创建一个详细的执行计划,将任务分解为具体步骤。 每个步骤应该清晰、可执行。 输出格式: 1. [步骤描述] 2. [步骤描述] ... """, input_variables=["objective"] ) return LLMChain(llm=self.llm, prompt=planner_prompt) def _create_executor(self): """创建执行器""" return AgentExecutor( agent=create_react_agent(self.llm, self.tools, REACT_PROMPT), tools=self.tools ) def run(self, objective: str): # 生成计划 plan = self.planner.run(objective) steps = self._parse_plan(plan) # 执行计划 results = [] for step in steps: result = self.executor.run(step) results.append(result) return self._synthesize_results(results) 5.2 自反思Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class SelfReflectionAgent: def __init__(self, llm): self.llm = llm self.reflection_prompt = PromptTemplate( template=""" 任务:{task} 初次回答:{initial_answer} 请评估这个回答: 1. 准确性如何? 2. 完整性如何? 3. 有什么可以改进的地方? 提供改进后的答案: """, input_variables=["task", "initial_answer"] ) def run(self, task: str): # 初次回答 initial_answer = self.llm.invoke(task) # 自我反思 reflection_chain = LLMChain( llm=self.llm, prompt=self.reflection_prompt ) improved_answer = reflection_chain.run( task=task, initial_answer=initial_answer ) return improved_answer 6. 流式输出与异步处理 6.1 流式响应 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler # 配置流式输出 streaming_llm = ChatOpenAI( temperature=0, streaming=True, callbacks=[StreamingStdOutCallbackHandler()] ) # 创建支持流式的Agent streaming_agent = create_react_agent( llm=streaming_llm, tools=tools, prompt=prompt ) # 流式执行 async def stream_agent_response(query: str): async for chunk in streaming_agent.astream({"input": query}): if "output" in chunk: yield chunk["output"] 6.2 异步Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio from langchain.agents import create_openai_tools_agent async def async_agent_execution(): # 创建异步Agent async_agent = create_openai_tools_agent( llm=llm, tools=tools, prompt=prompt ) # 异步执行器 async_executor = AgentExecutor( agent=async_agent, tools=tools, verbose=True ) # 并发执行多个查询 queries = ["问题1", "问题2", "问题3"] tasks = [async_executor.ainvoke({"input": q}) for q in queries] results = await asyncio.gather(*tasks) return results 7. 错误处理与重试机制 7.1 错误处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.agents import AgentExecutor from langchain.callbacks import CallbackManager from tenacity import retry, stop_after_attempt, wait_exponential class RobustAgent: def __init__(self, agent, tools): self.executor = AgentExecutor( agent=agent, tools=tools, max_iterations=5, early_stopping_method="generate", handle_parsing_errors=True ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def run_with_retry(self, input_text: str): try: result = self.executor.run(input_text) return result except Exception as e: print(f"错误: {e}") # 降级处理 return self.fallback_response(input_text) def fallback_response(self, input_text: str): """降级响应""" return f"抱歉,无法处理您的请求:{input_text}" 7.2 验证机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ValidatedAgent: def __init__(self, agent, validator): self.agent = agent self.validator = validator def run(self, input_text: str): # 输入验证 if not self.validator.validate_input(input_text): return "输入无效" # 执行Agent result = self.agent.run(input_text) # 输出验证 if not self.validator.validate_output(result): return self.agent.run_with_correction(input_text) return result 8. 性能优化 8.1 缓存策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from langchain.cache import InMemoryCache, RedisCache from langchain.globals import set_llm_cache # 内存缓存 set_llm_cache(InMemoryCache()) # Redis缓存 import redis redis_client = redis.Redis.from_url("redis://localhost:6379") set_llm_cache(RedisCache(redis_client)) # 自定义缓存 class CustomCache: def __init__(self): self.cache = {} def lookup(self, prompt: str, llm_string: str): key = f"{llm_string}:{prompt}" return self.cache.get(key) def update(self, prompt: str, llm_string: str, return_val: list): key = f"{llm_string}:{prompt}" self.cache[key] = return_val 8.2 批处理优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from langchain.llms import OpenAI from concurrent.futures import ThreadPoolExecutor class BatchProcessor: def __init__(self, agent, max_workers=5): self.agent = agent self.executor = ThreadPoolExecutor(max_workers=max_workers) def process_batch(self, inputs: list): """批量处理输入""" futures = [] for input_text in inputs: future = self.executor.submit(self.agent.run, input_text) futures.append(future) results = [] for future in futures: try: result = future.result(timeout=30) results.append(result) except Exception as e: results.append(f"处理失败: {e}") return results 9. 监控与日志 9.1 自定义回调 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.callbacks.base import BaseCallbackHandler import time class PerformanceCallbackHandler(BaseCallbackHandler): def __init__(self): self.start_times = {} self.metrics = { "total_tokens": 0, "total_cost": 0, "execution_times": [] } def on_llm_start(self, serialized, prompts, **kwargs): self.start_times["llm"] = time.time() def on_llm_end(self, response, **kwargs): duration = time.time() - self.start_times.get("llm", time.time()) self.metrics["execution_times"].append(duration) # 记录token使用 if hasattr(response, "llm_output"): tokens = response.llm_output.get("token_usage", {}) self.metrics["total_tokens"] += tokens.get("total_tokens", 0) def on_tool_start(self, serialized, input_str, **kwargs): self.start_times[f"tool_{serialized.get('name')}"] = time.time() def on_tool_end(self, output, **kwargs): # 记录工具执行时间 pass 9.2 追踪系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.callbacks import LangChainTracer # 配置追踪 tracer = LangChainTracer( project_name="my_agent_project", client=None # 可以配置自定义客户端 ) # 使用追踪的Agent traced_agent = AgentExecutor( agent=agent, tools=tools, callbacks=[tracer], verbose=True ) 10. 生产部署 10.1 API封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn app = FastAPI() class AgentRequest(BaseModel): query: str session_id: str = None class AgentResponse(BaseModel): result: str metadata: dict = {} @app.post("/agent/chat", response_model=AgentResponse) async def chat(request: AgentRequest): try: # 获取或创建会话 session = get_or_create_session(request.session_id) # 执行Agent result = await agent_executor.ainvoke({ "input": request.query, "chat_history": session.get_history() }) # 更新会话历史 session.add_message(request.query, result["output"]) return AgentResponse( result=result["output"], metadata={ "session_id": session.id, "tools_used": result.get("intermediate_steps", []) } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) 10.2 Docker部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # Dockerfile FROM python:3.11-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码 COPY . . # 环境变量 ENV OPENAI_API_KEY=${OPENAI_API_KEY} ENV LANGCHAIN_TRACING_V2=true ENV LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY} # 运行 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] 11. 测试策略 11.1 单元测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import pytest from unittest.mock import Mock, patch class TestAgent: @pytest.fixture def mock_llm(self): llm = Mock() llm.invoke.return_value = "模拟响应" return llm @pytest.fixture def test_agent(self, mock_llm): return create_react_agent( llm=mock_llm, tools=[], prompt=PromptTemplate.from_template("{input}") ) def test_agent_execution(self, test_agent): result = test_agent.run("测试输入") assert result is not None @patch('langchain.tools.Calculator') def test_tool_usage(self, mock_calculator): mock_calculator.run.return_value = "42" # 测试工具调用 11.2 集成测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class IntegrationTests: def test_end_to_end_flow(self): """端到端测试""" # 1. 创建Agent agent = create_production_agent() # 2. 测试简单查询 simple_result = agent.run("What is 2+2?") assert "4" in simple_result # 3. 测试复杂查询 complex_result = agent.run( "Search for the latest AI news and summarize it" ) assert len(complex_result) > 100 # 4. 测试错误处理 error_result = agent.run("Invalid input %$#@") assert "error" not in error_result.lower() 12. 最佳实践总结 明确的工具描述:确保工具描述准确、详细 适当的提示工程:根据任务调整提示模板 合理的迭代限制:防止无限循环 完善的错误处理:处理各种异常情况 性能监控:跟踪token使用和执行时间 渐进式复杂度:从简单Agent开始,逐步增加功能 测试覆盖:包括单元测试和集成测试 文档完善:记录Agent能力和限制 结论 LangChain Agent提供了构建智能对话系统的强大框架。通过合理使用其提供的工具和模式,我们可以快速构建出功能丰富、性能优秀的AI Agent应用。 ...
引言 AI Agent作为人工智能领域的前沿技术,正在revolutionize我们与AI系统交互的方式。本文将深入探讨AI Agent的核心架构、设计模式和实践经验。 1. AI Agent核心架构 1.1 基础组件 AI Agent系统通常包含以下核心组件: 1 2 3 4 5 6 7 class AgentCore: def __init__(self): self.llm = LanguageModel() # 大语言模型 self.memory = MemorySystem() # 记忆系统 self.tools = ToolRegistry() # 工具注册表 self.planner = TaskPlanner() # 任务规划器 self.executor = ActionExecutor() # 执行器 1.2 感知-推理-行动循环 Agent的核心运行机制基于感知-推理-行动(Perception-Reasoning-Action)循环: graph LR A[环境输入] --> B[感知模块] B --> C[推理引擎] C --> D[行动执行] D --> E[环境反馈] E --> B C --> F[记忆系统] F --> C C --> G[知识库] G --> C style A fill:#e1f5fe,stroke:#01579b,stroke-width:2px style E fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style C fill:#fff3e0,stroke:#e65100,stroke-width:2px 感知阶段:接收环境输入,理解用户意图 推理阶段:基于记忆和知识进行决策 行动阶段:执行具体操作,产生输出 2. 高级架构模式 2.1 ReAct架构 ReAct(Reasoning and Acting)模式结合了推理和行动: ...
引言 在人工智能和大语言模型(LLM)的应用中,知识的表示与组织方式直接影响系统的推理能力和智能水平。LangChain Graph 作为LangChain生态系统中的重要组件,提供了一套强大的工具,使开发者能够轻松地从文本中提取结构化知识,构建知识图谱,并基于图进行复杂推理。本文将深入探讨LangChain Graph的概念、工作原理、应用场景以及实践技巧,帮助您全面理解和应用这一强大工具。 知识图谱与LangChain Graph基础 什么是知识图谱? 知识图谱(Knowledge Graph)是一种结构化数据模型,用于表示实体(Entities)之间的关系(Relations)。它以图的形式组织信息,其中: 节点(Nodes):代表实体或概念 边(Edges):代表实体间的关系 graph LR A["艾伦·图灵"] -->|"发明"| B["图灵机"] A -->|"出生于"| C["英国"] A -->|"被誉为"| D["计算机科学之父"] B -->|"是"| E["理论计算模型"] LangChain Graph的定义与价值 LangChain Graph是LangChain框架中专注于知识图谱构建、存储和查询的模块集合。它将LLM的自然语言处理能力与图数据库的结构化表示结合,实现了: 自动从文本中提取实体和关系 构建和维护知识图谱 基于图结构进行复杂查询和推理 增强LLM应用的上下文理解和回答质量 LangChain Graph架构 LangChain Graph的整体架构可以通过以下图示来理解: flowchart TB subgraph "输入层" A["文本文档"] --> B["网页内容"] C["结构化数据"] --> D["用户查询"] end subgraph "处理层" E["实体提取 EntityExtractor"] F["关系提取 RelationExtractor"] G["知识图谱构建 KnowledgeGraphCreator"] end subgraph "存储层" H["图数据库 Neo4j/NetworkX"] I["向量存储 VectorStores"] end subgraph "应用层" J["图查询 GraphQuery"] K["图推理 GraphReasoning"] L["QA系统 GraphQAChain"] end A --> E B --> E C --> F D --> F E --> G F --> G G --> H G --> I H --> J H --> K I --> L 核心组件详解 1. 实体和关系提取器 这些组件负责从文本中识别实体和它们之间的关系: ...
前言 随着人工智能技术的快速发展,实时Agent系统正在成为推动产业智能化升级的核心驱动力。从多模态感知到自主决策,从工作流协作到人机协同,Agent技术正在重新定义人机交互的未来。本文将深入探讨实时Agent系统的技术演进路径、应用场景及其对未来产业发展的深远影响。 一、实时Agent系统的技术架构与核心原理 1.1 多模态与自主决策技术突破 实时Agent系统的核心能力建立在多模态感知与自主决策的技术融合上。根据斯坦福大学的研究,多模态智能体通过整合视觉、听觉等传感器数据,实现了对物理和虚拟环境的交互式理解。 技术亮点: GPT-4多模态融合:通过插件系统调用外部工具时,需同步处理文本指令与图像数据,其多模态融合准确率较单模态提升40%以上 硬件加速优化:GPU/TPU集群将推理延迟从秒级降至毫秒级 算法优化:知识蒸馏技术使模型参数量减少70%的同时保持90%的原始性能 实时Agent系统架构图: graph TD A["用户输入"] --> B["语音识别ASR"] B --> C["自然语言理解NLU"] C --> D["Agent决策引擎"] D --> E["任务执行模块"] D --> F["知识库查询"] E --> G["自然语言生成NLG"] F --> G G --> H["语音合成TTS"] H --> I["实时输出"] subgraph "核心处理层" D E F end subgraph "感知层" A B C end subgraph "表达层" G H I end style A fill:#e1f5fe,stroke:#01579b,stroke-width:2px style I fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px style D fill:#fff3e0,stroke:#e65100,stroke-width:2px 1.2 工作流与多智能体协作机制 当前技术流派主要分为两类: Workflow流(工作流模式) 以BabyAGI为代表,采用固定流程协调Agent: ...