引言

单个Agent虽然强大,但面对复杂任务时往往力不从心。多Agent系统通过协作分工,能够处理更复杂的问题,实现1+1>2的效果。本文深入探讨多Agent系统的设计原理、协作机制和实践案例。

1. 多Agent系统架构

graph TB
    subgraph "多Agent系统拓扑"
        subgraph "层次结构"
            M[管理Agent]
            M --> A1[执行Agent1]
            M --> A2[执行Agent2]
            M --> A3[执行Agent3]
        end
        
        subgraph "对等网络"
            P1[Agent1] <--> P2[Agent2]
            P2 <--> P3[Agent3]
            P3 <--> P1
        end
        
        subgraph "黑板模式"
            BB[共享黑板]
            B1[Agent1] --> BB
            B2[Agent2] --> BB
            B3[Agent3] --> BB
            BB --> B1
            BB --> B2
            BB --> B3
        end
    end
    
    style M fill:#ffeb3b,stroke:#f57c00,stroke-width:2px
    style BB fill:#e1f5fe,stroke:#01579b,stroke-width:2px

1.1 系统拓扑结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from enum import Enum
from typing import List, Dict, Any

class TopologyType(Enum):
    HIERARCHICAL = "hierarchical"  # 层次结构
    PEER_TO_PEER = "p2p"           # 对等网络
    BLACKBOARD = "blackboard"      # 黑板模式
    PIPELINE = "pipeline"          # 流水线
    HYBRID = "hybrid"              # 混合模式

class MultiAgentSystem:
    def __init__(self, topology: TopologyType):
        self.topology = topology
        self.agents = {}
        self.communication_channels = {}
        self.shared_memory = {}
        
    def add_agent(self, agent_id: str, agent: Any):
        """添加Agent到系统"""
        self.agents[agent_id] = agent
        self.setup_communication(agent_id)
    
    def setup_communication(self, agent_id: str):
        """设置通信通道"""
        if self.topology == TopologyType.HIERARCHICAL:
            self._setup_hierarchical_comm(agent_id)
        elif self.topology == TopologyType.PEER_TO_PEER:
            self._setup_p2p_comm(agent_id)

1.2 Agent角色定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AgentRole(Enum):
    COORDINATOR = "coordinator"      # 协调者
    EXECUTOR = "executor"           # 执行者
    VALIDATOR = "validator"         # 验证者
    MONITOR = "monitor"             # 监控者
    SPECIALIST = "specialist"       # 专家

class BaseAgent:
    def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]):
        self.agent_id = agent_id
        self.role = role
        self.capabilities = capabilities
        self.message_queue = []
        self.state = AgentState.IDLE
        
    async def receive_message(self, message: Dict):
        """接收消息"""
        self.message_queue.append(message)
        await self.process_message(message)
    
    async def send_message(self, recipient: str, content: Dict):
        """发送消息"""
        message = {
            "sender": self.agent_id,
            "recipient": recipient,
            "content": content,
            "timestamp": time.time()
        }
        await self.communication_channel.send(message)

2. 通信协议设计

2.1 消息格式定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class Message:
    sender_id: str
    recipient_id: str
    message_type: str  # REQUEST, RESPONSE, BROADCAST, etc.
    content: Any
    conversation_id: str
    timestamp: float
    priority: int = 0
    requires_response: bool = False
    
class MessageProtocol:
    """消息协议定义"""
    
    # 消息类型
    REQUEST = "REQUEST"
    RESPONSE = "RESPONSE"
    BROADCAST = "BROADCAST"
    SUBSCRIBE = "SUBSCRIBE"
    UNSUBSCRIBE = "UNSUBSCRIBE"
    HEARTBEAT = "HEARTBEAT"
    
    # 内容格式
    @staticmethod
    def create_task_request(task: str, requirements: Dict) -> Dict:
        return {
            "type": MessageProtocol.REQUEST,
            "task": task,
            "requirements": requirements,
            "deadline": None
        }
    
    @staticmethod
    def create_capability_announcement(capabilities: List[str]) -> Dict:
        return {
            "type": MessageProtocol.BROADCAST,
            "capabilities": capabilities,
            "availability": True
        }

