LLM微调完全指南:从原理到生产部署

引言 大语言模型(LLM)的微调是将通用模型适配到特定任务的关键技术。本文全面介绍LLM微调的方法、技巧和最佳实践,包括全量微调、参数高效微调(PEFT)、强化学习微调等技术。 1. 微调基础架构 graph TB subgraph "LLM微调流程" D[原始数据] --> DP[数据预处理] DP --> DS[数据集分割] subgraph "微调方法" DS --> FT[全量微调] DS --> LORA[LoRA微调] DS --> QLORA[QLoRA微调] DS --> PT[Prefix Tuning] end subgraph "训练过程" LORA --> TR[训练循环] TR --> VAL[验证评估] VAL --> CK[检查点保存] CK --> TR end subgraph "优化技术" GC[梯度累积] MP[混合精度] GCP[梯度检查点] DS2[DeepSpeed] end TR -.-> GC TR -.-> MP TR -.-> GCP TR -.-> DS2 CK --> MD[模型部署] end style D fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style MD fill:#fff3e0,stroke:#ff9800,stroke-width:2px style LORA fill:#e3f2fd,stroke:#2196f3,stroke-width:3px 1.1 微调框架设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 from dataclasses import dataclass from typing import Optional, Dict, List, Union import torch from transformers import AutoModelForCausalLM, AutoTokenizer @dataclass class FineTuningConfig: model_name: str = "meta-llama/Llama-2-7b-hf" dataset_path: str = "./data/train.jsonl" output_dir: str = "./checkpoints" # 训练参数 learning_rate: float = 2e-5 batch_size: int = 4 gradient_accumulation_steps: int = 4 num_epochs: int = 3 warmup_ratio: float = 0.1 weight_decay: float = 0.01 # 优化参数 max_seq_length: int = 2048 gradient_checkpointing: bool = True mixed_precision: str = "fp16" # fp16, bf16, or None # LoRA参数 use_lora: bool = True lora_rank: int = 16 lora_alpha: int = 32 lora_dropout: float = 0.1 # 量化参数 use_quantization: bool = False quantization_bits: int = 4 class LLMFineTuner: def __init__(self, config: FineTuningConfig): self.config = config self.model = None self.tokenizer = None self.optimizer = None self.scheduler = None def setup_model(self): """设置模型和分词器""" # 加载分词器 self.tokenizer = AutoTokenizer.from_pretrained( self.config.model_name, trust_remote_code=True ) self.tokenizer.pad_token = self.tokenizer.eos_token # 加载模型 if self.config.use_quantization: self.model = self.load_quantized_model() else: self.model = AutoModelForCausalLM.from_pretrained( self.config.model_name, torch_dtype=torch.float16 if self.config.mixed_precision == "fp16" else torch.float32, device_map="auto", trust_remote_code=True ) # 应用LoRA if self.config.use_lora: self.apply_lora() # 启用梯度检查点 if self.config.gradient_checkpointing: self.model.gradient_checkpointing_enable() def apply_lora(self): """应用LoRA适配器""" from peft import LoraConfig, get_peft_model, TaskType lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=self.config.lora_rank, lora_alpha=self.config.lora_alpha, lora_dropout=self.config.lora_dropout, target_modules=["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], bias="none" ) self.model = get_peft_model(self.model, lora_config) self.model.print_trainable_parameters() 1.2 数据处理pipeline 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import json from torch.utils.data import Dataset, DataLoader from typing import List, Dict class InstructionDataset(Dataset): def __init__(self, data_path: str, tokenizer, max_length: int = 2048): self.tokenizer = tokenizer self.max_length = max_length self.data = self.load_data(data_path) def load_data(self, path: str) -> List[Dict]: """加载指令数据""" data = [] with open(path, 'r', encoding='utf-8') as f: for line in f: item = json.loads(line) data.append(item) return data def __len__(self): return len(self.data) def __getitem__(self, idx): item = self.data[idx] # 构建提示 prompt = self.build_prompt(item) # 分词 encoded = self.tokenizer( prompt, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt" ) # 创建标签(用于计算损失) labels = encoded["input_ids"].clone() # 将padding部分的标签设为-100(忽略) labels[labels == self.tokenizer.pad_token_id] = -100 return { "input_ids": encoded["input_ids"].squeeze(), "attention_mask": encoded["attention_mask"].squeeze(), "labels": labels.squeeze() } def build_prompt(self, item: Dict) -> str: """构建指令提示""" system_prompt = item.get("system", "You are a helpful assistant.") instruction = item.get("instruction", "") input_text = item.get("input", "") output = item.get("output", "") if input_text: prompt = f"""<|system|>{system_prompt}</s> <|user|>{instruction} Input: {input_text}</s> <|assistant|>{output}</s>""" else: prompt = f"""<|system|>{system_prompt}</s> <|user|>{instruction}</s> <|assistant|>{output}</s>""" return prompt class DataCollator: def __init__(self, tokenizer): self.tokenizer = tokenizer def __call__(self, batch): """批处理数据""" input_ids = torch.stack([item["input_ids"] for item in batch]) attention_mask = torch.stack([item["attention_mask"] for item in batch]) labels = torch.stack([item["labels"] for item in batch]) return { "input_ids": input_ids, "attention_mask": attention_mask, "labels": labels } 2. 参数高效微调(PEFT) 2.1 LoRA实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import torch.nn as nn import torch.nn.functional as F import math class LoRALayer(nn.Module): def __init__(self, in_features: int, out_features: int, rank: int = 16, alpha: int = 32, dropout: float = 0.1): super().__init__() self.rank = rank self.alpha = alpha self.scaling = alpha / rank # LoRA参数 self.lora_A = nn.Parameter(torch.zeros(rank, in_features)) self.lora_B = nn.Parameter(torch.zeros(out_features, rank)) self.lora_dropout = nn.Dropout(dropout) # 初始化 nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) nn.init.zeros_(self.lora_B) def forward(self, x: torch.Tensor, base_output: torch.Tensor) -> torch.Tensor: """LoRA前向传播""" if self.training: x = self.lora_dropout(x) # BA矩阵乘法 lora_output = x @ self.lora_A.T @ self.lora_B.T # 缩放并添加到基础输出 return base_output + lora_output * self.scaling class LoRALinear(nn.Module): def __init__(self, base_layer: nn.Linear, rank: int = 16, alpha: int = 32, dropout: float = 0.1): super().__init__() self.base_layer = base_layer self.lora = LoRALayer( base_layer.in_features, base_layer.out_features, rank, alpha, dropout ) # 冻结基础层 for param in self.base_layer.parameters(): param.requires_grad = False def forward(self, x: torch.Tensor) -> torch.Tensor: base_output = self.base_layer(x) return self.lora(x, base_output) def merge_weights(self): """合并LoRA权重到基础层""" with torch.no_grad(): self.base_layer.weight.data += ( self.lora.lora_B @ self.lora.lora_A ) * self.lora.scaling 2.2 QLoRA实现 graph LR subgraph "QLoRA架构" I[输入] --> Q4[4-bit量化模型] Q4 --> D[反量化] D --> B[基础计算] I --> LA[LoRA A矩阵FP16] LA --> LB[LoRA B矩阵FP16] B --> ADD[相加] LB --> ADD ADD --> O[输出] subgraph "内存优化" M1[模型: 4-bit] M2[LoRA: FP16] M3[梯度: FP16] end end style I fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style O fill:#fff3e0,stroke:#ff9800,stroke-width:2px style Q4 fill:#ffebee,stroke:#f44336,stroke-width:2px 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import bitsandbytes as bnb from transformers import BitsAndBytesConfig class QLoRAFineTuner: def __init__(self, model_name: str): self.model_name = model_name self.bnb_config = None self.model = None def setup_quantization(self): """设置4位量化配置""" self.bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) def load_quantized_model(self): """加载量化模型""" from transformers import AutoModelForCausalLM model = AutoModelForCausalLM.from_pretrained( self.model_name, quantization_config=self.bnb_config, device_map="auto", trust_remote_code=True ) # 准备模型用于k-bit训练 model = prepare_model_for_kbit_training(model) return model def apply_qlora(self, model): """应用QLoRA""" from peft import LoraConfig, get_peft_model # 找到所有Linear层 target_modules = self.find_linear_layers(model) config = LoraConfig( r=16, lora_alpha=32, target_modules=target_modules, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, config) return model def find_linear_layers(self, model): """找到所有可以应用LoRA的线性层""" linear_cls = bnb.nn.Linear4bit lora_module_names = set() for name, module in model.named_modules(): if isinstance(module, linear_cls): names = name.split('.') lora_module_names.add(names[-1]) # 排除一些层 if 'lm_head' in lora_module_names: lora_module_names.remove('lm_head') return list(lora_module_names) def prepare_model_for_kbit_training(model): """准备模型进行k-bit训练""" model.gradient_checkpointing_enable() # 将部分层转为fp32以提高稳定性 for param in model.parameters(): param.requires_grad = False if param.ndim == 1: param.data = param.data.to(torch.float32) # 启用输入层的梯度 model.enable_input_require_grads() return model 3. 训练优化技术 3.1 分布式训练 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp.fully_sharded_data_parallel import ( CPUOffload, BackwardPrefetch, ) class DistributedTrainer: def __init__(self, model, config): self.model = model self.config = config self.world_size = torch.cuda.device_count() def setup_ddp(self, rank: int): """设置DDP训练""" # 初始化进程组 dist.init_process_group( backend='nccl', init_method='env://', world_size=self.world_size, rank=rank ) # 设置设备 torch.cuda.set_device(rank) # 包装模型 self.model = self.model.to(rank) self.model = DDP( self.model, device_ids=[rank], output_device=rank, find_unused_parameters=False ) def setup_fsdp(self): """设置FSDP训练(完全分片数据并行)""" from torch.distributed.fsdp.wrap import ( size_based_auto_wrap_policy, transformer_auto_wrap_policy, ) # 自动包装策略 auto_wrap_policy = functools.partial( transformer_auto_wrap_policy, transformer_layer_cls={ transformers.models.llama.modeling_llama.LlamaDecoderLayer } ) # FSDP配置 self.model = FSDP( self.model, auto_wrap_policy=auto_wrap_policy, backward_prefetch=BackwardPrefetch.BACKWARD_PRE, cpu_offload=CPUOffload(offload_params=True), mixed_precision=self.get_mixed_precision_policy(), sharding_strategy=ShardingStrategy.FULL_SHARD, device_id=torch.cuda.current_device(), limit_all_gathers=True ) def get_mixed_precision_policy(self): """获取混合精度策略""" from torch.distributed.fsdp import MixedPrecision return MixedPrecision( param_dtype=torch.bfloat16, reduce_dtype=torch.bfloat16, buffer_dtype=torch.bfloat16, ) 3.2 梯度优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class GradientOptimizer: def __init__(self, model, config): self.model = model self.config = config def setup_optimizer(self): """设置优化器""" # 参数分组 param_groups = self.get_parameter_groups() # AdamW优化器 optimizer = torch.optim.AdamW( param_groups, lr=self.config.learning_rate, betas=(0.9, 0.95), eps=1e-8, weight_decay=self.config.weight_decay ) return optimizer def get_parameter_groups(self): """获取参数组(不同学习率)""" # 不需要weight decay的参数 no_decay = ["bias", "LayerNorm.weight", "layer_norm.weight"] # LoRA参数使用更高的学习率 lora_params = [] base_params_decay = [] base_params_no_decay = [] for name, param in self.model.named_parameters(): if not param.requires_grad: continue if "lora_" in name: lora_params.append(param) elif any(nd in name for nd in no_decay): base_params_no_decay.append(param) else: base_params_decay.append(param) param_groups = [ { "params": base_params_decay, "weight_decay": self.config.weight_decay, "lr": self.config.learning_rate }, { "params": base_params_no_decay, "weight_decay": 0.0, "lr": self.config.learning_rate }, { "params": lora_params, "weight_decay": 0.0, "lr": self.config.learning_rate * 2 # LoRA参数使用2倍学习率 } ] return param_groups def gradient_clipping(self, optimizer): """梯度裁剪""" torch.nn.utils.clip_grad_norm_( self.model.parameters(), max_norm=1.0 ) def gradient_accumulation_step(self, loss, step, optimizer): """梯度累积""" loss = loss / self.config.gradient_accumulation_steps loss.backward() if (step + 1) % self.config.gradient_accumulation_steps == 0: self.gradient_clipping(optimizer) optimizer.step() optimizer.zero_grad() 4. 强化学习微调(RLHF/DPO) 4.1 RLHF实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 from transformers import AutoModelForCausalLM import torch.nn.functional as F class RLHFTrainer: def __init__(self, policy_model, reward_model, reference_model): self.policy_model = policy_model self.reward_model = reward_model self.reference_model = reference_model # 冻结参考模型和奖励模型 for param in self.reference_model.parameters(): param.requires_grad = False for param in self.reward_model.parameters(): param.requires_grad = False def compute_rewards(self, prompts, responses): """计算奖励""" # 获取奖励模型评分 rewards = self.reward_model(prompts, responses) # KL惩罚 kl_penalty = self.compute_kl_penalty(prompts, responses) # 最终奖励 final_rewards = rewards - self.config.kl_coeff * kl_penalty return final_rewards def compute_kl_penalty(self, prompts, responses): """计算KL散度惩罚""" with torch.no_grad(): # 获取参考模型的对数概率 ref_logprobs = self.get_logprobs( self.reference_model, prompts, responses ) # 获取策略模型的对数概率 policy_logprobs = self.get_logprobs( self.policy_model, prompts, responses ) # KL散度 kl = policy_logprobs - ref_logprobs return kl.mean() def ppo_step(self, prompts, responses, rewards, old_logprobs): """PPO训练步骤""" # 获取当前策略的对数概率 logprobs = self.get_logprobs(self.policy_model, prompts, responses) # 计算比率 ratio = torch.exp(logprobs - old_logprobs) # 优势函数(这里简化为奖励) advantages = rewards # PPO损失 surr1 = ratio * advantages surr2 = torch.clamp( ratio, 1 - self.config.clip_range, 1 + self.config.clip_range ) * advantages policy_loss = -torch.min(surr1, surr2).mean() # 值函数损失(如果有值头) value_loss = 0 if hasattr(self.policy_model, 'value_head'): values = self.policy_model.value_head(prompts, responses) value_loss = F.mse_loss(values, rewards) # 熵奖励 entropy = self.compute_entropy(logprobs) # 总损失 loss = ( policy_loss + self.config.vf_coef * value_loss - self.config.entropy_coef * entropy ) return loss 4.2 DPO(Direct Preference Optimization) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class DPOTrainer: def __init__(self, model, reference_model, beta: float = 0.1): self.model = model self.reference_model = reference_model self.beta = beta # 冻结参考模型 for param in self.reference_model.parameters(): param.requires_grad = False def compute_dpo_loss(self, prompts, chosen, rejected): """计算DPO损失""" # 获取策略模型的对数概率 pi_logprobs_chosen = self.get_logprobs(self.model, prompts, chosen) pi_logprobs_rejected = self.get_logprobs(self.model, prompts, rejected) # 获取参考模型的对数概率 with torch.no_grad(): ref_logprobs_chosen = self.get_logprobs( self.reference_model, prompts, chosen ) ref_logprobs_rejected = self.get_logprobs( self.reference_model, prompts, rejected ) # 计算对数比率 pi_logratios = pi_logprobs_chosen - pi_logprobs_rejected ref_logratios = ref_logprobs_chosen - ref_logprobs_rejected # DPO损失 losses = -F.logsigmoid(self.beta * (pi_logratios - ref_logratios)) # 添加正则化 chosen_rewards = self.beta * ( pi_logprobs_chosen - ref_logprobs_chosen ).detach() rejected_rewards = self.beta * ( pi_logprobs_rejected - ref_logprobs_rejected ).detach() return losses.mean(), chosen_rewards, rejected_rewards def get_logprobs(self, model, prompts, responses): """获取响应的对数概率""" inputs = self.tokenizer( [p + r for p, r in zip(prompts, responses)], return_tensors="pt", padding=True, truncation=True ) with torch.no_grad() if model == self.reference_model else nullcontext(): outputs = model(**inputs, labels=inputs["input_ids"]) # 提取响应部分的对数概率 logits = outputs.logits labels = inputs["input_ids"] # 计算对数概率 logprobs = F.log_softmax(logits, dim=-1) # 获取标签对应的对数概率 selected_logprobs = torch.gather( logprobs, 2, labels.unsqueeze(-1) ).squeeze(-1) # 只计算响应部分 prompt_lens = [len(self.tokenizer(p)["input_ids"]) for p in prompts] response_logprobs = [] for i, prompt_len in enumerate(prompt_lens): response_logprobs.append( selected_logprobs[i, prompt_len:].sum() ) return torch.stack(response_logprobs) 5. 指令微调 5.1 指令数据构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 class InstructionDataBuilder: def __init__(self): self.templates = self.load_templates() def create_instruction_data(self, raw_data: List[Dict]) -> List[Dict]: """创建指令数据""" instruction_data = [] for item in raw_data: # 生成多样化的指令 instructions = self.generate_instructions(item) for instruction in instructions: formatted = self.format_instruction(instruction, item) instruction_data.append(formatted) return instruction_data def generate_instructions(self, item: Dict) -> List[str]: """生成多样化指令""" task_type = item.get("task_type", "general") instructions = [] if task_type == "qa": instructions.extend([ f"Answer the following question: {item['question']}", f"Please provide an answer to: {item['question']}", f"What is the answer to this question: {item['question']}", ]) elif task_type == "summarization": instructions.extend([ "Summarize the following text:", "Please provide a brief summary of:", "Create a concise summary for:", ]) elif task_type == "translation": instructions.extend([ f"Translate the following from {item['source_lang']} to {item['target_lang']}:", f"Please translate this text to {item['target_lang']}:", ]) return instructions def format_instruction(self, instruction: str, item: Dict) -> Dict: """格式化指令""" return { "instruction": instruction, "input": item.get("input", ""), "output": item.get("output", ""), "system": self.get_system_prompt(item.get("task_type", "general")) } def get_system_prompt(self, task_type: str) -> str: """获取系统提示""" system_prompts = { "qa": "You are a helpful question-answering assistant.", "summarization": "You are an expert at creating concise summaries.", "translation": "You are a professional translator.", "general": "You are a helpful AI assistant.", } return system_prompts.get(task_type, system_prompts["general"]) 5.2 Chain-of-Thought微调 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class CoTFineTuning: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer def create_cot_data(self, problems: List[Dict]) -> List[Dict]: """创建思维链数据""" cot_data = [] for problem in problems: # 生成思维链 cot_solution = self.generate_cot_solution(problem) # 格式化为训练数据 cot_item = { "instruction": problem["question"], "output": cot_solution, "system": "Let's think step by step." } cot_data.append(cot_item) return cot_data def generate_cot_solution(self, problem: Dict) -> str: """生成思维链解决方案""" steps = problem.get("solution_steps", []) cot_text = "Let me solve this step by step.\n\n" for i, step in enumerate(steps, 1): cot_text += f"Step {i}: {step['description']}\n" if "calculation" in step: cot_text += f"Calculation: {step['calculation']}\n" if "reasoning" in step: cot_text += f"Reasoning: {step['reasoning']}\n" cot_text += "\n" cot_text += f"Therefore, the answer is: {problem['answer']}" return cot_text def train_with_cot(self, train_data: List[Dict]): """使用思维链数据训练""" # 创建数据集 dataset = CoTDataset(train_data, self.tokenizer) dataloader = DataLoader(dataset, batch_size=4, shuffle=True) # 训练循环 optimizer = torch.optim.AdamW(self.model.parameters(), lr=1e-5) for epoch in range(3): for batch in dataloader: # 前向传播 outputs = self.model(**batch) loss = outputs.loss # 反向传播 loss.backward() optimizer.step() optimizer.zero_grad() 6. 数据增强技术 6.1 自动数据增强 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 class DataAugmentation: def __init__(self, base_model): self.base_model = base_model def paraphrase_augmentation(self, text: str, num_variants: int = 3): """释义增强""" paraphrases = [] prompts = [ "Rewrite the following in different words:", "Express the same idea differently:", "Paraphrase the following text:", ] for i in range(num_variants): prompt = f"{prompts[i % len(prompts)]} {text}" paraphrase = self.base_model.generate(prompt) paraphrases.append(paraphrase) return paraphrases def back_translation(self, text: str, intermediate_lang: str = "zh"): """回译增强""" # 翻译到中间语言 translated = self.translate(text, "en", intermediate_lang) # 翻译回原语言 back_translated = self.translate(translated, intermediate_lang, "en") return back_translated def instruction_augmentation(self, instruction: str, output: str): """指令增强""" augmented = [] # 改变指令风格 styles = ["formal", "casual", "detailed", "concise"] for style in styles: new_instruction = self.restyle_instruction(instruction, style) augmented.append({ "instruction": new_instruction, "output": output }) # 添加约束 constraints = [ "Answer in one sentence.", "Provide a detailed explanation.", "Use simple language.", "Include examples.", ] for constraint in constraints: augmented.append({ "instruction": f"{instruction} {constraint}", "output": self.modify_output(output, constraint) }) return augmented 6.2 合成数据生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class SyntheticDataGenerator: def __init__(self, generator_model): self.generator = generator_model def generate_qa_pairs(self, context: str, num_pairs: int = 5): """生成问答对""" qa_pairs = [] # 生成问题 questions = self.generate_questions(context, num_pairs) for question in questions: # 生成答案 answer = self.generate_answer(context, question) qa_pairs.append({ "question": question, "answer": answer, "context": context }) return qa_pairs def generate_instructions(self, capability: str, num_instructions: int = 10): """生成指令数据""" prompt = f"""Generate {num_instructions} diverse instructions that test the following capability: {capability} Format each instruction as: Instruction: [instruction text] Expected Output: [expected output] """ response = self.generator.generate(prompt) # 解析响应 instructions = self.parse_instructions(response) return instructions def self_instruct(self, seed_tasks: List[str], num_iterations: int = 3): """Self-Instruct方法""" all_instructions = seed_tasks.copy() for iteration in range(num_iterations): # 采样现有指令 sampled = random.sample(all_instructions, min(5, len(all_instructions))) # 生成新指令 new_instructions = self.generate_similar_instructions(sampled) # 过滤低质量指令 filtered = self.filter_instructions(new_instructions) # 添加到集合 all_instructions.extend(filtered) return all_instructions 7. 评估与验证 7.1 自动评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 from sklearn.metrics import accuracy_score, f1_score import numpy as np class ModelEvaluator: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer def evaluate_generation_quality(self, test_data: List[Dict]): """评估生成质量""" metrics = { "bleu": [], "rouge": [], "perplexity": [], "diversity": [], "coherence": [] } for item in test_data: # 生成响应 generated = self.generate_response(item["instruction"]) reference = item["output"] # 计算BLEU分数 bleu = self.calculate_bleu(generated, reference) metrics["bleu"].append(bleu) # 计算ROUGE分数 rouge = self.calculate_rouge(generated, reference) metrics["rouge"].append(rouge) # 计算困惑度 perplexity = self.calculate_perplexity(generated) metrics["perplexity"].append(perplexity) # 计算多样性 diversity = self.calculate_diversity([generated]) metrics["diversity"].append(diversity) # 计算连贯性 coherence = self.calculate_coherence(generated) metrics["coherence"].append(coherence) # 汇总指标 summary = { metric: np.mean(values) for metric, values in metrics.items() } return summary def calculate_perplexity(self, text: str) -> float: """计算困惑度""" inputs = self.tokenizer(text, return_tensors="pt") with torch.no_grad(): outputs = self.model(**inputs, labels=inputs["input_ids"]) loss = outputs.loss perplexity = torch.exp(loss) return perplexity.item() def human_eval_simulation(self, generated: str, reference: str): """模拟人类评估""" # 使用另一个模型作为评判者 judge_prompt = f""" Please rate the quality of the generated response compared to the reference. Generated: {generated} Reference: {reference} Rate on a scale of 1-5 for: 1. Relevance 2. Fluency 3. Informativeness 4. Correctness """ # 获取评分(这里应该使用评判模型) scores = self.get_judge_scores(judge_prompt) return scores 7.2 A/B测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class ABTesting: def __init__(self, model_a, model_b): self.model_a = model_a self.model_b = model_b self.results = {"model_a": [], "model_b": [], "ties": []} def run_comparison(self, test_prompts: List[str]): """运行A/B测试""" for prompt in test_prompts: # 生成响应 response_a = self.model_a.generate(prompt) response_b = self.model_b.generate(prompt) # 评估响应 winner = self.evaluate_responses(prompt, response_a, response_b) # 记录结果 if winner == "a": self.results["model_a"].append(prompt) elif winner == "b": self.results["model_b"].append(prompt) else: self.results["ties"].append(prompt) # 统计分析 stats = self.calculate_statistics() return stats def calculate_statistics(self): """计算统计结果""" total = sum(len(v) for v in self.results.values()) stats = { "model_a_win_rate": len(self.results["model_a"]) / total, "model_b_win_rate": len(self.results["model_b"]) / total, "tie_rate": len(self.results["ties"]) / total, "confidence": self.calculate_confidence_interval() } return stats 8. 生产部署 8.1 模型优化与部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class ModelDeployment: def __init__(self, model_path: str): self.model_path = model_path def optimize_for_inference(self): """推理优化""" # 加载模型 model = torch.load(self.model_path) # 合并LoRA权重 if hasattr(model, 'merge_and_unload'): model = model.merge_and_unload() # 转换为半精度 model = model.half() # TorchScript转换 scripted_model = torch.jit.script(model) # 优化 optimized = torch.jit.optimize_for_inference(scripted_model) return optimized def export_to_onnx(self, model, dummy_input): """导出ONNX格式""" torch.onnx.export( model, dummy_input, "model.onnx", export_params=True, opset_version=14, do_constant_folding=True, input_names=['input_ids', 'attention_mask'], output_names=['logits'], dynamic_axes={ 'input_ids': {0: 'batch_size', 1: 'sequence'}, 'attention_mask': {0: 'batch_size', 1: 'sequence'}, 'logits': {0: 'batch_size', 1: 'sequence'} } ) def create_serving_endpoint(self): """创建服务端点""" from fastapi import FastAPI import uvicorn app = FastAPI() # 加载模型 model = self.load_optimized_model() @app.post("/generate") async def generate(prompt: str, max_length: int = 100): # 生成响应 response = model.generate(prompt, max_length=max_length) return {"response": response} return app 9. 监控与维护 9.1 模型监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class ModelMonitoring: def __init__(self): self.metrics_buffer = [] def monitor_inference(self, model, inputs, outputs): """监控推理""" metrics = { "timestamp": time.time(), "input_length": len(inputs["input_ids"][0]), "output_length": len(outputs[0]), "latency": None, "perplexity": None } # 计算延迟 start_time = time.time() _ = model(inputs) metrics["latency"] = time.time() - start_time # 计算困惑度 with torch.no_grad(): loss = model(**inputs, labels=inputs["input_ids"]).loss metrics["perplexity"] = torch.exp(loss).item() self.metrics_buffer.append(metrics) # 检测异常 self.detect_anomalies(metrics) def detect_drift(self, current_distribution, reference_distribution): """检测分布漂移""" from scipy.stats import ks_2samp # KS检验 statistic, p_value = ks_2samp( current_distribution, reference_distribution ) # 检测显著漂移 if p_value < 0.05: self.alert_drift_detected(statistic, p_value) return p_value 10. 最佳实践 数据质量优先:高质量的数据比大量低质量数据更有价值 渐进式微调:从简单任务开始,逐步增加复杂度 参数高效:优先使用LoRA/QLoRA等PEFT方法 持续评估:建立完善的评估体系 版本管理:跟踪数据、模型和配置的版本 安全对齐:确保模型输出安全、无害 结论 LLM微调是一个系统工程,需要在数据、算法、工程等多个方面进行优化。通过合理的技术选择和细致的实施,可以将通用大模型成功适配到特定领域和任务。 ...

