LLM微调完全指南:从原理到生产部署
引言 大语言模型(LLM)的微调是将通用模型适配到特定任务的关键技术。本文全面介绍LLM微调的方法、技巧和最佳实践,包括全量微调、参数高效微调(PEFT)、强化学习微调等技术。 1. 微调基础架构 graph TB subgraph "LLM微调流程" D[原始数据] --> DP[数据预处理] DP --> DS[数据集分割] subgraph "微调方法" DS --> FT[全量微调] DS --> LORA[LoRA微调] DS --> QLORA[QLoRA微调] DS --> PT[Prefix Tuning] end subgraph "训练过程" LORA --> TR[训练循环] TR --> VAL[验证评估] VAL --> CK[检查点保存] CK --> TR end subgraph "优化技术" GC[梯度累积] MP[混合精度] GCP[梯度检查点] DS2[DeepSpeed] end TR -.-> GC TR -.-> MP TR -.-> GCP TR -.-> DS2 CK --> MD[模型部署] end style D fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style MD fill:#fff3e0,stroke:#ff9800,stroke-width:2px style LORA fill:#e3f2fd,stroke:#2196f3,stroke-width:3px 1.1 微调框架设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 from dataclasses import dataclass from typing import Optional, Dict, List, Union import torch from transformers import AutoModelForCausalLM, AutoTokenizer @dataclass class FineTuningConfig: model_name: str = "meta-llama/Llama-2-7b-hf" dataset_path: str = "./data/train.jsonl" output_dir: str = "./checkpoints" # 训练参数 learning_rate: float = 2e-5 batch_size: int = 4 gradient_accumulation_steps: int = 4 num_epochs: int = 3 warmup_ratio: float = 0.1 weight_decay: float = 0.01 # 优化参数 max_seq_length: int = 2048 gradient_checkpointing: bool = True mixed_precision: str = "fp16" # fp16, bf16, or None # LoRA参数 use_lora: bool = True lora_rank: int = 16 lora_alpha: int = 32 lora_dropout: float = 0.1 # 量化参数 use_quantization: bool = False quantization_bits: int = 4 class LLMFineTuner: def __init__(self, config: FineTuningConfig): self.config = config self.model = None self.tokenizer = None self.optimizer = None self.scheduler = None def setup_model(self): """设置模型和分词器""" # 加载分词器 self.tokenizer = AutoTokenizer.from_pretrained( self.config.model_name, trust_remote_code=True ) self.tokenizer.pad_token = self.tokenizer.eos_token # 加载模型 if self.config.use_quantization: self.model = self.load_quantized_model() else: self.model = AutoModelForCausalLM.from_pretrained( self.config.model_name, torch_dtype=torch.float16 if self.config.mixed_precision == "fp16" else torch.float32, device_map="auto", trust_remote_code=True ) # 应用LoRA if self.config.use_lora: self.apply_lora() # 启用梯度检查点 if self.config.gradient_checkpointing: self.model.gradient_checkpointing_enable() def apply_lora(self): """应用LoRA适配器""" from peft import LoraConfig, get_peft_model, TaskType lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=self.config.lora_rank, lora_alpha=self.config.lora_alpha, lora_dropout=self.config.lora_dropout, target_modules=["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], bias="none" ) self.model = get_peft_model(self.model, lora_config) self.model.print_trainable_parameters() 1.2 数据处理pipeline 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import json from torch.utils.data import Dataset, DataLoader from typing import List, Dict class InstructionDataset(Dataset): def __init__(self, data_path: str, tokenizer, max_length: int = 2048): self.tokenizer = tokenizer self.max_length = max_length self.data = self.load_data(data_path) def load_data(self, path: str) -> List[Dict]: """加载指令数据""" data = [] with open(path, 'r', encoding='utf-8') as f: for line in f: item = json.loads(line) data.append(item) return data def __len__(self): return len(self.data) def __getitem__(self, idx): item = self.data[idx] # 构建提示 prompt = self.build_prompt(item) # 分词 encoded = self.tokenizer( prompt, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt" ) # 创建标签(用于计算损失) labels = encoded["input_ids"].clone() # 将padding部分的标签设为-100(忽略) labels[labels == self.tokenizer.pad_token_id] = -100 return { "input_ids": encoded["input_ids"].squeeze(), "attention_mask": encoded["attention_mask"].squeeze(), "labels": labels.squeeze() } def build_prompt(self, item: Dict) -> str: """构建指令提示""" system_prompt = item.get("system", "You are a helpful assistant.") instruction = item.get("instruction", "") input_text = item.get("input", "") output = item.get("output", "") if input_text: prompt = f"""<|system|>{system_prompt}</s> <|user|>{instruction} Input: {input_text}</s> <|assistant|>{output}</s>""" else: prompt = f"""<|system|>{system_prompt}</s> <|user|>{instruction}</s> <|assistant|>{output}</s>""" return prompt class DataCollator: def __init__(self, tokenizer): self.tokenizer = tokenizer def __call__(self, batch): """批处理数据""" input_ids = torch.stack([item["input_ids"] for item in batch]) attention_mask = torch.stack([item["attention_mask"] for item in batch]) labels = torch.stack([item["labels"] for item in batch]) return { "input_ids": input_ids, "attention_mask": attention_mask, "labels": labels } 2. 参数高效微调(PEFT) 2.1 LoRA实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import torch.nn as nn import torch.nn.functional as F import math class LoRALayer(nn.Module): def __init__(self, in_features: int, out_features: int, rank: int = 16, alpha: int = 32, dropout: float = 0.1): super().__init__() self.rank = rank self.alpha = alpha self.scaling = alpha / rank # LoRA参数 self.lora_A = nn.Parameter(torch.zeros(rank, in_features)) self.lora_B = nn.Parameter(torch.zeros(out_features, rank)) self.lora_dropout = nn.Dropout(dropout) # 初始化 nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) nn.init.zeros_(self.lora_B) def forward(self, x: torch.Tensor, base_output: torch.Tensor) -> torch.Tensor: """LoRA前向传播""" if self.training: x = self.lora_dropout(x) # BA矩阵乘法 lora_output = x @ self.lora_A.T @ self.lora_B.T # 缩放并添加到基础输出 return base_output + lora_output * self.scaling class LoRALinear(nn.Module): def __init__(self, base_layer: nn.Linear, rank: int = 16, alpha: int = 32, dropout: float = 0.1): super().__init__() self.base_layer = base_layer self.lora = LoRALayer( base_layer.in_features, base_layer.out_features, rank, alpha, dropout ) # 冻结基础层 for param in self.base_layer.parameters(): param.requires_grad = False def forward(self, x: torch.Tensor) -> torch.Tensor: base_output = self.base_layer(x) return self.lora(x, base_output) def merge_weights(self): """合并LoRA权重到基础层""" with torch.no_grad(): self.base_layer.weight.data += ( self.lora.lora_B @ self.lora.lora_A ) * self.lora.scaling 2.2 QLoRA实现 graph LR subgraph "QLoRA架构" I[输入] --> Q4[4-bit量化模型] Q4 --> D[反量化] D --> B[基础计算] I --> LA[LoRA A矩阵FP16] LA --> LB[LoRA B矩阵FP16] B --> ADD[相加] LB --> ADD ADD --> O[输出] subgraph "内存优化" M1[模型: 4-bit] M2[LoRA: FP16] M3[梯度: FP16] end end style I fill:#e8f5e9,stroke:#4caf50,stroke-width:2px style O fill:#fff3e0,stroke:#ff9800,stroke-width:2px style Q4 fill:#ffebee,stroke:#f44336,stroke-width:2px 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import bitsandbytes as bnb from transformers import BitsAndBytesConfig class QLoRAFineTuner: def __init__(self, model_name: str): self.model_name = model_name self.bnb_config = None self.model = None def setup_quantization(self): """设置4位量化配置""" self.bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) def load_quantized_model(self): """加载量化模型""" from transformers import AutoModelForCausalLM model = AutoModelForCausalLM.from_pretrained( self.model_name, quantization_config=self.bnb_config, device_map="auto", trust_remote_code=True ) # 准备模型用于k-bit训练 model = prepare_model_for_kbit_training(model) return model def apply_qlora(self, model): """应用QLoRA""" from peft import LoraConfig, get_peft_model # 找到所有Linear层 target_modules = self.find_linear_layers(model) config = LoraConfig( r=16, lora_alpha=32, target_modules=target_modules, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, config) return model def find_linear_layers(self, model): """找到所有可以应用LoRA的线性层""" linear_cls = bnb.nn.Linear4bit lora_module_names = set() for name, module in model.named_modules(): if isinstance(module, linear_cls): names = name.split('.') lora_module_names.add(names[-1]) # 排除一些层 if 'lm_head' in lora_module_names: lora_module_names.remove('lm_head') return list(lora_module_names) def prepare_model_for_kbit_training(model): """准备模型进行k-bit训练""" model.gradient_checkpointing_enable() # 将部分层转为fp32以提高稳定性 for param in model.parameters(): param.requires_grad = False if param.ndim == 1: param.data = param.data.to(torch.float32) # 启用输入层的梯度 model.enable_input_require_grads() return model 3. 训练优化技术 3.1 分布式训练 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp.fully_sharded_data_parallel import ( CPUOffload, BackwardPrefetch, ) class DistributedTrainer: def __init__(self, model, config): self.model = model self.config = config self.world_size = torch.cuda.device_count() def setup_ddp(self, rank: int): """设置DDP训练""" # 初始化进程组 dist.init_process_group( backend='nccl', init_method='env://', world_size=self.world_size, rank=rank ) # 设置设备 torch.cuda.set_device(rank) # 包装模型 self.model = self.model.to(rank) self.model = DDP( self.model, device_ids=[rank], output_device=rank, find_unused_parameters=False ) def setup_fsdp(self): """设置FSDP训练(完全分片数据并行)""" from torch.distributed.fsdp.wrap import ( size_based_auto_wrap_policy, transformer_auto_wrap_policy, ) # 自动包装策略 auto_wrap_policy = functools.partial( transformer_auto_wrap_policy, transformer_layer_cls={ transformers.models.llama.modeling_llama.LlamaDecoderLayer } ) # FSDP配置 self.model = FSDP( self.model, auto_wrap_policy=auto_wrap_policy, backward_prefetch=BackwardPrefetch.BACKWARD_PRE, cpu_offload=CPUOffload(offload_params=True), mixed_precision=self.get_mixed_precision_policy(), sharding_strategy=ShardingStrategy.FULL_SHARD, device_id=torch.cuda.current_device(), limit_all_gathers=True ) def get_mixed_precision_policy(self): """获取混合精度策略""" from torch.distributed.fsdp import MixedPrecision return MixedPrecision( param_dtype=torch.bfloat16, reduce_dtype=torch.bfloat16, buffer_dtype=torch.bfloat16, ) 3.2 梯度优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class GradientOptimizer: def __init__(self, model, config): self.model = model self.config = config def setup_optimizer(self): """设置优化器""" # 参数分组 param_groups = self.get_parameter_groups() # AdamW优化器 optimizer = torch.optim.AdamW( param_groups, lr=self.config.learning_rate, betas=(0.9, 0.95), eps=1e-8, weight_decay=self.config.weight_decay ) return optimizer def get_parameter_groups(self): """获取参数组(不同学习率)""" # 不需要weight decay的参数 no_decay = ["bias", "LayerNorm.weight", "layer_norm.weight"] # LoRA参数使用更高的学习率 lora_params = [] base_params_decay = [] base_params_no_decay = [] for name, param in self.model.named_parameters(): if not param.requires_grad: continue if "lora_" in name: lora_params.append(param) elif any(nd in name for nd in no_decay): base_params_no_decay.append(param) else: base_params_decay.append(param) param_groups = [ { "params": base_params_decay, "weight_decay": self.config.weight_decay, "lr": self.config.learning_rate }, { "params": base_params_no_decay, "weight_decay": 0.0, "lr": self.config.learning_rate }, { "params": lora_params, "weight_decay": 0.0, "lr": self.config.learning_rate * 2 # LoRA参数使用2倍学习率 } ] return param_groups def gradient_clipping(self, optimizer): """梯度裁剪""" torch.nn.utils.clip_grad_norm_( self.model.parameters(), max_norm=1.0 ) def gradient_accumulation_step(self, loss, step, optimizer): """梯度累积""" loss = loss / self.config.gradient_accumulation_steps loss.backward() if (step + 1) % self.config.gradient_accumulation_steps == 0: self.gradient_clipping(optimizer) optimizer.step() optimizer.zero_grad() 4. 强化学习微调(RLHF/DPO) 4.1 RLHF实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 from transformers import AutoModelForCausalLM import torch.nn.functional as F class RLHFTrainer: def __init__(self, policy_model, reward_model, reference_model): self.policy_model = policy_model self.reward_model = reward_model self.reference_model = reference_model # 冻结参考模型和奖励模型 for param in self.reference_model.parameters(): param.requires_grad = False for param in self.reward_model.parameters(): param.requires_grad = False def compute_rewards(self, prompts, responses): """计算奖励""" # 获取奖励模型评分 rewards = self.reward_model(prompts, responses) # KL惩罚 kl_penalty = self.compute_kl_penalty(prompts, responses) # 最终奖励 final_rewards = rewards - self.config.kl_coeff * kl_penalty return final_rewards def compute_kl_penalty(self, prompts, responses): """计算KL散度惩罚""" with torch.no_grad(): # 获取参考模型的对数概率 ref_logprobs = self.get_logprobs( self.reference_model, prompts, responses ) # 获取策略模型的对数概率 policy_logprobs = self.get_logprobs( self.policy_model, prompts, responses ) # KL散度 kl = policy_logprobs - ref_logprobs return kl.mean() def ppo_step(self, prompts, responses, rewards, old_logprobs): """PPO训练步骤""" # 获取当前策略的对数概率 logprobs = self.get_logprobs(self.policy_model, prompts, responses) # 计算比率 ratio = torch.exp(logprobs - old_logprobs) # 优势函数(这里简化为奖励) advantages = rewards # PPO损失 surr1 = ratio * advantages surr2 = torch.clamp( ratio, 1 - self.config.clip_range, 1 + self.config.clip_range ) * advantages policy_loss = -torch.min(surr1, surr2).mean() # 值函数损失(如果有值头) value_loss = 0 if hasattr(self.policy_model, 'value_head'): values = self.policy_model.value_head(prompts, responses) value_loss = F.mse_loss(values, rewards) # 熵奖励 entropy = self.compute_entropy(logprobs) # 总损失 loss = ( policy_loss + self.config.vf_coef * value_loss - self.config.entropy_coef * entropy ) return loss 4.2 DPO(Direct Preference Optimization) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class DPOTrainer: def __init__(self, model, reference_model, beta: float = 0.1): self.model = model self.reference_model = reference_model self.beta = beta # 冻结参考模型 for param in self.reference_model.parameters(): param.requires_grad = False def compute_dpo_loss(self, prompts, chosen, rejected): """计算DPO损失""" # 获取策略模型的对数概率 pi_logprobs_chosen = self.get_logprobs(self.model, prompts, chosen) pi_logprobs_rejected = self.get_logprobs(self.model, prompts, rejected) # 获取参考模型的对数概率 with torch.no_grad(): ref_logprobs_chosen = self.get_logprobs( self.reference_model, prompts, chosen ) ref_logprobs_rejected = self.get_logprobs( self.reference_model, prompts, rejected ) # 计算对数比率 pi_logratios = pi_logprobs_chosen - pi_logprobs_rejected ref_logratios = ref_logprobs_chosen - ref_logprobs_rejected # DPO损失 losses = -F.logsigmoid(self.beta * (pi_logratios - ref_logratios)) # 添加正则化 chosen_rewards = self.beta * ( pi_logprobs_chosen - ref_logprobs_chosen ).detach() rejected_rewards = self.beta * ( pi_logprobs_rejected - ref_logprobs_rejected ).detach() return losses.mean(), chosen_rewards, rejected_rewards def get_logprobs(self, model, prompts, responses): """获取响应的对数概率""" inputs = self.tokenizer( [p + r for p, r in zip(prompts, responses)], return_tensors="pt", padding=True, truncation=True ) with torch.no_grad() if model == self.reference_model else nullcontext(): outputs = model(**inputs, labels=inputs["input_ids"]) # 提取响应部分的对数概率 logits = outputs.logits labels = inputs["input_ids"] # 计算对数概率 logprobs = F.log_softmax(logits, dim=-1) # 获取标签对应的对数概率 selected_logprobs = torch.gather( logprobs, 2, labels.unsqueeze(-1) ).squeeze(-1) # 只计算响应部分 prompt_lens = [len(self.tokenizer(p)["input_ids"]) for p in prompts] response_logprobs = [] for i, prompt_len in enumerate(prompt_lens): response_logprobs.append( selected_logprobs[i, prompt_len:].sum() ) return torch.stack(response_logprobs) 5. 指令微调 5.1 指令数据构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 class InstructionDataBuilder: def __init__(self): self.templates = self.load_templates() def create_instruction_data(self, raw_data: List[Dict]) -> List[Dict]: """创建指令数据""" instruction_data = [] for item in raw_data: # 生成多样化的指令 instructions = self.generate_instructions(item) for instruction in instructions: formatted = self.format_instruction(instruction, item) instruction_data.append(formatted) return instruction_data def generate_instructions(self, item: Dict) -> List[str]: """生成多样化指令""" task_type = item.get("task_type", "general") instructions = [] if task_type == "qa": instructions.extend([ f"Answer the following question: {item['question']}", f"Please provide an answer to: {item['question']}", f"What is the answer to this question: {item['question']}", ]) elif task_type == "summarization": instructions.extend([ "Summarize the following text:", "Please provide a brief summary of:", "Create a concise summary for:", ]) elif task_type == "translation": instructions.extend([ f"Translate the following from {item['source_lang']} to {item['target_lang']}:", f"Please translate this text to {item['target_lang']}:", ]) return instructions def format_instruction(self, instruction: str, item: Dict) -> Dict: """格式化指令""" return { "instruction": instruction, "input": item.get("input", ""), "output": item.get("output", ""), "system": self.get_system_prompt(item.get("task_type", "general")) } def get_system_prompt(self, task_type: str) -> str: """获取系统提示""" system_prompts = { "qa": "You are a helpful question-answering assistant.", "summarization": "You are an expert at creating concise summaries.", "translation": "You are a professional translator.", "general": "You are a helpful AI assistant.", } return system_prompts.get(task_type, system_prompts["general"]) 5.2 Chain-of-Thought微调 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class CoTFineTuning: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer def create_cot_data(self, problems: List[Dict]) -> List[Dict]: """创建思维链数据""" cot_data = [] for problem in problems: # 生成思维链 cot_solution = self.generate_cot_solution(problem) # 格式化为训练数据 cot_item = { "instruction": problem["question"], "output": cot_solution, "system": "Let's think step by step." } cot_data.append(cot_item) return cot_data def generate_cot_solution(self, problem: Dict) -> str: """生成思维链解决方案""" steps = problem.get("solution_steps", []) cot_text = "Let me solve this step by step.\n\n" for i, step in enumerate(steps, 1): cot_text += f"Step {i}: {step['description']}\n" if "calculation" in step: cot_text += f"Calculation: {step['calculation']}\n" if "reasoning" in step: cot_text += f"Reasoning: {step['reasoning']}\n" cot_text += "\n" cot_text += f"Therefore, the answer is: {problem['answer']}" return cot_text def train_with_cot(self, train_data: List[Dict]): """使用思维链数据训练""" # 创建数据集 dataset = CoTDataset(train_data, self.tokenizer) dataloader = DataLoader(dataset, batch_size=4, shuffle=True) # 训练循环 optimizer = torch.optim.AdamW(self.model.parameters(), lr=1e-5) for epoch in range(3): for batch in dataloader: # 前向传播 outputs = self.model(**batch) loss = outputs.loss # 反向传播 loss.backward() optimizer.step() optimizer.zero_grad() 6. 数据增强技术 6.1 自动数据增强 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 class DataAugmentation: def __init__(self, base_model): self.base_model = base_model def paraphrase_augmentation(self, text: str, num_variants: int = 3): """释义增强""" paraphrases = [] prompts = [ "Rewrite the following in different words:", "Express the same idea differently:", "Paraphrase the following text:", ] for i in range(num_variants): prompt = f"{prompts[i % len(prompts)]} {text}" paraphrase = self.base_model.generate(prompt) paraphrases.append(paraphrase) return paraphrases def back_translation(self, text: str, intermediate_lang: str = "zh"): """回译增强""" # 翻译到中间语言 translated = self.translate(text, "en", intermediate_lang) # 翻译回原语言 back_translated = self.translate(translated, intermediate_lang, "en") return back_translated def instruction_augmentation(self, instruction: str, output: str): """指令增强""" augmented = [] # 改变指令风格 styles = ["formal", "casual", "detailed", "concise"] for style in styles: new_instruction = self.restyle_instruction(instruction, style) augmented.append({ "instruction": new_instruction, "output": output }) # 添加约束 constraints = [ "Answer in one sentence.", "Provide a detailed explanation.", "Use simple language.", "Include examples.", ] for constraint in constraints: augmented.append({ "instruction": f"{instruction} {constraint}", "output": self.modify_output(output, constraint) }) return augmented 6.2 合成数据生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class SyntheticDataGenerator: def __init__(self, generator_model): self.generator = generator_model def generate_qa_pairs(self, context: str, num_pairs: int = 5): """生成问答对""" qa_pairs = [] # 生成问题 questions = self.generate_questions(context, num_pairs) for question in questions: # 生成答案 answer = self.generate_answer(context, question) qa_pairs.append({ "question": question, "answer": answer, "context": context }) return qa_pairs def generate_instructions(self, capability: str, num_instructions: int = 10): """生成指令数据""" prompt = f"""Generate {num_instructions} diverse instructions that test the following capability: {capability} Format each instruction as: Instruction: [instruction text] Expected Output: [expected output] """ response = self.generator.generate(prompt) # 解析响应 instructions = self.parse_instructions(response) return instructions def self_instruct(self, seed_tasks: List[str], num_iterations: int = 3): """Self-Instruct方法""" all_instructions = seed_tasks.copy() for iteration in range(num_iterations): # 采样现有指令 sampled = random.sample(all_instructions, min(5, len(all_instructions))) # 生成新指令 new_instructions = self.generate_similar_instructions(sampled) # 过滤低质量指令 filtered = self.filter_instructions(new_instructions) # 添加到集合 all_instructions.extend(filtered) return all_instructions 7. 评估与验证 7.1 自动评估 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 from sklearn.metrics import accuracy_score, f1_score import numpy as np class ModelEvaluator: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer def evaluate_generation_quality(self, test_data: List[Dict]): """评估生成质量""" metrics = { "bleu": [], "rouge": [], "perplexity": [], "diversity": [], "coherence": [] } for item in test_data: # 生成响应 generated = self.generate_response(item["instruction"]) reference = item["output"] # 计算BLEU分数 bleu = self.calculate_bleu(generated, reference) metrics["bleu"].append(bleu) # 计算ROUGE分数 rouge = self.calculate_rouge(generated, reference) metrics["rouge"].append(rouge) # 计算困惑度 perplexity = self.calculate_perplexity(generated) metrics["perplexity"].append(perplexity) # 计算多样性 diversity = self.calculate_diversity([generated]) metrics["diversity"].append(diversity) # 计算连贯性 coherence = self.calculate_coherence(generated) metrics["coherence"].append(coherence) # 汇总指标 summary = { metric: np.mean(values) for metric, values in metrics.items() } return summary def calculate_perplexity(self, text: str) -> float: """计算困惑度""" inputs = self.tokenizer(text, return_tensors="pt") with torch.no_grad(): outputs = self.model(**inputs, labels=inputs["input_ids"]) loss = outputs.loss perplexity = torch.exp(loss) return perplexity.item() def human_eval_simulation(self, generated: str, reference: str): """模拟人类评估""" # 使用另一个模型作为评判者 judge_prompt = f""" Please rate the quality of the generated response compared to the reference. Generated: {generated} Reference: {reference} Rate on a scale of 1-5 for: 1. Relevance 2. Fluency 3. Informativeness 4. Correctness """ # 获取评分(这里应该使用评判模型) scores = self.get_judge_scores(judge_prompt) return scores 7.2 A/B测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class ABTesting: def __init__(self, model_a, model_b): self.model_a = model_a self.model_b = model_b self.results = {"model_a": [], "model_b": [], "ties": []} def run_comparison(self, test_prompts: List[str]): """运行A/B测试""" for prompt in test_prompts: # 生成响应 response_a = self.model_a.generate(prompt) response_b = self.model_b.generate(prompt) # 评估响应 winner = self.evaluate_responses(prompt, response_a, response_b) # 记录结果 if winner == "a": self.results["model_a"].append(prompt) elif winner == "b": self.results["model_b"].append(prompt) else: self.results["ties"].append(prompt) # 统计分析 stats = self.calculate_statistics() return stats def calculate_statistics(self): """计算统计结果""" total = sum(len(v) for v in self.results.values()) stats = { "model_a_win_rate": len(self.results["model_a"]) / total, "model_b_win_rate": len(self.results["model_b"]) / total, "tie_rate": len(self.results["ties"]) / total, "confidence": self.calculate_confidence_interval() } return stats 8. 生产部署 8.1 模型优化与部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class ModelDeployment: def __init__(self, model_path: str): self.model_path = model_path def optimize_for_inference(self): """推理优化""" # 加载模型 model = torch.load(self.model_path) # 合并LoRA权重 if hasattr(model, 'merge_and_unload'): model = model.merge_and_unload() # 转换为半精度 model = model.half() # TorchScript转换 scripted_model = torch.jit.script(model) # 优化 optimized = torch.jit.optimize_for_inference(scripted_model) return optimized def export_to_onnx(self, model, dummy_input): """导出ONNX格式""" torch.onnx.export( model, dummy_input, "model.onnx", export_params=True, opset_version=14, do_constant_folding=True, input_names=['input_ids', 'attention_mask'], output_names=['logits'], dynamic_axes={ 'input_ids': {0: 'batch_size', 1: 'sequence'}, 'attention_mask': {0: 'batch_size', 1: 'sequence'}, 'logits': {0: 'batch_size', 1: 'sequence'} } ) def create_serving_endpoint(self): """创建服务端点""" from fastapi import FastAPI import uvicorn app = FastAPI() # 加载模型 model = self.load_optimized_model() @app.post("/generate") async def generate(prompt: str, max_length: int = 100): # 生成响应 response = model.generate(prompt, max_length=max_length) return {"response": response} return app 9. 监控与维护 9.1 模型监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class ModelMonitoring: def __init__(self): self.metrics_buffer = [] def monitor_inference(self, model, inputs, outputs): """监控推理""" metrics = { "timestamp": time.time(), "input_length": len(inputs["input_ids"][0]), "output_length": len(outputs[0]), "latency": None, "perplexity": None } # 计算延迟 start_time = time.time() _ = model(inputs) metrics["latency"] = time.time() - start_time # 计算困惑度 with torch.no_grad(): loss = model(**inputs, labels=inputs["input_ids"]).loss metrics["perplexity"] = torch.exp(loss).item() self.metrics_buffer.append(metrics) # 检测异常 self.detect_anomalies(metrics) def detect_drift(self, current_distribution, reference_distribution): """检测分布漂移""" from scipy.stats import ks_2samp # KS检验 statistic, p_value = ks_2samp( current_distribution, reference_distribution ) # 检测显著漂移 if p_value < 0.05: self.alert_drift_detected(statistic, p_value) return p_value 10. 最佳实践 数据质量优先:高质量的数据比大量低质量数据更有价值 渐进式微调:从简单任务开始,逐步增加复杂度 参数高效:优先使用LoRA/QLoRA等PEFT方法 持续评估:建立完善的评估体系 版本管理:跟踪数据、模型和配置的版本 安全对齐:确保模型输出安全、无害 结论 LLM微调是一个系统工程,需要在数据、算法、工程等多个方面进行优化。通过合理的技术选择和细致的实施,可以将通用大模型成功适配到特定领域和任务。 ...