2.2 通信中间件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
from collections import defaultdict

class CommunicationMiddleware:
    def __init__(self):
        self.subscribers = defaultdict(list)
        self.message_buffer = asyncio.Queue()
        self.routing_table = {}
        
    async def publish(self, topic: str, message: Message):
        """发布消息到主题"""
        subscribers = self.subscribers.get(topic, [])
        tasks = []
        for subscriber in subscribers:
            task = asyncio.create_task(
                subscriber.receive_message(message)
            )
            tasks.append(task)
        await asyncio.gather(*tasks)
    
    def subscribe(self, topic: str, agent: BaseAgent):
        """订阅主题"""
        self.subscribers[topic].append(agent)
    
    async def route_message(self, message: Message):
        """路由消息"""
        if message.recipient_id == "BROADCAST":
            await self.broadcast(message)
        else:
            recipient = self.routing_table.get(message.recipient_id)
            if recipient:
                await recipient.receive_message(message)

3. 任务分配与协调

3.1 任务分解策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class TaskDecomposer:
    def __init__(self, llm):
        self.llm = llm
        
    def decompose_task(self, task: str) -> List[Dict]:
        """将复杂任务分解为子任务"""
        prompt = f"""
        将以下任务分解为独立的子任务:
        任务:{task}
        
        要求:
        1. 每个子任务应该是独立可执行的
        2. 标注每个子任务所需的能力
        3. 标注子任务之间的依赖关系
        
        输出JSON格式:
        {{
            "subtasks": [
                {{
                    "id": "task_1",
                    "description": "...",
                    "required_capabilities": [...],
                    "dependencies": [],
                    "estimated_time": 0
                }}
            ]
        }}
        """
        
        response = self.llm.invoke(prompt)
        return json.loads(response)["subtasks"]

class TaskCoordinator:
    def __init__(self, agents: Dict[str, BaseAgent]):
        self.agents = agents
        self.task_queue = asyncio.Queue()
        self.task_assignments = {}
        
    async def assign_task(self, task: Dict):
        """分配任务给合适的Agent"""
        # 找到具备所需能力的Agent
        capable_agents = self.find_capable_agents(
            task["required_capabilities"]
        )
        
        if not capable_agents:
            return None
        
        # 选择最优Agent(考虑负载均衡)
        selected_agent = self.select_optimal_agent(capable_agents)
        
        # 分配任务
        self.task_assignments[task["id"]] = selected_agent.agent_id
        await selected_agent.receive_message(
            MessageProtocol.create_task_request(
                task["description"],
                task.get("requirements", {})
            )
        )
        
        return selected_agent.agent_id
    
    def find_capable_agents(self, required_capabilities: List[str]):
        """查找具备所需能力的Agent"""
        capable = []
        for agent in self.agents.values():
            if all(cap in agent.capabilities for cap in required_capabilities):
                capable.append(agent)
        return capable

3.2 协商机制