January 7, 2025 · 18 min · Chico Gong

RAG Agent实战:构建知识增强的智能助手

引言 检索增强生成(Retrieval-Augmented Generation, RAG)是提升LLM准确性和时效性的关键技术。本文详细介绍如何构建生产级的RAG Agent系统,包括文档处理、向量存储、检索优化和生成策略。 1. RAG架构设计 flowchart TB subgraph "RAG系统架构" U[用户查询] --> QP[查询处理] subgraph "检索模块" QP --> VR[向量检索] QP --> KR[关键词检索] VR --> HM[混合匹配] KR --> HM end subgraph "知识库" DS[(文档存储)] VS[(向量数据库)] IS[(倒排索引)] end HM --> RR[重排序] RR --> CA[上下文聚合] subgraph "生成模块" CA --> PE[提示工程] PE --> LLM[大语言模型] LLM --> PP[后处理] end PP --> R[响应输出] DS -.-> VR VS -.-> VR IS -.-> KR end style U fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style R fill:#fff3e0,stroke:#ff9800,stroke-width:2px style LLM fill:#e3f2fd,stroke:#2196f3,stroke-width:3px 1.1 系统架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from typing import List, Dict, Any, Optional import numpy as np from dataclasses import dataclass @dataclass class RAGConfig: chunk_size: int = 1000 chunk_overlap: int = 200 embedding_model: str = "text-embedding-ada-002" vector_store: str = "chromadb" retrieval_top_k: int = 5 rerank_top_k: int = 3 temperature: float = 0.7 class RAGAgent: def __init__(self, config: RAGConfig): self.config = config self.document_processor = DocumentProcessor(config) self.vector_store = self._init_vector_store() self.retriever = HybridRetriever(self.vector_store) self.reranker = CrossEncoderReranker() self.generator = AugmentedGenerator() def _init_vector_store(self): """初始化向量存储""" if self.config.vector_store == "chromadb": from chromadb import Client return ChromaVectorStore(Client()) elif self.config.vector_store == "pinecone": import pinecone return PineconeVectorStore(pinecone) elif self.config.vector_store == "weaviate": import weaviate return WeaviateVectorStore(weaviate.Client()) 1.2 文档处理管道 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class DocumentProcessor: def __init__(self, config: RAGConfig): self.config = config self.text_splitter = self._init_splitter() self.metadata_extractor = MetadataExtractor() def _init_splitter(self): """初始化文本分割器""" from langchain.text_splitter import RecursiveCharacterTextSplitter return RecursiveCharacterTextSplitter( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap, separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""], length_function=len ) def process_documents(self, documents: List[Dict]) -> List[Dict]: """处理文档""" processed_chunks = [] for doc in documents: # 提取元数据 metadata = self.metadata_extractor.extract(doc) # 预处理文本 cleaned_text = self.preprocess_text(doc["content"]) # 智能分块 chunks = self.smart_chunking(cleaned_text, metadata) # 添加上下文 chunks_with_context = self.add_context(chunks) processed_chunks.extend(chunks_with_context) return processed_chunks def smart_chunking(self, text: str, metadata: Dict) -> List[str]: """智能分块策略""" # 根据文档类型选择分块策略 doc_type = metadata.get("type", "general") if doc_type == "code": return self.chunk_code(text) elif doc_type == "table": return self.chunk_table(text) elif doc_type == "conversation": return self.chunk_conversation(text) else: return self.text_splitter.split_text(text) def chunk_code(self, code: str) -> List[str]: """代码分块""" import ast chunks = [] try: tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.ClassDef)): chunk = ast.get_source_segment(code, node) if chunk: chunks.append(chunk) except: # 回退到普通分块 chunks = self.text_splitter.split_text(code) return chunks 2. 向量存储与索引 2.1 多级索引策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class HierarchicalIndex: def __init__(self): self.document_index = {} # 文档级索引 self.chunk_index = {} # 块级索引 self.summary_index = {} # 摘要索引 def build_index(self, documents: List[Dict]): """构建多级索引""" for doc in documents: doc_id = doc["id"] # 文档级索引 self.document_index[doc_id] = { "title": doc.get("title", ""), "summary": self.generate_summary(doc["content"]), "metadata": doc.get("metadata", {}), "chunk_ids": [] } # 块级索引 chunks = self.create_chunks(doc["content"]) for i, chunk in enumerate(chunks): chunk_id = f"{doc_id}_chunk_{i}" self.chunk_index[chunk_id] = { "content": chunk, "doc_id": doc_id, "position": i, "embedding": None # 后续填充 } self.document_index[doc_id]["chunk_ids"].append(chunk_id) # 摘要索引 summary_embedding = self.embed_text( self.document_index[doc_id]["summary"] ) self.summary_index[doc_id] = summary_embedding def generate_summary(self, content: str) -> str: """生成文档摘要""" from transformers import pipeline summarizer = pipeline("summarization", model="facebook/bart-large-cnn") summary = summarizer(content, max_length=150, min_length=50)[0]["summary_text"] return summary 2.2 向量数据库操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class VectorStore: def __init__(self, embedding_model): self.embedding_model = embedding_model self.index = None self.metadata = {} async def add_documents(self, documents: List[Dict]): """添加文档到向量存储""" embeddings = [] metadatas = [] for doc in documents: # 生成嵌入 embedding = await self.embedding_model.embed(doc["content"]) embeddings.append(embedding) # 保存元数据 metadata = { "id": doc["id"], "source": doc.get("source", ""), "timestamp": doc.get("timestamp", time.time()), "type": doc.get("type", "text") } metadatas.append(metadata) # 批量插入 await self.batch_insert(embeddings, metadatas) async def batch_insert(self, embeddings: List[np.ndarray], metadatas: List[Dict]): """批量插入向量""" batch_size = 100 for i in range(0, len(embeddings), batch_size): batch_embeddings = embeddings[i:i+batch_size] batch_metadatas = metadatas[i:i+batch_size] # 插入到向量数据库 ids = self.index.add( embeddings=batch_embeddings, metadatas=batch_metadatas ) # 更新元数据映射 for id, metadata in zip(ids, batch_metadatas): self.metadata[id] = metadata 3. 混合检索策略 graph LR subgraph "混合检索流程" Q[查询] --> QE[查询扩展] QE --> V[向量检索] QE --> K[关键词检索] QE --> S[语义缓存] V --> VR[向量结果] K --> KR[关键词结果] S --> SR[缓存结果] VR --> F[结果融合RRF算法] KR --> F SR --> F F --> R[重排序] R --> T[Top-K结果] end style Q fill:#e1f5fe,stroke:#01579b,stroke-width:2px style T fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style F fill:#fff3e0,stroke:#e65100,stroke-width:2px 3.1 多路检索 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 class HybridRetriever: def __init__(self, vector_store): self.vector_store = vector_store self.bm25_index = BM25Index() self.semantic_cache = {} async def retrieve(self, query: str, top_k: int = 5) -> List[Dict]: """混合检索""" # 1. 向量检索 vector_results = await self.vector_search(query, top_k * 2) # 2. 关键词检索 keyword_results = self.keyword_search(query, top_k * 2) # 3. 语义缓存检索 cache_results = self.search_cache(query) # 4. 融合结果 fused_results = self.fuse_results( vector_results, keyword_results, cache_results ) return fused_results[:top_k] async def vector_search(self, query: str, top_k: int) -> List[Dict]: """向量相似度检索""" query_embedding = await self.embed_query(query) results = self.vector_store.similarity_search( query_embedding, k=top_k, filter=self.build_filter(query) ) return results def keyword_search(self, query: str, top_k: int) -> List[Dict]: """BM25关键词检索""" # 分词 tokens = self.tokenize(query) # BM25评分 scores = self.bm25_index.get_scores(tokens) # 获取top-k top_indices = np.argsort(scores)[-top_k:][::-1] results = [] for idx in top_indices: results.append({ "content": self.bm25_index.documents[idx], "score": scores[idx], "type": "keyword" }) return results def fuse_results(self, *result_sets) -> List[Dict]: """结果融合(RRF算法)""" from collections import defaultdict k = 60 # RRF常数 scores = defaultdict(float) documents = {} for results in result_sets: for rank, result in enumerate(results): doc_id = result.get("id", str(result)) scores[doc_id] += 1 / (k + rank + 1) documents[doc_id] = result # 按融合分数排序 sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True) fused_results = [] for doc_id in sorted_ids: doc = documents[doc_id] doc["fusion_score"] = scores[doc_id] fused_results.append(doc) return fused_results 3.2 查询扩展 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class QueryExpander: def __init__(self, llm): self.llm = llm self.synonym_dict = self.load_synonyms() async def expand_query(self, query: str) -> List[str]: """扩展查询""" expanded_queries = [query] # 1. 同义词扩展 synonyms = self.get_synonyms(query) expanded_queries.extend(synonyms) # 2. LLM改写 rephrased = await self.rephrase_query(query) expanded_queries.extend(rephrased) # 3. 假设文档生成(HyDE) hypothetical = await self.generate_hypothetical_document(query) expanded_queries.append(hypothetical) return expanded_queries async def rephrase_query(self, query: str) -> List[str]: """使用LLM改写查询""" prompt = f""" 请将以下查询改写成3个不同的版本,保持语义相同: 原始查询:{query} 改写版本: 1. 2. 3. """ response = await self.llm.agenerate([prompt]) rephrased = self.parse_rephrased(response) return rephrased async def generate_hypothetical_document(self, query: str) -> str: """生成假设文档""" prompt = f""" 假设你正在写一个文档来回答以下问题: {query} 请写出这个文档的第一段: """ response = await self.llm.agenerate([prompt]) return response.generations[0][0].text 4. 重排序机制 4.1 交叉编码器重排 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class CrossEncoderReranker: def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"): from sentence_transformers import CrossEncoder self.model = CrossEncoder(model_name) def rerank(self, query: str, documents: List[Dict], top_k: int = 3) -> List[Dict]: """重排序文档""" # 准备输入对 pairs = [] for doc in documents: pairs.append([query, doc["content"]]) # 计算相关性分数 scores = self.model.predict(pairs) # 添加分数并排序 for doc, score in zip(documents, scores): doc["rerank_score"] = float(score) # 按分数排序 reranked = sorted(documents, key=lambda x: x["rerank_score"], reverse=True) return reranked[:top_k] 4.2 多样性重排 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class DiversityReranker: def __init__(self, lambda_param: float = 0.5): self.lambda_param = lambda_param def rerank_with_diversity(self, documents: List[Dict], top_k: int = 5) -> List[Dict]: """MMR(最大边际相关性)重排""" import numpy as np from sklearn.metrics.pairwise import cosine_similarity # 获取文档嵌入 embeddings = np.array([doc["embedding"] for doc in documents]) # 初始化 selected = [] selected_embeddings = [] remaining = list(range(len(documents))) # 选择第一个文档(相关性最高) first_idx = 0 selected.append(first_idx) selected_embeddings.append(embeddings[first_idx]) remaining.remove(first_idx) # 迭代选择 while len(selected) < min(top_k, len(documents)): mmr_scores = [] for idx in remaining: # 相关性分数 relevance = documents[idx].get("score", 0) # 与已选文档的最大相似度 similarities = cosine_similarity( [embeddings[idx]], selected_embeddings )[0] max_similarity = max(similarities) if similarities.size > 0 else 0 # MMR分数 mmr = self.lambda_param * relevance - (1 - self.lambda_param) * max_similarity mmr_scores.append((idx, mmr)) # 选择MMR最高的文档 best_idx = max(mmr_scores, key=lambda x: x[1])[0] selected.append(best_idx) selected_embeddings.append(embeddings[best_idx]) remaining.remove(best_idx) # 返回重排后的文档 return [documents[idx] for idx in selected] 5. 生成策略优化 5.1 上下文优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class ContextOptimizer: def __init__(self, max_tokens: int = 4000): self.max_tokens = max_tokens def optimize_context(self, query: str, documents: List[Dict]) -> str: """优化上下文""" # 1. 压缩文档 compressed_docs = self.compress_documents(documents) # 2. 排序和截断 prioritized_docs = self.prioritize_content(query, compressed_docs) # 3. 格式化上下文 context = self.format_context(prioritized_docs) # 4. 确保不超过token限制 context = self.truncate_to_limit(context) return context def compress_documents(self, documents: List[Dict]) -> List[Dict]: """压缩文档内容""" compressed = [] for doc in documents: # 提取关键句子 key_sentences = self.extract_key_sentences(doc["content"]) compressed.append({ **doc, "compressed_content": " ".join(key_sentences) }) return compressed def extract_key_sentences(self, text: str, num_sentences: int = 3) -> List[str]: """提取关键句子""" from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np # 分句 sentences = text.split("。") if len(sentences) <= num_sentences: return sentences # 计算句子嵌入 model = SentenceTransformer('paraphrase-MiniLM-L6-v2') embeddings = model.encode(sentences) # 计算中心性 similarity_matrix = cosine_similarity(embeddings) scores = similarity_matrix.sum(axis=1) # 选择得分最高的句子 top_indices = np.argsort(scores)[-num_sentences:] # 保持原始顺序 top_indices = sorted(top_indices) return [sentences[i] for i in top_indices] 5.2 增强生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class AugmentedGenerator: def __init__(self, llm): self.llm = llm self.prompt_templates = self.load_templates() async def generate(self, query: str, context: str, generation_config: Dict = None) -> str: """增强生成""" # 1. 选择提示模板 template = self.select_template(query) # 2. 构建提示 prompt = self.build_prompt(template, query, context) # 3. 生成回答 response = await self.llm.agenerate( [prompt], **generation_config or {} ) # 4. 后处理 answer = self.postprocess(response.generations[0][0].text) # 5. 验证答案 if not self.validate_answer(answer, context): answer = await self.regenerate_with_constraints(query, context) return answer def build_prompt(self, template: str, query: str, context: str) -> str: """构建提示""" return template.format( context=context, query=query, current_date=datetime.now().strftime("%Y-%m-%d") ) def validate_answer(self, answer: str, context: str) -> bool: """验证答案""" # 检查是否基于上下文 if "根据提供的信息" not in answer and len(answer) > 100: return False # 检查是否包含幻觉 facts_in_answer = self.extract_facts(answer) facts_in_context = self.extract_facts(context) for fact in facts_in_answer: if not self.fact_supported(fact, facts_in_context): return False return True 6. 评估与优化 6.1 检索质量评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 class RetrievalEvaluator: def __init__(self): self.metrics = {} def evaluate(self, queries: List[str], ground_truth: List[List[str]], retrieved: List[List[str]]) -> Dict: """评估检索质量""" metrics = { "mrr": self.calculate_mrr(ground_truth, retrieved), "map": self.calculate_map(ground_truth, retrieved), "ndcg": self.calculate_ndcg(ground_truth, retrieved), "recall@k": {}, "precision@k": {} } for k in [1, 3, 5, 10]: metrics["recall@k"][k] = self.calculate_recall_at_k( ground_truth, retrieved, k ) metrics["precision@k"][k] = self.calculate_precision_at_k( ground_truth, retrieved, k ) return metrics def calculate_mrr(self, ground_truth: List[List[str]], retrieved: List[List[str]]) -> float: """计算MRR(平均倒数排名)""" mrr = 0 for gt, ret in zip(ground_truth, retrieved): for i, doc in enumerate(ret): if doc in gt: mrr += 1 / (i + 1) break return mrr / len(ground_truth) def calculate_ndcg(self, ground_truth: List[List[str]], retrieved: List[List[str]], k: int = 10) -> float: """计算NDCG""" import numpy as np def dcg(relevances, k): relevances = np.array(relevances)[:k] if relevances.size: return np.sum(relevances / np.log2(np.arange(2, relevances.size + 2))) return 0 ndcg_scores = [] for gt, ret in zip(ground_truth, retrieved): relevances = [1 if doc in gt else 0 for doc in ret[:k]] ideal_relevances = [1] * min(len(gt), k) + [0] * (k - min(len(gt), k)) dcg_score = dcg(relevances, k) idcg_score = dcg(ideal_relevances, k) if idcg_score > 0: ndcg_scores.append(dcg_score / idcg_score) else: ndcg_scores.append(0) return np.mean(ndcg_scores) 6.2 生成质量评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class GenerationEvaluator: def __init__(self): self.faithfulness_model = self.load_faithfulness_model() self.relevance_model = self.load_relevance_model() async def evaluate_generation(self, query: str, context: str, answer: str) -> Dict: """评估生成质量""" metrics = { "faithfulness": await self.evaluate_faithfulness(answer, context), "relevance": await self.evaluate_relevance(answer, query), "completeness": self.evaluate_completeness(answer, query), "fluency": self.evaluate_fluency(answer) } return metrics async def evaluate_faithfulness(self, answer: str, context: str) -> float: """评估忠实度""" prompt = f""" 请评估答案是否忠实于给定的上下文。 上下文:{context} 答案:{answer} 评分(0-1): """ response = await self.faithfulness_model.agenerate([prompt]) score = self.extract_score(response) return score def evaluate_fluency(self, answer: str) -> float: """评估流畅度""" from transformers import GPT2LMHeadModel, GPT2Tokenizer import torch model = GPT2LMHeadModel.from_pretrained('gpt2') tokenizer = GPT2Tokenizer.from_pretrained('gpt2') inputs = tokenizer(answer, return_tensors='pt') with torch.no_grad(): outputs = model(**inputs, labels=inputs["input_ids"]) loss = outputs.loss perplexity = torch.exp(loss) # 将困惑度转换为0-1分数 fluency_score = 1 / (1 + perplexity.item() / 100) return fluency_score 7. 生产部署 7.1 API服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import asyncio app = FastAPI() class RAGRequest(BaseModel): query: str top_k: int = 5 temperature: float = 0.7 max_tokens: int = 500 class RAGResponse(BaseModel): answer: str sources: List[Dict] confidence: float # 初始化RAG系统 rag_agent = RAGAgent(RAGConfig()) @app.post("/rag/query", response_model=RAGResponse) async def query_rag(request: RAGRequest): try: # 检索相关文档 documents = await rag_agent.retriever.retrieve( request.query, request.top_k ) # 重排序 reranked = rag_agent.reranker.rerank( request.query, documents, top_k=3 ) # 生成答案 answer = await rag_agent.generator.generate( request.query, reranked, { "temperature": request.temperature, "max_tokens": request.max_tokens } ) return RAGResponse( answer=answer, sources=[{"content": doc["content"][:200], "score": doc.get("score", 0)} for doc in reranked], confidence=calculate_confidence(reranked) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/rag/index") async def index_documents(documents: List[Dict]): """索引新文档""" try: processed = rag_agent.document_processor.process_documents(documents) await rag_agent.vector_store.add_documents(processed) return {"status": "success", "indexed": len(processed)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) 7.2 性能优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class RAGOptimizer: def __init__(self, rag_agent: RAGAgent): self.rag_agent = rag_agent self.cache = LRUCache(maxsize=1000) self.query_embeddings_cache = {} async def optimized_retrieve(self, query: str) -> List[Dict]: """优化的检索""" # 1. 检查缓存 cache_key = self.get_cache_key(query) if cache_key in self.cache: return self.cache[cache_key] # 2. 批量嵌入优化 if query in self.query_embeddings_cache: query_embedding = self.query_embeddings_cache[query] else: query_embedding = await self.batch_embed([query])[0] self.query_embeddings_cache[query] = query_embedding # 3. 并行检索 tasks = [ self.rag_agent.vector_store.search(query_embedding), self.rag_agent.bm25_index.search(query) ] vector_results, keyword_results = await asyncio.gather(*tasks) # 4. 融合和缓存结果 results = self.rag_agent.retriever.fuse_results( vector_results, keyword_results ) self.cache[cache_key] = results return results 8. 高级特性 8.1 增量学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class IncrementalLearningRAG: def __init__(self): self.feedback_buffer = [] self.update_frequency = 100 async def learn_from_feedback(self, query: str, answer: str, feedback: Dict): """从反馈中学习""" self.feedback_buffer.append({ "query": query, "answer": answer, "feedback": feedback, "timestamp": time.time() }) if len(self.feedback_buffer) >= self.update_frequency: await self.update_model() async def update_model(self): """更新模型""" # 1. 分析反馈 positive_examples = [] negative_examples = [] for item in self.feedback_buffer: if item["feedback"]["rating"] >= 4: positive_examples.append(item) else: negative_examples.append(item) # 2. 生成训练数据 training_data = self.prepare_training_data( positive_examples, negative_examples ) # 3. 微调嵌入模型 await self.finetune_embeddings(training_data) # 4. 更新检索策略 self.update_retrieval_strategy(training_data) # 清空缓冲区 self.feedback_buffer = [] 8.2 多模态RAG 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MultiModalRAG: def __init__(self): self.text_encoder = self.load_text_encoder() self.image_encoder = self.load_image_encoder() self.cross_modal_encoder = self.load_cross_modal_encoder() async def process_multimodal_query(self, query: Dict) -> str: """处理多模态查询""" text_query = query.get("text", "") image_query = query.get("image") # 1. 编码查询 if text_query and image_query: query_embedding = await self.encode_multimodal( text_query, image_query ) elif text_query: query_embedding = await self.text_encoder.encode(text_query) elif image_query: query_embedding = await self.image_encoder.encode(image_query) # 2. 多模态检索 results = await self.retrieve_multimodal(query_embedding) # 3. 生成答案 answer = await self.generate_from_multimodal(results, query) return answer 9. 最佳实践 文档预处理:清洗、去重、格式统一 智能分块:根据文档类型选择分块策略 混合检索:结合向量和关键词检索 重排序:使用交叉编码器提升精度 上下文优化:压缩和优先级排序 缓存策略:多级缓存提升性能 持续优化:基于反馈改进系统 结论 RAG Agent通过结合检索和生成,显著提升了LLM的准确性和实用性。关键在于优化检索质量、上下文管理和生成策略。 ...

