单个Agent虽然强大,但面对复杂任务时往往力不从心。多Agent系统通过协作分工,能够处理更复杂的问题,实现1+1>2的效果。本文深入探讨多Agent系统的设计原理、协作机制和实践案例。
1. 多Agent系统架构#
graph TB
subgraph "多Agent系统拓扑"
subgraph "层次结构"
M[管理Agent]
M --> A1[执行Agent1]
M --> A2[执行Agent2]
M --> A3[执行Agent3]
end
subgraph "对等网络"
P1[Agent1] <--> P2[Agent2]
P2 <--> P3[Agent3]
P3 <--> P1
end
subgraph "黑板模式"
BB[共享黑板]
B1[Agent1] --> BB
B2[Agent2] --> BB
B3[Agent3] --> BB
BB --> B1
BB --> B2
BB --> B3
end
end
style M fill:#ffeb3b,stroke:#f57c00,stroke-width:2px
style BB fill:#e1f5fe,stroke:#01579b,stroke-width:2px
1.1 系统拓扑结构#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from enum import Enum
from typing import List, Dict, Any
class TopologyType(Enum):
HIERARCHICAL = "hierarchical" # 层次结构
PEER_TO_PEER = "p2p" # 对等网络
BLACKBOARD = "blackboard" # 黑板模式
PIPELINE = "pipeline" # 流水线
HYBRID = "hybrid" # 混合模式
class MultiAgentSystem:
def __init__(self, topology: TopologyType):
self.topology = topology
self.agents = {}
self.communication_channels = {}
self.shared_memory = {}
def add_agent(self, agent_id: str, agent: Any):
"""添加Agent到系统"""
self.agents[agent_id] = agent
self.setup_communication(agent_id)
def setup_communication(self, agent_id: str):
"""设置通信通道"""
if self.topology == TopologyType.HIERARCHICAL:
self._setup_hierarchical_comm(agent_id)
elif self.topology == TopologyType.PEER_TO_PEER:
self._setup_p2p_comm(agent_id)
|
1.2 Agent角色定义#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| class AgentRole(Enum):
COORDINATOR = "coordinator" # 协调者
EXECUTOR = "executor" # 执行者
VALIDATOR = "validator" # 验证者
MONITOR = "monitor" # 监控者
SPECIALIST = "specialist" # 专家
class BaseAgent:
def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_queue = []
self.state = AgentState.IDLE
async def receive_message(self, message: Dict):
"""接收消息"""
self.message_queue.append(message)
await self.process_message(message)
async def send_message(self, recipient: str, content: Dict):
"""发送消息"""
message = {
"sender": self.agent_id,
"recipient": recipient,
"content": content,
"timestamp": time.time()
}
await self.communication_channel.send(message)
|
2. 通信协议设计#
2.1 消息格式定义#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| from dataclasses import dataclass
from typing import Optional, Any
@dataclass
class Message:
sender_id: str
recipient_id: str
message_type: str # REQUEST, RESPONSE, BROADCAST, etc.
content: Any
conversation_id: str
timestamp: float
priority: int = 0
requires_response: bool = False
class MessageProtocol:
"""消息协议定义"""
# 消息类型
REQUEST = "REQUEST"
RESPONSE = "RESPONSE"
BROADCAST = "BROADCAST"
SUBSCRIBE = "SUBSCRIBE"
UNSUBSCRIBE = "UNSUBSCRIBE"
HEARTBEAT = "HEARTBEAT"
# 内容格式
@staticmethod
def create_task_request(task: str, requirements: Dict) -> Dict:
return {
"type": MessageProtocol.REQUEST,
"task": task,
"requirements": requirements,
"deadline": None
}
@staticmethod
def create_capability_announcement(capabilities: List[str]) -> Dict:
return {
"type": MessageProtocol.BROADCAST,
"capabilities": capabilities,
"availability": True
}
|
2.2 通信中间件#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| import asyncio
from collections import defaultdict
class CommunicationMiddleware:
def __init__(self):
self.subscribers = defaultdict(list)
self.message_buffer = asyncio.Queue()
self.routing_table = {}
async def publish(self, topic: str, message: Message):
"""发布消息到主题"""
subscribers = self.subscribers.get(topic, [])
tasks = []
for subscriber in subscribers:
task = asyncio.create_task(
subscriber.receive_message(message)
)
tasks.append(task)
await asyncio.gather(*tasks)
def subscribe(self, topic: str, agent: BaseAgent):
"""订阅主题"""
self.subscribers[topic].append(agent)
async def route_message(self, message: Message):
"""路由消息"""
if message.recipient_id == "BROADCAST":
await self.broadcast(message)
else:
recipient = self.routing_table.get(message.recipient_id)
if recipient:
await recipient.receive_message(message)
|
3. 任务分配与协调#
3.1 任务分解策略#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| class TaskDecomposer:
def __init__(self, llm):
self.llm = llm
def decompose_task(self, task: str) -> List[Dict]:
"""将复杂任务分解为子任务"""
prompt = f"""
将以下任务分解为独立的子任务:
任务:{task}
要求:
1. 每个子任务应该是独立可执行的
2. 标注每个子任务所需的能力
3. 标注子任务之间的依赖关系
输出JSON格式:
{{
"subtasks": [
{{
"id": "task_1",
"description": "...",
"required_capabilities": [...],
"dependencies": [],
"estimated_time": 0
}}
]
}}
"""
response = self.llm.invoke(prompt)
return json.loads(response)["subtasks"]
class TaskCoordinator:
def __init__(self, agents: Dict[str, BaseAgent]):
self.agents = agents
self.task_queue = asyncio.Queue()
self.task_assignments = {}
async def assign_task(self, task: Dict):
"""分配任务给合适的Agent"""
# 找到具备所需能力的Agent
capable_agents = self.find_capable_agents(
task["required_capabilities"]
)
if not capable_agents:
return None
# 选择最优Agent(考虑负载均衡)
selected_agent = self.select_optimal_agent(capable_agents)
# 分配任务
self.task_assignments[task["id"]] = selected_agent.agent_id
await selected_agent.receive_message(
MessageProtocol.create_task_request(
task["description"],
task.get("requirements", {})
)
)
return selected_agent.agent_id
def find_capable_agents(self, required_capabilities: List[str]):
"""查找具备所需能力的Agent"""
capable = []
for agent in self.agents.values():
if all(cap in agent.capabilities for cap in required_capabilities):
capable.append(agent)
return capable
|
3.2 协商机制#
sequenceDiagram
participant C as 协调Agent
participant A1 as Agent1
participant A2 as Agent2
participant A3 as Agent3
C->>A1: 任务公告
C->>A2: 任务公告
C->>A3: 任务公告
Note over A1,A3: 评估任务能力
A1-->>C: 投标(成本:10, 时间:5min)
A2-->>C: 投标(成本:8, 时间:7min)
A3-->>C: 不投标
Note over C: 评估投标
C->>A2: 授予合同
A2->>C: 接受合同
Note over A2: 执行任务
A2->>C: 任务完成报告
C->>A2: 确认完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| class ContractNetProtocol:
"""合同网协议实现"""
def __init__(self, coordinator: BaseAgent):
self.coordinator = coordinator
self.bids = {}
async def announce_task(self, task: Dict):
"""发布任务公告"""
announcement = {
"type": "TASK_ANNOUNCEMENT",
"task": task,
"deadline": time.time() + 10 # 10秒投标期
}
# 广播任务
await self.coordinator.send_message(
"BROADCAST",
announcement
)
# 等待投标
await asyncio.sleep(10)
# 选择中标者
winner = self.select_winner()
if winner:
await self.award_contract(winner, task)
async def submit_bid(self, agent_id: str, bid: Dict):
"""提交投标"""
self.bids[agent_id] = {
"cost": bid.get("cost", float('inf')),
"time": bid.get("estimated_time", float('inf')),
"confidence": bid.get("confidence", 0)
}
def select_winner(self) -> Optional[str]:
"""选择中标Agent"""
if not self.bids:
return None
# 综合评分(可自定义权重)
best_agent = None
best_score = float('-inf')
for agent_id, bid in self.bids.items():
score = (
bid["confidence"] * 0.5 -
bid["cost"] * 0.3 -
bid["time"] * 0.2
)
if score > best_score:
best_score = score
best_agent = agent_id
return best_agent
|
4. 知识共享机制#
4.1 黑板系统#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| class BlackboardSystem:
"""黑板系统实现"""
def __init__(self):
self.blackboard = {}
self.knowledge_sources = []
self.controller = None
self.locks = {}
async def write(self, key: str, value: Any, agent_id: str):
"""写入知识"""
async with self.get_lock(key):
self.blackboard[key] = {
"value": value,
"author": agent_id,
"timestamp": time.time(),
"version": self.get_version(key) + 1
}
# 通知订阅者
await self.notify_subscribers(key, value)
async def read(self, key: str) -> Any:
"""读取知识"""
entry = self.blackboard.get(key)
return entry["value"] if entry else None
def subscribe(self, pattern: str, callback):
"""订阅知识更新"""
self.knowledge_sources.append({
"pattern": pattern,
"callback": callback
})
async def notify_subscribers(self, key: str, value: Any):
"""通知订阅者"""
for source in self.knowledge_sources:
if self.match_pattern(key, source["pattern"]):
await source["callback"](key, value)
|
4.2 知识图谱共享#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| from typing import Tuple
import networkx as nx
class SharedKnowledgeGraph:
def __init__(self):
self.graph = nx.DiGraph()
self.embeddings = {} # 节点嵌入
def add_knowledge(self, subject: str, predicate: str, object: str,
agent_id: str, confidence: float = 1.0):
"""添加知识三元组"""
# 添加节点
if subject not in self.graph:
self.graph.add_node(subject, type="entity")
if object not in self.graph:
self.graph.add_node(object, type="entity")
# 添加边
self.graph.add_edge(
subject, object,
predicate=predicate,
contributor=agent_id,
confidence=confidence,
timestamp=time.time()
)
def query(self, query_type: str, **kwargs) -> List:
"""查询知识"""
if query_type == "neighbors":
node = kwargs.get("node")
return list(self.graph.neighbors(node))
elif query_type == "path":
source = kwargs.get("source")
target = kwargs.get("target")
try:
path = nx.shortest_path(self.graph, source, target)
return path
except nx.NetworkXNoPath:
return []
elif query_type == "subgraph":
nodes = kwargs.get("nodes", [])
return self.graph.subgraph(nodes)
def merge_knowledge(self, other_graph: 'SharedKnowledgeGraph'):
"""合并其他Agent的知识"""
for edge in other_graph.graph.edges(data=True):
source, target, data = edge
existing_edge = self.graph.get_edge_data(source, target)
if existing_edge:
# 更新置信度(加权平均)
new_confidence = (
existing_edge["confidence"] + data["confidence"]
) / 2
self.graph[source][target]["confidence"] = new_confidence
else:
self.add_knowledge(
source, data["predicate"], target,
data["contributor"], data["confidence"]
)
|
5. 冲突解决机制#
5.1 投票机制#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| class VotingMechanism:
def __init__(self, voting_type: str = "majority"):
self.voting_type = voting_type
self.votes = {}
async def collect_votes(self, issue: str, options: List[str],
voters: List[BaseAgent]):
"""收集投票"""
self.votes[issue] = defaultdict(list)
# 并行收集投票
tasks = []
for voter in voters:
task = asyncio.create_task(
self.get_vote(voter, issue, options)
)
tasks.append(task)
votes = await asyncio.gather(*tasks)
# 统计投票
for voter, vote in zip(voters, votes):
self.votes[issue][vote].append(voter.agent_id)
return self.determine_winner(issue)
async def get_vote(self, voter: BaseAgent, issue: str,
options: List[str]) -> str:
"""获取单个Agent的投票"""
vote_request = {
"type": "VOTE_REQUEST",
"issue": issue,
"options": options
}
response = await voter.process_vote_request(vote_request)
return response["vote"]
def determine_winner(self, issue: str) -> str:
"""确定获胜选项"""
vote_counts = {
option: len(voters)
for option, voters in self.votes[issue].items()
}
if self.voting_type == "majority":
return max(vote_counts, key=vote_counts.get)
elif self.voting_type == "unanimous":
if len(vote_counts) == 1:
return list(vote_counts.keys())[0]
return None
|
5.2 协商与妥协#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| class NegotiationProtocol:
def __init__(self):
self.negotiation_history = []
async def negotiate(self, agents: List[BaseAgent], issue: Dict):
"""多轮协商"""
max_rounds = 5
current_round = 0
while current_round < max_rounds:
proposals = await self.collect_proposals(agents, issue)
# 评估提案
evaluations = await self.evaluate_proposals(
agents, proposals
)
# 检查是否达成共识
consensus = self.check_consensus(evaluations)
if consensus:
return consensus
# 生成反提案
issue = self.generate_counter_proposal(evaluations)
current_round += 1
# 未达成共识,使用仲裁
return await self.arbitrate(agents, issue)
async def collect_proposals(self, agents: List[BaseAgent],
issue: Dict) -> List[Dict]:
"""收集提案"""
proposals = []
for agent in agents:
proposal = await agent.generate_proposal(issue)
proposals.append({
"agent_id": agent.agent_id,
"proposal": proposal
})
return proposals
|
6. 实际应用案例#
6.1 软件开发团队#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| class SoftwareDevelopmentTeam:
def __init__(self):
self.product_manager = self.create_pm_agent()
self.architect = self.create_architect_agent()
self.developers = self.create_developer_agents(3)
self.qa_engineer = self.create_qa_agent()
self.devops = self.create_devops_agent()
def create_pm_agent(self):
return Agent(
agent_id="pm_001",
role=AgentRole.COORDINATOR,
capabilities=["requirement_analysis", "planning"],
llm=ChatOpenAI(model="gpt-4")
)
def create_architect_agent(self):
return Agent(
agent_id="architect_001",
role=AgentRole.SPECIALIST,
capabilities=["system_design", "tech_selection"],
llm=ChatOpenAI(model="gpt-4")
)
async def develop_feature(self, feature_request: str):
"""开发新功能的完整流程"""
# 1. PM分析需求
requirements = await self.product_manager.analyze_requirements(
feature_request
)
# 2. 架构师设计系统
design = await self.architect.create_design(requirements)
# 3. 分配开发任务
tasks = self.decompose_development_tasks(design)
development_results = await self.parallel_development(
tasks, self.developers
)
# 4. QA测试
test_results = await self.qa_engineer.test_feature(
development_results
)
# 5. DevOps部署
if test_results["passed"]:
deployment = await self.devops.deploy(development_results)
return deployment
else:
# 返回开发阶段修复bug
return await self.fix_bugs(test_results["issues"])
|
6.2 研究分析团队#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| class ResearchTeam:
def __init__(self):
self.lead_researcher = Agent(
"lead_001",
AgentRole.COORDINATOR,
["research_planning", "synthesis"]
)
self.data_collectors = [
Agent(f"collector_{i}", AgentRole.EXECUTOR,
["web_search", "data_extraction"])
for i in range(3)
]
self.analysts = [
Agent(f"analyst_{i}", AgentRole.SPECIALIST,
["data_analysis", "visualization"])
for i in range(2)
]
self.fact_checker = Agent(
"checker_001",
AgentRole.VALIDATOR,
["fact_checking", "source_verification"]
)
async def conduct_research(self, topic: str):
"""执行研究项目"""
# 1. 制定研究计划
research_plan = await self.lead_researcher.create_plan(topic)
# 2. 并行数据收集
data_collection_tasks = []
for collector in self.data_collectors:
task = asyncio.create_task(
collector.collect_data(research_plan["queries"])
)
data_collection_tasks.append(task)
raw_data = await asyncio.gather(*data_collection_tasks)
# 3. 事实核查
verified_data = await self.fact_checker.verify_data(raw_data)
# 4. 数据分析
analysis_results = []
for analyst in self.analysts:
result = await analyst.analyze(verified_data)
analysis_results.append(result)
# 5. 综合报告
final_report = await self.lead_researcher.synthesize(
analysis_results
)
return final_report
|
7. 性能优化#
7.1 负载均衡#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| class LoadBalancer:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self.agent_loads = {agent.agent_id: 0 for agent in agents}
self.agent_performance = {
agent.agent_id: {"avg_time": 0, "success_rate": 1.0}
for agent in agents
}
def select_agent(self, task: Dict) -> BaseAgent:
"""选择负载最低的Agent"""
# 计算综合得分
scores = {}
for agent in self.agents:
if self.is_capable(agent, task):
load_score = 1 / (1 + self.agent_loads[agent.agent_id])
perf_score = self.agent_performance[agent.agent_id]["success_rate"]
scores[agent.agent_id] = load_score * perf_score
if not scores:
return None
# 选择得分最高的Agent
best_agent_id = max(scores, key=scores.get)
selected_agent = next(
a for a in self.agents if a.agent_id == best_agent_id
)
# 更新负载
self.agent_loads[best_agent_id] += 1
return selected_agent
def update_performance(self, agent_id: str, execution_time: float,
success: bool):
"""更新Agent性能指标"""
perf = self.agent_performance[agent_id]
# 更新平均执行时间(指数移动平均)
alpha = 0.3
perf["avg_time"] = (
alpha * execution_time +
(1 - alpha) * perf["avg_time"]
)
# 更新成功率
perf["success_rate"] = (
perf["success_rate"] * 0.9 +
(1.0 if success else 0.0) * 0.1
)
# 更新负载
self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1)
|
7.2 缓存与共享#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| class SharedCache:
def __init__(self, max_size: int = 1000):
self.cache = {}
self.access_count = defaultdict(int)
self.max_size = max_size
async def get(self, key: str) -> Any:
"""获取缓存"""
if key in self.cache:
self.access_count[key] += 1
return self.cache[key]["value"]
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
"""设置缓存"""
if len(self.cache) >= self.max_size:
# LFU淘汰策略
self.evict_least_frequent()
self.cache[key] = {
"value": value,
"expire_at": time.time() + ttl,
"set_by": None # 可以记录是哪个Agent设置的
}
def evict_least_frequent(self):
"""淘汰最少使用的缓存"""
if not self.cache:
return
least_used = min(
self.cache.keys(),
key=lambda k: self.access_count.get(k, 0)
)
del self.cache[least_used]
del self.access_count[least_used]
|
8. 监控与调试#
8.1 系统监控#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| class SystemMonitor:
def __init__(self):
self.metrics = {
"message_count": 0,
"task_completed": 0,
"task_failed": 0,
"avg_response_time": 0,
"agent_utilization": {}
}
self.event_log = []
def log_event(self, event_type: str, details: Dict):
"""记录事件"""
event = {
"timestamp": time.time(),
"type": event_type,
"details": details
}
self.event_log.append(event)
# 更新指标
self.update_metrics(event)
def update_metrics(self, event: Dict):
"""更新系统指标"""
if event["type"] == "MESSAGE_SENT":
self.metrics["message_count"] += 1
elif event["type"] == "TASK_COMPLETED":
self.metrics["task_completed"] += 1
elif event["type"] == "TASK_FAILED":
self.metrics["task_failed"] += 1
def generate_report(self) -> Dict:
"""生成监控报告"""
return {
"metrics": self.metrics,
"health_status": self.check_health(),
"bottlenecks": self.identify_bottlenecks(),
"recommendations": self.generate_recommendations()
}
def check_health(self) -> str:
"""检查系统健康状态"""
success_rate = (
self.metrics["task_completed"] /
max(1, self.metrics["task_completed"] + self.metrics["task_failed"])
)
if success_rate > 0.95:
return "HEALTHY"
elif success_rate > 0.8:
return "DEGRADED"
else:
return "CRITICAL"
|
8.2 调试工具#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| class Debugger:
def __init__(self, system: MultiAgentSystem):
self.system = system
self.breakpoints = set()
self.watch_list = {}
def set_breakpoint(self, agent_id: str, event_type: str):
"""设置断点"""
self.breakpoints.add((agent_id, event_type))
async def trace_execution(self, duration: int = 60):
"""追踪执行过程"""
trace_data = []
start_time = time.time()
while time.time() - start_time < duration:
for agent_id, agent in self.system.agents.items():
state = {
"agent_id": agent_id,
"state": agent.state,
"queue_size": len(agent.message_queue),
"timestamp": time.time()
}
trace_data.append(state)
await asyncio.sleep(1)
return self.analyze_trace(trace_data)
def analyze_trace(self, trace_data: List[Dict]):
"""分析追踪数据"""
analysis = {
"deadlocks": self.detect_deadlocks(trace_data),
"performance_issues": self.detect_performance_issues(trace_data),
"communication_patterns": self.analyze_communication(trace_data)
}
return analysis
|
9. 最佳实践#
- 明确的角色定义:每个Agent应有清晰的职责边界
- 高效的通信协议:减少不必要的消息传递
- 容错机制:处理Agent失败的情况
- 可扩展性设计:支持动态添加/移除Agent
- 监控和日志:全面的系统监控
- 测试策略:包括单元测试和集成测试
多Agent系统通过协作能够解决单个Agent无法处理的复杂问题。关键在于设计合理的架构、高效的通信机制和智能的协调策略。
参考资源#