sequenceDiagram
    participant C as 协调Agent
    participant A1 as Agent1
    participant A2 as Agent2
    participant A3 as Agent3
    
    C->>A1: 任务公告
    C->>A2: 任务公告
    C->>A3: 任务公告
    
    Note over A1,A3: 评估任务能力
    
    A1-->>C: 投标(成本:10, 时间:5min)
    A2-->>C: 投标(成本:8, 时间:7min)
    A3-->>C: 不投标
    
    Note over C: 评估投标
    
    C->>A2: 授予合同
    A2->>C: 接受合同
    
    Note over A2: 执行任务
    
    A2->>C: 任务完成报告
    C->>A2: 确认完成
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ContractNetProtocol:
    """合同网协议实现"""
    
    def __init__(self, coordinator: BaseAgent):
        self.coordinator = coordinator
        self.bids = {}
        
    async def announce_task(self, task: Dict):
        """发布任务公告"""
        announcement = {
            "type": "TASK_ANNOUNCEMENT",
            "task": task,
            "deadline": time.time() + 10  # 10秒投标期
        }
        
        # 广播任务
        await self.coordinator.send_message(
            "BROADCAST",
            announcement
        )
        
        # 等待投标
        await asyncio.sleep(10)
        
        # 选择中标者
        winner = self.select_winner()
        if winner:
            await self.award_contract(winner, task)
    
    async def submit_bid(self, agent_id: str, bid: Dict):
        """提交投标"""
        self.bids[agent_id] = {
            "cost": bid.get("cost", float('inf')),
            "time": bid.get("estimated_time", float('inf')),
            "confidence": bid.get("confidence", 0)
        }
    
    def select_winner(self) -> Optional[str]:
        """选择中标Agent"""
        if not self.bids:
            return None
        
        # 综合评分(可自定义权重)
        best_agent = None
        best_score = float('-inf')
        
        for agent_id, bid in self.bids.items():
            score = (
                bid["confidence"] * 0.5 -
                bid["cost"] * 0.3 -
                bid["time"] * 0.2
            )
            if score > best_score:
                best_score = score
                best_agent = agent_id
        
        return best_agent

4. 知识共享机制

4.1 黑板系统

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class BlackboardSystem:
    """黑板系统实现"""
    
    def __init__(self):
        self.blackboard = {}
        self.knowledge_sources = []
        self.controller = None
        self.locks = {}
        
    async def write(self, key: str, value: Any, agent_id: str):
        """写入知识"""
        async with self.get_lock(key):
            self.blackboard[key] = {
                "value": value,
                "author": agent_id,
                "timestamp": time.time(),
                "version": self.get_version(key) + 1
            }
            
            # 通知订阅者
            await self.notify_subscribers(key, value)
    
    async def read(self, key: str) -> Any:
        """读取知识"""
        entry = self.blackboard.get(key)
        return entry["value"] if entry else None
    
    def subscribe(self, pattern: str, callback):
        """订阅知识更新"""
        self.knowledge_sources.append({
            "pattern": pattern,
            "callback": callback
        })
    
    async def notify_subscribers(self, key: str, value: Any):
        """通知订阅者"""
        for source in self.knowledge_sources:
            if self.match_pattern(key, source["pattern"]):
                await source["callback"](key, value)

4.2 知识图谱共享

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Tuple
import networkx as nx

class SharedKnowledgeGraph:
    def __init__(self):
        self.graph = nx.DiGraph()
        self.embeddings = {}  # 节点嵌入
        
    def add_knowledge(self, subject: str, predicate: str, object: str, 
                     agent_id: str, confidence: float = 1.0):
        """添加知识三元组"""
        # 添加节点
        if subject not in self.graph:
            self.graph.add_node(subject, type="entity")
        if object not in self.graph:
            self.graph.add_node(object, type="entity")
        
        # 添加边
        self.graph.add_edge(
            subject, object,
            predicate=predicate,
            contributor=agent_id,
            confidence=confidence,
            timestamp=time.time()
        )
    
    def query(self, query_type: str, **kwargs) -> List:
        """查询知识"""
        if query_type == "neighbors":
            node = kwargs.get("node")
            return list(self.graph.neighbors(node))
        
        elif query_type == "path":
            source = kwargs.get("source")
            target = kwargs.get("target")
            try:
                path = nx.shortest_path(self.graph, source, target)
                return path
            except nx.NetworkXNoPath:
                return []
        
        elif query_type == "subgraph":
            nodes = kwargs.get("nodes", [])
            return self.graph.subgraph(nodes)
    
    def merge_knowledge(self, other_graph: 'SharedKnowledgeGraph'):
        """合并其他Agent的知识"""
        for edge in other_graph.graph.edges(data=True):
            source, target, data = edge
            existing_edge = self.graph.get_edge_data(source, target)
            
            if existing_edge:
                # 更新置信度(加权平均)
                new_confidence = (
                    existing_edge["confidence"] + data["confidence"]
                ) / 2
                self.graph[source][target]["confidence"] = new_confidence
            else:
                self.add_knowledge(
                    source, data["predicate"], target,
                    data["contributor"], data["confidence"]
                )