January 4, 2025 · 14 min · Chico Gong

LangChain Agent完全指南:构建智能对话系统

前言 LangChain作为最流行的LLM应用开发框架,其Agent系统提供了强大的工具来构建智能对话系统。本文将全面介绍LangChain Agent的核心概念、实现方式和最佳实践。 1. LangChain Agent基础 1.1 核心概念 LangChain Agent由以下核心组件构成: 1 2 3 4 5 6 7 8 9 10 11 12 from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent from langchain.prompts import StringPromptTemplate from langchain import OpenAI, SerpAPIWrapper, LLMChain # Agent的核心组件 components = { "llm": "大语言模型", "tools": "可用工具集", "prompt": "提示模板", "output_parser": "输出解析器", "memory": "记忆系统" } 1.2 Agent类型 LangChain提供多种预构建的Agent类型: 1 2 3 4 5 6 7 8 9 10 11 # 1. Zero-shot ReAct Agent from langchain.agents import create_react_agent # 2. Conversational Agent from langchain.agents import create_openai_functions_agent # 3. Plan-and-Execute Agent from langchain_experimental.plan_and_execute import PlanAndExecute # 4. Self-ask with search from langchain.agents import create_self_ask_with_search_agent 2. 构建第一个Agent 2.1 基础设置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from langchain.agents import create_react_agent, AgentExecutor from langchain.tools import Tool from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate # 初始化LLM llm = ChatOpenAI( temperature=0, model="gpt-4-turbo-preview" ) # 定义工具 def calculate(expression: str) -> str: """计算数学表达式""" try: result = eval(expression) return str(result) except: return "计算错误" tools = [ Tool( name="Calculator", func=calculate, description="用于计算数学表达式。输入应该是有效的Python表达式。" ) ] 2.2 创建Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # 定义提示模板 prompt = PromptTemplate.from_template(""" Answer the following questions as best you can. You have access to the following tools: {tools} Use the following format: Question: the input question you must answer Thought: you should always think about what to do Action: the action to take, should be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (this Thought/Action/Action Input/Observation can repeat N times) Thought: I now know the final answer Final Answer: the final answer to the original input question Begin! Question: {input} Thought: {agent_scratchpad} """) # 创建Agent agent = create_react_agent( llm=llm, tools=tools, prompt=prompt ) # 创建执行器 agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=5 ) 3. 高级工具集成 3.1 自定义工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.tools import BaseTool from typing import Optional, Type from pydantic import BaseModel, Field class SearchInput(BaseModel): query: str = Field(description="搜索查询词") class CustomSearchTool(BaseTool): name = "custom_search" description = "搜索互联网信息" args_schema: Type[BaseModel] = SearchInput def _run(self, query: str) -> str: """执行搜索""" # 实现搜索逻辑 return f"搜索结果:{query}" async def _arun(self, query: str) -> str: """异步执行搜索""" # 实现异步搜索逻辑 return await self.async_search(query) 3.2 工具链组合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langchain.tools import StructuredTool def multi_step_analysis(data: dict) -> str: """多步骤数据分析工具""" steps = [] # 步骤1:数据清洗 cleaned_data = clean_data(data) steps.append("数据清洗完成") # 步骤2:统计分析 statistics = analyze_statistics(cleaned_data) steps.append(f"统计分析:{statistics}") # 步骤3:生成报告 report = generate_report(statistics) steps.append("报告生成完成") return "\n".join(steps) analysis_tool = StructuredTool.from_function( func=multi_step_analysis, name="DataAnalysis", description="执行多步骤数据分析" ) 4. 记忆系统集成 4.1 对话记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory from langchain.memory import ConversationBufferWindowMemory # 完整对话记忆 memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) # 窗口记忆(只保留最近N轮) window_memory = ConversationBufferWindowMemory( k=5, # 保留最近5轮对话 memory_key="chat_history", return_messages=True ) # 摘要记忆 summary_memory = ConversationSummaryMemory( llm=llm, memory_key="chat_history", return_messages=True ) 4.2 实体记忆 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.memory import ConversationEntityMemory entity_memory = ConversationEntityMemory( llm=llm, entity_extraction_prompt=ENTITY_EXTRACTION_PROMPT, entity_summarization_prompt=ENTITY_SUMMARIZATION_PROMPT ) # 使用实体记忆的Agent agent_with_memory = AgentExecutor( agent=agent, tools=tools, memory=entity_memory, verbose=True ) 5. 自定义Agent类型 5.1 计划执行Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class PlanExecuteAgent: def __init__(self, llm, tools): self.llm = llm self.tools = tools self.planner = self._create_planner() self.executor = self._create_executor() def _create_planner(self): """创建计划生成器""" planner_prompt = PromptTemplate( template=""" 给定目标:{objective} 创建一个详细的执行计划,将任务分解为具体步骤。 每个步骤应该清晰、可执行。 输出格式: 1. [步骤描述] 2. [步骤描述] ... """, input_variables=["objective"] ) return LLMChain(llm=self.llm, prompt=planner_prompt) def _create_executor(self): """创建执行器""" return AgentExecutor( agent=create_react_agent(self.llm, self.tools, REACT_PROMPT), tools=self.tools ) def run(self, objective: str): # 生成计划 plan = self.planner.run(objective) steps = self._parse_plan(plan) # 执行计划 results = [] for step in steps: result = self.executor.run(step) results.append(result) return self._synthesize_results(results) 5.2 自反思Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class SelfReflectionAgent: def __init__(self, llm): self.llm = llm self.reflection_prompt = PromptTemplate( template=""" 任务:{task} 初次回答:{initial_answer} 请评估这个回答: 1. 准确性如何? 2. 完整性如何? 3. 有什么可以改进的地方? 提供改进后的答案: """, input_variables=["task", "initial_answer"] ) def run(self, task: str): # 初次回答 initial_answer = self.llm.invoke(task) # 自我反思 reflection_chain = LLMChain( llm=self.llm, prompt=self.reflection_prompt ) improved_answer = reflection_chain.run( task=task, initial_answer=initial_answer ) return improved_answer 6. 流式输出与异步处理 6.1 流式响应 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler # 配置流式输出 streaming_llm = ChatOpenAI( temperature=0, streaming=True, callbacks=[StreamingStdOutCallbackHandler()] ) # 创建支持流式的Agent streaming_agent = create_react_agent( llm=streaming_llm, tools=tools, prompt=prompt ) # 流式执行 async def stream_agent_response(query: str): async for chunk in streaming_agent.astream({"input": query}): if "output" in chunk: yield chunk["output"] 6.2 异步Agent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio from langchain.agents import create_openai_tools_agent async def async_agent_execution(): # 创建异步Agent async_agent = create_openai_tools_agent( llm=llm, tools=tools, prompt=prompt ) # 异步执行器 async_executor = AgentExecutor( agent=async_agent, tools=tools, verbose=True ) # 并发执行多个查询 queries = ["问题1", "问题2", "问题3"] tasks = [async_executor.ainvoke({"input": q}) for q in queries] results = await asyncio.gather(*tasks) return results 7. 错误处理与重试机制 7.1 错误处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.agents import AgentExecutor from langchain.callbacks import CallbackManager from tenacity import retry, stop_after_attempt, wait_exponential class RobustAgent: def __init__(self, agent, tools): self.executor = AgentExecutor( agent=agent, tools=tools, max_iterations=5, early_stopping_method="generate", handle_parsing_errors=True ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def run_with_retry(self, input_text: str): try: result = self.executor.run(input_text) return result except Exception as e: print(f"错误: {e}") # 降级处理 return self.fallback_response(input_text) def fallback_response(self, input_text: str): """降级响应""" return f"抱歉,无法处理您的请求:{input_text}" 7.2 验证机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ValidatedAgent: def __init__(self, agent, validator): self.agent = agent self.validator = validator def run(self, input_text: str): # 输入验证 if not self.validator.validate_input(input_text): return "输入无效" # 执行Agent result = self.agent.run(input_text) # 输出验证 if not self.validator.validate_output(result): return self.agent.run_with_correction(input_text) return result 8. 性能优化 8.1 缓存策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from langchain.cache import InMemoryCache, RedisCache from langchain.globals import set_llm_cache # 内存缓存 set_llm_cache(InMemoryCache()) # Redis缓存 import redis redis_client = redis.Redis.from_url("redis://localhost:6379") set_llm_cache(RedisCache(redis_client)) # 自定义缓存 class CustomCache: def __init__(self): self.cache = {} def lookup(self, prompt: str, llm_string: str): key = f"{llm_string}:{prompt}" return self.cache.get(key) def update(self, prompt: str, llm_string: str, return_val: list): key = f"{llm_string}:{prompt}" self.cache[key] = return_val 8.2 批处理优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from langchain.llms import OpenAI from concurrent.futures import ThreadPoolExecutor class BatchProcessor: def __init__(self, agent, max_workers=5): self.agent = agent self.executor = ThreadPoolExecutor(max_workers=max_workers) def process_batch(self, inputs: list): """批量处理输入""" futures = [] for input_text in inputs: future = self.executor.submit(self.agent.run, input_text) futures.append(future) results = [] for future in futures: try: result = future.result(timeout=30) results.append(result) except Exception as e: results.append(f"处理失败: {e}") return results 9. 监控与日志 9.1 自定义回调 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from langchain.callbacks.base import BaseCallbackHandler import time class PerformanceCallbackHandler(BaseCallbackHandler): def __init__(self): self.start_times = {} self.metrics = { "total_tokens": 0, "total_cost": 0, "execution_times": [] } def on_llm_start(self, serialized, prompts, **kwargs): self.start_times["llm"] = time.time() def on_llm_end(self, response, **kwargs): duration = time.time() - self.start_times.get("llm", time.time()) self.metrics["execution_times"].append(duration) # 记录token使用 if hasattr(response, "llm_output"): tokens = response.llm_output.get("token_usage", {}) self.metrics["total_tokens"] += tokens.get("total_tokens", 0) def on_tool_start(self, serialized, input_str, **kwargs): self.start_times[f"tool_{serialized.get('name')}"] = time.time() def on_tool_end(self, output, **kwargs): # 记录工具执行时间 pass 9.2 追踪系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain.callbacks import LangChainTracer # 配置追踪 tracer = LangChainTracer( project_name="my_agent_project", client=None # 可以配置自定义客户端 ) # 使用追踪的Agent traced_agent = AgentExecutor( agent=agent, tools=tools, callbacks=[tracer], verbose=True ) 10. 生产部署 10.1 API封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn app = FastAPI() class AgentRequest(BaseModel): query: str session_id: str = None class AgentResponse(BaseModel): result: str metadata: dict = {} @app.post("/agent/chat", response_model=AgentResponse) async def chat(request: AgentRequest): try: # 获取或创建会话 session = get_or_create_session(request.session_id) # 执行Agent result = await agent_executor.ainvoke({ "input": request.query, "chat_history": session.get_history() }) # 更新会话历史 session.add_message(request.query, result["output"]) return AgentResponse( result=result["output"], metadata={ "session_id": session.id, "tools_used": result.get("intermediate_steps", []) } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) 10.2 Docker部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # Dockerfile FROM python:3.11-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码 COPY . . # 环境变量 ENV OPENAI_API_KEY=${OPENAI_API_KEY} ENV LANGCHAIN_TRACING_V2=true ENV LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY} # 运行 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] 11. 测试策略 11.1 单元测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import pytest from unittest.mock import Mock, patch class TestAgent: @pytest.fixture def mock_llm(self): llm = Mock() llm.invoke.return_value = "模拟响应" return llm @pytest.fixture def test_agent(self, mock_llm): return create_react_agent( llm=mock_llm, tools=[], prompt=PromptTemplate.from_template("{input}") ) def test_agent_execution(self, test_agent): result = test_agent.run("测试输入") assert result is not None @patch('langchain.tools.Calculator') def test_tool_usage(self, mock_calculator): mock_calculator.run.return_value = "42" # 测试工具调用 11.2 集成测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class IntegrationTests: def test_end_to_end_flow(self): """端到端测试""" # 1. 创建Agent agent = create_production_agent() # 2. 测试简单查询 simple_result = agent.run("What is 2+2?") assert "4" in simple_result # 3. 测试复杂查询 complex_result = agent.run( "Search for the latest AI news and summarize it" ) assert len(complex_result) > 100 # 4. 测试错误处理 error_result = agent.run("Invalid input %$#@") assert "error" not in error_result.lower() 12. 最佳实践总结 明确的工具描述:确保工具描述准确、详细 适当的提示工程:根据任务调整提示模板 合理的迭代限制:防止无限循环 完善的错误处理:处理各种异常情况 性能监控:跟踪token使用和执行时间 渐进式复杂度:从简单Agent开始,逐步增加功能 测试覆盖:包括单元测试和集成测试 文档完善:记录Agent能力和限制 结论 LangChain Agent提供了构建智能对话系统的强大框架。通过合理使用其提供的工具和模式,我们可以快速构建出功能丰富、性能优秀的AI Agent应用。 ...

