Agent记忆系统设计:构建有记忆的智能体
引言 记忆是智能的基石。一个没有记忆的Agent就像得了健忘症的助手,无法积累经验、学习模式或维持上下文。本文深入探讨如何为AI Agent构建高效的记忆系统。 1. 记忆系统架构 1.1 记忆类型分层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from enum import Enum from typing import Any, List, Dict, Optional import time class MemoryType(Enum): SENSORY = "sensory" # 感官记忆(<1秒) WORKING = "working" # 工作记忆(秒级) SHORT_TERM = "short_term" # 短期记忆(分钟级) LONG_TERM = "long_term" # 长期记忆(永久) EPISODIC = "episodic" # 情景记忆 SEMANTIC = "semantic" # 语义记忆 PROCEDURAL = "procedural" # 程序记忆 class MemorySystem: def __init__(self): self.sensory_buffer = SensoryMemory(capacity=10, ttl=1) self.working_memory = WorkingMemory(capacity=7) self.short_term_memory = ShortTermMemory(capacity=100, ttl=300) self.long_term_memory = LongTermMemory() self.episodic_memory = EpisodicMemory() self.semantic_memory = SemanticMemory() self.procedural_memory = ProceduralMemory() def process_input(self, input_data: Any): """处理输入信息的记忆流程""" # 1. 感官记忆 self.sensory_buffer.store(input_data) # 2. 注意力机制筛选 if self.is_important(input_data): # 3. 进入工作记忆 self.working_memory.add(input_data) # 4. 编码到短期记忆 encoded = self.encode_memory(input_data) self.short_term_memory.store(encoded) # 5. 巩固到长期记忆 if self.should_consolidate(encoded): self.consolidate_to_long_term(encoded) 1.2 工作记忆实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class WorkingMemory: """Miller's Magic Number: 7±2""" def __init__(self, capacity: int = 7): self.capacity = capacity self.buffer = [] self.attention_weights = [] def add(self, item: Any): """添加项目到工作记忆""" if len(self.buffer) >= self.capacity: # 移除注意力权重最低的项 min_idx = self.attention_weights.index(min(self.attention_weights)) self.buffer.pop(min_idx) self.attention_weights.pop(min_idx) self.buffer.append(item) self.attention_weights.append(1.0) def update_attention(self, idx: int, weight_delta: float): """更新注意力权重""" if 0 <= idx < len(self.attention_weights): self.attention_weights[idx] += weight_delta # 归一化 total = sum(self.attention_weights) self.attention_weights = [w/total for w in self.attention_weights] def get_context(self) -> List[Any]: """获取当前工作记忆上下文""" # 按注意力权重排序 sorted_items = sorted( zip(self.buffer, self.attention_weights), key=lambda x: x[1], reverse=True ) return [item for item, _ in sorted_items] 2. 长期记忆管理 2.1 记忆编码与存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import hashlib import numpy as np from dataclasses import dataclass from datetime import datetime @dataclass class Memory: id: str content: Any embedding: np.ndarray timestamp: float access_count: int = 0 importance: float = 0.5 last_accessed: float = None metadata: Dict = None class LongTermMemory: def __init__(self, vector_dim: int = 768): self.memories = {} self.embeddings = [] self.index = None # FAISS索引 self.vector_dim = vector_dim self._init_index() def _init_index(self): """初始化向量索引""" import faiss self.index = faiss.IndexFlatL2(self.vector_dim) def store(self, content: Any, importance: float = 0.5): """存储记忆""" # 生成唯一ID memory_id = self._generate_id(content) # 生成嵌入向量 embedding = self._encode_content(content) # 创建记忆对象 memory = Memory( id=memory_id, content=content, embedding=embedding, timestamp=time.time(), importance=importance, metadata=self._extract_metadata(content) ) # 存储 self.memories[memory_id] = memory self.index.add(np.array([embedding])) return memory_id def retrieve(self, query: Any, k: int = 5) -> List[Memory]: """检索相关记忆""" query_embedding = self._encode_content(query) # 向量检索 distances, indices = self.index.search( np.array([query_embedding]), k ) # 获取记忆对象 retrieved = [] for idx in indices[0]: if idx < len(self.memories): memory_id = list(self.memories.keys())[idx] memory = self.memories[memory_id] # 更新访问信息 memory.access_count += 1 memory.last_accessed = time.time() retrieved.append(memory) # 按相关性和重要性重排 return self._rerank_memories(retrieved, query) def _rerank_memories(self, memories: List[Memory], query: Any) -> List[Memory]: """重排记忆(考虑时间衰减、重要性等)""" current_time = time.time() scored_memories = [] for memory in memories: # 时间衰减因子 time_decay = np.exp(-(current_time - memory.timestamp) / 86400) # 日衰减 # 访问频率因子 access_factor = np.log1p(memory.access_count) / 10 # 综合得分 score = ( 0.4 * memory.importance + 0.3 * time_decay + 0.3 * access_factor ) scored_memories.append((memory, score)) # 按得分排序 scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 2.2 记忆巩固机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class MemoryConsolidation: def __init__(self, consolidation_threshold: float = 0.7): self.threshold = consolidation_threshold self.consolidation_queue = [] def evaluate_for_consolidation(self, memory: Memory) -> bool: """评估是否需要巩固到长期记忆""" # 重要性评分 importance_score = memory.importance # 重复接触评分 repetition_score = min(1.0, memory.access_count / 5) # 情感强度评分 emotion_score = self._evaluate_emotional_intensity(memory.content) # 新颖性评分 novelty_score = self._evaluate_novelty(memory.content) # 综合评分 consolidation_score = ( 0.3 * importance_score + 0.2 * repetition_score + 0.3 * emotion_score + 0.2 * novelty_score ) return consolidation_score >= self.threshold def consolidate(self, short_term_memories: List[Memory]) -> List[Memory]: """巩固短期记忆到长期记忆""" consolidated = [] for memory in short_term_memories: if self.evaluate_for_consolidation(memory): # 增强记忆编码 enhanced_memory = self._enhance_memory(memory) # 创建关联连接 self._create_associations(enhanced_memory, consolidated) consolidated.append(enhanced_memory) return consolidated def _enhance_memory(self, memory: Memory) -> Memory: """增强记忆编码(添加更多细节和关联)""" # 提取关键概念 concepts = self._extract_concepts(memory.content) # 生成记忆摘要 summary = self._generate_summary(memory.content) # 更新元数据 memory.metadata.update({ "concepts": concepts, "summary": summary, "consolidation_time": time.time() }) return memory 3. 情景记忆系统 3.1 情景记忆结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @dataclass class Episode: id: str start_time: float end_time: float events: List[Dict] context: Dict outcome: Any emotional_valence: float # -1到1,负面到正面 class EpisodicMemory: def __init__(self): self.episodes = {} self.current_episode = None self.episode_index = {} # 用于快速检索 def start_episode(self, context: Dict): """开始新情景""" episode_id = f"ep_{int(time.time() * 1000)}" self.current_episode = Episode( id=episode_id, start_time=time.time(), end_time=None, events=[], context=context, outcome=None, emotional_valence=0 ) return episode_id def add_event(self, event: Dict): """向当前情景添加事件""" if self.current_episode: event["timestamp"] = time.time() self.current_episode.events.append(event) # 更新情感效价 if "emotion" in event: self._update_emotional_valence(event["emotion"]) def end_episode(self, outcome: Any): """结束当前情景""" if self.current_episode: self.current_episode.end_time = time.time() self.current_episode.outcome = outcome # 存储情景 self.episodes[self.current_episode.id] = self.current_episode # 建立索引 self._index_episode(self.current_episode) # 重置当前情景 self.current_episode = None def recall_similar_episodes(self, query_context: Dict, k: int = 3) -> List[Episode]: """回忆相似情景""" similar_episodes = [] for episode in self.episodes.values(): similarity = self._calculate_context_similarity( query_context, episode.context ) similar_episodes.append((episode, similarity)) # 排序并返回top-k similar_episodes.sort(key=lambda x: x[1], reverse=True) return [ep for ep, _ in similar_episodes[:k]] def extract_patterns(self) -> Dict: """从情景中提取行为模式""" patterns = { "successful_patterns": [], "failure_patterns": [], "emotional_triggers": [] } for episode in self.episodes.values(): # 分析成功模式 if self._is_successful_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["successful_patterns"].append(pattern) # 分析失败模式 elif self._is_failure_outcome(episode.outcome): pattern = self._extract_action_sequence(episode) patterns["failure_patterns"].append(pattern) # 分析情感触发器 if abs(episode.emotional_valence) > 0.5: trigger = self._identify_emotional_trigger(episode) patterns["emotional_triggers"].append(trigger) return patterns 3.2 情景压缩与摘要 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class EpisodeCompression: def __init__(self, compression_ratio: float = 0.3): self.compression_ratio = compression_ratio def compress_episode(self, episode: Episode) -> Dict: """压缩情景为摘要""" # 识别关键事件 key_events = self._identify_key_events(episode.events) # 提取转折点 turning_points = self._find_turning_points(episode.events) # 生成叙事摘要 narrative = self._generate_narrative( key_events, turning_points, episode.outcome ) compressed = { "id": episode.id, "duration": episode.end_time - episode.start_time, "key_events": key_events, "turning_points": turning_points, "narrative": narrative, "outcome": episode.outcome, "emotional_arc": self._extract_emotional_arc(episode) } return compressed def _identify_key_events(self, events: List[Dict]) -> List[Dict]: """识别关键事件""" if len(events) <= 5: return events # 计算事件重要性 event_scores = [] for event in events: score = self._calculate_event_importance(event) event_scores.append((event, score)) # 选择top事件 event_scores.sort(key=lambda x: x[1], reverse=True) num_key_events = max(3, int(len(events) * self.compression_ratio)) key_events = [event for event, _ in event_scores[:num_key_events]] # 保持时间顺序 key_events.sort(key=lambda x: x["timestamp"]) return key_events 4. 语义记忆网络 4.1 概念网络构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import networkx as nx class SemanticMemory: def __init__(self): self.concept_graph = nx.DiGraph() self.concept_embeddings = {} self.relation_types = [ "is_a", "part_of", "has_property", "causes", "prevents", "related_to" ] def add_concept(self, concept: str, properties: Dict = None): """添加概念节点""" if concept not in self.concept_graph: self.concept_graph.add_node( concept, properties=properties or {}, activation=0.0, last_activated=None ) # 生成概念嵌入 self.concept_embeddings[concept] = self._embed_concept(concept) def add_relation(self, concept1: str, relation: str, concept2: str, strength: float = 1.0): """添加概念关系""" self.add_concept(concept1) self.add_concept(concept2) self.concept_graph.add_edge( concept1, concept2, relation=relation, strength=strength, created_at=time.time() ) def activate_concept(self, concept: str, activation: float = 1.0): """激活概念(扩散激活)""" if concept not in self.concept_graph: return # 设置初始激活 self.concept_graph.nodes[concept]["activation"] = activation self.concept_graph.nodes[concept]["last_activated"] = time.time() # 扩散激活 self._spread_activation(concept, activation, decay=0.5, depth=3) def _spread_activation(self, source: str, activation: float, decay: float, depth: int): """扩散激活算法""" if depth <= 0 or activation < 0.1: return # 激活相邻节点 for neighbor in self.concept_graph.neighbors(source): edge_data = self.concept_graph[source][neighbor] spread_activation = activation * edge_data["strength"] * decay current_activation = self.concept_graph.nodes[neighbor].get("activation", 0) new_activation = current_activation + spread_activation self.concept_graph.nodes[neighbor]["activation"] = min(1.0, new_activation) # 递归扩散 self._spread_activation(neighbor, spread_activation, decay, depth - 1) def query_concepts(self, query: str, k: int = 5) -> List[str]: """查询相关概念""" # 激活查询相关概念 query_concepts = self._extract_concepts_from_text(query) for concept in query_concepts: self.activate_concept(concept) # 获取激活度最高的概念 activated_concepts = [ (node, data["activation"]) for node, data in self.concept_graph.nodes(data=True) if data["activation"] > 0 ] activated_concepts.sort(key=lambda x: x[1], reverse=True) return [concept for concept, _ in activated_concepts[:k]] 4.2 知识推理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class SemanticReasoning: def __init__(self, semantic_memory: SemanticMemory): self.semantic_memory = semantic_memory def infer_relations(self, concept1: str, concept2: str) -> List[Dict]: """推理两个概念之间的关系""" inferences = [] # 直接关系 if self.semantic_memory.concept_graph.has_edge(concept1, concept2): edge_data = self.semantic_memory.concept_graph[concept1][concept2] inferences.append({ "type": "direct", "relation": edge_data["relation"], "confidence": edge_data["strength"] }) # 传递关系 try: paths = list(nx.all_simple_paths( self.semantic_memory.concept_graph, concept1, concept2, cutoff=3 )) for path in paths: if len(path) > 2: inference = self._analyze_path(path) inferences.append(inference) except nx.NetworkXNoPath: pass # 类比推理 analogies = self._find_analogies(concept1, concept2) inferences.extend(analogies) return inferences def _find_analogies(self, concept1: str, concept2: str) -> List[Dict]: """查找类比关系""" analogies = [] # 获取concept1的关系模式 patterns1 = self._get_relation_patterns(concept1) # 查找相似模式 for node in self.semantic_memory.concept_graph.nodes(): if node != concept1: patterns = self._get_relation_patterns(node) similarity = self._pattern_similarity(patterns1, patterns) if similarity > 0.7: analogies.append({ "type": "analogy", "base": concept1, "target": node, "mapped_to": concept2, "confidence": similarity }) return analogies 5. 程序记忆系统 5.1 技能学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @dataclass class Skill: name: str steps: List[Dict] preconditions: List[str] postconditions: List[str] success_rate: float = 0.0 execution_count: int = 0 class ProceduralMemory: def __init__(self): self.skills = {} self.skill_hierarchy = nx.DiGraph() self.execution_history = [] def learn_skill(self, demonstration: List[Dict]) -> Skill: """从演示中学习技能""" # 提取动作序列 action_sequence = self._extract_actions(demonstration) # 识别前置和后置条件 preconditions = self._identify_preconditions(demonstration[0]) postconditions = self._identify_postconditions(demonstration[-1]) # 创建技能 skill = Skill( name=self._generate_skill_name(action_sequence), steps=action_sequence, preconditions=preconditions, postconditions=postconditions ) # 存储技能 self.skills[skill.name] = skill # 更新技能层次 self._update_skill_hierarchy(skill) return skill def execute_skill(self, skill_name: str, context: Dict) -> Dict: """执行技能""" if skill_name not in self.skills: return {"success": False, "error": "Skill not found"} skill = self.skills[skill_name] # 检查前置条件 if not self._check_preconditions(skill.preconditions, context): return {"success": False, "error": "Preconditions not met"} # 执行步骤 result = {"success": True, "steps_executed": []} for step in skill.steps: step_result = self._execute_step(step, context) result["steps_executed"].append(step_result) if not step_result["success"]: result["success"] = False result["error"] = f"Failed at step: {step}" break # 更新技能统计 skill.execution_count += 1 if result["success"]: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1) + 1) / skill.execution_count ) else: skill.success_rate = ( (skill.success_rate * (skill.execution_count - 1)) / skill.execution_count ) # 记录执行历史 self.execution_history.append({ "skill": skill_name, "context": context, "result": result, "timestamp": time.time() }) return result def compose_skills(self, goal: str) -> List[str]: """组合技能以达成目标""" # 查找能达成目标的技能序列 relevant_skills = self._find_relevant_skills(goal) # 规划技能执行顺序 skill_plan = self._plan_skill_sequence(relevant_skills, goal) return skill_plan 5.2 技能优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class SkillOptimizer: def __init__(self, procedural_memory: ProceduralMemory): self.procedural_memory = procedural_memory def optimize_skill(self, skill_name: str): """优化技能执行""" skill = self.procedural_memory.skills.get(skill_name) if not skill: return # 分析执行历史 history = [ h for h in self.procedural_memory.execution_history if h["skill"] == skill_name ] # 识别失败模式 failure_patterns = self._identify_failure_patterns(history) # 优化步骤 optimized_steps = self._optimize_steps( skill.steps, failure_patterns ) # 创建优化版本 optimized_skill = Skill( name=f"{skill_name}_optimized", steps=optimized_steps, preconditions=skill.preconditions, postconditions=skill.postconditions ) self.procedural_memory.skills[optimized_skill.name] = optimized_skill return optimized_skill def _identify_failure_patterns(self, history: List[Dict]) -> List[Dict]: """识别失败模式""" failures = [h for h in history if not h["result"]["success"]] patterns = [] for failure in failures: failed_step = failure["result"].get("error", "") context = failure["context"] pattern = { "step": failed_step, "context_conditions": self._extract_conditions(context), "frequency": 1 } # 合并相似模式 merged = False for existing_pattern in patterns: if self._patterns_similar(pattern, existing_pattern): existing_pattern["frequency"] += 1 merged = True break if not merged: patterns.append(pattern) return patterns 6. 记忆检索优化 6.1 上下文感知检索 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class ContextAwareRetrieval: def __init__(self, memory_system: MemorySystem): self.memory_system = memory_system self.context_window = 10 # 考虑最近10个交互 def retrieve(self, query: str, context: List[Dict]) -> List[Memory]: """上下文感知的记忆检索""" # 1. 提取上下文特征 context_features = self._extract_context_features(context) # 2. 扩展查询 expanded_query = self._expand_query_with_context(query, context_features) # 3. 多源检索 candidates = [] # 从长期记忆检索 ltm_results = self.memory_system.long_term_memory.retrieve( expanded_query, k=10 ) candidates.extend(ltm_results) # 从情景记忆检索 episodes = self.memory_system.episodic_memory.recall_similar_episodes( context_features, k=3 ) for episode in episodes: candidates.extend(self._extract_memories_from_episode(episode)) # 从语义记忆检索 concepts = self.memory_system.semantic_memory.query_concepts( query, k=5 ) for concept in concepts: candidates.extend(self._get_concept_memories(concept)) # 4. 重排和去重 unique_memories = self._deduplicate_memories(candidates) ranked_memories = self._rank_by_relevance( unique_memories, query, context_features ) return ranked_memories[:5] def _rank_by_relevance(self, memories: List[Memory], query: str, context: Dict) -> List[Memory]: """按相关性排序记忆""" scored_memories = [] for memory in memories: # 查询相关性 query_relevance = self._calculate_similarity( memory.content, query ) # 上下文相关性 context_relevance = self._calculate_context_relevance( memory, context ) # 时间相关性 time_relevance = self._calculate_time_relevance(memory) # 综合评分 score = ( 0.4 * query_relevance + 0.3 * context_relevance + 0.2 * time_relevance + 0.1 * memory.importance ) scored_memories.append((memory, score)) scored_memories.sort(key=lambda x: x[1], reverse=True) return [memory for memory, _ in scored_memories] 6.2 记忆链构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class MemoryChain: def __init__(self): self.chain_graph = nx.DiGraph() def build_memory_chain(self, seed_memory: Memory, memory_pool: List[Memory], max_length: int = 5) -> List[Memory]: """构建记忆链""" chain = [seed_memory] current = seed_memory while len(chain) < max_length: # 找到最相关的下一个记忆 next_memory = self._find_next_memory( current, memory_pool, chain ) if next_memory is None: break chain.append(next_memory) current = next_memory return chain def _find_next_memory(self, current: Memory, candidates: List[Memory], chain: List[Memory]) -> Optional[Memory]: """找到链中的下一个记忆""" best_memory = None best_score = -1 for candidate in candidates: if candidate in chain: continue # 计算连接强度 connection_strength = self._calculate_connection_strength( current, candidate ) # 计算多样性奖励 diversity_bonus = self._calculate_diversity_bonus( candidate, chain ) score = connection_strength + 0.2 * diversity_bonus if score > best_score: best_score = score best_memory = candidate return best_memory if best_score > 0.3 else None 7. 记忆更新与遗忘 7.1 自适应遗忘 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class AdaptiveForgetting: def __init__(self, base_decay_rate: float = 0.01): self.base_decay_rate = base_decay_rate def update_memories(self, memories: Dict[str, Memory]): """更新记忆(包括遗忘)""" current_time = time.time() to_forget = [] for memory_id, memory in memories.items(): # 计算遗忘曲线 time_since_access = current_time - (memory.last_accessed or memory.timestamp) time_since_creation = current_time - memory.timestamp # Ebbinghaus遗忘曲线 retention = self._calculate_retention( time_since_access, memory.access_count, memory.importance ) # 更新记忆强度 memory.strength = retention # 标记需要遗忘的记忆 if retention < 0.1 and time_since_creation > 86400: # 24小时 to_forget.append(memory_id) # 执行遗忘 for memory_id in to_forget: self._forget_memory(memories, memory_id) def _calculate_retention(self, time_elapsed: float, access_count: int, importance: float) -> float: """计算记忆保持率""" # 基础遗忘率 base_retention = np.exp(-self.base_decay_rate * time_elapsed / 3600) # 重复强化因子 repetition_factor = 1 + np.log1p(access_count) * 0.1 # 重要性调节 importance_factor = 1 + importance * 0.5 # 最终保持率 retention = min(1.0, base_retention * repetition_factor * importance_factor) return retention def _forget_memory(self, memories: Dict, memory_id: str): """遗忘记忆(不是删除,而是转为痕迹)""" memory = memories[memory_id] # 保留痕迹 trace = { "id": memory_id, "summary": self._create_summary(memory.content), "timestamp": memory.timestamp, "importance": memory.importance * 0.1 } # 存储痕迹(可以用于后续的重建) self._store_trace(trace) # 从活跃记忆中移除 del memories[memory_id] 8. 记忆系统集成 8.1 统一记忆接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class UnifiedMemoryInterface: def __init__(self): self.memory_system = MemorySystem() self.retrieval = ContextAwareRetrieval(self.memory_system) self.forgetting = AdaptiveForgetting() async def remember(self, content: Any, memory_type: MemoryType = None): """记住信息""" # 自动判断记忆类型 if memory_type is None: memory_type = self._infer_memory_type(content) if memory_type == MemoryType.EPISODIC: self.memory_system.episodic_memory.add_event(content) elif memory_type == MemoryType.SEMANTIC: concepts = self._extract_concepts(content) for concept in concepts: self.memory_system.semantic_memory.add_concept(concept) elif memory_type == MemoryType.PROCEDURAL: if self._is_skill_demonstration(content): self.memory_system.procedural_memory.learn_skill(content) else: # 默认存储到长期记忆 self.memory_system.long_term_memory.store(content) async def recall(self, query: str, context: List[Dict] = None) -> List[Any]: """回忆信息""" # 并行从各种记忆类型检索 tasks = [ self._recall_from_ltm(query), self._recall_from_episodic(query, context), self._recall_from_semantic(query), self._recall_from_procedural(query) ] results = await asyncio.gather(*tasks) # 合并和排序结果 all_memories = [] for result in results: all_memories.extend(result) # 去重和排序 unique_memories = self._deduplicate(all_memories) ranked_memories = self._rank_memories(unique_memories, query) return ranked_memories[:10] def reflect(self) -> Dict: """反思和总结记忆""" reflection = { "patterns": self.memory_system.episodic_memory.extract_patterns(), "important_concepts": self._get_important_concepts(), "skill_improvements": self._suggest_skill_improvements(), "memory_statistics": self._get_memory_stats() } return reflection 9. 最佳实践 分层存储:根据访问频率和重要性分层 智能遗忘:模拟人类遗忘曲线 关联构建:自动构建记忆间的关联 上下文感知:考虑当前上下文进行检索 持续学习:从交互中不断优化记忆系统 压缩策略:定期压缩和总结记忆 结论 一个优秀的Agent记忆系统不仅要能存储和检索信息,还要能够学习、关联、遗忘和总结。通过模拟人类记忆的多层次结构和处理机制,我们可以构建出更智能、更有"记忆"的AI Agent。 ...