5. 冲突解决机制

5.1 投票机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class VotingMechanism:
    def __init__(self, voting_type: str = "majority"):
        self.voting_type = voting_type
        self.votes = {}
        
    async def collect_votes(self, issue: str, options: List[str], 
                           voters: List[BaseAgent]):
        """收集投票"""
        self.votes[issue] = defaultdict(list)
        
        # 并行收集投票
        tasks = []
        for voter in voters:
            task = asyncio.create_task(
                self.get_vote(voter, issue, options)
            )
            tasks.append(task)
        
        votes = await asyncio.gather(*tasks)
        
        # 统计投票
        for voter, vote in zip(voters, votes):
            self.votes[issue][vote].append(voter.agent_id)
        
        return self.determine_winner(issue)
    
    async def get_vote(self, voter: BaseAgent, issue: str, 
                      options: List[str]) -> str:
        """获取单个Agent的投票"""
        vote_request = {
            "type": "VOTE_REQUEST",
            "issue": issue,
            "options": options
        }
        
        response = await voter.process_vote_request(vote_request)
        return response["vote"]
    
    def determine_winner(self, issue: str) -> str:
        """确定获胜选项"""
        vote_counts = {
            option: len(voters) 
            for option, voters in self.votes[issue].items()
        }
        
        if self.voting_type == "majority":
            return max(vote_counts, key=vote_counts.get)
        elif self.voting_type == "unanimous":
            if len(vote_counts) == 1:
                return list(vote_counts.keys())[0]
            return None

5.2 协商与妥协

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class NegotiationProtocol:
    def __init__(self):
        self.negotiation_history = []
        
    async def negotiate(self, agents: List[BaseAgent], issue: Dict):
        """多轮协商"""
        max_rounds = 5
        current_round = 0
        
        while current_round < max_rounds:
            proposals = await self.collect_proposals(agents, issue)
            
            # 评估提案
            evaluations = await self.evaluate_proposals(
                agents, proposals
            )
            
            # 检查是否达成共识
            consensus = self.check_consensus(evaluations)
            if consensus:
                return consensus
            
            # 生成反提案
            issue = self.generate_counter_proposal(evaluations)
            current_round += 1
        
        # 未达成共识,使用仲裁
        return await self.arbitrate(agents, issue)
    
    async def collect_proposals(self, agents: List[BaseAgent], 
                               issue: Dict) -> List[Dict]:
        """收集提案"""
        proposals = []
        for agent in agents:
            proposal = await agent.generate_proposal(issue)
            proposals.append({
                "agent_id": agent.agent_id,
                "proposal": proposal
            })
        return proposals

6. 实际应用案例

6.1 软件开发团队

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class SoftwareDevelopmentTeam:
    def __init__(self):
        self.product_manager = self.create_pm_agent()
        self.architect = self.create_architect_agent()
        self.developers = self.create_developer_agents(3)
        self.qa_engineer = self.create_qa_agent()
        self.devops = self.create_devops_agent()
        
    def create_pm_agent(self):
        return Agent(
            agent_id="pm_001",
            role=AgentRole.COORDINATOR,
            capabilities=["requirement_analysis", "planning"],
            llm=ChatOpenAI(model="gpt-4")
        )
    
    def create_architect_agent(self):
        return Agent(
            agent_id="architect_001",
            role=AgentRole.SPECIALIST,
            capabilities=["system_design", "tech_selection"],
            llm=ChatOpenAI(model="gpt-4")
        )
    
    async def develop_feature(self, feature_request: str):
        """开发新功能的完整流程"""
        
        # 1. PM分析需求
        requirements = await self.product_manager.analyze_requirements(
            feature_request
        )
        
        # 2. 架构师设计系统
        design = await self.architect.create_design(requirements)
        
        # 3. 分配开发任务
        tasks = self.decompose_development_tasks(design)
        development_results = await self.parallel_development(
            tasks, self.developers
        )
        
        # 4. QA测试
        test_results = await self.qa_engineer.test_feature(
            development_results
        )
        
        # 5. DevOps部署
        if test_results["passed"]:
            deployment = await self.devops.deploy(development_results)
            return deployment
        else:
            # 返回开发阶段修复bug
            return await self.fix_bugs(test_results["issues"])