January 2, 2025 · 9 min · Chico Gong

AI Agent架构深度解析:从原理到实践

引言 AI Agent作为人工智能领域的前沿技术,正在revolutionize我们与AI系统交互的方式。本文将深入探讨AI Agent的核心架构、设计模式和实践经验。 1. AI Agent核心架构 1.1 基础组件 AI Agent系统通常包含以下核心组件: 1 2 3 4 5 6 7 class AgentCore: def __init__(self): self.llm = LanguageModel() # 大语言模型 self.memory = MemorySystem() # 记忆系统 self.tools = ToolRegistry() # 工具注册表 self.planner = TaskPlanner() # 任务规划器 self.executor = ActionExecutor() # 执行器 1.2 感知-推理-行动循环 Agent的核心运行机制基于感知-推理-行动(Perception-Reasoning-Action)循环: graph LR A[环境输入] --> B[感知模块] B --> C[推理引擎] C --> D[行动执行] D --> E[环境反馈] E --> B C --> F[记忆系统] F --> C C --> G[知识库] G --> C style A fill:#e1f5fe,stroke:#01579b,stroke-width:2px style E fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style C fill:#fff3e0,stroke:#e65100,stroke-width:2px 感知阶段:接收环境输入,理解用户意图 推理阶段:基于记忆和知识进行决策 行动阶段:执行具体操作,产生输出 2. 高级架构模式 2.1 ReAct架构 ReAct(Reasoning and Acting)模式结合了推理和行动: ...

