写在前面:什么是贝叶斯优化
参考这里
主要包含两个部分
-
一个代理模型(surrogate model),用于对目标函数进行建模。代理模型通常有确定的公式或者能计算梯度,又或者有已知的凹凸性、线性等特性,总之就是更容易用于优化。更泛化地讲,其实它就是一个学习模型,输入是所有观测到的函数值点,训练后可以在给定任意x的情况下给出对f(x)的估计。
-
一个优化策略(optimization strategy),决定下一个采样点的位置,即下一步应在哪个输入x
处观测函数值f(x)。通常它是通过采集函数(acquisition function) 来实现的:
采集函数通常是一个由代理模型推出的函数,它的输入是可行集(feasible set)A上的任意值,输出值衡量了每个输入x有多值得被观测。通常会从以下两方面考虑:- 有多大的可能性在x处取得最优值
- 评估x是否能减少贝叶斯统计模型的不确定性
采集函数通常也是容易求最优值的函数(例如:有公式/能算梯度等),下一个采样点就是可行集上的最大值点,即使采集函数的取最大值的点。
本文主要学习采集函数,模型代码参考SCOOT(WWW2025 oral)中hebo/acqusitions库的代码实现。
代理模型内容见此
HEBO的简要介绍
SCOOT代码大部分都是基于HEBO(异质方差进化贝叶斯优化)算法实现,这里先简要介绍下,具体见https://github.com/huawei-noah/HEBO
背景
- 建模假设
使用代理模型来近似黑盒函数问题。常见的代理模型是高斯过程回归模型(Gaussian processes,GPs)。GPs模型本身是同方差的(Heteroscedastic),但其实际建模的问题是带异质方差噪音,同时同方差的GPs模型不能处理非平稳的评估点集,这种现象在机器学习超参数优化中尤为常见。 - 采集函数和优化器假设
在采集函数阶段,通常是最大化采集函数并输出下一个候选点。这一过程将引入额外的限制和假设。如假设黑盒函数中只包含连续变量,采取一阶或二阶(如LBFGS,ADAM)来求解,而忽略了超参数优化中包含的离散变量(如深度网络中的隐层大小)。另外在整一个贝叶斯优化框架中采集函数只选择一种,忽略了多个采集函数组合的情况。
问题及回答
问题1:超参数优化任务是平稳的吗?
回答:即使是简单的机器学习任务也显示出极大的非平稳性( non-stationarity)
问题2:超参数优化任务是同质的吗?
回答:即使是简单的机器学习任务也显示出极大的异方差性(heteroscedasticity)
问题3:在超参数优化中,不同采集函数输出的结果是否是冲突的呢?
回答:没有“一招鲜吃遍天”的方法。不同采集函数通常是冲突,经常会输出相反的候选点
总结
- 针对不平稳性,对输入做转化(Transformation)
- 针对异质方差,对输出做转化(Transformation)
- 不同采集函数在不同参数设置下,其对应的采集函数值相互冲突。 对此,作者从如下两方面进行优化:
- 带鲁棒性的采集函数;
- 多目标采集函数。
整体主要由如下部分组成:
伪随机序列生成,具体可用参考笔者《超参数优化(三): 实验设计》部分
对X进行变换
对y进行变换
调用GPs模型,进行训练建模
GPs模型输出的最优候选点
构建采集函数
多目标采集函数优化
输出最优候选点
本文介绍的HEBO方法,在2020 BBO大赛和QQ浏览器2021AI算法大赛中表现都非常优异。HEBO算法框架仍然是贝叶斯优化(Bayesian Optimisation)。作者通过分析经典贝叶斯优化的数据输入、数据输出、代理模型和采集函数存在的问题和局限性,对每一步的问题都做了相应的优化:
对数据输入、数据输出进行变换校准;
将数据变换校准和GPs核函数联合在一起优化;
引入多目标采集函数,进行更鲁棒的探索(HEBO方法最大亮点)。
采集函数
从SCOOT的代码结构上来看,使用了两个优化器
当优化目标是单目标时使用的是HEBOConstr,采集函数使用的MACEConstr
当优化目标是多目标时使用的是GeneralBO,采集函数使用的GeneralAcq
MACEConstr
class MACEConstr(Acquisition): def __init__(self, model, best_y, num_model_constr, y_thres, **conf): # 调用父类 Acquisition 的初始化方法 super().__init__(model, **conf) # 设置LCB中的探索系数kappa,默认值为2.0 self.kappa = conf.get('kappa', 2.0) # 设置数值稳定性参数,避免除零错误,默认1e-4 self.eps = conf.get('eps', 1e-4) # 设置硬约束数量,默认3个 self._num_hard_constr = conf.get('num_hard_constr', 3) # 设置隐藏约束数量,默认1个 self._num_hidden_constr = conf.get('num_hidden_constr', 1) # 获取随机森林模型及阈值,用于评估隐藏约束 self.rf_with_thres = conf.get('rf_with_thres', None) # 设置最大序列长度,用于硬约束检查,默认12 self.max_sequence_length = conf.get('max_sequence_length', 12) # 获取搜索空间定义 self.space = conf.get('space', None) # 设置当前最佳目标值tau,用于EI和PI计算 self.tau = best_y # 设置约束阈值,约束函数需要小于该阈值 self.y_thres = y_thres # 设置模型约束数量(通过高斯过程建模的约束) self._num_model_constr= num_model_constr # 如果没有定义搜索空间,则禁用硬约束和隐藏约束 if not self.space: self._num_hard_constr = 0 self._num_hidden_constr = 0 # 如果有隐藏约束,必须提供随机森林模型和阈值 if self._num_hidden_constr > 0: assert isinstance(self.rf_with_thres, tuple), f'num_hiddent_constr is {self._num_hidden_constr}, the rf model and corresponding threshold should be passed' @property def num_obj(self): """返回目标函数数量,固定为3(LCB, EI, PI三个采集准则)""" return 3 @property def num_model_constr(self) -> int: """返回模型约束数量的属性""" return self._num_model_constr @property def num_hidden_constr(self) -> int: """返回隐藏约束数量的属性""" return self._num_hidden_constr @property def num_constr(self) -> int: """返回总约束数量的属性(模型约束+硬约束+隐藏约束)""" return self._num_model_constr + self._num_hard_constr + self._num_hidden_constr @property def num_hard_constr(self) -> int: """返回硬约束数量的属性""" return self._num_hard_constr def eval(self, x : torch.FloatTensor, xe : torch.LongTensor) -> torch.FloatTensor: """ 采集函数评估方法 最小化三个目标:(-1 * EI的负对数, -1 * PI的负对数, LCB) 即同时优化三种采集准则:EI、PI和LCB """ # 不计算梯度,提高计算效率 with torch.no_grad(): # 使用高斯过程模型预测均值和方差 py, ps2 = self.model.predict(x, xe) ### 目标函数部分:计算三种采集准则 ### # 提取目标函数的预测(第一列) py_obj = py[:,:1] # 提取目标函数的方差(第一列) ps2_obj = ps2[:, :1] # 计算噪声标准差,乘以sqrt(2)用于EI/PI计算 noise = np.sqrt(2.0) * self.model.noise.sqrt()[0] # 计算目标函数的标准差,确保数值稳定性 ps_obj = ps2_obj.sqrt().clamp(min = torch.finfo(ps2_obj.dtype).eps) # 计算LCB(Lower Confidence Bound):均值 - kappa * 标准差 lcb_obj = (py_obj + noise * torch.randn(py_obj.shape)) - self.kappa * ps_obj # 计算标准化改进量:(当前最佳值 - 预测值) / 标准差 normed = ((self.tau - self.eps - py_obj - noise * torch.randn(py_obj.shape)) / ps_obj) # 创建标准正态分布对象 dist = Normal(0., 1.) # 计算标准正态分布在normed处的对数概率密度 log_phi = dist.log_prob(normed) # 计算标准正态分布在normed处的累积分布函数 Phi = dist.cdf(normed) # 计算改进概率PI:P(f(x) < tau) PI = Phi # 计算期望改进EI:E[max(tau - f(x), 0)] EI = ps_obj * (Phi * normed + log_phi.exp()) # EI的对数近似公式,用于数值不稳定情况 logEIapp = ps_obj.log() - 0.5 * normed**2 - (normed**2 - 1).log() # PI的对数近似公式,用于数值不稳定情况 logPIapp = -0.5 * normed**2 - torch.log(-1 * normed) - torch.log(torch.sqrt(torch.tensor(2 * np.pi))) # 判断哪些点需要使用近似公式: # 当normed > -6 且 EI和PI的对数有限时使用精确计算,否则使用近似 use_app = ~((normed > -6) & torch.isfinite(EI.log()) & torch.isfinite(PI.log())).reshape(-1) # 初始化输出张量,3列对应3个采集准则 out = torch.zeros(x.shape[0], 3) # 第一列:LCB目标(探索性) out[:, 0] = lcb_obj.reshape(-1) # 第二列:EI准则(使用近似或精确计算) out[:, 1][use_app] = -1 * logEIapp[use_app].reshape(-1) # 使用近似 out[:, 1][~use_app] = -1 * EI[~use_app].log().reshape(-1) # 使用精确 # 第三列:PI准则(使用近似或精确计算) out[:, 2][use_app] = -1 * logPIapp[use_app].reshape(-1) # 使用近似 out[:, 2][~use_app] = -1 * PI[~use_app].log().reshape(-1) # 使用精确 ### 模型约束部分 ### if self.y_thres is not None: # 提取约束函数的预测(第1列之后的所有列) py_constr = py[:, 1:] # 提取约束函数的方差 ps2_constr = ps2[:, 1:] # 计算约束函数的标准差 ps_constr = ps2_constr.sqrt().clamp(min = torch.finfo(ps2_constr.dtype).eps) # 计算约束违反程度:预测值 + kappa*标准差 - 阈值 # 如果结果>0表示可能违反约束 out_model_constr = (py_constr + noise * torch.randn(py_constr.shape)) + self.kappa * ps_constr - self.y_thres else: # 没有模型约束时返回空张量 out_model_constr = torch.zeros([out.size()[0], 0]) # 获取硬约束违反程度(基于先验知识的约束) out_hard_constr = get_hard_constr(x, self.max_sequence_length, self.num_hard_constr, py, self.space) # 获取隐藏约束违反程度(基于随机森林模型的约束) out_hidden_constr = get_hidden_constr(x, xe, self.rf_with_thres, self.num_hidden_constr, py, self.space) # 合并所有输出:3个采集准则目标 + 模型约束 + 硬约束 + 隐藏约束 return torch.concat([out, out_model_constr, out_hard_constr, out_hidden_constr], dim=1)
GeneralAcq
class GeneralAcq(Acquisition): def __init__(self, model, num_obj, num_model_constr, **conf): # 调用父类 Acquisition 的初始化方法 super().__init__(model, **conf) # 设置目标函数数量 self._num_obj = num_obj # 设置模型约束数量(通过高斯过程建模的约束) self._num_model_constr = num_model_constr # 从配置中获取硬约束数量,默认为3 self._num_hard_constr = conf.get('num_hard_constr', 3) # 从配置中获取隐藏约束数量,默认为1 self._num_hidden_constr = conf.get('num_hidden_constr', 1) # 获取随机森林模型及阈值,用于隐藏约束评估 self.rf_with_thres = conf.get('rf_with_thres', None) # 获取搜索空间定义 self.space = conf.get('space', None) # 获取目标函数的kappa参数(探索系数),默认为2.0 self.kappa = conf.get('kappa', 2.0) # 获取约束函数的c_kappa参数(约束探索系数),默认为0 self.c_kappa = conf.get('c_kappa', 0.) # 是否在预测时添加噪声,默认为True self.use_noise = conf.get('use_noise', True) # 最大序列长度,用于硬约束检查,默认为12 self.max_sequence_length = conf.get('max_sequence_length', 12) # 如果没有定义搜索空间,则禁用硬约束和隐藏约束 if not self.space: self._num_hard_constr = 0 self._num_hidden_constr = 0 # 验证模型输出维度等于目标数加模型约束数 assert self.model.num_out == self.num_obj + self.num_model_constr # 验证至少有一个目标函数 assert self.num_obj >= 1 # 如果有隐藏约束,必须提供随机森林模型和阈值 if self._num_hidden_constr > 0: assert isinstance(self.rf_with_thres, tuple), f'num_hiddent_constr is {self._num_hidden_constr}, the rf model and corresponding threshold should be passed' @property def num_obj(self) -> int: """返回目标函数数量的属性""" return self._num_obj @property def num_model_constr(self) -> int: """返回模型约束数量的属性""" return self._num_model_constr @property def num_hidden_constr(self) -> int: """返回隐藏约束数量的属性""" return self._num_hidden_constr @property def num_constr(self) -> int: """返回总约束数量的属性(模型约束+硬约束+隐藏约束)""" return self._num_model_constr+self._num_hard_constr + self._num_hidden_constr @property def num_hard_constr(self) -> int: """返回硬约束数量的属性""" return self._num_hard_constr def eval(self, x : torch.FloatTensor, xe : torch.LongTensor) -> torch.FloatTensor: """ 采集函数评估方法,处理一般约束多目标优化问题 假设有 $om$ 个目标和 $cn$ 个约束,问题应被转化为: 最小化 (o1, o2, ..., om) 满足 c1 < 0, c2 < 0, ... cb_cn < 0 在这个 GeneralAcq 采集函数中,我们计算目标和约束的下置信界, 并解决以下问题: 最小化 (lcb_o1, lcb_o2, ..., lcb_om) 满足 lcb_c1 < 0, lcb_c2 < 0, ... lcb_cn < 0 """ # 不计算梯度,提高计算效率 with torch.no_grad(): # 使用模型预测均值和方差 py, ps2 = self.model.predict(x, xe) # 计算标准差,并确保数值稳定性(避免除零错误) ps = ps2.sqrt().clamp(min = torch.finfo(ps2.dtype).eps) # 如果启用噪声,在预测值上添加高斯噪声 if self.use_noise: noise = self.model.noise.sqrt() # 获取噪声标准差 py += noise * torch.randn(py.shape) # 添加随机噪声 # 初始化输出张量,形状与预测值相同 out = torch.ones(py.shape) # 计算目标函数的下置信界(LCB) # 对于最小化问题,LCB = 均值 - kappa * 标准差 out[:, :self.num_obj] = py[:, :self.num_obj] - self.kappa * ps[:, :self.num_obj] # 计算约束函数的下置信界 # 对于约束c<0,LCB = 均值 - c_kappa * 标准差 out[:, self.num_obj:] = py[:, self.num_obj:] - self.c_kappa * ps[:, self.num_obj:] # 获取硬约束违反程度(基于先验知识的约束) out_hard_constr = get_hard_constr(x, self.max_sequence_length, self.num_hard_constr, py, self.space) # 获取隐藏约束违反程度(基于随机森林模型的约束) out_hidden_constr = get_hidden_constr(x, xe, self.rf_with_thres, self.num_hidden_constr, py, self.space) # 合并所有输出:目标LCB + 模型约束LCB + 硬约束 + 隐藏约束 return torch.concat([out, out_hard_constr, out_hidden_constr], dim=1)
区别
在单目标问题中,没有一种采集准则在任何情况下都是最优的:
EI:在中等不确定性下表现好
PI:在低不确定性下偏向利用
LCB/UCB:在高不确定性下偏向探索
MACE的解决方案则是同时优化三种准则,让算法自动选择平衡:minimize [LCB, -log(EI), -log(PI)](间接转化为了一个多目标优化)
多目标问题本身就有多个需要权衡的目标,引入多种采集准则会使问题过于复杂,而且LCB可以很自然地扩展到多目标情况,对于M个目标,需要计算M次EI和M次PI,计算量随目标数线性增长,且数值稳定性问题更严重
优化器
介绍下HEBOConstr和GeneralBO是怎么用这些acq的
HEBOConstr
class HEBOConstr(AbstractOptimizer): # 类属性:声明支持的优化特性 support_parallel_opt = True # 支持并行优化,可同时评估多个候选点 support_combinatorial = True # 支持组合优化,可处理离散和连续混合参数 support_contextual = True # 支持上下文优化,可处理固定参数约束 def __init__(self, space, num_model_constr, y_thres = None, model_name = 'multi_task', base_model_name='gpy', rand_sample = None, acq_cls = MACEConstr, es = 'nsga2', model_config = None, scramble_seed: Optional[int] = None, num_hard_constr: Optional[int] = None, num_hidden_constr: Optional[int] = 1, max_sequence_length: Optional[int] = 12): # 最大序列长度默认12,对应2^12=4096 """ 构造函数:初始化约束贝叶斯优化器 参数说明: space: 搜索空间定义,包含参数类型、范围等信息 num_model_constr: 模型约束数量,通过高斯过程建模的约束个数 y_thres: 约束阈值,约束函数需要小于该阈值才可行 model_name: 代理模型名称,默认'multi_task'多任务模型 base_model_name: 基础模型类型,默认'gpy'使用GPy库的高斯过程 rand_sample: 随机采样迭代次数,在开始阶段使用随机探索 acq_cls: 采集函数类,默认MACEConstr用于约束优化的多准则采集 es: 进化算法类型,默认'nsga2'用于多目标优化 model_config: 模型配置字典,可自定义模型参数 scramble_seed: Sobol序列随机种子,用于初始点采样的可重复性 num_hard_constr: 硬约束数量,问题固有的不可违反约束 num_hidden_constr: 隐藏约束数量,通过随机森林等模型学习的约束 max_sequence_length: 最大序列长度,用于硬约束检查 """ # 调用父类AbstractOptimizer的构造函数,初始化基础功能 super().__init__(space) # 存储搜索空间对象,用于参数转换和边界检查 self.space = space # 设置进化算法类型,用于优化采集函数寻找候选点 self.es = es # 初始化观测数据存储:X存储历史参数配置,y存储对应的目标值和约束值 # X是DataFrame,列名为参数名称;y是numpy数组,第一列是目标值,后面是约束值 self.X = pd.DataFrame(columns = self.space.para_names) self.y = np.zeros((0, num_model_constr + 1)) # 初始为空数组,随着观测增加 # 设置代理模型名称,决定使用哪种机器学习模型作为目标函数的代理 self.model_name = model_name # 设置随机采样次数:在开始阶段使用随机探索建立初始模型 # 默认值为空间维度+1,确保至少有2次随机采样 self.rand_sample = 1 + self.space.num_paras if rand_sample is None else max(2, rand_sample) # 初始化Sobol序列生成器,用于准随机采样(比纯随机采样覆盖更均匀) # scramble=True启用序列打乱,避免在低维度的相关性 self.scramble_seed = scramble_seed # 保存种子用于 reproducibility self.sobol = SobolEngine(self.space.num_paras, scramble = True, seed = scramble_seed) # 设置采集函数类,默认使用MACEConstr(多准则采集函数,带约束处理) self.acq_cls = acq_cls # 设置基础模型名称,决定底层使用的建模技术 self.base_model_name = base_model_name # 存储模型配置,如果为None则使用默认配置 self._model_config = model_config # 约束相关参数设置 self.num_hard_constr = num_hard_constr # 硬约束:问题固有的物理/逻辑约束 self.num_hidden_constr = num_hidden_constr # 隐藏约束:数据驱动的约束模型 self.max_sequence_length = max_sequence_length # 用于硬约束检查的最大序列长度 # 处理约束阈值y_thres的多种输入格式 if isinstance(y_thres,list): # 如果输入是列表,转换为numpy数组并重塑为1行多列 self.y_thres = np.array(y_thres).reshape([1, -1]) # 验证列表长度与模型约束数量一致 assert self.y_thres.shape[1] == num_model_constr, 'If give list, please give each output dim one threshold' elif isinstance(y_thres, int): # 如果输入是整数,转换为numpy数组并重塑为1行1列 self.y_thres = np.array([y_thres]).reshape([1, -1]) # 验证当输入为整数时,模型约束数量必须为1 assert num_model_constr == 1, 'If give int, only one ouput dim is supported' elif y_thres is None: # 如果没有提供约束阈值,设置为None(无约束优化) self.y_thres = y_thres def quasi_sample(self, n, fix_input = None): """ 准随机采样方法:使用Sobol序列在搜索空间内生成均匀分布的初始点 参数: n: 需要生成的采样点数量 fix_input: 需要固定的输入参数字典,{参数名: 固定值} 返回: df_samp: DataFrame,包含n个采样点的参数配置 """ # 从Sobol序列生成n个点在[0,1]^d单位超立方体内 # Sobol序列提供低差异采样,比随机采样覆盖更均匀 samp = self.sobol.draw(n) # 将[0,1]区间的采样映射到实际参数空间:x_actual = lb + (ub-lb) * x_normalized # 这里将单位超立方体映射到参数的实际取值范围 samp = samp * (self.space.opt_ub - self.space.opt_lb) + self.space.opt_lb # 分离连续变量和离散变量:前num_numeric列是连续变量,后面是离散变量 x = samp[:, :self.space.num_numeric] # 连续变量部分 xe = samp[:, self.space.num_numeric:] # 离散变量部分(已编码) # 对离散化后的连续变量进行取整处理:有些连续参数在变换后需要离散值 for i, n in enumerate(self.space.numeric_names): if self.space.paras[n].is_discrete_after_transform: # 如果参数在变换后应该是离散的,对采样值进行四舍五入 x[:, i] = x[:, i].round() # 将数值转换回原始参数空间:将标准化后的数值转换为实际参数值 # 包括连续变量的缩放和离散变量的解码 df_samp = self.space.inverse_transform(x, xe) # 如果有需要固定的输入参数,覆盖采样中的对应参数值 if fix_input is not None: for k, v in fix_input.items(): # 将指定参数的所有采样点设置为固定值 df_samp[k] = v return df_samp def suggest(self, n_suggestions=1, fix_input = None, rf_with_thres = None): """ 生成建议点的主方法:根据历史数据选择下一个评估点 参数: n_suggestions: 需要生成的建议点数量 fix_input: 需要固定的输入参数字典,在优化过程中保持这些参数不变 rf_with_thres: 随机森林模型及阈值元组,用于评估隐藏约束 返回: rec_selected: DataFrame,包含选择的n_suggestions个建议点 """ # 检查并行优化支持:只有MACEConstr采集函数支持并行建议 if self.acq_cls != MACEConstr and n_suggestions != 1: raise RuntimeError('Parallel optimization is supported only for MACE acquisition') # 初始随机采样阶段:当历史数据量小于随机采样阈值时 if self.X.shape[0] < self.rand_sample: # 使用准随机采样生成初始点,建立初始数据集 sample = self.quasi_sample(n_suggestions, fix_input) # 如果有硬约束,确保采样点满足所有硬约束条件 if self.num_hard_constr: sample = ensure_hard_constr(sample, max_sequence_length=2**self.max_sequence_length) return sample # 返回随机采样点 else: # 贝叶斯优化阶段:当有足够历史数据时,使用模型指导的智能采样 # 转换输入数据:将原始参数转换为模型可处理的数值格式 # X: 连续变量数值,Xe: 离散变量编码 X, Xe = self.space.transform(self.X) # 处理目标函数数据:使用幂变换增强模型拟合效果 # 幂变换可以使数据更接近正态分布,提高高斯过程模型性能 y_obj = self.y[:,0:1] # 提取目标值(第一列) y = enable_power_transform(y_obj) # 应用幂变换 # 处理约束数据:如果有模型约束,同样进行变换处理 y_constr = self.y[:, 1:] # 提取约束值(第二列及以后) y_thres = None # 初始化约束阈值变量 if y_constr.shape[1] > 0: # 将约束阈值与历史约束数据拼接,一起进行幂变换 # 这样可以确保约束值和阈值在相同的变换空间中 y_constr_with_prefer = np.concatenate([y_constr, self.y_thres], axis = 0) y_constr_with_prefer = enable_power_transform(y_constr_with_prefer) # 将变换后的目标值和约束值拼接为完整输出 y = torch.cat([y, y_constr_with_prefer[:-1, ]], axis=1) # 提取变换后的约束阈值(最后一行) y_thres = y_constr_with_prefer[-1: ] # 创建并训练代理模型:使用指定的模型类型和配置 # 模型输入维度:连续变量数 + 离散变量数,输出维度:目标数 + 约束数 model = get_model(self.model_name, self.space.num_numeric, self.space.num_categorical, y.shape[1], **self.model_config) # 使用历史数据拟合模型,学习目标函数和约束的映射关系 model.fit(X, Xe, y) # 获取当前最佳点的索引和具体参数配置 best_id = self.get_best_id(fix_input) best_x = self.X.iloc[[best_id]] # 保持DataFrame结构,使用双括号 # 预测当前最佳点的目标值和不确定性(用于采集函数) py_best, ps2_best = model.predict(*self.space.transform(best_x)) py_best = py_best[:,0].detach().numpy().squeeze() # 提取目标值预测均值 ps_best = ps2_best[:,0].sqrt().detach().numpy().squeeze() # 提取目标值预测标准差 # 计算自适应kappa参数(探索系数):随着迭代增加探索性 # kappa控制LCB中的探索程度,值越大越倾向于探索高不确定性区域 iter = max(1, self.X.shape[0] // n_suggestions) # 估算当前迭代次数 upsi = 0.5 # 探索权重参数 delta = 0.01 # 置信水平参数 # 根据理论公式计算kappa,随迭代对数增长 kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(iter) + np.log(3 * np.pi**2 / (3 * delta)))) # 创建采集函数实例,用于评估候选点的"好坏" acq = self.acq_cls( model, # 训练好的代理模型 space= self.space, # 搜索空间 num_model_constr = self.y.shape[1]-1, # 模型约束数量 num_hard_constr = self.num_hard_constr, # 硬约束数量 num_hidden_constr = self.num_hidden_constr, # 隐藏约束数量 rf_with_thres = rf_with_thres, # 随机森林约束模型 best_y = py_best, # 当前最佳目标值预测 kappa = kappa, # 探索系数 y_thres = y_thres, # 变换后的约束阈值 max_sequence_length = self.max_sequence_length) # 序列长度限制 # 创建均值和标准差函数,用于后续的候选点选择策略 mu = Mean(model.models[0]) # 均值预测函数 sig = Sigma(model.models[0], linear_a = -1.) # 标准差预测函数,线性系数-1用于后续处理 # 使用进化算法优化采集函数,寻找最有希望的候选点 # pop=100: 种群大小,iters=100: 进化迭代次数 opt = EvolutionOpt(self.space, acq, pop = 100, iters = 100, verbose = False, es=self.es) # 执行优化,从当前最佳点开始搜索,去除重复点 rec = opt.optimize(initial_suggest = best_x, fix_input = fix_input).drop_duplicates() # 检查候选点是否与历史观测重复,只保留唯一点 rec = rec[self.check_unique(rec)] # 如果生成的建议点数量不足,使用随机采样补充 cnt = 0 # 补充尝试计数器 while rec.shape[0] < n_suggestions: # 生成随机采样点补充不足部分 rand_rec = self.quasi_sample(n_suggestions - rec.shape[0], fix_input) # 确保随机采样点也满足硬约束 if self.num_hard_constr: rand_rec = ensure_hard_constr(rand_rec, max_sequence_length=2**self.max_sequence_length) # 检查随机采样点的唯一性 rand_rec = rand_rec[self.check_unique(rand_rec)] # 合并到候选点集中 rec = pd.concat([rec, rand_rec], axis = 0, ignore_index = True) cnt += 1 # 增加尝试计数 if cnt > 3: # 如果尝试3次后仍不足,可能设计空间很小,重复采样不可避免 break # 最终检查:如果候选点数量仍不足,强制补充随机采样 if rec.shape[0] < n_suggestions: rand_rec = self.quasi_sample(n_suggestions - rec.shape[0], fix_input) if self.num_hard_constr: rand_rec = ensure_hard_constr(rand_rec, max_sequence_length=2**self.max_sequence_length) rec = pd.concat([rec, rand_rec], axis = 0, ignore_index = True) # 从候选点集中选择最终的建议点 # 首先随机选择n_suggestions个点作为基础选择 select_id = np.random.choice(rec.shape[0], n_suggestions, replace = False).tolist() x_guess = [] # 初始化猜测点列表(未使用) # 在不计算梯度的情况下评估候选点质量 with torch.no_grad(): # 预测所有候选点的目标值均值和标准差 py_all = mu(*self.space.transform(rec)).squeeze().numpy() # 均值预测 ps_all = -1 * sig(*self.space.transform(rec)).squeeze().numpy() # 标准差预测(取负) # 找到预测最佳点(均值最小)和不确定性最大点(标准差最大) best_pred_id = np.argmin(py_all) # 预测目标值最小的点索引 best_unce_id = np.argmax(ps_all) # 不确定性最大的点索引 # 确保选择集中包含重要点:不确定性最大点和预测最佳点 if best_unce_id not in select_id and n_suggestions > 2: # 如果选择集中没有不确定性最大点且可替换,将其加入 select_id[0]= best_unce_id if best_pred_id not in select_id and n_suggestions > 2: # 如果选择集中没有预测最佳点且可替换,将其加入 select_id[1]= best_pred_id # 从候选点集中提取最终选择的建议点 rec_selected = rec.iloc[select_id].copy() # 使用copy避免视图问题 return rec_selected # 返回最终选择的建议点
GeneralBO
class GeneralBO(AbstractOptimizer): """ 通用的贝叶斯优化器,支持多目标优化和约束优化 可以处理多个目标函数、模型约束、硬约束和隐藏约束 """ def __init__(self, space : DesignSpace, # 设计空间,定义优化参数的取值范围和类型 num_obj: int = 1, # 目标函数的数量,默认为单目标优化 num_model_constr: int = 0, # 模型约束的数量,通过高斯过程建模的约束 num_hard_constr: int = 0, # 硬约束的数量,问题固有的不可违反约束 num_hidden_constr: int = 1, # 隐藏约束的数量,通过随机森林等模型学习的约束 max_sequence_length: int = 12, # 最大序列长度,用于硬约束检查,默认2^12=4096 rand_sample: int = None, # 随机采样阶段的数据量,None时使用默认值 model_name: str = 'multi_task', # 代理模型名称,默认多任务模型 model_config: dict = None, # 模型配置参数字典 kappa: float = 2., # 目标函数的探索系数,控制LCB中的探索程度 c_kappa: float = 0., # 约束函数的探索系数,控制约束边界的探索 use_noise: bool = False, # 是否在预测时添加噪声,增强鲁棒性 evo_pop: int = 100, # 进化算法的种群大小 evo_iters: int = 200, # 进化算法的迭代次数 ref_point: np.ndarray = None, # 多目标优化的参考点,用于计算超体积 scramble_seed: Optional[int] = None # Sobol序列的随机种子,用于可重复的初始采样 ): # 调用父类AbstractOptimizer的构造函数 super().__init__(space) # 初始化搜索空间和优化问题参数 self.space = space # 设计空间对象 self.num_obj = num_obj # 目标函数数量 self.num_model_constr = num_model_constr # 模型约束数量 self.num_hard_constr = num_hard_constr # 硬约束数量 self.num_hidden_constr = num_hidden_constr # 隐藏约束数量 self.max_sequence_length = max_sequence_length # 最大序列长度限制 # 设置随机采样阶段的数据量:默认值为参数维度+1,确保至少有2次采样 self.rand_sample = 1 + self.space.num_paras if rand_sample is None else rand_sample # 模型相关参数 self.model_name = model_name # 代理模型类型 self.model_config = model_config if model_config is not None else {} # 模型配置,空字典为默认配置 # 数据存储:X存储历史参数配置,y存储对应的目标值和约束值 self.X = pd.DataFrame(columns = self.space.para_names) # 空的DataFrame,列名为参数名 self.y = np.zeros((0, num_obj + num_model_constr)) # 空的numpy数组,列数为目标+约束 # 采集函数参数 self.kappa = kappa # 目标函数的探索系数 self.c_kappa = c_kappa # 约束函数的探索系数 self.use_noise = use_noise # 是否使用噪声 # 优化过程参数 self.model = None # 代理模型实例,初始为None self.evo_pop = evo_pop # 进化算法种群大小 self.evo_iters = evo_iters # 进化算法迭代次数 self.iter = 0 # 优化迭代计数器 # 多目标优化参数 self.ref_point = ref_point # 参考点,用于多目标优化的超体积计算 # 初始化Sobol序列生成器,用于准随机初始采样 # scramble=True启用序列打乱,避免低维度相关性 self.sobol = SobolEngine(self.space.num_paras, scramble = True, seed = scramble_seed) # 验证模型是否支持多输出:如果目标+约束总数>1,模型必须支持多输出 if num_obj + num_model_constr > 1: assert get_model_class(model_name).support_multi_output def quasi_sample(self, n, fix_input = None): """ 准随机采样方法:使用Sobol序列在搜索空间内生成均匀分布的采样点 参数: n: 需要生成的采样点数量 fix_input: 需要固定的输入参数字典,{参数名: 固定值} 返回: df_samp: DataFrame,包含n个采样点的参数配置 """ # 从Sobol序列生成n个点在[0,1]^d单位超立方体内 samp = self.sobol.draw(n) # 将[0,1]区间的采样映射到实际参数空间:x_actual = lb + (ub-lb) * x_normalized samp = samp * (self.space.opt_ub - self.space.opt_lb) + self.space.opt_lb # 分离连续变量和离散变量:前num_numeric列是连续变量,后面是离散变量 x = samp[:, :self.space.num_numeric] # 连续变量部分 xe = samp[:, self.space.num_numeric:] # 离散变量部分(已编码) # 对离散化后的连续变量进行取整处理 for i, n in enumerate(self.space.numeric_names): if self.space.paras[n].is_discrete_after_transform: # 如果参数在变换后应该是离散的,对采样值进行四舍五入 x[:, i] = x[:, i].round() # 将数值转换回原始参数空间:将标准化后的数值转换为实际参数值 df_samp = self.space.inverse_transform(x, xe) # 如果有需要固定的输入参数,覆盖采样中的对应参数值 if fix_input is not None: for k, v in fix_input.items(): df_samp[k] = v return df_samp def suggest(self, n_suggestions = 1, fix_input = None, rf_with_thres = None): """ 生成建议点的主方法:根据历史数据选择下一个评估点 参数: n_suggestions: 需要生成的建议点数量 fix_input: 需要固定的输入参数字典 rf_with_thres: 随机森林模型及阈值元组,用于评估隐藏约束 返回: 建议点DataFrame,包含n_suggestions个参数配置 """ # 增加迭代计数器 self.iter += 1 # 初始随机采样阶段:当历史数据量小于随机采样阈值时 if self.X.shape[0] < self.rand_sample: # 使用准随机采样生成初始点,建立初始数据集 sample = self.quasi_sample(n_suggestions, fix_input) # 如果有硬约束,确保采样点满足所有硬约束条件 if self.num_hard_constr: sample = ensure_hard_constr(sample, max_sequence_length=2**self.max_sequence_length) return sample else: # 贝叶斯优化阶段:当有足够历史数据时,使用模型指导的智能采样 # 转换输入数据:将原始参数转换为模型可处理的数值格式 X, Xe = self.space.transform(self.X) # 将目标值和约束值转换为PyTorch张量 y = torch.FloatTensor(self.y) # 获取离散变量的类别数量(用于模型初始化) num_uniqs = None if Xe.shape[1] > 0: # 如果有离散变量 num_uniqs = [len(self.space.paras[name].categories) for name in self.space.enum_names] # 创建并训练代理模型 self.model = get_model(self.model_name, X.shape[1], Xe.shape[1], y.shape[1], num_uniqs=num_uniqs, **self.model_config) self.model.fit(X, Xe, y) # 使用历史数据训练模型 # 设置探索系数:如果未提供则根据理论公式计算自适应值 kappa = self.kappa c_kappa = self.c_kappa upsi = 0.1 # 探索权重参数 delta = 0.01 # 置信水平参数 if kappa is None: # 自适应kappa计算:随迭代次数对数增长,增加探索性 kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(self.iter) + np.log(3 * np.pi**2 / (3 * delta)))) if c_kappa is None: # 自适应c_kappa计算:约束探索系数的类似公式 c_kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(self.iter) + np.log(3 * np.pi**2 / (3 * delta)))) # 创建通用采集函数,处理多目标和约束优化 acq = GeneralAcq( self.model, # 训练好的代理模型 self.num_obj, # 目标函数数量 self.num_model_constr, # 模型约束数量 num_hard_constr = self.num_hard_constr, # 硬约束数量 num_hidden_constr = self.num_hidden_constr, # 隐藏约束数量 max_sequence_length = self.max_sequence_length, # 序列长度限制 rf_with_thres = rf_with_thres, # 随机森林约束模型 space = self.space, # 搜索空间 kappa = kappa, # 目标探索系数 c_kappa = c_kappa, # 约束探索系数 use_noise = self.use_noise) # 是否使用噪声 # 使用进化算法优化采集函数,寻找最有希望的候选点 opt = EvolutionOpt(self.space, acq, pop=self.evo_pop, iters=self.evo_iters) suggest = opt.optimize() # 执行优化,返回候选点 # 如果进化算法找到的候选点数量不足,使用随机采样补充 if suggest.shape[0] < n_suggestions: # 生成随机采样点补充不足部分 rand_samp = self.quasi_sample(n_suggestions - suggest.shape[0], fix_input) # 如果有硬约束,确保随机采样点也满足约束 if self.num_hard_constr: print('GeneralBO before ensure:') print(rand_samp) # 应用硬约束确保 rand_samp = ensure_hard_constr(rand_samp, max_sequence_length=2**self.max_sequence_length) print('GeneralBO after ensure:') print(rand_samp) # 合并进化算法结果和随机采样结果 suggest = pd.concat([suggest, rand_samp], axis=0, ignore_index=True) return suggest # 如果没有提供参考点(单目标或简单的多目标选择策略) elif self.ref_point is None: # 在不计算梯度的情况下评估候选点 with torch.no_grad(): # 预测所有候选点的均值和方差 py, ps2 = self.model.predict(*self.space.transform(suggest)) # 找到不确定性最大的点(各目标对数方差之和最大) largest_uncert_id = np.argmax(np.log(ps2).sum(axis=1)) # 随机选择n_suggestions个候选点 select_id = np.random.choice(suggest.shape[0], n_suggestions, replace=False).tolist() # 确保选择集中包含不确定性最大的点(促进探索) if largest_uncert_id not in select_id: select_id[0] = largest_uncert_id return suggest.iloc[select_id] # 返回选择的建议点 else: # 多目标优化且有参考点的情况:使用EHVI(期望超体积改进)进行选择 assert self.num_obj > 1 # 必须是多目标问题 assert self.num_model_constr == 0 # EHVI目前不支持约束 n_mc = 10 # Monte Carlo采样次数 # 初始化超体积计算器 hv = HV(ref_point=self.ref_point.reshape(-1)) with torch.no_grad(): # 预测候选点的均值和方差 py, ps2 = self.model.predict(*self.space.transform(suggest)) # 从后验分布中采样,获得目标函数的蒙特卡洛样本 y_samp = self.model.sample_y(*self.space.transform(suggest), n_mc).numpy() # 获取当前的Pareto前沿 y_curr = self.get_pf(self.y).copy() select_id = [] # 存储选择的点索引 # 依次选择n_suggestions个点 for i in range(n_suggestions): ehvi_lst = [] # 存储每个候选点的期望超体积改进 base_hv = hv.do(y_curr) # 计算当前Pareto前沿的超体积 # 对每个候选点计算EHVI for j in range(suggest.shape[0]): samp = y_samp[:, j] # 候选点j的蒙特卡洛样本 hvi_est = 0 # 超体积改进估计值 # 对每个蒙特卡洛样本计算超体积改进 for k in range(n_mc): # 将当前样本添加到Pareto前沿中 y_tmp = np.vstack([y_curr, samp[[k]]]) # 计算新的超体积 hvi_est += hv.do(y_tmp) - base_hv # 超体积改进 hvi_est /= n_mc # 取平均得到期望超体积改进 ehvi_lst.append(hvi_est) # 选择EHVI最大的点,如果没有正改进则随机选择 best_id = np.argmax(ehvi_lst) if max(ehvi_lst) > 0 else np.random.choice(suggest.shape[0]) # 将选择点的最优样本(各目标最小值)添加到当前Pareto前沿 y_curr = np.vstack([y_curr, y_samp[:, best_id].min(axis=0, keepdims=True)]) select_id.append(best_id) # 记录选择的点索引 # 确保选择的点不重复,如果重复则用其他点补充 select_id = list(set(select_id)) # 去重 if len(select_id) < n_suggestions: # 找出未被选择的候选点 candidate_id = [i for i in range(suggest.shape[0]) if i not in select_id] # 随机选择补充点 select_id += np.random.choice(candidate_id, n_suggestions - len(select_id), replace=False).tolist() # 返回最终选择的建议点 return suggest.iloc[select_id]
区别
HEBOConstr:默认使用 MACEConstr(LCB + EI + PI),将三种采集函数组合为多目标优化,使用 Power Transform(Box-Cox 或 Yeo-Johnson)对目标函数进行变换,用 Mean 和 Sigma 选择建议点,优先选择预测值最小和不确定性最大的点
GeneralBO:使用 GeneralAcq(LCB),每个目标都使用 LCB,直接使用原始观测值,不做变换,若提供 ref_point,使用 EHVI(Expected Hypervolume Improvement)选择,否则基于不确定性选择
进化算法选择最优配置
class EvolutionOpt: """ 进化算法优化器:使用进化算法优化采集函数,寻找最优候选点 支持单目标和多目标优化,处理混合变量(连续和离散)优化问题 """ def __init__(self, design_space : DesignSpace, # 设计空间,定义优化参数的取值范围和类型 acq : Acquisition, # 采集函数实例,用于评估候选点的质量 es : str = None, # 进化算法类型,None时自动选择 **conf): # 其他配置参数 # 初始化基本参数 self.space = design_space # 设计空间对象 self.es = es # 进化算法类型 self.acq = acq # 采集函数实例 self.pop = conf.get('pop', 100) # 种群大小,默认100 self.iter = conf.get('iters',500) # 进化迭代次数,默认500 self.verbose = conf.get('verbose', False) # 是否输出详细信息,默认False self.repair = conf.get('repair', None) # 修复算子,用于处理不可行解 self.sobol_init = conf.get('sobol_init', True) # 是否使用Sobol序列初始化种群,默认True # 验证采集函数至少有一个目标 assert(self.acq.num_obj > 0) # 如果没有指定进化算法类型,根据目标数量自动选择 if self.es is None: # 单目标使用遗传算法(GA),多目标使用NSGA-II self.es = 'nsga2' if self.acq.num_obj > 1 else 'ga' def optimize(self, initial_suggest : pd.DataFrame = None, fix_input : dict = None, return_pop = False) -> pd.DataFrame: """ 执行进化算法优化过程 参数: initial_suggest: 初始建议点,用于引导搜索方向 fix_input: 需要固定的输入参数字典 return_pop: 是否返回整个种群而不仅仅是最优解 返回: df_opt: DataFrame,包含优化得到的候选点参数配置 """ # 获取设计空间的上下界(转换为numpy数组) lb = self.space.opt_lb.numpy() # 参数下界 ub = self.space.opt_ub.numpy() # 参数上界 # 创建贝叶斯优化问题实例,将采集函数包装成优化问题 prob = BOProblem(self.acq, self.space, fix_input) # 初始化种群:使用Sobol序列或随机采样生成初始种群 init_pop = get_init_pop(self.space, self.pop, initial_suggest, self.sobol_init) # 根据目标数量选择不同的进化算法 if self.acq.num_obj == 1: # 单目标优化:使用混合变量遗传算法 algo = MixedVariableGA( pop_size = self.pop, # 种群大小 repair = self.repair, # 修复算子 sampling = init_pop # 初始种群 ) else: # 多目标优化:使用NSGA-II算法 algo = NSGA2( pop_size = self.pop, # 种群大小 sampling = init_pop, # 初始种群 mating = MixedVariableMating(eliminate_duplicates = MixedVariableDuplicateElimination()), # 混合变量交配算子 eliminate_duplicates = MixedVariableDuplicateElimination() # 重复个体消除 ) # 执行进化算法优化过程 # ('n_gen', self.iter) 表示运行self.iter代 res = minimize(prob, algo, ('n_gen', self.iter), verbose = self.verbose) # 处理优化结果 if res.X is not None and not return_pop: # 如果找到了最优解且不需要返回整个种群 x = res.X # 最优解 # 处理不同格式的解 if isinstance(x, dict): # 如果解是字典格式,转换为列表 x = [x] if isinstance(x, np.ndarray): # 如果解是numpy数组,转换为列表 x = x.tolist() # 提取参数值并转换为DataFrame opt_x = pd.DataFrame(x)[self.space.para_names].values.astype(float) else: # 返回整个种群或当没有明确最优解时 # 从种群中提取所有个体的参数 opt_x = pd.DataFrame([p.X for p in res.pop])[self.space.para_names].values.astype(float) # 如果是单目标优化且不需要返回整个种群,随机选择一个个体 if self.acq.num_obj == 1 and not return_pop: opt_x = opt_x[[np.random.choice(opt_x.shape[0])]] # 保存优化结果(用于调试和分析) self.res = res # 分离连续变量和离散变量 opt_xcont = torch.from_numpy(opt_x[:, :self.space.num_numeric]) # 连续变量部分 opt_xenum = torch.from_numpy(opt_x[:, self.space.num_numeric:]) # 离散变量部分 # 将优化结果转换回原始参数空间 df_opt = self.space.inverse_transform(opt_xcont, opt_xenum) # 如果有需要固定的输入参数,覆盖优化结果中的对应参数 if fix_input is not None: for k, v in fix_input.items(): df_opt[k] = v return df_opt # 返回最终的优化结果
输入:采集函数(定义了一个复杂的优化景观)
输出:在采集函数上得分最高的参数配置