6.2 研究分析团队

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ResearchTeam:
    def __init__(self):
        self.lead_researcher = Agent(
            "lead_001",
            AgentRole.COORDINATOR,
            ["research_planning", "synthesis"]
        )
        self.data_collectors = [
            Agent(f"collector_{i}", AgentRole.EXECUTOR, 
                 ["web_search", "data_extraction"])
            for i in range(3)
        ]
        self.analysts = [
            Agent(f"analyst_{i}", AgentRole.SPECIALIST,
                 ["data_analysis", "visualization"])
            for i in range(2)
        ]
        self.fact_checker = Agent(
            "checker_001",
            AgentRole.VALIDATOR,
            ["fact_checking", "source_verification"]
        )
    
    async def conduct_research(self, topic: str):
        """执行研究项目"""
        
        # 1. 制定研究计划
        research_plan = await self.lead_researcher.create_plan(topic)
        
        # 2. 并行数据收集
        data_collection_tasks = []
        for collector in self.data_collectors:
            task = asyncio.create_task(
                collector.collect_data(research_plan["queries"])
            )
            data_collection_tasks.append(task)
        
        raw_data = await asyncio.gather(*data_collection_tasks)
        
        # 3. 事实核查
        verified_data = await self.fact_checker.verify_data(raw_data)
        
        # 4. 数据分析
        analysis_results = []
        for analyst in self.analysts:
            result = await analyst.analyze(verified_data)
            analysis_results.append(result)
        
        # 5. 综合报告
        final_report = await self.lead_researcher.synthesize(
            analysis_results
        )
        
        return final_report

7. 性能优化

7.1 负载均衡

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class LoadBalancer:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
        self.agent_loads = {agent.agent_id: 0 for agent in agents}
        self.agent_performance = {
            agent.agent_id: {"avg_time": 0, "success_rate": 1.0}
            for agent in agents
        }
    
    def select_agent(self, task: Dict) -> BaseAgent:
        """选择负载最低的Agent"""
        # 计算综合得分
        scores = {}
        for agent in self.agents:
            if self.is_capable(agent, task):
                load_score = 1 / (1 + self.agent_loads[agent.agent_id])
                perf_score = self.agent_performance[agent.agent_id]["success_rate"]
                scores[agent.agent_id] = load_score * perf_score
        
        if not scores:
            return None
        
        # 选择得分最高的Agent
        best_agent_id = max(scores, key=scores.get)
        selected_agent = next(
            a for a in self.agents if a.agent_id == best_agent_id
        )
        
        # 更新负载
        self.agent_loads[best_agent_id] += 1
        
        return selected_agent
    
    def update_performance(self, agent_id: str, execution_time: float, 
                          success: bool):
        """更新Agent性能指标"""
        perf = self.agent_performance[agent_id]
        
        # 更新平均执行时间(指数移动平均)
        alpha = 0.3
        perf["avg_time"] = (
            alpha * execution_time + 
            (1 - alpha) * perf["avg_time"]
        )
        
        # 更新成功率
        perf["success_rate"] = (
            perf["success_rate"] * 0.9 + 
            (1.0 if success else 0.0) * 0.1
        )
        
        # 更新负载
        self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1)

