写在前面:什么是贝叶斯优化
参考这里
主要包含两个部分
-
一个代理模型(surrogate model),用于对目标函数进行建模。代理模型通常有确定的公式或者能计算梯度,又或者有已知的凹凸性、线性等特性,总之就是更容易用于优化。更泛化地讲,其实它就是一个学习模型,输入是所有观测到的函数值点,训练后可以在给定任意x的情况下给出对f(x)的估计。
-
一个优化策略(optimization strategy),决定下一个采样点的位置,即下一步应在哪个输入x
处观测函数值f(x)。通常它是通过采集函数(acquisition function) 来实现的:
采集函数通常是一个由代理模型推出的函数,它的输入是可行集(feasible set)A上的任意值,输出值衡量了每个输入x有多值得被观测。通常会从以下两方面考虑:- 有多大的可能性在x处取得最优值
- 评估x是否能减少贝叶斯统计模型的不确定性
采集函数通常也是容易求最优值的函数(例如:有公式/能算梯度等),下一个采样点就是可行集上的最大值点,即使采集函数的取最大值的点。
本文主要学习代理模型,模型代码参考SCOOT(WWW2025 oral)中hebo/model库的代码实现
常用的贝叶斯代理模型
GP:标准高斯过程
class GP(BaseModel): # support_grad = True 表示该模型支持梯度计算,这对于贝叶斯优化等需要梯度信息的场景非常重要 support_grad = True def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化高斯过程回归模型 参数: num_cont (int): 连续变量的数量 num_enum (int): 离散/枚举变量的数量 num_out (int): 输出变量的数量(通常为1,表示单目标优化) **conf: 配置参数字典,可包含以下键值对: - lr (float): 学习率,默认为3e-2 - num_epochs (int): 训练轮数,默认为100 - verbose (bool): 是否打印训练过程信息,默认为False - print_every (int): 打印训练信息的频率,默认为10 - pred_likeli (bool): 预测时是否考虑似然噪声,默认为True - noise_lb (float): 噪声下界,默认为1e-5 - optimizer (str): 优化器类型,可选'lbfgs'、'psgld'或'adam',默认为'psgld' - noise_guess (float): 噪声初始猜测值,默认为0.01 - ard_kernel (bool): 是否使用ARD核(自动相关性判定),默认为True """ # 调用父类BaseModel的初始化方法 super().__init__(num_cont, num_enum, num_out, **conf) # 从配置中提取参数,如果未提供则使用默认值 self.lr = conf.get('lr', 3e-2) # 学习率 self.num_epochs = conf.get('num_epochs', 100) # 训练轮数 self.verbose = conf.get('verbose', False) # 是否打印训练过程信息 self.print_every = conf.get('print_every', 10) # 打印训练信息的频率 self.pred_likeli = conf.get('pred_likeli', True) # 预测时是否考虑似然噪声 self.noise_lb = conf.get('noise_lb', 1e-5) # 噪声下界 self.optimizer = conf.get('optimizer', 'psgld') # 优化器类型 self.noise_guess = conf.get('noise_guess', 0.01) # 噪声初始猜测值 self.ard_kernel = conf.get('ard_kernel', True) # 是否使用ARD核 # 初始化数据标准化器 # xscaler: 用于连续变量的最小-最大标准化器,将数据缩放到[-1, 1]区间 self.xscaler = TorchMinMaxScaler((-1, 1)) # yscaler: 用于目标变量的标准标准化器,使其均值为0,标准差为1 self.yscaler = TorchStandardScaler() def fit_scaler(self, Xc : Tensor, Xe : Tensor, y : Tensor): """ 拟合数据标准化器 参数: Xc: 连续特征张量 Xe: 离散特征张量 y: 目标变量张量 """ # 如果存在连续变量且数量大于0,则拟合连续变量标准化器 if Xc is not None and Xc.shape[1] > 0: self.xscaler.fit(Xc) # 拟合目标变量标准化器 self.yscaler.fit(y) def xtrans(self, Xc : Tensor, Xe : Tensor, y : Tensor = None): """ 对输入数据进行转换(标准化和类型转换) 参数: Xc: 连续特征张量 Xe: 离散特征张量 y: 可选的目标变量张量 返回: 转换后的特征和目标变量(如果提供了y) """ # 处理连续变量:如果存在则标准化,否则创建空张量 if Xc is not None and Xc.shape[1] > 0: Xc_t = self.xscaler.transform(Xc) # 标准化连续变量 else: Xc_t = torch.zeros(Xe.shape[0], 0) # 创建空的连续变量张量 # 处理离散变量:如果存在则转换为long类型,否则创建空张量 if Xe is None: Xe_t = torch.zeros(Xc.shape[0], 0).long() # 创建空的离散变量张量 else: Xe_t = Xe.long() # 转换为long类型,通常用于嵌入层 # 如果提供了目标变量,则也进行标准化 if y is not None: y_t = self.yscaler.transform(y) # 标准化目标变量 return Xc_t, Xe_t, y_t else: return Xc_t, Xe_t def fit(self, Xc : Tensor, Xe : Tensor, y : Tensor): """ 训练高斯过程模型 参数: Xc: 连续特征张量 Xe: 离散特征张量 y: 目标变量张量 """ # 1. 数据预处理:过滤NaN值 Xc, Xe, y = filter_nan(Xc, Xe, y, 'all') # 2. 拟合标准化器并转换数据 self.fit_scaler(Xc, Xe, y) Xc, Xe, y = self.xtrans(Xc, Xe, y) # 3. 验证数据维度 assert(Xc.shape[1] == self.num_cont) assert(Xe.shape[1] == self.num_enum) assert(y.shape[1] == self.num_out) # 4. 保存训练数据 self.Xc = Xc self.Xe = Xe self.y = y # 5. 设置似然函数(噪声模型) n_constr = GreaterThan(self.noise_lb) # 噪声下界约束 n_prior = LogNormalPrior(np.log(self.noise_guess), 0.5) # 对数正态先验 # 创建高斯似然函数,包含噪声约束和先验 self.lik = GaussianLikelihood(noise_constraint = n_constr, noise_prior = n_prior) # 6. 创建GPyTorch模型 self.gp = GPyTorchModel(self.Xc, self.Xe, self.y, self.lik, **self.conf) # 7. 初始化噪声参数 self.gp.likelihood.noise = max(1e-2, self.noise_lb) # 8. 设置模型为训练模式 self.gp.train() self.lik.train() # 9. 选择优化器 if self.optimizer.lower() == 'lbfgs': # L-BFGS优化器,适用于小批量数据的准牛顿方法 opt = torch.optim.LBFGS(self.gp.parameters(), lr = self.lr, max_iter = 5, line_search_fn = 'strong_wolfe') elif self.optimizer == 'psgld': # 预处理随机梯度Langevin动力学,用于贝叶斯采样 opt = pSGLD(self.gp.parameters(), lr = self.lr, factor = 1. / y.shape[0], pretrain_step = self.num_epochs // 10) else: # Adam优化器,默认选择 opt = torch.optim.Adam(self.gp.parameters(), lr = self.lr) # 10. 创建边际对数似然目标函数 mll = gpytorch.mlls.ExactMarginalLogLikelihood(self.lik, self.gp) # 11. 训练循环 for epoch in range(self.num_epochs): def closure(): """ 优化器的闭包函数,计算损失并梯度 """ # 前向传播:获取GP的后验分布 dist = self.gp(self.Xc, self.Xe) # 计算负边际对数似然损失 loss = -1 * mll(dist, self.y.squeeze()) # 清零梯度 opt.zero_grad() # 反向传播 loss.backward() return loss # 优化器步进 opt.step(closure) # 打印训练信息 if self.verbose and ((epoch + 1) % self.print_every == 0 or epoch == 0): print('After %d epochs, loss = %g' % (epoch + 1, closure().item()), flush = True) # 12. 设置模型为评估模式 self.gp.eval() self.lik.eval() def predict(self, Xc, Xe): """ 使用训练好的模型进行预测 参数: Xc: 连续特征张量 Xe: 离散特征张量 返回: mu: 预测均值 var: 预测方差 """ # 1. 转换输入数据 Xc, Xe = self.xtrans(Xc, Xe) # 2. 使用快速预测设置进行预测 with gpytorch.settings.fast_pred_var(), gpytorch.settings.debug(False): # 获取GP预测 pred = self.gp(Xc, Xe) # 如果考虑似然噪声,通过似然函数进行预测 if self.pred_likeli: pred = self.lik(pred) # 提取均值和方差 mu_ = pred.mean.reshape(-1, self.num_out) var_ = pred.variance.reshape(-1, self.num_out) # 3. 将预测结果转换回原始尺度 mu = self.yscaler.inverse_transform(mu_) # 逆标准化均值 var = var_ * self.yscaler.std**2 # 调整方差到原始尺度 # 4. 确保方差不为零(避免数值问题) return mu, var.clamp(min = torch.finfo(var.dtype).eps) def sample_y(self, Xc, Xe, n_samples = 1) -> FloatTensor: """ 从后验分布中采样目标变量 参数: Xc: 连续特征张量 Xe: 离散特征张量 n_samples: 采样数量 返回: 采样结果,形状为(n_samples, 样本数, 输出维度) """ # 1. 转换输入数据 Xc, Xe = self.xtrans(Xc, Xe) # 2. 进行采样 with gpytorch.settings.debug(False): if self.pred_likeli: # 考虑噪声的采样 pred = self.lik(self.gp(Xc, Xe)) else: # 不考虑噪声的采样(函数值采样) pred = self.gp(Xc, Xe) # 从后验分布中采样 samp = pred.rsample(torch.Size((n_samples,))).view(n_samples, Xc.shape[0], self.num_out) # 将采样结果转换回原始尺度 return self.yscaler.inverse_transform(samp) def sample_f(self): """ 采样函数值(不支持,使用sample_y代替) """ raise NotImplementedError('Thompson sampling is not supported for GP, use `sample_y` instead') @property def noise(self): """ 获取估计的噪声水平(转换回原始尺度) """ return (self.gp.likelihood.noise * self.yscaler.std**2).view(self.num_out).detach() class GPyTorchModel(gpytorch.models.ExactGP): """ GPyTorch模型实现类,继承自ExactGP(精确高斯过程) """ def __init__(self, x : torch.Tensor, # 训练数据的连续特征 xe : torch.Tensor, # 训练数据的离散特征 y : torch.Tensor, # 训练目标值 lik : GaussianLikelihood, # 似然函数 **conf): # 配置参数 # 调用父类构造函数,传入训练数据和似然函数 super().__init__((x, xe), y.squeeze(), lik) # 特征提取器:处理连续和离散特征的组合 self.fe = deepcopy(conf.get('fe', DummyFeatureExtractor(x.shape[1], xe.shape[1], conf.get('num_uniqs'), conf.get('emb_sizes')))) # 均值函数:默认使用常数均值 self.mean = deepcopy(conf.get('mean', ConstantMean())) # 核函数选择:根据配置选择默认核或随机分解核 if conf.get("rd", False): # 使用随机分解核(Random Decomposition) self.cov = deepcopy(conf.get('kern', default_kern_rd(x, xe, y, self.fe.total_dim, conf.get('ard_kernel', True), conf.get('fe'), E=conf.get("E", 0.2)))) else: # 使用默认核函数 self.cov = deepcopy(conf.get('kern', default_kern(x, xe, y, self.fe.total_dim, conf.get('ard_kernel', True), conf.get('fe')))) def forward(self, x, xe): """ 前向传播,定义高斯过程的行为 参数: x: 连续特征 xe: 离散特征 返回: MultivariateNormal: 多元正态分布,表示高斯过程的预测 """ # 1. 特征提取:将连续和离散特征组合成统一表示 x_all = self.fe(x, xe) # 2. 计算均值函数 m = self.mean(x_all) # 3. 计算协方差矩阵(核函数) K = self.cov(x_all) # 4. 返回多元正态分布 return MultivariateNormal(m, K)
GPyGP:输入扭曲的高斯过程模型
class GPyGP(BaseModel): """ Input warped GP model implemented using GPy instead of GPyTorch 使用GPy库实现的输入扭曲高斯过程模型(而非GPyTorch) Why doing so: 为什么这样做: - Input warped GP 支持输入扭曲的高斯过程 """ def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类初始化 total_dim = num_cont # 总维度初始化为连续变量数量 if num_enum > 0: # 如果存在离散变量 self.one_hot = OneHotTransform(self.conf['num_uniqs']) # 创建one-hot编码器 total_dim += self.one_hot.num_out # 增加离散变量编码后的维度 self.xscaler = TorchMinMaxScaler((-1, 1)) # 连续变量标准化器,范围[-1,1] self.yscaler = TorchStandardScaler() # 目标变量标准化器 self.verbose = self.conf.get('verbose', False) # 是否显示训练信息 self.num_epochs = self.conf.get('num_epochs', 200) # 训练轮数 self.warp = self.conf.get('warp', True) # 是否使用输入扭曲 self.space = self.conf.get('space') # 设计空间定义 self.num_restarts = self.conf.get('num_restarts', 10) # 优化重启次数 self.rd = self.conf.get('rd', False) # 是否使用随机分解 self.E = self.conf.get('E', 0.2) # 随机分解的边概率 if self.space is None and self.warp: # 如果没有设计空间但启用了扭曲 warnings.warn('Space not provided, set warp to False') # 发出警告 self.warp = False # 禁用扭曲 if self.warp: # 如果启用扭曲 for i in range(total_dim): # 为每个维度禁用日志记录器 logging.getLogger(f'a{i}').disabled = True # 禁用参数a的日志 logging.getLogger(f'b{i}').disabled = True # 禁用参数b的日志 def fit_scaler(self, Xc : FloatTensor, y : FloatTensor): if Xc is not None and Xc.shape[1] > 0: # 如果存在连续变量 if self.space is not None: # 如果有设计空间信息 cont_lb = self.space.opt_lb[:self.space.num_numeric].view(1, -1).float() # 获取连续变量下界 cont_ub = self.space.opt_ub[:self.space.num_numeric].view(1, -1).float() # 获取连续变量上界 self.xscaler.fit(torch.cat([Xc, cont_lb, cont_ub], dim = 0)) # 结合数据和边界来拟合标准化器 else: self.xscaler.fit(Xc) # 仅使用数据拟合标准化器 self.yscaler.fit(y) # 拟合目标变量标准化器 def trans(self, Xc : Tensor, Xe : Tensor, y : Tensor = None): if Xc is not None and Xc.shape[1] > 0: # 处理连续变量 Xc_t = self.xscaler.transform(Xc) # 标准化连续变量 else: Xc_t = torch.zeros(Xe.shape[0], 0) # 创建空的连续变量张量 if Xe is None or Xe.shape[1] == 0: # 处理离散变量 Xe_t = torch.zeros(Xc.shape[0], 0) # 创建空的离散变量张量 else: Xe_t = self.one_hot(Xe.long()) # one-hot编码离散变量 Xall = torch.cat([Xc_t, Xe_t], dim = 1) # 合并连续和离散特征 if y is not None: # 如果提供了目标变量 y_t = self.yscaler.transform(y) # 标准化目标变量 return Xall.numpy(), y_t.numpy() # 转换为numpy数组返回 return Xall.numpy() # 只返回特征数据 def fit(self, Xc : FloatTensor, Xe : LongTensor, y : LongTensor): Xc, Xe, y = filter_nan(Xc, Xe, y, 'all') # 过滤NaN值 self.fit_scaler(Xc, y) # 拟合标准化器 X, y = self.trans(Xc, Xe, y) # 转换数据格式 if self.rd: # 如果使用随机分解 cliques = get_random_graph(X.shape[1], self.E) # 生成随机图团结构 # process first clique 处理第一个团 pair = cliques[0] # 获取第一个团 k1 = GPy.kern.Linear(len(pair), active_dims=pair, ARD = False) # 线性核 k2 = GPy.kern.Matern32(len(pair), active_dims=pair, ARD = True) # Matern32核,启用ARD k2.lengthscale = np.std(X, axis = 0)[pair] # 设置长度尺度为特征标准差 k2.variance = 0.5 # 设置核方差 k2.variance.set_prior(GPy.priors.Gamma(0.5, 1)) # 设置方差先验为Gamma分布 kern = k1 + k2 # 线性核 + Matern核 # process remaining cliques 处理剩余团 for pair in cliques[1:]: k1 = GPy.kern.Linear(len(pair), active_dims=pair, ARD = False) # 线性核 k2 = GPy.kern.Matern32(len(pair), active_dims=pair, ARD = True) # Matern32核 geo_mean = 1 # 初始化几何均值 for d in pair: # 计算团内特征的几何标准差 geo_mean *= np.std(X, axis = 0)[d] k2.lengthscale = geo_mean**(1/len(pair)) # 设置长度尺度为几何均值 k2.variance = 0.5 # 设置核方差 k2.variance.set_prior(GPy.priors.Gamma(0.5, 1)) # 设置方差先验 kern += k1 + k2 # 累加到总核函数 else: # 如果不使用随机分解 k1 = GPy.kern.Linear(X.shape[1], ARD = False) # 全局线性核 k2 = GPy.kern.Matern32(X.shape[1], ARD = True) # 全局Matern32核,启用ARD k2.lengthscale = np.std(X, axis = 0).clip(min = 0.02) # 设置长度尺度,最小0.02 k2.variance = 0.5 # 设置核方差 k2.variance.set_prior(GPy.priors.Gamma(0.5, 1), warning = False) # 设置方差先验 kern = k1 + k2 # 线性核 + Matern核 if not self.warp: # 如果不使用输入扭曲 self.gp = GPy.models.GPRegression(X, y, kern) # 创建标准高斯过程回归 else: # 如果使用输入扭曲 xmin = np.zeros(X.shape[1]) # 初始化最小边界 xmax = np.ones(X.shape[1]) # 初始化最大边界 xmin[:Xc.shape[1]] = -1 # 设置连续变量范围为[-1,1] warp_f = GPy.util.input_warping_functions.KumarWarping(X, Xmin = xmin, Xmax = xmax) # 创建Kumar扭曲函数 self.gp = GPy.models.InputWarpedGP(X, y, kern, warping_function = warp_f) # 创建输入扭曲高斯过程 self.gp.likelihood.variance.set_prior(GPy.priors.LogGaussian(-4.63, 0.5), warning = False) # 设置噪声先验 # 多重启优化:最大迭代次数、是否显示信息、重启次数、鲁棒模式 self.gp.optimize_restarts(max_iters = self.num_epochs, verbose = self.verbose, num_restarts = self.num_restarts, robust = True) return self # 返回自身用于链式调用 def predict(self, Xc : FloatTensor, Xe : LongTensor) -> (FloatTensor, FloatTensor): Xall = self.trans(Xc, Xe) # 转换输入数据 py, ps2 = self.gp.predict(Xall) # 使用GPy模型预测(均值和方差) mu = self.yscaler.inverse_transform(FloatTensor(py).view(-1, 1)) # 逆标准化均值 var = self.yscaler.std**2 * FloatTensor(ps2).view(-1, 1) # 调整方差到原始尺度 return mu, var.clamp(torch.finfo(var.dtype).eps) # 返回均值和确保非负的方差 def sample_f(self): raise NotImplementedError('Thompson sampling is not supported for GP, use `sample_y` instead') # 不支持函数采样 @property def noise(self): var_normalized = self.gp.likelihood.variance[0] # 获取标准化后的噪声方差 return (var_normalized * self.yscaler.std**2).view(self.num_out) # 转换到原始尺度并调整形状
RF:随机森林回归
class RF(BaseModel): """ 随机森林回归模型实现 """ def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类初始化 self.n_estimators = self.conf.get('n_estimators', 100) # 树的数量,默认100 self.rf = RandomForestRegressor(n_estimators = self.n_estimators) # 创建随机森林回归器 self.est_noise = torch.zeros(self.num_out) # 初始化估计噪声为零张量 if self.num_enum > 0: # 如果存在离散变量 self.one_hot = OneHotTransform(self.conf['num_uniqs']) # 创建one-hot编码器 def xtrans(self, Xc : FloatTensor, Xe: LongTensor) -> np.ndarray: """ 转换输入数据为numpy数组格式 """ if self.num_enum == 0: # 如果没有离散变量 return Xc.detach().numpy() # 直接返回连续变量的numpy数组 else: # 如果有离散变量 Xe_one_hot = self.one_hot(Xe) # 对离散变量进行one-hot编码 if Xc is None: # 如果没有连续变量 Xc = torch.zeros(Xe.shape[0], 0) # 创建空的连续变量张量 return torch.cat([Xc, Xe_one_hot], dim = 1).numpy() # 合并连续和离散特征并转为numpy def fit(self, Xc : torch.Tensor, Xe : torch.Tensor, y : torch.Tensor): """ 训练随机森林模型 """ Xc, Xe, y = filter_nan(Xc, Xe, y, 'all') # 过滤包含NaN的数据点 Xtr = self.xtrans(Xc, Xe) # 转换输入特征为numpy格式 ytr = y.numpy().reshape(-1) # 转换目标变量为一维numpy数组 self.rf.fit(Xtr, ytr) # 训练随机森林模型 # 计算训练集上的MSE作为噪声估计 mse = np.mean((self.rf.predict(Xtr).reshape(-1) - ytr)**2).reshape(self.num_out) self.est_noise = torch.FloatTensor(mse) # 保存估计的噪声水平 @property def noise(self): """ 返回估计的噪声水平 """ return self.est_noise def predict(self, Xc : torch.Tensor, Xe : torch.Tensor): """ 使用训练好的模型进行预测 返回: (预测均值, 预测方差 + 估计噪声) """ X = self.xtrans(Xc, Xe) # 转换输入数据 mean = self.rf.predict(X).reshape(-1, 1) # 预测均值(所有树的平均) preds = [] # 存储每棵树的独立预测 for estimator in self.rf.estimators_: # 遍历所有决策树 preds.append(estimator.predict(X).reshape([-1,1])) # 收集每棵树的预测 var = np.var(np.concatenate(preds, axis=1), axis=1) # 计算树间预测方差 # 返回均值和总方差(模型方差 + 估计噪声) return torch.FloatTensor(mean.reshape([-1,1])), torch.FloatTensor(var.reshape([-1,1])) + self.noise
SVGP:稀疏变分高斯过程(Stochastic Variational Gaussian Process)
class SVGP(BaseModel): """ 稀疏变分高斯过程 (Stochastic Variational Gaussian Process) 适用于大规模数据的近似高斯过程模型 """ support_grad = True # 支持梯度计算 support_multi_output = True # 支持多输出任务 def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类初始化 # 配置参数 self.use_ngd = conf.get('use_ngd', False) # 是否使用自然梯度下降 self.lr = conf.get('lr', 1e-2) # 基础学习率 self.lr_vp = conf.get('lr_vp', 1e-1) # 变分参数学习率 self.lr_fe = conf.get('lr_fe', 1e-3) # 特征提取器学习率 self.num_inducing = conf.get('num_inducing', 128) # 诱导点数量 self.ard_kernel = conf.get('ard_kernel', True) # 是否使用ARD核 self.pred_likeli = conf.get('pred_likeli', True) # 预测时是否考虑噪声 self.beta = conf.get('beta', 1.0) # ELBO的beta参数 # 训练参数 self.batch_size = conf.get('batch_size', 64) # 批大小 self.num_epochs = conf.get('num_epochs', 300) # 训练轮数 self.verbose = conf.get('verbose', False) # 是否显示训练信息 self.print_every = conf.get('print_every', 10) # 打印频率 self.noise_lb = conf.get('noise_lb', 1e-5) # 噪声下界 # 数据标准化器 self.xscaler = TorchMinMaxScaler((-1, 1)) # 输入标准化器 self.yscaler = TorchStandardScaler() # 输出标准化器 def fit_scaler(self, Xc : FloatTensor, Xe : LongTensor, y : FloatTensor): """拟合数据标准化器""" if Xc is not None and Xc.shape[1] > 0: # 如果有连续变量 self.xscaler.fit(Xc) # 拟合输入标准化器 self.yscaler.fit(y) # 拟合输出标准化器 def xtrans(self, Xc : FloatTensor, Xe : LongTensor, y : FloatTensor = None): """转换输入数据格式""" if Xc is not None and Xc.shape[1] > 0: # 处理连续变量 Xc_t = self.xscaler.transform(Xc) # 标准化连续变量 else: Xc_t = torch.zeros(Xe.shape[0], 0) # 创建空的连续变量张量 if Xe is None: # 处理离散变量 Xe_t = torch.zeros(Xc.shape[0], 0).long() else: Xe_t = Xe.long() # 确保离散变量为long类型 if y is not None: # 如果提供了目标变量 y_t = self.yscaler.transform(y) # 标准化目标变量 return Xc_t, Xe_t, y_t else: return Xc_t, Xe_t def fit(self, Xc : FloatTensor, Xe : LongTensor, y : FloatTensor): """训练SVGP模型""" Xc, Xe, y = filter_nan(Xc, Xe, y, 'any') # 过滤包含NaN的数据点 self.fit_scaler(Xc, Xe, y) # 拟合标准化器 Xc, Xe, y = self.xtrans(Xc, Xe, y) # 转换数据格式 # 验证数据维度 assert(Xc.shape[1] == self.num_cont) assert(Xe.shape[1] == self.num_enum) assert(y.shape[1] == self.num_out) # 设置噪声约束和创建模型 n_constr = GreaterThan(self.noise_lb) # 噪声下界约束 self.gp = SVGPModel(Xc, Xe, y, **self.conf) # 创建SVGP模型 # 为每个输出创建独立的高斯似然函数 self.lik = nn.ModuleList([GaussianLikelihood(noise_constraint = n_constr) for _ in range(self.num_out)]) # 设置模型为训练模式 self.gp.train() self.lik.train() # 创建数据加载器 ds = TensorDataset(Xc, Xe, y) # 创建Tensor数据集 dl = DataLoader(ds, batch_size = self.batch_size, shuffle = True, drop_last = y.shape[0] > self.batch_size) # 数据加载器 # 配置优化器 if self.use_ngd: # 如果使用自然梯度下降 opt = torch.optim.Adam([ {'params' : self.gp.fe.parameters(), 'lr' : self.lr_fe}, # 特征提取器参数 {'params' : self.gp.gp.hyperparameters()}, # GP超参数 {'params' : self.lik.parameters()}, # 似然函数参数 ], lr = self.lr) opt_ng = gpytorch.optim.NGD(self.gp.variational_parameters(), lr = self.lr_vp, num_data = y.shape[0]) # 自然梯度优化器 else: # 使用标准Adam优化器 opt = torch.optim.Adam([ {'params' : self.gp.fe.parameters(), 'lr' : self.lr_fe}, # 特征提取器参数 {'params' : self.gp.gp.hyperparameters()}, # GP超参数 {'params' : self.gp.gp.variational_parameters(), 'lr' : self.lr_vp}, # 变分参数 {'params' : self.lik.parameters()}, # 似然函数参数 ], lr = self.lr) # 为每个输出创建变分ELBO目标函数 mll = [gpytorch.mlls.VariationalELBO(self.lik[i], self.gp.gp[i], num_data = y.shape[0], beta = self.beta) for i in range(self.num_out)] # 训练循环 for epoch in range(self.num_epochs): epoch_loss = 0. # 累计损失 epoch_cnt = 1e-6 # 批次计数(避免除零) for bxc, bxe, by in dl: # 遍历数据批次 dist_list = self.gp(bxc, bxe, by) # 前向传播,获取分布列表 loss = 0 # 初始化损失 valid = torch.isfinite(by) # 检查有效值 # 计算每个输出的损失 for i, dist in enumerate(dist_list): loss += -1 * mll[i](dist, by[valid[:, i], i]) * valid[:, i].sum() # 加权ELBO损失 loss /= by.shape[0] # 平均损失 # 反向传播和优化 if self.use_ngd: opt.zero_grad() opt_ng.zero_grad() loss.backward() opt.step() opt_ng.step() else: opt.zero_grad() loss.backward() opt.step() # 累计损失和计数 epoch_loss += loss.item() epoch_cnt += 1 epoch_loss /= epoch_cnt # 计算平均epoch损失 if self.verbose and ((epoch + 1) % self.print_every == 0 or epoch == 0): print('After %d epochs, loss = %g' % (epoch + 1, epoch_loss), flush = True) # 设置模型为评估模式 self.gp.eval() self.lik.eval() def predict(self, Xc, Xe): """模型预测""" Xc, Xe = self.xtrans(Xc, Xe) # 转换输入数据 with gpytorch.settings.fast_pred_var(), gpytorch.settings.debug(False): # 快速预测设置 pred = self.gp(Xc, Xe) # 获取预测分布 if self.pred_likeli: # 如果考虑似然噪声 for i in range(self.num_out): pred[i] = self.lik[i](pred[i]) # 通过似然函数转换 # 合并所有输出的均值和方差 mu_ = torch.cat([pred[i].mean.reshape(-1, 1) for i in range(self.num_out)], dim = 1) var_ = torch.cat([pred[i].variance.reshape(-1, 1) for i in range(self.num_out)], dim = 1) # 逆标准化到原始尺度 mu = self.yscaler.inverse_transform(mu_) var = var_ * self.yscaler.std**2 return mu, var.clamp(min = torch.finfo(var.dtype).eps) # 确保方差非负 def sample_y(self, Xc, Xe, n_samples = 1) -> FloatTensor: """ 从后验分布采样目标变量 返回: (n_samples, 样本数, 输出维度) """ Xc, Xe = self.xtrans(Xc, Xe) # 转换输入数据 with gpytorch.settings.debug(False): pred = self.gp(Xc, Xe) # 获取预测分布 if self.pred_likeli: # 如果考虑似然噪声 for i in range(self.num_out): pred[i] = self.lik[i](pred[i]) # 通过似然函数转换 # 从每个输出分布采样并合并 samp = [pred[i].rsample(torch.Size((n_samples, ))).reshape(n_samples, -1, 1) for i in range(self.num_out)] samp = torch.cat(samp, dim = -1) return self.yscaler.inverse_transform(samp) # 逆标准化采样结果 def sample_f(self): """不支持函数采样""" raise NotImplementedError('Thompson sampling is not supported for GP, use `sample_y` instead') @property def noise(self): """获取估计的噪声水平""" noise = torch.FloatTensor([lik.noise for lik in self.lik]).view(self.num_out).detach() # 各输出噪声 return noise * self.yscaler.std**2 # 转换到原始尺度
SVIDKL:稀疏变分深度核学习(Stochastic Variational Deep Kernel Learning)
class DKLFe(nn.Module): """ 深度核学习特征提取器 (Deep Kernel Learning Feature Extractor) 使用神经网络自动学习特征表示,替代手动设计的核函数 """ def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__() # 神经网络配置参数 self.num_hiddens = conf.get('num_hiddens', 64) # 隐藏层维度,默认64 self.num_layers = conf.get('num_layers', 2) # 隐藏层数量,默认2层 self.act = conf.get('act', nn.LeakyReLU()) # 激活函数,默认LeakyReLU self.sn_norm = conf.get('sn_norm') # 谱归一化,默认不使用 # 特征预处理:处理连续和离散变量的嵌入转换 self.emb_trans = DummyFeatureExtractor(num_cont, num_enum, conf.get('num_uniqs'), conf.get('emb_sizes')) # 构建深度神经网络特征提取器 self.fe = construct_hidden(self.emb_trans.total_dim, self.num_layers, self.num_hiddens, self.act, self.sn_norm) self.total_dim = self.num_hiddens # 输出特征维度 def forward(self, x, xe): """ 前向传播:将原始输入转换为深度特征表示 """ x_all = self.emb_trans(x, xe) # 首先进行嵌入转换,处理混合类型输入 return self.fe(x_all) # 通过深度网络提取高级特征 class SVIDKL(SVGP): """ 稀疏变分深度核学习 (Stochastic Variational Deep Kernel Learning) 在SVGP基础上集成深度特征学习的扩展 """ def __init__(self, num_cont, num_enum, num_out, **conf): # 创建配置的深拷贝,避免修改原始配置 new_conf = deepcopy(conf) # 关键变化1:禁用ARD核 new_conf.setdefault('ard_kernel', False) # 解释:当有神经网络特征提取器时,不需要使用ARD核 # 因为神经网络已经自动学习了特征的重要性权重 # 关键变化2:使用深度特征提取器 new_conf.setdefault('fe', DKLFe(num_cont, num_enum, num_out, **new_conf)) # 解释:用深度神经网络替代简单的特征转换,自动学习数据表示 # 关键变化3:使用简化的核函数 new_conf.setdefault('kern', ScaleKernel(MaternKernel(nu = 2.5))) # 解释:由于特征提取器已经处理了复杂性,可以使用更简单的核函数 # 调用父类SVGP的初始化,传入修改后的配置 super().__init__(num_cont, num_enum, num_out, **new_conf)
CatBoost:梯度提升树模型
class CatBoost(BaseModel): """ CatBoost梯度提升树模型实现 专门优化类别特征处理的梯度提升算法 """ def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类初始化 # CatBoost配置参数 self.num_epochs = self.conf.get('num_epochs', 100) # 最大树的数量(迭代次数) self.lr = self.conf.get('lr', 0.2) # 学习率 self.depth = self.conf.get('depth', 10) # 树深度,推荐范围[1, 10] self.loss_function = self.conf.get('loss_function', 'RMSEWithUncertainty') # 损失函数,支持不确定性估计 self.posterior_sampling = self.conf.get('posterior_sampling', True) # 是否使用后验采样 self.verbose = self.conf.get('verbose', False) # 是否显示训练过程 self.random_seed = self.conf.get('random_seed', 42) # 随机种子 self.num_ensembles = self.conf.get('num_ensembles', 10) # 虚拟集成数量 # 确保迭代次数足够用于集成 if self.num_epochs < 2 * self.num_ensembles: self.num_epochs = self.num_ensembles * 2 # 至少是集成数量的2倍 # 创建CatBoost回归器实例 self.model = CatBoostRegressor( iterations=self.num_epochs, # 迭代次数 learning_rate=self.lr, # 学习率 depth=self.depth, # 树深度 loss_function=self.loss_function, # 损失函数(支持不确定性) posterior_sampling=self.posterior_sampling, # 后验采样 verbose=self.verbose, # 是否显示训练信息 random_seed=self.random_seed, # 随机种子 allow_writing_files=False) # 禁止写入文件 def xtrans(self, Xc: FloatTensor, Xe: LongTensor) -> FeaturesData: """ 转换输入数据为CatBoost所需的格式 """ # 处理连续变量:转换为numpy float32格式 num_feature_data = Xc.numpy().astype(np.float32) if self.num_cont != 0 else None # 处理离散变量:转换为字符串格式(CatBoost要求) cat_feature_data = Xe.numpy().astype(str).astype(object) if self.num_enum != 0 else None # 返回CatBoost特征数据对象 return FeaturesData( num_feature_data=num_feature_data, # 数值特征 cat_feature_data=cat_feature_data) # 类别特征 def fit(self, Xc: FloatTensor, Xe: LongTensor, y: FloatTensor): """ 训练CatBoost模型 """ Xc, Xe, y = filter_nan(Xc, Xe, y, 'all') # 过滤NaN值 # 创建训练数据池 train_data = Pool( data=self.xtrans(Xc=Xc, Xe=Xe), # 转换特征数据 label=y.numpy().reshape(-1)) # 转换标签为一维数组 self.model.fit(train_data) # 训练模型 def predict(self, Xc: FloatTensor, Xe: LongTensor) -> (FloatTensor, FloatTensor): """ 使用CatBoost进行预测,返回均值和方差 """ test_data = Pool(data=self.xtrans(Xc=Xc, Xe=Xe)) # 创建测试数据池 # 使用虚拟集成进行预测,获取不确定性估计 preds = self.model.virtual_ensembles_predict( data=test_data, # 测试数据 prediction_type='TotalUncertainty', # 预测类型:总不确定性 virtual_ensembles_count=self.num_ensembles) # 虚拟集成数量 # 解析预测结果: # preds[:, 0] = 预测均值 # preds[:, 1] = 知识不确定性(模型不确定性) # preds[:, 2] = 偶然不确定性(数据噪声) mean = preds[:, 0] # 预测均值 var = preds[:, 1] + preds[:, 2] # 总方差 = 模型方差 + 数据噪声 # 返回PyTorch张量格式的结果 return torch.FloatTensor(mean.reshape([-1,1])), torch.FloatTensor(var.reshape([-1,1]))
DeepEnsemble:深度集成模型
class DeepEnsemble(BaseModel): """ 深度集成模型 - 通过多个独立训练的神经网络集成来估计不确定性 基于Lakshminarayanan et al. (2017) "Simple and Scalable Predictive Uncertainty Estimation using Deep Ensembles" """ # 模型能力标识 support_ts = True # 支持Thompson采样 support_grad = True # 支持梯度计算 support_multi_output = True # 支持多输出任务 support_warm_start = True # 支持热启动(从已有模型继续训练) def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化深度集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类初始化 # 集成策略配置 self.bootstrap = self.conf.setdefault('bootstrap', False) # 是否使用自助采样创建数据多样性 self.rand_prior = self.conf.setdefault('rand_prior', False) # 是否使用随机先验网络 self.output_noise = self.conf.setdefault('output_noise', True) # 是否输出噪声估计 self.num_ensembles = self.conf.setdefault('num_ensembles', 5) # 集成模型数量 self.num_process = self.conf.setdefault('num_processes', 1) # 并行训练进程数 self.num_epochs = self.conf.setdefault('num_epochs', 500) # 每个模型的训练轮数 self.print_every = self.conf.setdefault('print_every', 50) # 训练信息打印频率 # 网络架构配置 self.num_layers = self.conf.setdefault('num_layers', 1) # 隐藏层数量 self.num_hiddens = self.conf.setdefault('num_hiddens', 128) # 隐藏层维度 self.l1 = self.conf.setdefault('l1', 1e-3) # L1正则化系数 self.batch_size = self.conf.setdefault('batch_size', 32) # 批大小 self.lr = self.conf.setdefault('lr', 5e-3) # 学习率 self.adv_eps = self.conf.setdefault('adv_eps', 0.) # 对抗训练扰动大小(未使用) self.verbose = self.conf.setdefault('verbose', False) # 是否显示训练详情 self.basenet_cls = self.conf.setdefault('basenet_cls', BaseNet) # 基础网络类 assert self.num_ensembles > 0 # 确保至少有一个集成模型 # 数据标准化器 self.xscaler = TorchMinMaxScaler((-1, 1)) # 输入标准化器,范围[-1,1] self.yscaler = TorchStandardScaler() # 输出标准化器,均值为0标准差为1 # 损失函数配置 self.loss = self.loss_likelihood if self.output_noise else self.loss_mse # 选择损失函数 self.loss_name = "NLL" if self.output_noise else "MSE" # 损失函数名称 self.models = None # 存储集成模型的列表 self.sample_idx = 0 # Thompson采样索引 self.noise_est = torch.zeros(self.num_out) # 估计的噪声水平 @property def fitted(self): """检查模型是否已经训练""" return self.models is not None @property def noise(self) -> FloatTensor: """返回估计的噪声水平""" return self.noise_est def fit(self, Xc_ : FloatTensor, Xe_ : LongTensor, y_ : FloatTensor, **fitting_conf): """ 训练深度集成模型 参数: Xc_: 连续特征 Xe_: 离散特征 y_: 目标变量 """ # 1. 数据预处理:过滤无效值 valid = torch.isfinite(y_).any(dim = 1) # 找到有效的目标值 Xc = Xc_[valid] if Xc_ is not None else None # 过滤连续特征 Xe = Xe_[valid] if Xe_ is not None else None # 过滤离散特征 y = y_[valid] # 过滤目标变量 # 2. 如果是首次训练,拟合标准化器 if self.models is None: self.fit_scaler(Xc, Xe, y) # 3. 转换数据格式 Xc, Xe, y = self.trans(Xc, Xe, y) # 4. 并行或串行训练集成模型 if self.num_process > 1: # 多进程并行训练 with Pool(self.num_process) as p: self.models = p.starmap(self.fit_one, [ (Xc.clone(), Xe.clone(), y.clone(), model_idx) for model_idx in range(self.num_ensembles) ]) else: # 单进程串行训练 self.models = [self.fit_one(Xc, Xe, y, i) for i in range(self.num_ensembles)] # 5. 验证所有模型都成功训练 assert None not in self.models self.sample_idx = 0 # 重置采样索引 # 6. 估计噪声水平(基于训练集误差) with torch.no_grad(): py, _ = self.predict(Xc_, Xe_) # 预测训练数据 err = (py - y_)[valid] # 计算预测误差 self.noise_est = (err**2).mean(dim = 0).detach().clone() # 计算MSE作为噪声估计 def predict(self, Xc_ : FloatTensor, Xe_ : LongTensor) -> (FloatTensor, FloatTensor): """ 使用集成模型进行预测 返回: mu: 预测均值 var: 预测方差 """ # 1. 转换输入数据 Xc, Xe = self.trans(Xc_, Xe_) # 2. 收集所有模型的预测 preds = torch.stack([self.models[i](Xc, Xe) for i in range(self.num_ensembles)]) # 3. 计算均值和方差 if not self.output_noise: # 如果不输出噪声,直接计算均值和方差 py = preds.mean(dim = 0) # 模型平均 ps2 = 1e-8 + preds.var(dim = 0, unbiased = False) # 模型方差,添加小值避免数值问题 else: # 如果输出噪声,分解为均值和方差部分 mu = preds[:, :, :self.num_out] # 均值预测 sigma2 = preds[:, :, self.num_out:] # 方差预测 py = mu.mean(dim = 0) # 集成均值 ps2 = mu.var(dim = 0, unbiased = False) + sigma2.mean(axis = 0) # 总方差 = 模型方差 + 平均数据方差 # 4. 将预测转换回原始尺度 return self.yscaler.inverse_transform(py), ps2 * self.yscaler.std**2 def sample_f(self): """ Thompson采样:随机选择一个集成模型进行预测 返回: 采样函数,用于贝叶斯优化 """ assert(self.fitted) # 确保模型已训练 idx = self.sample_idx # 获取当前采样索引 self.sample_idx = (self.sample_idx + 1) % self.num_ensembles # 循环更新索引 def f(Xc : FloatTensor, Xe : LongTensor) -> FloatTensor: """采样函数""" model = self.models[idx] # 选择第idx个模型 Xc, Xe = self.trans(Xc, Xe) # 转换输入 # 返回预测(只取均值部分,逆标准化) return self.yscaler.inverse_transform(model(Xc, Xe)[:, :self.num_out]) return f def fit_scaler(self, Xc : Tensor, Xe : Tensor, y : Tensor): """拟合数据标准化器""" if Xc is not None and Xc.shape[1] > 0: self.xscaler.fit(Xc) # 拟合连续变量标准化器 self.yscaler.fit(y) # 拟合目标变量标准化器 def trans(self, Xc : Tensor, Xe : Tensor, y : Tensor = None): """转换输入数据格式""" # 处理连续变量 if Xc is not None and Xc.shape[1] > 0: Xc_t = self.xscaler.transform(Xc) # 标准化连续变量 else: Xc_t = torch.zeros(Xe.shape[0], 0) # 创建空的连续变量张量 # 处理离散变量 if Xe is None: Xe_t = torch.zeros(Xc.shape[0], 0).long() # 创建空的离散变量张量 else: Xe_t = Xe.long() # 转换为long类型 # 处理目标变量(如果提供) if y is not None: y_t = self.yscaler.transform(y) # 标准化目标变量 return Xc_t, Xe_t, y_t else: return Xc_t, Xe_t def loss_mse(self, pred, target): """均方误差损失函数""" mask = torch.isfinite(target) # 找到有效目标值 return nn.MSELoss()(pred[mask], target[mask]) # 计算MSE def loss_likelihood(self, pred, target): """负对数似然损失函数(用于不确定性估计)""" mask = torch.isfinite(target) # 找到有效目标值 mu = pred[:, :self.num_out][mask] # 均值预测 sigma2 = pred[:, self.num_out:][mask] # 方差预测 # 高斯负对数似然:0.5 * (y-μ)²/σ² + 0.5 * log(σ²) loss = 0.5 * (target[mask] - mu)**2 / sigma2 + 0.5 * torch.log(sigma2) return torch.mean(loss) def fit_one(self, Xc, Xe, y, idx, **fitting_conf): """ 训练单个神经网络模型 参数: Xc, Xe, y: 训练数据 idx: 模型索引 """ torch.seed() # 设置随机种子(确保每个模型初始化不同) # 1. 准备数据加载器 dataset = TensorDataset(Xc, Xe, y) # 创建数据集 loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, drop_last=y.shape[0] > self.batch_size) # 创建数据加载器 # 2. 初始化模型(支持热启动) if self.models is not None and len(self.models) == self.num_ensembles: model = deepcopy(self.models[idx]) # 热启动:复制已有模型 else: model = self.basenet_cls(self.num_cont, self.num_enum, self.num_out, **self.conf) # 创建新模型 # 3. 设置优化器 opt = torch.optim.Adam(model.parameters(), lr=self.lr) model.train() # 设置训练模式 # 4. 训练循环 for epoch in range(self.num_epochs): epoch_loss = 0 # 累计损失 for bxc, bxe, by in loader: # 遍历批次 # 前向传播 py = model(bxc, bxe) # 计算损失 data_loss = self.loss(py, by) # 数据损失 reg_loss = 0. # 正则化损失 for p in model.parameters(): reg_loss += self.l1 * p.abs().sum() / (y.shape[0] * y.shape[1]) # L1正则化 loss = data_loss + reg_loss # 总损失 # 反向传播 opt.zero_grad() loss.backward() opt.step() # 累计批次损失 epoch_loss += data_loss * bxc.shape[0] # 打印训练信息 if epoch % self.print_every == 0: if self.verbose: print("Epoch %d, %s loss = %g" % (epoch, self.loss_name, epoch_loss / Xc.shape[0]), flush=True) model.eval() # 设置评估模式 return model class BaseNet(nn.Module): """ 基础神经网络架构 - 支持不确定性估计和随机先验 """ def __init__(self, num_cont, num_enum, num_out, **conf): super().__init__() # 基础配置 self.num_cont = num_cont self.num_enum = num_enum self.num_out = num_out self.noise_lb = conf.get('noise_lb', 1e-4) # 噪声下界(避免除零) self.num_layers = conf.get('num_layers', 1) # 隐藏层数量 self.num_hiddens = conf.get('num_hiddens', 128) # 隐藏层维度 self.output_noise = conf.get('output_noise', True) # 是否输出噪声估计 self.rand_prior = conf.get('rand_prior', False) # 是否使用随机先验 self.act = conf.get('act', nn.ReLU()) # 激活函数 # 计算有效输入维度 self.eff_dim = num_cont # 初始化为连续变量维度 # 离散变量处理 if self.num_enum > 0: assert 'num_uniqs' in conf, "num_uniqs not in algorithm configuration" num_uniqs = conf['num_uniqs'] # 每个离散变量的类别数 enum_trans = conf.get('enum_trans', 'embedding') # 离散变量转换方式 if enum_trans == 'embedding': self.enum_layer = EmbTransform(num_uniqs) # 嵌入转换 elif enum_trans == 'onehot': self.enum_layer = OneHotTransform(num_uniqs) # 独热编码 else: raise RuntimeError(f'Unknown enum processing type {enum_trans}, can only be [embedding|onehot]') self.eff_dim += self.enum_layer.num_out # 增加离散变量编码后的维度 # 构建隐藏层 self.hidden = construct_hidden(self.eff_dim, self.num_layers, self.num_hiddens, act=self.act) # 均值输出层 self.mu = nn.Linear(self.num_hiddens, self.num_out) # 方差输出层(如果启用不确定性估计) if self.output_noise: self.sigma2 = nn.Sequential( nn.Linear(self.num_hiddens, self.num_out), # 线性层 nn.Softplus() # 确保输出为正数 ) # 随机先验网络(如果启用) if self.rand_prior: # 基于Osband et al. (2018) "Randomized Prior Functions for Deep Reinforcement Learning" self.prior_net = construct_hidden(self.eff_dim, self.num_layers, self.num_hiddens, act=self.act) self.prior_net.add_module('prior_net_out', nn.Linear(self.num_hiddens, self.num_out)) # 参数初始化 for n, p in self.named_parameters(): if "bias" in n: nn.init.zeros_(p) # 偏置初始化为0 else: nn.init.xavier_uniform_(p, gain=nn.init.calculate_gain('relu')) # 权重使用Xavier初始化 def xtrans(self, Xc : FloatTensor, Xe : LongTensor) -> FloatTensor: """转换输入特征为统一格式""" Xall = Xc.clone() if self.num_cont > 0 else torch.zeros(Xe.shape[0], 0) # 处理连续变量 if self.num_enum > 0: # 处理离散变量并拼接 Xall = torch.cat([Xall, self.enum_layer(Xe)], dim=1) return Xall def forward(self, Xc : FloatTensor, Xe : LongTensor) -> FloatTensor: """前向传播""" # 1. 特征转换 inputs = self.xtrans(Xc, Xe) # 2. 随机先验输出(如果启用) prior_out = 0. if self.rand_prior: with torch.no_grad(): # 随机先验网络不参与梯度计算 prior_out = self.prior_net(inputs).detach() # 3. 主网络前向传播 hidden = self.hidden(inputs) # 隐藏层 mu = self.mu(hidden) + prior_out # 均值输出 + 先验 # 4. 构建最终输出 if self.output_noise: # 输出均值和方差:[mu, sigma2] sigma2_output = self.noise_lb + self.sigma2(hidden) # 方差输出,确保大于下界 out = torch.cat((mu, sigma2_output), dim=1) else: # 只输出均值 out = mu return out
pSGLDEnsemble:基于预处理随机梯度Langevin动力学优化器的深度集成模型
class SGLD(torch.optim.SGD): """ 随机梯度Langevin动力学 (Stochastic Gradient Langevin Dynamics) 一种贝叶斯采样优化器,在SGD基础上添加高斯噪声,用于从后验分布中采样 基于: Welling & Teh (2011) "Bayesian Learning via Stochastic Gradient Langevin Dynamics" """ def __init__(self, params, lr, factor = 1., pretrain_step = 0, **kw_args): """ 初始化SGLD优化器 参数: params: 要优化的模型参数 lr: 学习率 factor: 噪声缩放因子。如果损失函数是nn.MSELoss(reduction='mean'),则factor应为1./N,N是数据量 pretrain_step: 预训练步数,在此步数之前只使用SGD(不添加噪声) **kw_args: 传递给父类SGD的其他参数 """ super().__init__(params, lr, **kw_args) # 调用父类SGD初始化 self.factor = factor # 噪声缩放因子 self.pretrain_step = pretrain_step # 预训练步数(纯SGD阶段) self.n_step = 0 # 当前训练步数计数器 @torch.no_grad() # 不计算梯度,因为这是优化步骤 def step(self, **kw_args): """ 执行一步优化(包含Langevin噪声) """ super().step(**kw_args) # 首先执行标准的SGD步骤 self.n_step += 1 # 更新步数计数器 # 遍历所有参数组 for group in self.param_groups: # 遍历组内的每个参数 for p in group['params']: if p.grad is None: # 如果参数没有梯度,跳过 continue # 如果超过了预训练步数,开始添加Langevin噪声 if self.n_step > self.pretrain_step: lr = group['lr'] # 获取当前学习率 noise_var = 2. * lr # 计算噪声方差:2 * 学习率 noise_std = np.sqrt(noise_var) # 计算噪声标准差 # 添加高斯噪声:factor * √(2*lr) * N(0,1) p.add_(self.factor * noise_std * torch.randn_like(p)) class pSGLD(torch.optim.RMSprop): """ 预处理随机梯度Langevin动力学 (preconditioned Stochastic Gradient Langevin Dynamics) SGLD的改进版本,使用RMSProp的预处理矩阵来调整噪声尺度 基于: Li et al. (2016) "Preconditioned Stochastic Gradient Langevin Dynamics for Deep Neural Networks" """ def __init__(self, params, factor = 1., pretrain_step = 0, lr=1e-2, alpha=0.99, eps=1e-8, weight_decay=0): """ 初始化pSGLD优化器 参数: params: 要优化的模型参数 factor: 噪声缩放因子 pretrain_step: 预训练步数 lr: 学习率 alpha: RMSProp的平滑常数 eps: 数值稳定性常数 weight_decay: 权重衰减系数 """ # 调用父类RMSprop初始化(禁用动量和中心化) super().__init__(params, lr=lr, alpha=alpha, eps=eps, weight_decay=weight_decay, momentum=0, centered=False) self.factor = factor # 噪声缩放因子 self.pretrain_step = pretrain_step # 预训练步数 self.n_step = 0 # 当前训练步数计数器 @torch.no_grad() def step(self, closure = None): """ 执行一步优化(包含预处理Langevin噪声) """ super().step(closure) # 执行标准的RMSprop步骤 self.n_step += 1 # 更新步数计数器 # 遍历所有参数组 for group in self.param_groups: # 遍历组内的每个参数 for p in group['params']: if p.grad is None: # 如果参数没有梯度,跳过 continue # 如果超过了预训练步数,开始添加预处理Langevin噪声 if self.n_step > self.pretrain_step: # 获取RMSProp状态:平方梯度的指数移动平均 square_avg = self.state[p]['square_avg'] lr = group['lr'] # 学习率 eps = group['eps'] # 数值稳定性常数 # 计算预处理矩阵:G = √(square_avg + eps) avg = square_avg.sqrt().add_(eps) # 计算预处理后的噪声方差:2 * lr / G noise_var = 2 * lr / avg # 添加预处理高斯噪声:factor * √(2*lr/G) * N(0,1) p.add_(self.factor * noise_var.sqrt() * torch.randn_like(p)) class pSGLDEnsemble(DeepEnsemble): """ 基于pSGLD的深度集成模型 使用pSGLD优化器训练每个网络,从后验分布中采样多个模型 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化pSGLD集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类DeepEnsemble初始化 self.factor = conf.get('factor') # 噪声缩放因子 self.pretrain_epochs = conf.get('pretrain_epochs', 50) # 预训练轮数 def fit_one(self, Xc, Xe, y, idx, **fitting_conf): """ 使用pSGLD优化器训练单个神经网络模型 参数: Xc, Xe, y: 训练数据 idx: 模型索引 """ torch.seed() # 设置随机种子(确保每个模型初始化不同) # 1. 准备数据加载器 dataset = TensorDataset(Xc, Xe, y) # 创建数据集 # 创建数据加载器,当数据量大于批次大小时丢弃最后不完整的批次 loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, drop_last=y.shape[0] > self.batch_size) # 2. 初始化模型(支持热启动) if self.models is not None and len(self.models) == self.num_ensembles: model = deepcopy(self.models[idx]) # 热启动:复制已有模型 else: # 创建新的基础网络 model = self.basenet_cls(self.num_cont, self.num_enum, self.num_out, **self.conf) # 3. 配置pSGLD优化器 # 计算噪声因子:默认为1/数据元素总数,确保正确的贝叶斯缩放 factor = 1. / y.numel() if self.factor is None else self.factor # 计算预训练步数:轮数 * 每轮批次数 pretrain_steps = self.pretrain_epochs * y.shape[0] // self.batch_size # 创建pSGLD优化器 opt = pSGLD(model.parameters(), lr = self.lr, # 学习率 factor = factor, # 噪声缩放因子 pretrain_step = pretrain_steps) # 预训练步数 # 4. 设置模型为训练模式 model.train() # 5. 训练循环 for epoch in range(self.num_epochs): epoch_loss = 0 # 累计损失 for bxc, bxe, by in loader: # 遍历数据批次 # 前向传播 py = model(bxc, bxe) # 计算损失 data_loss = self.loss(py, by) # 数据损失(似然或MSE) reg_loss = 0. # 正则化损失 # 计算L1正则化 for p in model.parameters(): reg_loss += self.l1 * p.abs().sum() / (y.shape[0] * y.shape[1]) # 总损失 = 数据损失 + 正则化损失 loss = data_loss + reg_loss # 反向传播和优化 opt.zero_grad() # 清零梯度 loss.backward() # 反向传播计算梯度 opt.step() # pSGLD优化步骤(可能添加Langevin噪声) # 注意:对抗训练未使用(代码注释XXX) # 累计批次损失(仅数据损失部分) epoch_loss += data_loss * bxc.shape[0] # 打印训练信息 if epoch % self.print_every == 0: if self.verbose: print("Epoch %d, %s loss = %g" % (epoch, self.loss_name, epoch_loss / Xc.shape[0]), flush=True) # 6. 设置模型为评估模式 model.eval() return model
MCBNEnsemble:蒙特卡洛批量归一化集成
class MLPBN(BaseNet): """ 带批量归一化的多层感知机 (Multilayer Perceptron with Batch Normalization) BaseNet的扩展版本,在隐藏层中添加批量归一化层 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化MLPBN网络 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类BaseNet初始化 # 配置批量归一化的位置:在激活函数之前或之后 self.bn_before_act = conf.get('bn_before_act', True) # True: BN -> Act, False: Act -> BN # 首先构建标准的隐藏层(没有BN) self.hidden = construct_hidden(self.eff_dim, self.num_layers, self.num_hiddens) net_list = list(self.hidden) # 将Sequential转换为列表以便修改 # 根据配置在适当位置插入批量归一化层 if self.bn_before_act: # 模式:Linear -> BatchNorm -> Activation # 处理除最后两层外的所有层(假设最后两层是最后一个Linear和可能的Activation) net_list = sum([ # 如果是线性层,保持不变;如果是激活函数,在前面添加BatchNorm [ele] if isinstance(ele, nn.Linear) else [nn.BatchNorm1d(self.num_hiddens), ele] for ele in net_list[:-2] # 处理除最后两层外的所有层 ], []) + net_list[-2:] # 保留最后两层不变 else: # 模式:Linear -> Activation -> BatchNorm # 处理除最后两层外的所有层 net_list = sum([ # 如果是线性层,保持不变;如果是激活函数,在后面添加BatchNorm [ele] if isinstance(ele, nn.Linear) else [ele, nn.BatchNorm1d(self.num_hiddens)] for ele in net_list[:-2] # 处理除最后两层外的所有层 ], []) + net_list[-2:] # 保留最后两层不变 # 重新构建隐藏层为Sequential self.hidden = nn.Sequential(*net_list) class MCBNEnsemble(DeepEnsemble): """ 蒙特卡洛批量归一化集成 (Monte Carlo Batch Normalization Ensemble) 实现论文'Bayesian Uncertainty Estimation for Batch Normalized Deep Networks'中的思想 http://proceedings.mlr.press/v80/teye18a/teye18a.pdf 核心思想:利用批量归一化层在测试时的随机性来产生模型多样性, 而不需要训练多个独立的模型,从而高效地估计不确定性 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化MCBN集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类DeepEnsemble初始化 # 使用MLPBN作为基础网络类(包含批量归一化) self.basenet_cls = MLPBN # 推理时的批次大小,用于设置BN层统计量 self.inf_batch_size = self.conf.get('inf_batch_size', self.batch_size) def fit_one(self, Xc, Xe, y, idx, **fitting_conf): """ 训练单个模型,并设置批量归一化层的统计量 参数: Xc, Xe, y: 训练数据 idx: 模型索引 """ # 1. 使用父类方法训练模型 model = super().fit_one(Xc, Xe, y, idx, **fitting_conf) # 2. 将模型设置为训练模式(为了使用BN的随机性) model.train() # 3. 使用训练数据的一个批次来设置BN层的运行统计量 # 注意:这里使用训练数据的一个随机子集来初始化BN统计量 num_data = y.shape[0] # 数据总量 # 随机选择一批数据索引 batch_idx = np.random.choice(num_data, self.inf_batch_size) # 获取批次数据 bxc = Xc[batch_idx] bxe = Xe[batch_idx] # 4. 配置所有批量归一化层使用当前批次的统计量 for layer in model.hidden: if isinstance(layer, nn.BatchNorm1d): # 将momentum设置为1.0,意味着完全使用当前批次的统计量 # 而不是累积的运行平均值 layer.momentum = 1.0 # 5. 前向传播一次,使用选定的批次数据来设置BN层的统计量 model(bxc, bxe) # 6. 将模型设置回评估模式 model.eval() return model
MaskedDeepEnsemble:基于掩码的深度集成模型
class MaskedBaseNet(BaseNet): """ 掩码基础网络 - 支持条件参数和层次化参数空间的神经网络 用于处理具有条件依赖关系的参数,例如当某些参数的存在依赖于其他参数的值时 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化掩码基础网络 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数,包含space和stages信息 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类BaseNet初始化 # 获取设计空间和阶段定义 self.space = conf.get('space', None) # 参数空间定义,包含参数依赖关系 self.stages = conf.get('stages', None) # 参数阶段列表,定义参数层次结构 def xtrans(self, Xc_ : FloatTensor, Xe_ : LongTensor) -> FloatTensor: """ 扩展的特征转换方法,处理条件参数掩码 参数: Xc_: 连续特征张量 Xe_: 离散特征张量 返回: 应用掩码后的特征张量 """ # 如果没有定义空间或阶段,回退到标准特征转换 if self.space is None or self.stages is None: return super().xtrans(Xc_, Xe_) num_data = Xe_.shape[0] # 数据样本数量 Xc = Xc_.clone() # 克隆连续特征(避免修改原始数据) Xe = Xe_.clone() # 克隆离散特征(避免修改原始数据) # 首先转换离散特征(使用父类的枚举层) Xe_trans = self.enum_layer(Xe) # 将标准化后的特征转换回原始参数空间,以便分析依赖关系 params = self.space.inverse_transform(Xc, Xe) # 处理阶段依赖关系:如果前一阶段为'null',则后续阶段也设为'null' for i, stage in enumerate(self.stages): stage_null = params[stage] == 'null' # 找到当前阶段为'null'的样本 rest_stages = self.stages[i+1:] # 获取后续阶段列表 # 将后续阶段中对应样本设为'null' params.loc[stage_null, rest_stages] = 'null' # 处理连续变量的条件掩码 if self.space.numeric_names: # 遍历所有连续变量 for i, name in enumerate(self.space.numeric_names): # 解析参数名获取阶段和依赖信息 # 格式假设:dependency@value#stage stage = name.split('#')[-1] # 提取阶段名 depend = name.split('#')[0].split('@')[-1] # 提取依赖条件 # 创建有效性掩码:只有当依赖条件满足时该参数才有效 valid = torch.FloatTensor((params[stage].values == depend).astype(float)) # 应用掩码:无效的参数置为0 Xc[:, i] *= valid else: # 如果没有连续变量,创建空的连续特征张量 Xc = torch.zeros(num_data, 0) # 处理离散变量的条件掩码 start_idx = 0 # 跟踪当前在编码后特征中的起始位置 # 遍历所有离散变量 for i, name in enumerate(self.space.enum_names): # 如果当前变量本身就是一个阶段变量,跳过掩码处理 if name in self.stages: # 更新起始索引(跳过这个阶段变量的编码维度) start_idx += self.enum_layer.num_out_list[i] else: # 解析非阶段离散变量的依赖关系 stage = name.split('#')[-1] # 提取阶段名 depend = name.split('#')[0].split('@')[-1] # 提取依赖条件 # 创建有效性掩码 valid = torch.FloatTensor((params[stage].values == depend).astype(float)).view(-1, 1) # 计算当前变量在编码特征中的结束位置 end_idx = start_idx + self.enum_layer.num_out_list[i] # 应用掩码:将无效的离散特征编码置为0 Xe_trans[:, start_idx:end_idx] *= valid # 更新起始索引到下一个变量 start_idx = end_idx # 合并处理后的连续和离散特征 return torch.cat([Xc, Xe_trans], axis = 1) class MaskedDeepEnsemble(DeepEnsemble): """ 掩码深度集成模型 - 使用MaskedBaseNet的深度集成 专门设计用于处理具有条件依赖关系的参数空间 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化掩码深度集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类DeepEnsemble初始化 # 获取阶段定义和参数空间 self.stages = self.conf.get('stages', None) # 参数阶段列表 self.space = self.conf.get('space', None) # 参数空间定义 # 使用MaskedBaseNet作为基础网络类 self.basenet_cls = MaskedBaseNet
FeDeepEnsemble:特征选择深度集成模型 (Feature Selection Deep Ensemble)
class FeNet(BaseNet): """ 特征选择网络 (Feature Selection Network) 在基础网络基础上增加特征选择层,自动学习特征重要性 基于STG (Stochastic Gate) 或类似的特征选择方法 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化特征选择网络 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类BaseNet初始化 # 获取特征选择层类型,默认为'stg'(随机门) self.fe_layer = get_fe_layer(conf.get('fe_layer', 'stg')) self.temperature = conf.get('temperature') # Gumbel-Softmax温度参数 # 初始化特征选择层 if self.temperature: # 如果指定了温度参数,传递给特征选择层 self.feature_select = self.fe_layer(self.eff_dim, self.temperature) else: # 否则使用默认温度 self.feature_select = self.fe_layer(self.eff_dim) def forward(self, xc, xe): """ 前向传播,包含特征选择步骤 参数: xc: 连续特征 xe: 离散特征 返回: 网络输出(均值和方差) """ # 注意:我们忽略随机先验网络,因为它会破坏不相关特征的选择 # 1. 特征转换和选择 inputs = self.feature_select(self.xtrans(xc, xe)) # 2. 隐藏层前向传播 hidden = self.hidden(inputs) # 3. 均值输出 mu = self.mu(hidden) # 4. 构建最终输出(包含方差估计如果启用) if self.output_noise: # 输出均值和方差:[mu, sigma2] sigma2_output = self.noise_lb + self.sigma2(hidden) out = torch.cat((mu, sigma2_output), dim=1) else: # 只输出均值 out = mu return out class FeDeepEnsemble(DeepEnsemble): """ 特征选择深度集成模型 (Feature Selection Deep Ensemble) 使用特征选择网络的深度集成,自动学习特征重要性 """ support_grad = False # 禁用梯度计算(因为特征选择可能不可微) def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化特征选择深度集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 调用父类DeepEnsemble初始化 # 使用FeNet作为基础网络类 self.basenet_cls = FeNet # 特征选择相关配置 self.mask_reg = conf.get('mask_reg', 0.1) # 掩码正则化系数 self.end_temp = conf.get('end_temp', 0.1) # 退火结束温度 self.start_temp = conf.get('start_temp', 1.0) # 退火开始温度 self.anneal_base = conf.get('anneal_base', 0.99) # 退火基数 def fit_one(self, Xc, Xe, y, idx): """ 训练单个特征选择网络模型 参数: Xc, Xe, y: 训练数据 idx: 模型索引 """ torch.seed() # 设置随机种子 # 1. 准备数据加载器 dataset = TensorDataset(Xc, Xe, y) loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True, drop_last=y.shape[0] > self.batch_size) # 2. 初始化模型(支持热启动) if self.models is not None and len(self.models) == self.num_ensembles: model = deepcopy(self.models[idx]) # 热启动 else: model = self.basenet_cls(self.num_cont, self.num_enum, self.num_out, **self.conf) # 3. 打印第一个模型的架构(如果启用详细模式) if idx == 0 and self.verbose: print(model, flush=True) # 4. 设置优化器 opt = torch.optim.Adam(model.parameters(), lr=self.lr) model.train() # 设置训练模式 # 5. 训练循环 for epoch in range(self.num_epochs): epoch_loss = 0 # 累计数据损失 # 6. 温度退火(如果不是随机门,则应用退火) if not isinstance(model.feature_select, StochasticGate): # 计算当前温度:start_temp * anneal_base^epoch,限制不低于end_temp temperature = torch.tensor(self.start_temp * self.anneal_base ** epoch).clamp(self.end_temp) model.feature_select.temperature = temperature # 7. 批次训练 for bxc, bxe, by in loader: # 前向传播 py = model(bxc, bxe) # 计算数据损失 data_loss = self.loss(py, by) # 计算L1正则化损失(只对权重参数) reg_loss = 0. for n, p in model.named_parameters(): if 'weight' in n: # 只对权重参数应用L1正则化 reg_loss += self.l1 * p.abs().sum() / (y.shape[0] * y.shape[1]) # 计算掩码正则化损失(鼓励特征选择稀疏性) mask_loss = 0. if Xc.shape[1] > 0: # 如果有连续特征 # 使用掩码范数作为正则化项,鼓励选择少量特征 mask_loss = self.mask_reg * model.feature_select.mask_norm / (y.shape[0] * y.shape[1]) # 总损失 = 数据损失 + L1正则化 + 掩码正则化 loss = data_loss + reg_loss + mask_loss # 反向传播和优化 opt.zero_grad() loss.backward() opt.step() # 9. 设置模型为评估模式 model.eval() return model
GumbelDeepEnsemble:
class GumbelSelectionLayer(nn.Module): """ Gumbel-Softmax特征选择层 使用Gumbel-Softmax重参数化技巧实现可微分的特征选择 基于论文: Jang et al. (2017) "Categorical Reparameterization with Gumbel-Softmax" """ def __init__(self, in_features, out_features, temperature = 0.1): """ 初始化Gumbel选择层 参数: in_features: 输入特征维度 out_features: 输出特征维度(选择的特征数量) temperature: Gumbel-Softmax温度参数,控制选择的随机性 """ super().__init__() self.in_features = in_features # 输入特征数量 self.out_features = out_features # 输出特征数量(要选择的特征数) # 初始化选择logits参数(决定每个特征被选择的概率) self.logits = nn.Parameter(torch.zeros(out_features, in_features)) self.temperature = temperature # 温度参数:低温→接近离散选择,高温→更随机 @property def dist(self): """创建Gumbel-Softmax分布""" return RelaxedOneHotCategorical(temperature = self.temperature, logits = self.logits) def forward(self, x): """ 前向传播:使用Gumbel-Softmax采样特征权重 参数: x: 输入特征张量 返回: 特征选择后的输出 """ # TODO: 分别处理训练和评估模式 # 从Gumbel-Softmax分布中采样权重(可微分) w = self.dist.rsample() # 应用线性变换(特征选择) out = F.linear(x, weight = w) return out class GumbelNet(BaseNet): """ 基于Gumbel-Softmax的特征选择网络 自动学习哪些连续特征对预测最重要 """ def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化GumbelNet 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 注意:这里我们只对连续参数应用特征选择,为了更快的实现 # 如果要处理离散变量,需要处理枚举转换器(嵌入或独热编码), # 这不难但需要更多工作... self.reduced_dim = conf.setdefault('reduced_dim', num_cont // 2 + 1) # 降维后的特征数 # 创建特征选择层 self.feature_select = GumbelSelectionLayer(self.num_cont, self.reduced_dim) if self.num_cont > 0: # 注意:即使没有数值参数,我们仍然保留`feature_select`, # 因为在`fit_one`中,温度仍然会被退火 # 更新有效维度:减去原始连续维度,加上降维后的维度 self.eff_dim = (self.eff_dim - self.num_cont) + self.reduced_dim # 重新构建隐藏层以适应新的维度 self.hidden = construct_hidden(self.eff_dim, self.num_layers, self.num_hiddens) def forward(self, xc, xe): """ 前向传播:先进行特征选择,然后通过标准网络 参数: xc: 连续特征 xe: 离散特征 返回: 网络输出 """ if self.num_cont > 0: # 对连续特征进行选择 xc = self.feature_select(xc) # 调用父类的前向传播 return super().forward(xc, xe) class GumbelDeepEnsemble(DeepEnsemble): """ Gumbel深度集成模型 使用GumbelNet作为基础网络的深度集成,实现自动特征选择 """ support_grad = False # 禁用梯度支持(因为特征选择引入随机性) def __init__(self, num_cont, num_enum, num_out, **conf): """ 初始化Gumbel深度集成模型 参数: num_cont: 连续变量数量 num_enum: 离散变量数量 num_out: 输出维度 **conf: 配置参数 """ super().__init__(num_cont, num_enum, num_out, **conf) # 使用GumbelNet作为基础网络 self.basenet_cls = GumbelNet def fit_one(self, Xc, Xe, y, idx): """ 训练单个GumbelNet模型 参数: Xc, Xe, y: 训练数据 idx: 模型索引 返回: 训练好的模型 """ torch.seed() # 设置随机种子确保可重复性 # 准备数据加载器 dataset = TensorDataset(Xc, Xe, y) loader = DataLoader(dataset, batch_size = self.batch_size, shuffle = True) # 初始化模型(支持热启动) if self.models is not None and len(self.models) == self.num_ensembles: model = deepcopy(self.models[idx]) else: model = self.basenet_cls(self.num_cont, self.num_enum, self.num_out, **self.conf) # 设置优化器 opt = torch.optim.Adam(model.parameters(), lr = self.lr) model.train() # 设置训练模式 # 训练循环 for epoch in range(self.num_epochs): epoch_loss = 0 # 累计损失 # 温度退火:随着训练进行逐渐降低温度 # 从较高温度开始(更随机),逐渐降低到较低温度(更确定) temperature = 0.8**(epoch) + 0.1 # 指数退火公式 model.feature_select.temperature = temperature # 更新温度 for bxc, bxe, by in loader: # 遍历批次数据 # 前向传播 py = model(bxc, bxe) # 计算损失 data_loss = self.loss(py, by) # 数据损失 reg_loss = 0. # 正则化损失 # L1正则化(只对权重参数,不包括偏置) for n, p in model.named_parameters(): if 'weight' in n: reg_loss += self.l1 * p.abs().sum() / (y.shape[0] * y.shape[1]) mask_loss = 0. # 掩码损失(当前未使用) # 总损失 = 数据损失 + 正则化损失 loss = data_loss + reg_loss # 反向传播和优化 opt.zero_grad() loss.backward() opt.step() # 累计批次损失 epoch_loss += data_loss * bxc.shape[0] # 打印训练信息 if epoch % self.print_every == 0: if self.verbose: print("Epoch %d, %s loss = %g, temperature = %.3f" % (epoch, self.loss_name, epoch_loss / Xc.shape[0], temperature), flush = True) model.eval() # 设置评估模式 return model
贝叶斯优化代理模型对比分析
| 模型名称 | 数学实现原理 | 优势 | 劣势 | 训练及预测开销 | 大模型参数调优场景适配度(9参数,4布尔值场景) |
|---|---|---|---|---|---|
| GP(标准高斯过程) | 基于随机过程理论,假设函数值服从多元高斯先验,通过核函数(如Matern、RBF)定义协方差结构。后验分布通过贝叶斯定理推导,使用精确推断计算边际似然。核超参数通过梯度-based方法最大化对数边际似然估计。数学形式为:f(x) ~ GP(m(x), k(x,x')),其中m为均值函数,k为协方差核函数。 | 提供解析的不确定性量化,预测输出为完整概率分布而非点估计。理论保证回归一致性,对小样本数据效率极高。核函数可融入领域知识,ARD核能自动学习特征相关性。适用于数学建模严谨且需可靠不确定性估计的场景。 | 计算复杂度随样本数立方增长(O(n³)),万级以上样本不可行。核函数选择对性能影响显著但无普适准则。高维空间中出现协方差矩阵病态问题,需要正则化处理。假设数据服从高斯先验,对非高斯、多模态分布建模能力有限。 | 训练阶段需计算协方差矩阵逆和行列式,复杂度O(n³)。存储需求O(n²)。预测阶段均值和方差计算分别为O(n)和O(n²)。实际应用中,千样本量训练需秒级,万样本需小时级,超过十万样本基本不可行。 | 极高适配度。在9维参数空间下计算复杂度完全可控,O(n³)的瓶颈不会显现。可通过设计混合核函数(如对连续参数使用Matern核,对布尔参数使用Hamming核或类别核)精确建模参数间关系。布尔参数可编码为0/1连续变量或使用专门的类别核处理。小样本效率高的特性在此场景下充分发挥,通常50-200个评估点即可获得良好代理模型。能提供高质量的不确定性估计指导贝叶斯优化。 |
| GPyGP(输入扭曲的高斯过程) | 在标准GP前引入输入扭曲函数φ(x),将原始输入映射到潜空间:f(x) = GP(m(φ(x)), k(φ(x), φ(x')))。常用KumarWarping等单调可微扭曲函数,通过参数化函数族增强非平稳性建模能力。联合优化核参数与扭曲函数参数。 | 能有效建模目标函数的非平稳特性,解决标准GP在全局平稳性假设下的局限性。扭曲函数自动调整输入空间尺度,对参数敏感度不均问题有显著改善。在响应曲面存在突变、不同区域光滑度差异大时表现优越。 | 扭曲函数引入额外超参数,大幅增加模型选择复杂度。优化过程易陷入局部极值,需要谨慎初始化。扭曲函数形式需要先验指定,错误选择可能导致性能下降。计算开销比标准GP增加30%-50%。 | 训练复杂度O(n³ + p³),p为扭曲函数参数维度。预测阶段需额外计算扭曲变换,但渐进复杂度与标准GP相同。实际运行时间比标准GP增加25%-40%,取决于扭曲函数复杂性。 | 中等适配度。在低维空间中输入扭曲的优势有限,反而因额外超参数增加调优复杂度。当布尔参数与连续参数存在复杂交互时可能略有帮助,但通常标准GP已足够。扭曲函数在9维空间中的表达能力与计算开销不成正比。建议仅在先验知识表明响应曲面存在强烈非平稳性时使用。 |
| RF(随机森林回归) | 基于集成学习理论,构建多棵决策树,每棵树使用bootstrap样本和随机特征子集训练。预测为所有树的平均:ŷ = (1/B)∑f_b(x)。不确定性通过树间预测方差估计,提供一种非参数不确定性量化方法。 | 训练高效,支持并行化,能处理高维特征且无需特征缩放。对异常值和噪声鲁棒性强,能自然处理混合类型特征。通过特征重要性排序提供可解释性。在中等数据规模下平衡效率与精度。 | 不确定性估计为启发式,缺乏严格概率基础。外推能力差,超出训练数据范围的预测不可靠。树结构导致预测函数不连续,可能影响优化收敛。密集数据下可能过拟合,泛化能力受限。 | 训练复杂度O(B·n·log(n)·d),B为树数量,d为特征数。预测复杂度O(B·深度)。内存占用O(B·节点数)。实际千样本量训练秒级完成,万样本分钟级,可很好扩展到百万样本。 | 高适配度。天然支持混合类型参数,布尔变量可直接作为类别特征处理无需特殊编码。在9维空间中树模型能有效捕获参数交互效应。训练速度快,适合需要快速迭代的场景。不确定性估计虽为启发式但在实践中足够指导优化。当评估预算有限(<1000次)时表现可靠。 |
| SVGP(稀疏变分高斯过程) | 使用变分推断引入诱导点u = f(Z)作为潜变量,近似真实后验p(f|X,y) ≈ q(f) = ∫p(f|u)q(u)du。通过最大化证据下界(ELBO)学习变分分布q(u)~N(m,S)和核超参数。将复杂度从O(n³)降至O(m²n),m为诱导点数。 | 突破GP计算瓶颈,可处理十万级数据。保持GP良好校准的不确定性估计能力。随机变分推断支持小批量训练,适合内存受限场景。通过诱导点位置优化自适应捕捉函数重要区域。 | 变分近似引入偏差,预测不确定性可能被低估。诱导点数量和位置选择敏感,影响模型容量和近似质量。ELBO优化可能收敛到次优解,需要多次重启。边缘似然无闭式解,模型比较困难。 | 训练复杂度O(m²n),-预测复杂度O(m²)。千诱导点、十万样本训练需分钟到小时级。内存占用O(m² + mn)。预测速度比标准GP快数个量级,适合实时应用。 | 中等适配度。在9维小参数空间中稀疏近似的优势不明显,反而因变分推断引入近似误差。诱导点机制在低维空间中的自适应学习能力有限。只有当评估次数预期超过数千次时才有必要考虑,但此类场景中参数维度通常更高。计算开销相对于收益偏高。 |
| SVIDKL(稀疏变分深度核学习) | 结合深度神经网络特征提取与GP概率建模:f(x) = GP(m(h(x;θ)), k(h(x;θ), h(x';θ)))。其中h(x;θ)为DNN提取的特征表示。使用变分推断联合学习网络参数θ和GP超参数。深度核自适应学习数据的非线性嵌入。 | 深度特征提取自动学习高维数据的低维表示,缓解GP维度灾难问题。核在学习特征空间定义,更易捕获复杂模式。结合DNN表达力和GP不确定性量化优势。在非平稳、高维问题中表现显著优于传统GP。 | 训练复杂度高,需交替优化DNN和GP参数。易过拟合,需要谨慎正则化。梯度消失/爆炸问题在深度架构中仍存在。不确定性估计质量依赖于特征学习稳定性。 | 训练复杂度O(m²n + L·n·d²),L为网络层数。预测需前向传播和GP推断。实际训练时间比SVGP增加2-5倍,取决于网络深度。GPU加速对训练效率提升显著。 | 低适配度。9维参数空间无需复杂的深度特征学习,DNN的表达能力优势无法发挥反而容易过拟合。网络参数与GP超参数的联合优化在数据有限时不稳定。计算成本高且在小空间中收益有限。仅在参数间存在极其复杂的非线性耦合时可能有用,但这种情况在9维空间中罕见。 |
| CatBoost | 基于对称决策树的梯度提升算法,使用Ordered Boosting解决预测偏移,采用特征组合自动处理交互效应。不确定性通过虚拟集成估计:从训练过程中不同迭代创建模型集合,计算预测分布。 | 内置类别特征处理,无需独热编码。有序提升减少过拟合,提高泛化能力。训练高效且默认参数表现良好。不确定性估计无需显著额外计算。在表格数据中常达到state-of-the-art。 | 不确定性估计基于启发式,概率校准不如贝叶斯方法。模型解释性低于单一决策树。内存消耗随树数量和深度线性增长。对超参数敏感,特别是学习率和树深度。 | 训练复杂度O(n·log(n)·d·B),预测复杂度O(B·深度)。实际训练速度与RF相当,但通常需要更多树。内存使用较高,特别是使用大规模类别特征时。 | 高适配度。专门优化的类别特征处理对4个布尔参数非常有效。有序提升机制在小样本下泛化能力好。在表格数据上的卓越表现使其适合此类混合参数优化。不确定性估计通过虚拟集成实现,虽非严格贝叶斯但在实践中有效。训练效率高,适合中等规模评估预算。 |
| DeepEnsemble | 基于Lakshminarayanan等人的方法,训练多个独立神经网络,使用负对数似然损失学习均值和方差:L = -logN(y∣μ(x),σ²(x))。集成通过简单平均组合各模型预测,不确定性来自模型间差异和个体不确定性。 | 实现简单且经验表现优越,在不确定性量化基准测试中常优于单一贝叶斯神经网络。训练可完全并行化。无需修改网络架构,与现有代码兼容性好。点预测精度通常很高。 | 计算成本随集成规模线性增长。缺乏单一连贯概率模型,不确定性分解临时。模型间独立性假设在实践中难保证。深度网络训练不稳定性导致个别模型可能性能差。 | 训练复杂度O(K·E·n·d²),K为模型数,E为epoch数。预测复杂度O(K·前向传播)。实际5模型集成训练时间约为单模型的5倍,但并行化可减少实际时间。 | 中等偏高适配度。神经网络能灵活处理混合类型输入(布尔参数可嵌入或直接输入)。集成提供稳健的性能和有意义的不确定性估计。但在小样本场景下(<500评估点),多个独立网络的训练可能不稳定。当评估预算充足时可提供优秀性能,但需要仔细调参。 |
| pSGLDEnsemble | 结合预处理随机梯度Langevin动力学和集成学习。pSGLD在SGD基础上添加自适应噪声:Δθ = -εG⁻¹∇L + N(0,2εG⁻¹),其中G为预处理矩阵。从马尔可夫链中采集多个模型作为近似后验样本。 | 提供原则性贝叶斯不确定性估计,避免DeepEnsemble的临时性。预处理矩阵适应曲率,提高采样效率。单一训练过程获得模型分布,无需多次独立训练。理论上收敛到真实后验。 | 采样效率依赖预处理矩阵质量,估计不准可能导致混合速度慢。需要精细调参(步长、预处理更新频率)。计算成本高于标准优化器。预热阶段长度对性能影响显著。 | 训练复杂度与标准优化器相同,但需要更多迭代达到稳定。预测复杂度与DeepEnsemble相同。实际训练时间比Adam等优化器增加20%-50%,取决于噪声调度策略。 | 中等适配度。贝叶斯深度集成提供原则性不确定性估计,但在低维小样本场景中采样效率优势有限。预处理矩阵在9维空间中的估计相对稳定,但计算开销仍高于标准优化器。适合对不确定性校准要求极高的场景,但通常标准DeepEnsemble已足够。 |
| MCBNEnsemble | 基于Teye等人的工作,利用批量归一化层在测试时的随机性。训练单一网络,预测时对同一输入多次前向传播,利用BN的随机批统计量估计不确定性:Var[y] ≈ Var[f(x,θ,γ_t,β_t)],其中γ_t,β_t为随机BN参数。 | 极大降低计算成本,单一模型即可获得不确定性估计。实现简单,只需在预测时启用BN训练模式。训练过程与标准网络完全相同。适合资源受限环境。 | 不确定性估计质量依赖BN层随机性,可能不够可靠。仅适用于包含BN的架构。不确定性校准对BN配置敏感。深度网络中随机性可能随着传播而衰减。 | 训练复杂度与单网络相同。预测复杂度O(T·前向传播),T为采样次数。实际预测时间比单次预测增加T倍,但训练成本远低于其他集成方法。 | 中等适配度。计算高效的不确定性估计机制在资源受限时有用,但批量归一化在小型网络中的随机性可能不足以产生有意义的模型多样性。当评估预算极其有限(<100次)时可能有用,但在此类小预算场景中更简单的模型往往更可靠。 |
| MaskedDeepEnsemble | 针对条件参数空间设计,引入掩码机制处理参数间依赖关系。通过条件掩码函数m(x)使无效参数不影响预测:f(x) = Ensemble[f_i(m(x)∘x)]。掩码根据参数约束动态激活/禁用特定特征。 | 显式建模参数间条件依赖,提高在结构化参数空间中的样本效率。避免无效区域探索,加速优化收敛。通用性强,可结合各种基础代理模型。在层次化超参数调优中表现突出。 | 需要先验知识定义参数依赖图。掩码函数引入额外复杂性。训练可能不稳定,特别是依赖关系复杂时。预测开销随条件分支数量增加。 | 训练复杂度与基础集成相同,但收敛更快因搜索空间减小。预测需计算掩码函数,增加O(d)开销。实际训练时间与标准DeepEnsemble相当,但优化周期更短。 | 低适配度。针对条件参数空间的设计在9个独立参数场景中无法发挥优势。掩码机制引入不必要的复杂性。只有当布尔参数与其他参数存在明确的条件依赖关系时才考虑使用,但您描述的场景中参数似乎是独立的。 |
| FeDeepEnsemble | 集成特征选择机制,如随机门(STG)或L1正则化,自动学习特征重要性。通过稀疏约束促进特征选择:L_total = L_NLL + λ‖w‖₁。集成提供鲁棒的特征重要性估计。 | 自动识别相关特征,提高模型可解释性。减少噪声特征干扰,改善泛化能力。特征选择与模型训练端到端进行。在高维无关特征存在时表现显著优于标准方法。 | 特征选择稳定性对集成规模敏感。稀疏约束使优化更困难,需要谨慎调参λ。连续松弛(如STG)引入额外超参数。训练时间显著增加。 | 训练复杂度比DeepEnsemble增加20%-40%,因特征选择机制需要更精细优化。预测复杂度相同。实际收敛速度可能更快,因有效维度降低。 | 低适配度。特征选择在9维空间中意义有限,所有参数都可能相关。稀疏正则化反而可能剔除重要参数。Gumbel-Softmax等复杂机制在低维下的收益无法抵消计算开销。仅在明确知道部分参数完全无关时有用,但此类先验知识通常不可得。 |
| GumbelDeepEnsemble | 使用Gumbel-Softmax重参数化进行可微分特征选择:z = softmax((logπ + g)/τ),其中g为Gumbel噪声,τ为温度参数。通过退火τ使选择逐渐趋向离散。集成提供鲁棒的特征子集估计。 | 提供可微分特征选择,梯度可贯穿整个网络。Gumbel噪声增加探索,避免局部最优。温度退火平衡探索与利用。理论上有更好特征选择一致性保证。 | 温度调度需要精细设计,影响最终性能。Gumbel噪声在训练早期可能主导,导致不稳定。计算开销大于硬选择方法。实际特征选择质量对初始化和噪声规模敏感。 | 训练复杂度比FeDeepEnsemble略高,因Gumbel-Softmax计算和温度调度。预测复杂度相同。实际训练时间增加10%-20%,主要来自重参数化操作。 | 低适配度。Gumbel-Softmax特征选择在9维参数空间中过于复杂,温度退火调度引入额外调参负担。重参数化技巧的计算开销在低维下不划算。特征选择的不稳定性可能损害优化效果。适合超高维特征选择,不适用于当前场景。 |