多Agent系统协作:构建智能团队的艺术
引言 单个Agent虽然强大,但面对复杂任务时往往力不从心。多Agent系统通过协作分工,能够处理更复杂的问题,实现1+1>2的效果。本文深入探讨多Agent系统的设计原理、协作机制和实践案例。 1. 多Agent系统架构 graph TB subgraph "多Agent系统拓扑" subgraph "层次结构" M[管理Agent] M --> A1[执行Agent1] M --> A2[执行Agent2] M --> A3[执行Agent3] end subgraph "对等网络" P1[Agent1] <--> P2[Agent2] P2 <--> P3[Agent3] P3 <--> P1 end subgraph "黑板模式" BB[共享黑板] B1[Agent1] --> BB B2[Agent2] --> BB B3[Agent3] --> BB BB --> B1 BB --> B2 BB --> B3 end end style M fill:#ffeb3b,stroke:#f57c00,stroke-width:2px style BB fill:#e1f5fe,stroke:#01579b,stroke-width:2px 1.1 系统拓扑结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from enum import Enum from typing import List, Dict, Any class TopologyType(Enum): HIERARCHICAL = "hierarchical" # 层次结构 PEER_TO_PEER = "p2p" # 对等网络 BLACKBOARD = "blackboard" # 黑板模式 PIPELINE = "pipeline" # 流水线 HYBRID = "hybrid" # 混合模式 class MultiAgentSystem: def __init__(self, topology: TopologyType): self.topology = topology self.agents = {} self.communication_channels = {} self.shared_memory = {} def add_agent(self, agent_id: str, agent: Any): """添加Agent到系统""" self.agents[agent_id] = agent self.setup_communication(agent_id) def setup_communication(self, agent_id: str): """设置通信通道""" if self.topology == TopologyType.HIERARCHICAL: self._setup_hierarchical_comm(agent_id) elif self.topology == TopologyType.PEER_TO_PEER: self._setup_p2p_comm(agent_id) 1.2 Agent角色定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class AgentRole(Enum): COORDINATOR = "coordinator" # 协调者 EXECUTOR = "executor" # 执行者 VALIDATOR = "validator" # 验证者 MONITOR = "monitor" # 监控者 SPECIALIST = "specialist" # 专家 class BaseAgent: def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]): self.agent_id = agent_id self.role = role self.capabilities = capabilities self.message_queue = [] self.state = AgentState.IDLE async def receive_message(self, message: Dict): """接收消息""" self.message_queue.append(message) await self.process_message(message) async def send_message(self, recipient: str, content: Dict): """发送消息""" message = { "sender": self.agent_id, "recipient": recipient, "content": content, "timestamp": time.time() } await self.communication_channel.send(message) 2. 通信协议设计 2.1 消息格式定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from dataclasses import dataclass from typing import Optional, Any @dataclass class Message: sender_id: str recipient_id: str message_type: str # REQUEST, RESPONSE, BROADCAST, etc. content: Any conversation_id: str timestamp: float priority: int = 0 requires_response: bool = False class MessageProtocol: """消息协议定义""" # 消息类型 REQUEST = "REQUEST" RESPONSE = "RESPONSE" BROADCAST = "BROADCAST" SUBSCRIBE = "SUBSCRIBE" UNSUBSCRIBE = "UNSUBSCRIBE" HEARTBEAT = "HEARTBEAT" # 内容格式 @staticmethod def create_task_request(task: str, requirements: Dict) -> Dict: return { "type": MessageProtocol.REQUEST, "task": task, "requirements": requirements, "deadline": None } @staticmethod def create_capability_announcement(capabilities: List[str]) -> Dict: return { "type": MessageProtocol.BROADCAST, "capabilities": capabilities, "availability": True } 2.2 通信中间件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncio from collections import defaultdict class CommunicationMiddleware: def __init__(self): self.subscribers = defaultdict(list) self.message_buffer = asyncio.Queue() self.routing_table = {} async def publish(self, topic: str, message: Message): """发布消息到主题""" subscribers = self.subscribers.get(topic, []) tasks = [] for subscriber in subscribers: task = asyncio.create_task( subscriber.receive_message(message) ) tasks.append(task) await asyncio.gather(*tasks) def subscribe(self, topic: str, agent: BaseAgent): """订阅主题""" self.subscribers[topic].append(agent) async def route_message(self, message: Message): """路由消息""" if message.recipient_id == "BROADCAST": await self.broadcast(message) else: recipient = self.routing_table.get(message.recipient_id) if recipient: await recipient.receive_message(message) 3. 任务分配与协调 3.1 任务分解策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class TaskDecomposer: def __init__(self, llm): self.llm = llm def decompose_task(self, task: str) -> List[Dict]: """将复杂任务分解为子任务""" prompt = f""" 将以下任务分解为独立的子任务: 任务:{task} 要求: 1. 每个子任务应该是独立可执行的 2. 标注每个子任务所需的能力 3. 标注子任务之间的依赖关系 输出JSON格式: {{ "subtasks": [ {{ "id": "task_1", "description": "...", "required_capabilities": [...], "dependencies": [], "estimated_time": 0 }} ] }} """ response = self.llm.invoke(prompt) return json.loads(response)["subtasks"] class TaskCoordinator: def __init__(self, agents: Dict[str, BaseAgent]): self.agents = agents self.task_queue = asyncio.Queue() self.task_assignments = {} async def assign_task(self, task: Dict): """分配任务给合适的Agent""" # 找到具备所需能力的Agent capable_agents = self.find_capable_agents( task["required_capabilities"] ) if not capable_agents: return None # 选择最优Agent(考虑负载均衡) selected_agent = self.select_optimal_agent(capable_agents) # 分配任务 self.task_assignments[task["id"]] = selected_agent.agent_id await selected_agent.receive_message( MessageProtocol.create_task_request( task["description"], task.get("requirements", {}) ) ) return selected_agent.agent_id def find_capable_agents(self, required_capabilities: List[str]): """查找具备所需能力的Agent""" capable = [] for agent in self.agents.values(): if all(cap in agent.capabilities for cap in required_capabilities): capable.append(agent) return capable 3.2 协商机制 sequenceDiagram participant C as 协调Agent participant A1 as Agent1 participant A2 as Agent2 participant A3 as Agent3 C->>A1: 任务公告 C->>A2: 任务公告 C->>A3: 任务公告 Note over A1,A3: 评估任务能力 A1-->>C: 投标(成本:10, 时间:5min) A2-->>C: 投标(成本:8, 时间:7min) A3-->>C: 不投标 Note over C: 评估投标 C->>A2: 授予合同 A2->>C: 接受合同 Note over A2: 执行任务 A2->>C: 任务完成报告 C->>A2: 确认完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class ContractNetProtocol: """合同网协议实现""" def __init__(self, coordinator: BaseAgent): self.coordinator = coordinator self.bids = {} async def announce_task(self, task: Dict): """发布任务公告""" announcement = { "type": "TASK_ANNOUNCEMENT", "task": task, "deadline": time.time() + 10 # 10秒投标期 } # 广播任务 await self.coordinator.send_message( "BROADCAST", announcement ) # 等待投标 await asyncio.sleep(10) # 选择中标者 winner = self.select_winner() if winner: await self.award_contract(winner, task) async def submit_bid(self, agent_id: str, bid: Dict): """提交投标""" self.bids[agent_id] = { "cost": bid.get("cost", float('inf')), "time": bid.get("estimated_time", float('inf')), "confidence": bid.get("confidence", 0) } def select_winner(self) -> Optional[str]: """选择中标Agent""" if not self.bids: return None # 综合评分(可自定义权重) best_agent = None best_score = float('-inf') for agent_id, bid in self.bids.items(): score = ( bid["confidence"] * 0.5 - bid["cost"] * 0.3 - bid["time"] * 0.2 ) if score > best_score: best_score = score best_agent = agent_id return best_agent 4. 知识共享机制 4.1 黑板系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class BlackboardSystem: """黑板系统实现""" def __init__(self): self.blackboard = {} self.knowledge_sources = [] self.controller = None self.locks = {} async def write(self, key: str, value: Any, agent_id: str): """写入知识""" async with self.get_lock(key): self.blackboard[key] = { "value": value, "author": agent_id, "timestamp": time.time(), "version": self.get_version(key) + 1 } # 通知订阅者 await self.notify_subscribers(key, value) async def read(self, key: str) -> Any: """读取知识""" entry = self.blackboard.get(key) return entry["value"] if entry else None def subscribe(self, pattern: str, callback): """订阅知识更新""" self.knowledge_sources.append({ "pattern": pattern, "callback": callback }) async def notify_subscribers(self, key: str, value: Any): """通知订阅者""" for source in self.knowledge_sources: if self.match_pattern(key, source["pattern"]): await source["callback"](key, value) 4.2 知识图谱共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 from typing import Tuple import networkx as nx class SharedKnowledgeGraph: def __init__(self): self.graph = nx.DiGraph() self.embeddings = {} # 节点嵌入 def add_knowledge(self, subject: str, predicate: str, object: str, agent_id: str, confidence: float = 1.0): """添加知识三元组""" # 添加节点 if subject not in self.graph: self.graph.add_node(subject, type="entity") if object not in self.graph: self.graph.add_node(object, type="entity") # 添加边 self.graph.add_edge( subject, object, predicate=predicate, contributor=agent_id, confidence=confidence, timestamp=time.time() ) def query(self, query_type: str, **kwargs) -> List: """查询知识""" if query_type == "neighbors": node = kwargs.get("node") return list(self.graph.neighbors(node)) elif query_type == "path": source = kwargs.get("source") target = kwargs.get("target") try: path = nx.shortest_path(self.graph, source, target) return path except nx.NetworkXNoPath: return [] elif query_type == "subgraph": nodes = kwargs.get("nodes", []) return self.graph.subgraph(nodes) def merge_knowledge(self, other_graph: 'SharedKnowledgeGraph'): """合并其他Agent的知识""" for edge in other_graph.graph.edges(data=True): source, target, data = edge existing_edge = self.graph.get_edge_data(source, target) if existing_edge: # 更新置信度(加权平均) new_confidence = ( existing_edge["confidence"] + data["confidence"] ) / 2 self.graph[source][target]["confidence"] = new_confidence else: self.add_knowledge( source, data["predicate"], target, data["contributor"], data["confidence"] ) 5. 冲突解决机制 5.1 投票机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class VotingMechanism: def __init__(self, voting_type: str = "majority"): self.voting_type = voting_type self.votes = {} async def collect_votes(self, issue: str, options: List[str], voters: List[BaseAgent]): """收集投票""" self.votes[issue] = defaultdict(list) # 并行收集投票 tasks = [] for voter in voters: task = asyncio.create_task( self.get_vote(voter, issue, options) ) tasks.append(task) votes = await asyncio.gather(*tasks) # 统计投票 for voter, vote in zip(voters, votes): self.votes[issue][vote].append(voter.agent_id) return self.determine_winner(issue) async def get_vote(self, voter: BaseAgent, issue: str, options: List[str]) -> str: """获取单个Agent的投票""" vote_request = { "type": "VOTE_REQUEST", "issue": issue, "options": options } response = await voter.process_vote_request(vote_request) return response["vote"] def determine_winner(self, issue: str) -> str: """确定获胜选项""" vote_counts = { option: len(voters) for option, voters in self.votes[issue].items() } if self.voting_type == "majority": return max(vote_counts, key=vote_counts.get) elif self.voting_type == "unanimous": if len(vote_counts) == 1: return list(vote_counts.keys())[0] return None 5.2 协商与妥协 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class NegotiationProtocol: def __init__(self): self.negotiation_history = [] async def negotiate(self, agents: List[BaseAgent], issue: Dict): """多轮协商""" max_rounds = 5 current_round = 0 while current_round < max_rounds: proposals = await self.collect_proposals(agents, issue) # 评估提案 evaluations = await self.evaluate_proposals( agents, proposals ) # 检查是否达成共识 consensus = self.check_consensus(evaluations) if consensus: return consensus # 生成反提案 issue = self.generate_counter_proposal(evaluations) current_round += 1 # 未达成共识,使用仲裁 return await self.arbitrate(agents, issue) async def collect_proposals(self, agents: List[BaseAgent], issue: Dict) -> List[Dict]: """收集提案""" proposals = [] for agent in agents: proposal = await agent.generate_proposal(issue) proposals.append({ "agent_id": agent.agent_id, "proposal": proposal }) return proposals 6. 实际应用案例 6.1 软件开发团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class SoftwareDevelopmentTeam: def __init__(self): self.product_manager = self.create_pm_agent() self.architect = self.create_architect_agent() self.developers = self.create_developer_agents(3) self.qa_engineer = self.create_qa_agent() self.devops = self.create_devops_agent() def create_pm_agent(self): return Agent( agent_id="pm_001", role=AgentRole.COORDINATOR, capabilities=["requirement_analysis", "planning"], llm=ChatOpenAI(model="gpt-4") ) def create_architect_agent(self): return Agent( agent_id="architect_001", role=AgentRole.SPECIALIST, capabilities=["system_design", "tech_selection"], llm=ChatOpenAI(model="gpt-4") ) async def develop_feature(self, feature_request: str): """开发新功能的完整流程""" # 1. PM分析需求 requirements = await self.product_manager.analyze_requirements( feature_request ) # 2. 架构师设计系统 design = await self.architect.create_design(requirements) # 3. 分配开发任务 tasks = self.decompose_development_tasks(design) development_results = await self.parallel_development( tasks, self.developers ) # 4. QA测试 test_results = await self.qa_engineer.test_feature( development_results ) # 5. DevOps部署 if test_results["passed"]: deployment = await self.devops.deploy(development_results) return deployment else: # 返回开发阶段修复bug return await self.fix_bugs(test_results["issues"]) 6.2 研究分析团队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class ResearchTeam: def __init__(self): self.lead_researcher = Agent( "lead_001", AgentRole.COORDINATOR, ["research_planning", "synthesis"] ) self.data_collectors = [ Agent(f"collector_{i}", AgentRole.EXECUTOR, ["web_search", "data_extraction"]) for i in range(3) ] self.analysts = [ Agent(f"analyst_{i}", AgentRole.SPECIALIST, ["data_analysis", "visualization"]) for i in range(2) ] self.fact_checker = Agent( "checker_001", AgentRole.VALIDATOR, ["fact_checking", "source_verification"] ) async def conduct_research(self, topic: str): """执行研究项目""" # 1. 制定研究计划 research_plan = await self.lead_researcher.create_plan(topic) # 2. 并行数据收集 data_collection_tasks = [] for collector in self.data_collectors: task = asyncio.create_task( collector.collect_data(research_plan["queries"]) ) data_collection_tasks.append(task) raw_data = await asyncio.gather(*data_collection_tasks) # 3. 事实核查 verified_data = await self.fact_checker.verify_data(raw_data) # 4. 数据分析 analysis_results = [] for analyst in self.analysts: result = await analyst.analyze(verified_data) analysis_results.append(result) # 5. 综合报告 final_report = await self.lead_researcher.synthesize( analysis_results ) return final_report 7. 性能优化 7.1 负载均衡 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class LoadBalancer: def __init__(self, agents: List[BaseAgent]): self.agents = agents self.agent_loads = {agent.agent_id: 0 for agent in agents} self.agent_performance = { agent.agent_id: {"avg_time": 0, "success_rate": 1.0} for agent in agents } def select_agent(self, task: Dict) -> BaseAgent: """选择负载最低的Agent""" # 计算综合得分 scores = {} for agent in self.agents: if self.is_capable(agent, task): load_score = 1 / (1 + self.agent_loads[agent.agent_id]) perf_score = self.agent_performance[agent.agent_id]["success_rate"] scores[agent.agent_id] = load_score * perf_score if not scores: return None # 选择得分最高的Agent best_agent_id = max(scores, key=scores.get) selected_agent = next( a for a in self.agents if a.agent_id == best_agent_id ) # 更新负载 self.agent_loads[best_agent_id] += 1 return selected_agent def update_performance(self, agent_id: str, execution_time: float, success: bool): """更新Agent性能指标""" perf = self.agent_performance[agent_id] # 更新平均执行时间(指数移动平均) alpha = 0.3 perf["avg_time"] = ( alpha * execution_time + (1 - alpha) * perf["avg_time"] ) # 更新成功率 perf["success_rate"] = ( perf["success_rate"] * 0.9 + (1.0 if success else 0.0) * 0.1 ) # 更新负载 self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1) 7.2 缓存与共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class SharedCache: def __init__(self, max_size: int = 1000): self.cache = {} self.access_count = defaultdict(int) self.max_size = max_size async def get(self, key: str) -> Any: """获取缓存""" if key in self.cache: self.access_count[key] += 1 return self.cache[key]["value"] return None async def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存""" if len(self.cache) >= self.max_size: # LFU淘汰策略 self.evict_least_frequent() self.cache[key] = { "value": value, "expire_at": time.time() + ttl, "set_by": None # 可以记录是哪个Agent设置的 } def evict_least_frequent(self): """淘汰最少使用的缓存""" if not self.cache: return least_used = min( self.cache.keys(), key=lambda k: self.access_count.get(k, 0) ) del self.cache[least_used] del self.access_count[least_used] 8. 监控与调试 8.1 系统监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class SystemMonitor: def __init__(self): self.metrics = { "message_count": 0, "task_completed": 0, "task_failed": 0, "avg_response_time": 0, "agent_utilization": {} } self.event_log = [] def log_event(self, event_type: str, details: Dict): """记录事件""" event = { "timestamp": time.time(), "type": event_type, "details": details } self.event_log.append(event) # 更新指标 self.update_metrics(event) def update_metrics(self, event: Dict): """更新系统指标""" if event["type"] == "MESSAGE_SENT": self.metrics["message_count"] += 1 elif event["type"] == "TASK_COMPLETED": self.metrics["task_completed"] += 1 elif event["type"] == "TASK_FAILED": self.metrics["task_failed"] += 1 def generate_report(self) -> Dict: """生成监控报告""" return { "metrics": self.metrics, "health_status": self.check_health(), "bottlenecks": self.identify_bottlenecks(), "recommendations": self.generate_recommendations() } def check_health(self) -> str: """检查系统健康状态""" success_rate = ( self.metrics["task_completed"] / max(1, self.metrics["task_completed"] + self.metrics["task_failed"]) ) if success_rate > 0.95: return "HEALTHY" elif success_rate > 0.8: return "DEGRADED" else: return "CRITICAL" 8.2 调试工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class Debugger: def __init__(self, system: MultiAgentSystem): self.system = system self.breakpoints = set() self.watch_list = {} def set_breakpoint(self, agent_id: str, event_type: str): """设置断点""" self.breakpoints.add((agent_id, event_type)) async def trace_execution(self, duration: int = 60): """追踪执行过程""" trace_data = [] start_time = time.time() while time.time() - start_time < duration: for agent_id, agent in self.system.agents.items(): state = { "agent_id": agent_id, "state": agent.state, "queue_size": len(agent.message_queue), "timestamp": time.time() } trace_data.append(state) await asyncio.sleep(1) return self.analyze_trace(trace_data) def analyze_trace(self, trace_data: List[Dict]): """分析追踪数据""" analysis = { "deadlocks": self.detect_deadlocks(trace_data), "performance_issues": self.detect_performance_issues(trace_data), "communication_patterns": self.analyze_communication(trace_data) } return analysis 9. 最佳实践 明确的角色定义:每个Agent应有清晰的职责边界 高效的通信协议:减少不必要的消息传递 容错机制:处理Agent失败的情况 可扩展性设计:支持动态添加/移除Agent 监控和日志:全面的系统监控 测试策略:包括单元测试和集成测试 结论 多Agent系统通过协作能够解决单个Agent无法处理的复杂问题。关键在于设计合理的架构、高效的通信机制和智能的协调策略。 ...