7.2 缓存与共享

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SharedCache:
    def __init__(self, max_size: int = 1000):
        self.cache = {}
        self.access_count = defaultdict(int)
        self.max_size = max_size
        
    async def get(self, key: str) -> Any:
        """获取缓存"""
        if key in self.cache:
            self.access_count[key] += 1
            return self.cache[key]["value"]
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 3600):
        """设置缓存"""
        if len(self.cache) >= self.max_size:
            # LFU淘汰策略
            self.evict_least_frequent()
        
        self.cache[key] = {
            "value": value,
            "expire_at": time.time() + ttl,
            "set_by": None  # 可以记录是哪个Agent设置的
        }
    
    def evict_least_frequent(self):
        """淘汰最少使用的缓存"""
        if not self.cache:
            return
        
        least_used = min(
            self.cache.keys(),
            key=lambda k: self.access_count.get(k, 0)
        )
        del self.cache[least_used]
        del self.access_count[least_used]

8. 监控与调试

8.1 系统监控

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class SystemMonitor:
    def __init__(self):
        self.metrics = {
            "message_count": 0,
            "task_completed": 0,
            "task_failed": 0,
            "avg_response_time": 0,
            "agent_utilization": {}
        }
        self.event_log = []
        
    def log_event(self, event_type: str, details: Dict):
        """记录事件"""
        event = {
            "timestamp": time.time(),
            "type": event_type,
            "details": details
        }
        self.event_log.append(event)
        
        # 更新指标
        self.update_metrics(event)
    
    def update_metrics(self, event: Dict):
        """更新系统指标"""
        if event["type"] == "MESSAGE_SENT":
            self.metrics["message_count"] += 1
        elif event["type"] == "TASK_COMPLETED":
            self.metrics["task_completed"] += 1
        elif event["type"] == "TASK_FAILED":
            self.metrics["task_failed"] += 1
    
    def generate_report(self) -> Dict:
        """生成监控报告"""
        return {
            "metrics": self.metrics,
            "health_status": self.check_health(),
            "bottlenecks": self.identify_bottlenecks(),
            "recommendations": self.generate_recommendations()
        }
    
    def check_health(self) -> str:
        """检查系统健康状态"""
        success_rate = (
            self.metrics["task_completed"] / 
            max(1, self.metrics["task_completed"] + self.metrics["task_failed"])
        )
        
        if success_rate > 0.95:
            return "HEALTHY"
        elif success_rate > 0.8:
            return "DEGRADED"
        else:
            return "CRITICAL"

8.2 调试工具

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Debugger:
    def __init__(self, system: MultiAgentSystem):
        self.system = system
        self.breakpoints = set()
        self.watch_list = {}
        
    def set_breakpoint(self, agent_id: str, event_type: str):
        """设置断点"""
        self.breakpoints.add((agent_id, event_type))
    
    async def trace_execution(self, duration: int = 60):
        """追踪执行过程"""
        trace_data = []
        start_time = time.time()
        
        while time.time() - start_time < duration:
            for agent_id, agent in self.system.agents.items():
                state = {
                    "agent_id": agent_id,
                    "state": agent.state,
                    "queue_size": len(agent.message_queue),
                    "timestamp": time.time()
                }
                trace_data.append(state)
            
            await asyncio.sleep(1)
        
        return self.analyze_trace(trace_data)
    
    def analyze_trace(self, trace_data: List[Dict]):
        """分析追踪数据"""
        analysis = {
            "deadlocks": self.detect_deadlocks(trace_data),
            "performance_issues": self.detect_performance_issues(trace_data),
            "communication_patterns": self.analyze_communication(trace_data)
        }
        return analysis

9. 最佳实践

  1. 明确的角色定义:每个Agent应有清晰的职责边界
  2. 高效的通信协议:减少不必要的消息传递
  3. 容错机制:处理Agent失败的情况
  4. 可扩展性设计:支持动态添加/移除Agent
  5. 监控和日志:全面的系统监控
  6. 测试策略:包括单元测试和集成测试

结论

多Agent系统通过协作能够解决单个Agent无法处理的复杂问题。关键在于设计合理的架构、高效的通信机制和智能的协调策略。

参考资源