January 1, 2025 · 5 min · Chico Gong

LangChain Graph 详解:构建智能知识图谱

引言 在人工智能和大语言模型(LLM)的应用中,知识的表示与组织方式直接影响系统的推理能力和智能水平。LangChain Graph 作为LangChain生态系统中的重要组件,提供了一套强大的工具,使开发者能够轻松地从文本中提取结构化知识,构建知识图谱,并基于图进行复杂推理。本文将深入探讨LangChain Graph的概念、工作原理、应用场景以及实践技巧,帮助您全面理解和应用这一强大工具。 知识图谱与LangChain Graph基础 什么是知识图谱? 知识图谱(Knowledge Graph)是一种结构化数据模型,用于表示实体(Entities)之间的关系(Relations)。它以图的形式组织信息,其中: 节点(Nodes):代表实体或概念 边(Edges):代表实体间的关系 graph LR A["艾伦·图灵"] -->|"发明"| B["图灵机"] A -->|"出生于"| C["英国"] A -->|"被誉为"| D["计算机科学之父"] B -->|"是"| E["理论计算模型"] LangChain Graph的定义与价值 LangChain Graph是LangChain框架中专注于知识图谱构建、存储和查询的模块集合。它将LLM的自然语言处理能力与图数据库的结构化表示结合,实现了: 自动从文本中提取实体和关系 构建和维护知识图谱 基于图结构进行复杂查询和推理 增强LLM应用的上下文理解和回答质量 LangChain Graph架构 LangChain Graph的整体架构可以通过以下图示来理解: flowchart TB subgraph "输入层" A["文本文档"] --> B["网页内容"] C["结构化数据"] --> D["用户查询"] end subgraph "处理层" E["实体提取 EntityExtractor"] F["关系提取 RelationExtractor"] G["知识图谱构建 KnowledgeGraphCreator"] end subgraph "存储层" H["图数据库 Neo4j/NetworkX"] I["向量存储 VectorStores"] end subgraph "应用层" J["图查询 GraphQuery"] K["图推理 GraphReasoning"] L["QA系统 GraphQAChain"] end A --> E B --> E C --> F D --> F E --> G F --> G G --> H G --> I H --> J H --> K I --> L 核心组件详解 1. 实体和关系提取器 这些组件负责从文本中识别实体和它们之间的关系: ...

December 27, 2024 · 7 min · Chico Gong