贝叶斯优化之采集函数 0基础学习

写在前面:什么是贝叶斯优化
参考这里

主要包含两个部分

  • 一个代理模型(surrogate model),用于对目标函数进行建模。代理模型通常有确定的公式或者能计算梯度,又或者有已知的凹凸性、线性等特性,总之就是更容易用于优化。更泛化地讲,其实它就是一个学习模型,输入是所有观测到的函数值点,训练后可以在给定任意x的情况下给出对f(x)的估计。

  • 一个优化策略(optimization strategy),决定下一个采样点的位置,即下一步应在哪个输入x
    处观测函数值f(x)。通常它是通过采集函数(acquisition function) 来实现的:
    采集函数通常是一个由代理模型推出的函数,它的输入是可行集(feasible set)A上的任意值,输出值衡量了每个输入x有多值得被观测。通常会从以下两方面考虑:

    • 有多大的可能性在x处取得最优值
    • 评估x是否能减少贝叶斯统计模型的不确定性

采集函数通常也是容易求最优值的函数(例如:有公式/能算梯度等),下一个采样点就是可行集上的最大值点,即使采集函数的取最大值的点。

本文主要学习采集函数,模型代码参考SCOOT(WWW2025 oral)中hebo/acqusitions库的代码实现
代理模型内容见此

HEBO的简要介绍

SCOOT代码大部分都是基于HEBO(异质方差进化贝叶斯优化)算法实现,这里先简要介绍下,具体见https://github.com/huawei-noah/HEBO

背景

  • 建模假设
    使用代理模型来近似黑盒函数问题。常见的代理模型是高斯过程回归模型(Gaussian processes,GPs)。GPs模型本身是同方差的(Heteroscedastic),但其实际建模的问题是带异质方差噪音,同时同方差的GPs模型不能处理非平稳的评估点集,这种现象在机器学习超参数优化中尤为常见。
  • 采集函数和优化器假设
    在采集函数阶段,通常是最大化采集函数并输出下一个候选点。这一过程将引入额外的限制和假设。如假设黑盒函数中只包含连续变量,采取一阶或二阶(如LBFGS,ADAM)来求解,而忽略了超参数优化中包含的离散变量(如深度网络中的隐层大小)。另外在整一个贝叶斯优化框架中采集函数只选择一种,忽略了多个采集函数组合的情况。

问题及回答

问题1:超参数优化任务是平稳的吗?

回答:即使是简单的机器学习任务也显示出极大的非平稳性( non-stationarity)

问题2:超参数优化任务是同质的吗?

回答:即使是简单的机器学习任务也显示出极大的异方差性(heteroscedasticity)

问题3:在超参数优化中,不同采集函数输出的结果是否是冲突的呢?

回答:没有“一招鲜吃遍天”的方法。不同采集函数通常是冲突,经常会输出相反的候选点

总结

  • 针对不平稳性,对输入做转化(Transformation)
  • 针对异质方差,对输出做转化(Transformation)
  • 不同采集函数在不同参数设置下,其对应的采集函数值相互冲突。 对此,作者从如下两方面进行优化:
    • 带鲁棒性的采集函数;
    • 多目标采集函数。

整体主要由如下部分组成:

伪随机序列生成,具体可用参考笔者《超参数优化(三): 实验设计》部分
对X进行变换
对y进行变换
调用GPs模型,进行训练建模
GPs模型输出的最优候选点
构建采集函数
多目标采集函数优化
输出最优候选点

本文介绍的HEBO方法,在2020 BBO大赛和QQ浏览器2021AI算法大赛中表现都非常优异。HEBO算法框架仍然是贝叶斯优化(Bayesian Optimisation)。作者通过分析经典贝叶斯优化的数据输入、数据输出、代理模型和采集函数存在的问题和局限性,对每一步的问题都做了相应的优化:

对数据输入、数据输出进行变换校准;
将数据变换校准和GPs核函数联合在一起优化;
引入多目标采集函数,进行更鲁棒的探索(HEBO方法最大亮点)。

采集函数

从SCOOT的代码结构上来看,使用了两个优化器
当优化目标是单目标时使用的是HEBOConstr,采集函数使用的MACEConstr
当优化目标是多目标时使用的是GeneralBO,采集函数使用的GeneralAcq

MACEConstr

class MACEConstr(Acquisition):     def __init__(self, model, best_y, num_model_constr, y_thres, **conf):         # 调用父类 Acquisition 的初始化方法         super().__init__(model, **conf)                  # 设置LCB中的探索系数kappa,默认值为2.0         self.kappa = conf.get('kappa', 2.0)                  # 设置数值稳定性参数,避免除零错误,默认1e-4         self.eps   = conf.get('eps', 1e-4)                  # 设置硬约束数量,默认3个         self._num_hard_constr = conf.get('num_hard_constr', 3)                  # 设置隐藏约束数量,默认1个         self._num_hidden_constr = conf.get('num_hidden_constr', 1)                  # 获取随机森林模型及阈值,用于评估隐藏约束         self.rf_with_thres = conf.get('rf_with_thres', None)                  # 设置最大序列长度,用于硬约束检查,默认12         self.max_sequence_length = conf.get('max_sequence_length', 12)          # 获取搜索空间定义         self.space = conf.get('space', None)                  # 设置当前最佳目标值tau,用于EI和PI计算         self.tau   = best_y                  # 设置约束阈值,约束函数需要小于该阈值         self.y_thres = y_thres                  # 设置模型约束数量(通过高斯过程建模的约束)         self._num_model_constr= num_model_constr                  # 如果没有定义搜索空间,则禁用硬约束和隐藏约束         if not self.space:             self._num_hard_constr = 0              self._num_hidden_constr = 0          # 如果有隐藏约束,必须提供随机森林模型和阈值         if self._num_hidden_constr > 0:             assert isinstance(self.rf_with_thres, tuple), f'num_hiddent_constr is {self._num_hidden_constr}, the rf model and corresponding threshold should be passed'          @property     def num_obj(self):         """返回目标函数数量,固定为3(LCB, EI, PI三个采集准则)"""         return 3      @property     def num_model_constr(self) -> int:         """返回模型约束数量的属性"""         return self._num_model_constr          @property     def num_hidden_constr(self) -> int:         """返回隐藏约束数量的属性"""         return self._num_hidden_constr      @property     def num_constr(self) -> int:         """返回总约束数量的属性(模型约束+硬约束+隐藏约束)"""         return self._num_model_constr + self._num_hard_constr + self._num_hidden_constr          @property     def num_hard_constr(self) -> int:         """返回硬约束数量的属性"""         return self._num_hard_constr      def eval(self, x : torch.FloatTensor, xe : torch.LongTensor) -> torch.FloatTensor:         """         采集函数评估方法         最小化三个目标:(-1 * EI的负对数, -1 * PI的负对数, LCB)         即同时优化三种采集准则:EI、PI和LCB         """         # 不计算梯度,提高计算效率         with torch.no_grad():             # 使用高斯过程模型预测均值和方差             py, ps2   = self.model.predict(x, xe)              ### 目标函数部分:计算三种采集准则 ###                          # 提取目标函数的预测(第一列)             py_obj = py[:,:1]             # 提取目标函数的方差(第一列)             ps2_obj = ps2[:, :1]                          # 计算噪声标准差,乘以sqrt(2)用于EI/PI计算             noise     = np.sqrt(2.0) * self.model.noise.sqrt()[0]                          # 计算目标函数的标准差,确保数值稳定性             ps_obj    = ps2_obj.sqrt().clamp(min = torch.finfo(ps2_obj.dtype).eps)                          # 计算LCB(Lower Confidence Bound):均值 - kappa * 标准差             lcb_obj   = (py_obj + noise * torch.randn(py_obj.shape)) - self.kappa * ps_obj                          # 计算标准化改进量:(当前最佳值 - 预测值) / 标准差             normed    = ((self.tau - self.eps - py_obj - noise * torch.randn(py_obj.shape)) / ps_obj)                          # 创建标准正态分布对象             dist      = Normal(0., 1.)                          # 计算标准正态分布在normed处的对数概率密度             log_phi   = dist.log_prob(normed)                          # 计算标准正态分布在normed处的累积分布函数             Phi       = dist.cdf(normed)                          # 计算改进概率PI:P(f(x) < tau)             PI        = Phi                          # 计算期望改进EI:E[max(tau - f(x), 0)]             EI        = ps_obj * (Phi * normed +  log_phi.exp())                          # EI的对数近似公式,用于数值不稳定情况             logEIapp  = ps_obj.log() - 0.5 * normed**2 - (normed**2 - 1).log()                          # PI的对数近似公式,用于数值不稳定情况             logPIapp  = -0.5 * normed**2 - torch.log(-1 * normed) - torch.log(torch.sqrt(torch.tensor(2 * np.pi)))              # 判断哪些点需要使用近似公式:             # 当normed > -6 且 EI和PI的对数有限时使用精确计算,否则使用近似             use_app             = ~((normed > -6) & torch.isfinite(EI.log()) & torch.isfinite(PI.log())).reshape(-1)                          # 初始化输出张量,3列对应3个采集准则             out                 = torch.zeros(x.shape[0], 3)                          # 第一列:LCB目标(探索性)             out[:, 0]           = lcb_obj.reshape(-1)                          # 第二列:EI准则(使用近似或精确计算)             out[:, 1][use_app]  = -1 * logEIapp[use_app].reshape(-1)  # 使用近似             out[:, 1][~use_app] = -1 * EI[~use_app].log().reshape(-1) # 使用精确                          # 第三列:PI准则(使用近似或精确计算)             out[:, 2][use_app]  = -1 * logPIapp[use_app].reshape(-1)  # 使用近似             out[:, 2][~use_app] = -1 * PI[~use_app].log().reshape(-1) # 使用精确              ### 模型约束部分 ###             if self.y_thres is not None:                 # 提取约束函数的预测(第1列之后的所有列)                 py_constr = py[:, 1:]                 # 提取约束函数的方差                 ps2_constr = ps2[:, 1:]                 # 计算约束函数的标准差                 ps_constr    = ps2_constr.sqrt().clamp(min = torch.finfo(ps2_constr.dtype).eps)                 # 计算约束违反程度:预测值 + kappa*标准差 - 阈值                 # 如果结果>0表示可能违反约束                 out_model_constr = (py_constr + noise * torch.randn(py_constr.shape)) + self.kappa * ps_constr - self.y_thres             else:                 # 没有模型约束时返回空张量                 out_model_constr = torch.zeros([out.size()[0], 0])                          # 获取硬约束违反程度(基于先验知识的约束)             out_hard_constr = get_hard_constr(x, self.max_sequence_length,                                               self.num_hard_constr, py, self.space)                          # 获取隐藏约束违反程度(基于随机森林模型的约束)             out_hidden_constr = get_hidden_constr(x, xe, self.rf_with_thres,                                                   self.num_hidden_constr, py, self.space)                          # 合并所有输出:3个采集准则目标 + 模型约束 + 硬约束 + 隐藏约束             return torch.concat([out, out_model_constr, out_hard_constr, out_hidden_constr], dim=1) 

GeneralAcq

class GeneralAcq(Acquisition):     def __init__(self, model, num_obj, num_model_constr, **conf):         # 调用父类 Acquisition 的初始化方法         super().__init__(model, **conf)                  # 设置目标函数数量         self._num_obj    = num_obj                  # 设置模型约束数量(通过高斯过程建模的约束)         self._num_model_constr = num_model_constr                  # 从配置中获取硬约束数量,默认为3         self._num_hard_constr = conf.get('num_hard_constr', 3)                  # 从配置中获取隐藏约束数量,默认为1         self._num_hidden_constr = conf.get('num_hidden_constr', 1)                  # 获取随机森林模型及阈值,用于隐藏约束评估         self.rf_with_thres = conf.get('rf_with_thres', None)                  # 获取搜索空间定义         self.space       = conf.get('space', None)                  # 获取目标函数的kappa参数(探索系数),默认为2.0         self.kappa       = conf.get('kappa', 2.0)                  # 获取约束函数的c_kappa参数(约束探索系数),默认为0         self.c_kappa     = conf.get('c_kappa', 0.)                  # 是否在预测时添加噪声,默认为True         self.use_noise   = conf.get('use_noise', True)                  # 最大序列长度,用于硬约束检查,默认为12         self.max_sequence_length = conf.get('max_sequence_length', 12)          # 如果没有定义搜索空间,则禁用硬约束和隐藏约束         if not self.space:             self._num_hard_constr = 0              self._num_hidden_constr = 0                      # 验证模型输出维度等于目标数加模型约束数         assert self.model.num_out == self.num_obj + self.num_model_constr                  # 验证至少有一个目标函数         assert self.num_obj >= 1                  # 如果有隐藏约束,必须提供随机森林模型和阈值         if self._num_hidden_constr > 0:             assert isinstance(self.rf_with_thres, tuple), f'num_hiddent_constr is {self._num_hidden_constr}, the rf model and corresponding threshold should be passed'      @property     def num_obj(self) -> int:         """返回目标函数数量的属性"""         return self._num_obj      @property     def num_model_constr(self) -> int:         """返回模型约束数量的属性"""         return self._num_model_constr          @property     def num_hidden_constr(self) -> int:         """返回隐藏约束数量的属性"""         return self._num_hidden_constr      @property     def num_constr(self) -> int:         """返回总约束数量的属性(模型约束+硬约束+隐藏约束)"""         return self._num_model_constr+self._num_hard_constr + self._num_hidden_constr          @property     def num_hard_constr(self) -> int:         """返回硬约束数量的属性"""         return self._num_hard_constr      def eval(self, x : torch.FloatTensor, xe : torch.LongTensor) -> torch.FloatTensor:         """         采集函数评估方法,处理一般约束多目标优化问题                  假设有 $om$ 个目标和 $cn$ 个约束,问题应被转化为:         最小化 (o1, o2, ..., om)         满足   c1 < 0,                 c2 < 0,                 ...                cb_cn < 0          在这个 GeneralAcq 采集函数中,我们计算目标和约束的下置信界,         并解决以下问题:                  最小化 (lcb_o1, lcb_o2, ..., lcb_om)         满足   lcb_c1 < 0,                 lcb_c2 < 0,                 ...                lcb_cn < 0         """         # 不计算梯度,提高计算效率         with torch.no_grad():             # 使用模型预测均值和方差             py, ps2 = self.model.predict(x, xe)                          # 计算标准差,并确保数值稳定性(避免除零错误)             ps      = ps2.sqrt().clamp(min = torch.finfo(ps2.dtype).eps)                          # 如果启用噪声,在预测值上添加高斯噪声             if self.use_noise:                 noise  = self.model.noise.sqrt()  # 获取噪声标准差                 py    += noise * torch.randn(py.shape)  # 添加随机噪声                          # 初始化输出张量,形状与预测值相同             out = torch.ones(py.shape)                          # 计算目标函数的下置信界(LCB)             # 对于最小化问题,LCB = 均值 - kappa * 标准差             out[:, :self.num_obj] = py[:, :self.num_obj]  - self.kappa   * ps[:, :self.num_obj]                          # 计算约束函数的下置信界             # 对于约束c<0,LCB = 均值 - c_kappa * 标准差             out[:, self.num_obj:] = py[:, self.num_obj:]  - self.c_kappa * ps[:, self.num_obj:]                          # 获取硬约束违反程度(基于先验知识的约束)             out_hard_constr = get_hard_constr(x, self.max_sequence_length,                                               self.num_hard_constr, py, self.space)                          # 获取隐藏约束违反程度(基于随机森林模型的约束)             out_hidden_constr = get_hidden_constr(x, xe, self.rf_with_thres,                                                   self.num_hidden_constr, py, self.space)                  # 合并所有输出:目标LCB + 模型约束LCB + 硬约束 + 隐藏约束         return torch.concat([out, out_hard_constr, out_hidden_constr], dim=1) 

区别

在单目标问题中,没有一种采集准则在任何情况下都是最优的:
EI:在中等不确定性下表现好
PI:在低不确定性下偏向利用
LCB/UCB:在高不确定性下偏向探索
MACE的解决方案则是同时优化三种准则,让算法自动选择平衡:minimize [LCB, -log(EI), -log(PI)](间接转化为了一个多目标优化)
多目标问题本身就有多个需要权衡的目标,引入多种采集准则会使问题过于复杂,而且LCB可以很自然地扩展到多目标情况,对于M个目标,需要计算M次EI和M次PI,计算量随目标数线性增长,且数值稳定性问题更严重

优化器

介绍下HEBOConstr和GeneralBO是怎么用这些acq的

HEBOConstr

class HEBOConstr(AbstractOptimizer):     # 类属性:声明支持的优化特性     support_parallel_opt  = True    # 支持并行优化,可同时评估多个候选点     support_combinatorial = True    # 支持组合优化,可处理离散和连续混合参数     support_contextual    = True    # 支持上下文优化,可处理固定参数约束          def __init__(self,                  space,                  num_model_constr,                  y_thres = None,                  model_name = 'multi_task',                  base_model_name='gpy',                  rand_sample = None,                  acq_cls = MACEConstr,                 es = 'nsga2',                  model_config = None,                 scramble_seed: Optional[int] = None,                  num_hard_constr: Optional[int] = None,                  num_hidden_constr: Optional[int] = 1,                 max_sequence_length: Optional[int] = 12):  # 最大序列长度默认12,对应2^12=4096         """         构造函数:初始化约束贝叶斯优化器                  参数说明:         space: 搜索空间定义,包含参数类型、范围等信息         num_model_constr: 模型约束数量,通过高斯过程建模的约束个数         y_thres: 约束阈值,约束函数需要小于该阈值才可行         model_name: 代理模型名称,默认'multi_task'多任务模型         base_model_name: 基础模型类型,默认'gpy'使用GPy库的高斯过程         rand_sample: 随机采样迭代次数,在开始阶段使用随机探索         acq_cls: 采集函数类,默认MACEConstr用于约束优化的多准则采集         es: 进化算法类型,默认'nsga2'用于多目标优化         model_config: 模型配置字典,可自定义模型参数         scramble_seed: Sobol序列随机种子,用于初始点采样的可重复性         num_hard_constr: 硬约束数量,问题固有的不可违反约束         num_hidden_constr: 隐藏约束数量,通过随机森林等模型学习的约束         max_sequence_length: 最大序列长度,用于硬约束检查         """                  # 调用父类AbstractOptimizer的构造函数,初始化基础功能         super().__init__(space)                  # 存储搜索空间对象,用于参数转换和边界检查         self.space       = space                  # 设置进化算法类型,用于优化采集函数寻找候选点         self.es          = es                  # 初始化观测数据存储:X存储历史参数配置,y存储对应的目标值和约束值         # X是DataFrame,列名为参数名称;y是numpy数组,第一列是目标值,后面是约束值         self.X           = pd.DataFrame(columns = self.space.para_names)         self.y           = np.zeros((0, num_model_constr + 1))  # 初始为空数组,随着观测增加          # 设置代理模型名称,决定使用哪种机器学习模型作为目标函数的代理         self.model_name  = model_name                  # 设置随机采样次数:在开始阶段使用随机探索建立初始模型         # 默认值为空间维度+1,确保至少有2次随机采样         self.rand_sample = 1 + self.space.num_paras if rand_sample is None else max(2, rand_sample)                  # 初始化Sobol序列生成器,用于准随机采样(比纯随机采样覆盖更均匀)         # scramble=True启用序列打乱,避免在低维度的相关性         self.scramble_seed = scramble_seed  # 保存种子用于 reproducibility         self.sobol       = SobolEngine(self.space.num_paras, scramble = True, seed = scramble_seed)                  # 设置采集函数类,默认使用MACEConstr(多准则采集函数,带约束处理)         self.acq_cls     = acq_cls                  # 设置基础模型名称,决定底层使用的建模技术         self.base_model_name = base_model_name                  # 存储模型配置,如果为None则使用默认配置         self._model_config = model_config                  # 约束相关参数设置         self.num_hard_constr = num_hard_constr      # 硬约束:问题固有的物理/逻辑约束         self.num_hidden_constr = num_hidden_constr  # 隐藏约束:数据驱动的约束模型         self.max_sequence_length = max_sequence_length  # 用于硬约束检查的最大序列长度                  # 处理约束阈值y_thres的多种输入格式         if isinstance(y_thres,list):             # 如果输入是列表,转换为numpy数组并重塑为1行多列             self.y_thres = np.array(y_thres).reshape([1, -1])             # 验证列表长度与模型约束数量一致             assert self.y_thres.shape[1] == num_model_constr, 'If give list, please give each output dim one threshold'         elif isinstance(y_thres, int):             # 如果输入是整数,转换为numpy数组并重塑为1行1列             self.y_thres = np.array([y_thres]).reshape([1, -1])             # 验证当输入为整数时,模型约束数量必须为1             assert num_model_constr == 1, 'If give int, only one ouput dim is supported'         elif y_thres is None:             # 如果没有提供约束阈值,设置为None(无约束优化)             self.y_thres = y_thres               def quasi_sample(self, n, fix_input = None):         """         准随机采样方法:使用Sobol序列在搜索空间内生成均匀分布的初始点                  参数:         n: 需要生成的采样点数量         fix_input: 需要固定的输入参数字典,{参数名: 固定值}                  返回:         df_samp: DataFrame,包含n个采样点的参数配置         """                  # 从Sobol序列生成n个点在[0,1]^d单位超立方体内         # Sobol序列提供低差异采样,比随机采样覆盖更均匀         samp    = self.sobol.draw(n)                  # 将[0,1]区间的采样映射到实际参数空间:x_actual = lb + (ub-lb) * x_normalized         # 这里将单位超立方体映射到参数的实际取值范围         samp    = samp * (self.space.opt_ub - self.space.opt_lb) + self.space.opt_lb                  # 分离连续变量和离散变量:前num_numeric列是连续变量,后面是离散变量         x       = samp[:, :self.space.num_numeric]      # 连续变量部分         xe      = samp[:, self.space.num_numeric:]      # 离散变量部分(已编码)                  # 对离散化后的连续变量进行取整处理:有些连续参数在变换后需要离散值         for i, n in enumerate(self.space.numeric_names):             if self.space.paras[n].is_discrete_after_transform:                 # 如果参数在变换后应该是离散的,对采样值进行四舍五入                 x[:, i] = x[:, i].round()                          # 将数值转换回原始参数空间:将标准化后的数值转换为实际参数值         # 包括连续变量的缩放和离散变量的解码         df_samp = self.space.inverse_transform(x, xe)                  # 如果有需要固定的输入参数,覆盖采样中的对应参数值         if fix_input is not None:             for k, v in fix_input.items():                 # 将指定参数的所有采样点设置为固定值                 df_samp[k] = v                          return df_samp      def suggest(self, n_suggestions=1, fix_input = None, rf_with_thres = None):         """         生成建议点的主方法:根据历史数据选择下一个评估点                  参数:         n_suggestions: 需要生成的建议点数量         fix_input: 需要固定的输入参数字典,在优化过程中保持这些参数不变         rf_with_thres: 随机森林模型及阈值元组,用于评估隐藏约束                  返回:         rec_selected: DataFrame,包含选择的n_suggestions个建议点         """                  # 检查并行优化支持:只有MACEConstr采集函数支持并行建议         if self.acq_cls != MACEConstr and n_suggestions != 1:             raise RuntimeError('Parallel optimization is supported only for MACE acquisition')                      # 初始随机采样阶段:当历史数据量小于随机采样阈值时         if self.X.shape[0] < self.rand_sample:             # 使用准随机采样生成初始点,建立初始数据集             sample = self.quasi_sample(n_suggestions, fix_input)                          # 如果有硬约束,确保采样点满足所有硬约束条件             if self.num_hard_constr:                 sample = ensure_hard_constr(sample, max_sequence_length=2**self.max_sequence_length)                              return sample  # 返回随机采样点                      else:             # 贝叶斯优化阶段:当有足够历史数据时,使用模型指导的智能采样                          # 转换输入数据:将原始参数转换为模型可处理的数值格式             # X: 连续变量数值,Xe: 离散变量编码             X, Xe = self.space.transform(self.X)              # 处理目标函数数据:使用幂变换增强模型拟合效果             # 幂变换可以使数据更接近正态分布,提高高斯过程模型性能             y_obj = self.y[:,0:1]  # 提取目标值(第一列)             y = enable_power_transform(y_obj)  # 应用幂变换              # 处理约束数据:如果有模型约束,同样进行变换处理             y_constr = self.y[:, 1:]  # 提取约束值(第二列及以后)             y_thres = None         # 初始化约束阈值变量                           if y_constr.shape[1] > 0:                 # 将约束阈值与历史约束数据拼接,一起进行幂变换                 # 这样可以确保约束值和阈值在相同的变换空间中                 y_constr_with_prefer = np.concatenate([y_constr, self.y_thres], axis = 0)                 y_constr_with_prefer = enable_power_transform(y_constr_with_prefer)                                  # 将变换后的目标值和约束值拼接为完整输出                 y = torch.cat([y, y_constr_with_prefer[:-1, ]], axis=1)                                  # 提取变换后的约束阈值(最后一行)                 y_thres = y_constr_with_prefer[-1: ]              # 创建并训练代理模型:使用指定的模型类型和配置             # 模型输入维度:连续变量数 + 离散变量数,输出维度:目标数 + 约束数             model = get_model(self.model_name, self.space.num_numeric, self.space.num_categorical, y.shape[1], **self.model_config)                          # 使用历史数据拟合模型,学习目标函数和约束的映射关系             model.fit(X, Xe, y)              # 获取当前最佳点的索引和具体参数配置             best_id = self.get_best_id(fix_input)             best_x  = self.X.iloc[[best_id]]  # 保持DataFrame结构,使用双括号              # 预测当前最佳点的目标值和不确定性(用于采集函数)             py_best, ps2_best = model.predict(*self.space.transform(best_x))             py_best = py_best[:,0].detach().numpy().squeeze()  # 提取目标值预测均值             ps_best = ps2_best[:,0].sqrt().detach().numpy().squeeze()  # 提取目标值预测标准差              # 计算自适应kappa参数(探索系数):随着迭代增加探索性             # kappa控制LCB中的探索程度,值越大越倾向于探索高不确定性区域             iter  = max(1, self.X.shape[0] // n_suggestions)  # 估算当前迭代次数             upsi  = 0.5    # 探索权重参数             delta = 0.01   # 置信水平参数             # 根据理论公式计算kappa,随迭代对数增长             kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(iter) + np.log(3 * np.pi**2 / (3 * delta))))                          # 创建采集函数实例,用于评估候选点的"好坏"             acq = self.acq_cls( model,                    # 训练好的代理模型                                 space= self.space,        # 搜索空间                                 num_model_constr = self.y.shape[1]-1,  # 模型约束数量                                 num_hard_constr = self.num_hard_constr,  # 硬约束数量                                 num_hidden_constr = self.num_hidden_constr,  # 隐藏约束数量                                 rf_with_thres = rf_with_thres,  # 随机森林约束模型                                 best_y = py_best,         # 当前最佳目标值预测                                 kappa = kappa,            # 探索系数                                 y_thres = y_thres,        # 变换后的约束阈值                                 max_sequence_length = self.max_sequence_length)  # 序列长度限制                          # 创建均值和标准差函数,用于后续的候选点选择策略             mu  = Mean(model.models[0])   # 均值预测函数             sig = Sigma(model.models[0], linear_a = -1.)  # 标准差预测函数,线性系数-1用于后续处理                          # 使用进化算法优化采集函数,寻找最有希望的候选点             # pop=100: 种群大小,iters=100: 进化迭代次数             opt = EvolutionOpt(self.space, acq, pop = 100, iters = 100, verbose = False, es=self.es)                          # 执行优化,从当前最佳点开始搜索,去除重复点             rec = opt.optimize(initial_suggest = best_x, fix_input = fix_input).drop_duplicates()                          # 检查候选点是否与历史观测重复,只保留唯一点             rec = rec[self.check_unique(rec)]              # 如果生成的建议点数量不足,使用随机采样补充             cnt = 0  # 补充尝试计数器             while rec.shape[0] < n_suggestions:                 # 生成随机采样点补充不足部分                 rand_rec = self.quasi_sample(n_suggestions - rec.shape[0], fix_input)                                  # 确保随机采样点也满足硬约束                 if self.num_hard_constr:                     rand_rec = ensure_hard_constr(rand_rec, max_sequence_length=2**self.max_sequence_length)                                      # 检查随机采样点的唯一性                 rand_rec = rand_rec[self.check_unique(rand_rec)]                                  # 合并到候选点集中                 rec      = pd.concat([rec, rand_rec], axis = 0, ignore_index = True)                 cnt +=  1  # 增加尝试计数                                  if cnt > 3:                     # 如果尝试3次后仍不足,可能设计空间很小,重复采样不可避免                     break                                  # 最终检查:如果候选点数量仍不足,强制补充随机采样             if rec.shape[0] < n_suggestions:                 rand_rec = self.quasi_sample(n_suggestions - rec.shape[0], fix_input)                 if self.num_hard_constr:                     rand_rec = ensure_hard_constr(rand_rec, max_sequence_length=2**self.max_sequence_length)                 rec      = pd.concat([rec, rand_rec], axis = 0, ignore_index = True)              # 从候选点集中选择最终的建议点             # 首先随机选择n_suggestions个点作为基础选择             select_id = np.random.choice(rec.shape[0], n_suggestions, replace = False).tolist()             x_guess   = []  # 初始化猜测点列表(未使用)                          # 在不计算梯度的情况下评估候选点质量             with torch.no_grad():                 # 预测所有候选点的目标值均值和标准差                 py_all       = mu(*self.space.transform(rec)).squeeze().numpy()  # 均值预测                 ps_all       = -1 * sig(*self.space.transform(rec)).squeeze().numpy()  # 标准差预测(取负)                                  # 找到预测最佳点(均值最小)和不确定性最大点(标准差最大)                 best_pred_id = np.argmin(py_all)  # 预测目标值最小的点索引                 best_unce_id = np.argmax(ps_all)  # 不确定性最大的点索引                                  # 确保选择集中包含重要点:不确定性最大点和预测最佳点                 if best_unce_id not in select_id and n_suggestions > 2:                     # 如果选择集中没有不确定性最大点且可替换,将其加入                     select_id[0]= best_unce_id                 if best_pred_id not in select_id and n_suggestions > 2:                     # 如果选择集中没有预测最佳点且可替换,将其加入                     select_id[1]= best_pred_id                                      # 从候选点集中提取最终选择的建议点                 rec_selected = rec.iloc[select_id].copy()  # 使用copy避免视图问题                              return rec_selected  # 返回最终选择的建议点 

GeneralBO

class GeneralBO(AbstractOptimizer):     """     通用的贝叶斯优化器,支持多目标优化和约束优化     可以处理多个目标函数、模型约束、硬约束和隐藏约束     """     def __init__(self,             space : DesignSpace,                    # 设计空间,定义优化参数的取值范围和类型             num_obj:            int   = 1,          # 目标函数的数量,默认为单目标优化             num_model_constr:   int   = 0,          # 模型约束的数量,通过高斯过程建模的约束             num_hard_constr:    int   = 0,          # 硬约束的数量,问题固有的不可违反约束             num_hidden_constr:   int  = 1,          # 隐藏约束的数量,通过随机森林等模型学习的约束             max_sequence_length: int  = 12,         # 最大序列长度,用于硬约束检查,默认2^12=4096             rand_sample:        int   = None,       # 随机采样阶段的数据量,None时使用默认值             model_name:         str   = 'multi_task', # 代理模型名称,默认多任务模型             model_config:       dict  = None,       # 模型配置参数字典             kappa:              float = 2.,         # 目标函数的探索系数,控制LCB中的探索程度             c_kappa:            float = 0.,         # 约束函数的探索系数,控制约束边界的探索             use_noise:          bool  = False,      # 是否在预测时添加噪声,增强鲁棒性             evo_pop:            int   = 100,        # 进化算法的种群大小             evo_iters:          int   = 200,        # 进化算法的迭代次数             ref_point:          np.ndarray = None,  # 多目标优化的参考点,用于计算超体积             scramble_seed:      Optional[int] = None # Sobol序列的随机种子,用于可重复的初始采样             ):         # 调用父类AbstractOptimizer的构造函数         super().__init__(space)                  # 初始化搜索空间和优化问题参数         self.space        = space                   # 设计空间对象         self.num_obj      = num_obj                 # 目标函数数量         self.num_model_constr   = num_model_constr  # 模型约束数量         self.num_hard_constr    = num_hard_constr   # 硬约束数量         self.num_hidden_constr  = num_hidden_constr # 隐藏约束数量         self.max_sequence_length = max_sequence_length # 最大序列长度限制                  # 设置随机采样阶段的数据量:默认值为参数维度+1,确保至少有2次采样         self.rand_sample  = 1 + self.space.num_paras if rand_sample is None else rand_sample                  # 模型相关参数         self.model_name   = model_name              # 代理模型类型         self.model_config = model_config if model_config is not None else {}  # 模型配置,空字典为默认配置                  # 数据存储:X存储历史参数配置,y存储对应的目标值和约束值         self.X            = pd.DataFrame(columns = self.space.para_names)  # 空的DataFrame,列名为参数名         self.y            = np.zeros((0, num_obj + num_model_constr))       # 空的numpy数组,列数为目标+约束                  # 采集函数参数         self.kappa        = kappa                  # 目标函数的探索系数         self.c_kappa      = c_kappa                # 约束函数的探索系数         self.use_noise    = use_noise              # 是否使用噪声                  # 优化过程参数         self.model        = None                   # 代理模型实例,初始为None         self.evo_pop      = evo_pop                # 进化算法种群大小         self.evo_iters    = evo_iters              # 进化算法迭代次数         self.iter         = 0                      # 优化迭代计数器                  # 多目标优化参数         self.ref_point    = ref_point              # 参考点,用于多目标优化的超体积计算                  # 初始化Sobol序列生成器,用于准随机初始采样         # scramble=True启用序列打乱,避免低维度相关性         self.sobol       = SobolEngine(self.space.num_paras, scramble = True, seed = scramble_seed)                  # 验证模型是否支持多输出:如果目标+约束总数>1,模型必须支持多输出         if num_obj + num_model_constr > 1:             assert get_model_class(model_name).support_multi_output      def quasi_sample(self, n, fix_input = None):         """         准随机采样方法:使用Sobol序列在搜索空间内生成均匀分布的采样点                  参数:         n: 需要生成的采样点数量         fix_input: 需要固定的输入参数字典,{参数名: 固定值}                  返回:         df_samp: DataFrame,包含n个采样点的参数配置         """         # 从Sobol序列生成n个点在[0,1]^d单位超立方体内         samp    = self.sobol.draw(n)                  # 将[0,1]区间的采样映射到实际参数空间:x_actual = lb + (ub-lb) * x_normalized         samp    = samp * (self.space.opt_ub - self.space.opt_lb) + self.space.opt_lb                  # 分离连续变量和离散变量:前num_numeric列是连续变量,后面是离散变量         x       = samp[:, :self.space.num_numeric]      # 连续变量部分         xe      = samp[:, self.space.num_numeric:]      # 离散变量部分(已编码)                  # 对离散化后的连续变量进行取整处理         for i, n in enumerate(self.space.numeric_names):             if self.space.paras[n].is_discrete_after_transform:                 # 如果参数在变换后应该是离散的,对采样值进行四舍五入                 x[:, i] = x[:, i].round()                          # 将数值转换回原始参数空间:将标准化后的数值转换为实际参数值         df_samp = self.space.inverse_transform(x, xe)                  # 如果有需要固定的输入参数,覆盖采样中的对应参数值         if fix_input is not None:             for k, v in fix_input.items():                 df_samp[k] = v                          return df_samp      def suggest(self, n_suggestions = 1, fix_input = None, rf_with_thres = None):         """         生成建议点的主方法:根据历史数据选择下一个评估点                  参数:         n_suggestions: 需要生成的建议点数量         fix_input: 需要固定的输入参数字典         rf_with_thres: 随机森林模型及阈值元组,用于评估隐藏约束                  返回:         建议点DataFrame,包含n_suggestions个参数配置         """         # 增加迭代计数器         self.iter += 1                  # 初始随机采样阶段:当历史数据量小于随机采样阈值时         if self.X.shape[0] < self.rand_sample:             # 使用准随机采样生成初始点,建立初始数据集             sample = self.quasi_sample(n_suggestions, fix_input)                          # 如果有硬约束,确保采样点满足所有硬约束条件             if self.num_hard_constr:                 sample = ensure_hard_constr(sample, max_sequence_length=2**self.max_sequence_length)             return sample                      else:             # 贝叶斯优化阶段:当有足够历史数据时,使用模型指导的智能采样                          # 转换输入数据:将原始参数转换为模型可处理的数值格式             X, Xe = self.space.transform(self.X)                          # 将目标值和约束值转换为PyTorch张量             y = torch.FloatTensor(self.y)                          # 获取离散变量的类别数量(用于模型初始化)             num_uniqs = None             if Xe.shape[1] > 0:  # 如果有离散变量                 num_uniqs = [len(self.space.paras[name].categories) for name in self.space.enum_names]                          # 创建并训练代理模型             self.model = get_model(self.model_name, X.shape[1], Xe.shape[1], y.shape[1],                                   num_uniqs=num_uniqs, **self.model_config)             self.model.fit(X, Xe, y)  # 使用历史数据训练模型                          # 设置探索系数:如果未提供则根据理论公式计算自适应值             kappa = self.kappa             c_kappa = self.c_kappa             upsi = 0.1     # 探索权重参数             delta = 0.01   # 置信水平参数                          if kappa is None:                 # 自适应kappa计算:随迭代次数对数增长,增加探索性                 kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(self.iter) + np.log(3 * np.pi**2 / (3 * delta))))             if c_kappa is None:                 # 自适应c_kappa计算:约束探索系数的类似公式                 c_kappa = np.sqrt(upsi * 2 * ((2.0 + self.X.shape[1] / 2.0) * np.log(self.iter) + np.log(3 * np.pi**2 / (3 * delta))))                          # 创建通用采集函数,处理多目标和约束优化             acq = GeneralAcq(                   self.model,                    # 训练好的代理模型                   self.num_obj,                  # 目标函数数量                   self.num_model_constr,         # 模型约束数量                   num_hard_constr = self.num_hard_constr,     # 硬约束数量                   num_hidden_constr = self.num_hidden_constr, # 隐藏约束数量                   max_sequence_length = self.max_sequence_length, # 序列长度限制                   rf_with_thres = rf_with_thres, # 随机森林约束模型                   space = self.space,            # 搜索空间                   kappa = kappa,                 # 目标探索系数                   c_kappa = c_kappa,             # 约束探索系数                   use_noise = self.use_noise)    # 是否使用噪声                          # 使用进化算法优化采集函数,寻找最有希望的候选点             opt = EvolutionOpt(self.space, acq, pop=self.evo_pop, iters=self.evo_iters)             suggest = opt.optimize()  # 执行优化,返回候选点                          # 如果进化算法找到的候选点数量不足,使用随机采样补充             if suggest.shape[0] < n_suggestions:                 # 生成随机采样点补充不足部分                 rand_samp = self.quasi_sample(n_suggestions - suggest.shape[0], fix_input)                                  # 如果有硬约束,确保随机采样点也满足约束                 if self.num_hard_constr:                     print('GeneralBO before ensure:')                     print(rand_samp)                                          # 应用硬约束确保                     rand_samp = ensure_hard_constr(rand_samp, max_sequence_length=2**self.max_sequence_length)                      print('GeneralBO after ensure:')                     print(rand_samp)                  # 合并进化算法结果和随机采样结果                 suggest = pd.concat([suggest, rand_samp], axis=0, ignore_index=True)                 return suggest                              # 如果没有提供参考点(单目标或简单的多目标选择策略)             elif self.ref_point is None:                 # 在不计算梯度的情况下评估候选点                 with torch.no_grad():                     # 预测所有候选点的均值和方差                     py, ps2 = self.model.predict(*self.space.transform(suggest))                     # 找到不确定性最大的点(各目标对数方差之和最大)                     largest_uncert_id = np.argmax(np.log(ps2).sum(axis=1))                                  # 随机选择n_suggestions个候选点                 select_id = np.random.choice(suggest.shape[0], n_suggestions, replace=False).tolist()                                  # 确保选择集中包含不确定性最大的点(促进探索)                 if largest_uncert_id not in select_id:                     select_id[0] = largest_uncert_id                                      return suggest.iloc[select_id]  # 返回选择的建议点                              else:                 # 多目标优化且有参考点的情况:使用EHVI(期望超体积改进)进行选择                 assert self.num_obj > 1          # 必须是多目标问题                 assert self.num_model_constr == 0  # EHVI目前不支持约束                                  n_mc = 10  # Monte Carlo采样次数                 # 初始化超体积计算器                 hv = HV(ref_point=self.ref_point.reshape(-1))                                  with torch.no_grad():                     # 预测候选点的均值和方差                     py, ps2 = self.model.predict(*self.space.transform(suggest))                     # 从后验分布中采样,获得目标函数的蒙特卡洛样本                     y_samp = self.model.sample_y(*self.space.transform(suggest), n_mc).numpy()                                  # 获取当前的Pareto前沿                 y_curr = self.get_pf(self.y).copy()                 select_id = []  # 存储选择的点索引                                  # 依次选择n_suggestions个点                 for i in range(n_suggestions):                     ehvi_lst = []  # 存储每个候选点的期望超体积改进                     base_hv = hv.do(y_curr)  # 计算当前Pareto前沿的超体积                                          # 对每个候选点计算EHVI                     for j in range(suggest.shape[0]):                         samp = y_samp[:, j]  # 候选点j的蒙特卡洛样本                         hvi_est = 0  # 超体积改进估计值                                                  # 对每个蒙特卡洛样本计算超体积改进                         for k in range(n_mc):                             # 将当前样本添加到Pareto前沿中                             y_tmp = np.vstack([y_curr, samp[[k]]])                             # 计算新的超体积                             hvi_est += hv.do(y_tmp) - base_hv  # 超体积改进                                                  hvi_est /= n_mc  # 取平均得到期望超体积改进                         ehvi_lst.append(hvi_est)                                          # 选择EHVI最大的点,如果没有正改进则随机选择                     best_id = np.argmax(ehvi_lst) if max(ehvi_lst) > 0 else np.random.choice(suggest.shape[0])                                          # 将选择点的最优样本(各目标最小值)添加到当前Pareto前沿                     y_curr = np.vstack([y_curr, y_samp[:, best_id].min(axis=0, keepdims=True)])                     select_id.append(best_id)  # 记录选择的点索引                  # 确保选择的点不重复,如果重复则用其他点补充                 select_id = list(set(select_id))  # 去重                 if len(select_id) < n_suggestions:                     # 找出未被选择的候选点                     candidate_id = [i for i in range(suggest.shape[0]) if i not in select_id]                     # 随机选择补充点                     select_id += np.random.choice(candidate_id, n_suggestions - len(select_id), replace=False).tolist()                                  # 返回最终选择的建议点                 return suggest.iloc[select_id] 

区别

HEBOConstr:默认使用 MACEConstr(LCB + EI + PI),将三种采集函数组合为多目标优化,使用 Power Transform(Box-Cox 或 Yeo-Johnson)对目标函数进行变换,用 Mean 和 Sigma 选择建议点,优先选择预测值最小和不确定性最大的点
GeneralBO:使用 GeneralAcq(LCB),每个目标都使用 LCB,直接使用原始观测值,不做变换,若提供 ref_point,使用 EHVI(Expected Hypervolume Improvement)选择,否则基于不确定性选择

进化算法选择最优配置

class EvolutionOpt:     """     进化算法优化器:使用进化算法优化采集函数,寻找最优候选点     支持单目标和多目标优化,处理混合变量(连续和离散)优化问题     """     def __init__(self,             design_space : DesignSpace,  # 设计空间,定义优化参数的取值范围和类型             acq          : Acquisition,  # 采集函数实例,用于评估候选点的质量             es           : str = None,   # 进化算法类型,None时自动选择             **conf):                     # 其他配置参数         # 初始化基本参数         self.space      = design_space   # 设计空间对象         self.es         = es             # 进化算法类型         self.acq        = acq            # 采集函数实例         self.pop        = conf.get('pop', 100)        # 种群大小,默认100         self.iter       = conf.get('iters',500)       # 进化迭代次数,默认500         self.verbose    = conf.get('verbose', False)  # 是否输出详细信息,默认False         self.repair     = conf.get('repair', None)    # 修复算子,用于处理不可行解         self.sobol_init = conf.get('sobol_init', True) # 是否使用Sobol序列初始化种群,默认True                  # 验证采集函数至少有一个目标         assert(self.acq.num_obj > 0)          # 如果没有指定进化算法类型,根据目标数量自动选择         if self.es is None:             # 单目标使用遗传算法(GA),多目标使用NSGA-II             self.es = 'nsga2' if self.acq.num_obj > 1 else 'ga'       def optimize(self, initial_suggest : pd.DataFrame = None, fix_input : dict = None, return_pop = False) -> pd.DataFrame:         """         执行进化算法优化过程                  参数:         initial_suggest: 初始建议点,用于引导搜索方向         fix_input: 需要固定的输入参数字典         return_pop: 是否返回整个种群而不仅仅是最优解                  返回:         df_opt: DataFrame,包含优化得到的候选点参数配置         """         # 获取设计空间的上下界(转换为numpy数组)         lb        = self.space.opt_lb.numpy()  # 参数下界         ub        = self.space.opt_ub.numpy()  # 参数上界                  # 创建贝叶斯优化问题实例,将采集函数包装成优化问题         prob      = BOProblem(self.acq, self.space, fix_input)                  # 初始化种群:使用Sobol序列或随机采样生成初始种群         init_pop  = get_init_pop(self.space, self.pop, initial_suggest, self.sobol_init)                  # 根据目标数量选择不同的进化算法         if self.acq.num_obj == 1:             # 单目标优化:使用混合变量遗传算法             algo = MixedVariableGA(                 pop_size = self.pop,      # 种群大小                 repair = self.repair,     # 修复算子                 sampling = init_pop       # 初始种群             )         else:             # 多目标优化:使用NSGA-II算法             algo = NSGA2(                 pop_size = self.pop,      # 种群大小                 sampling = init_pop,      # 初始种群                 mating   = MixedVariableMating(eliminate_duplicates = MixedVariableDuplicateElimination()),  # 混合变量交配算子                 eliminate_duplicates = MixedVariableDuplicateElimination()  # 重复个体消除             )                  # 执行进化算法优化过程         # ('n_gen', self.iter) 表示运行self.iter代         res = minimize(prob, algo, ('n_gen', self.iter), verbose = self.verbose)                  # 处理优化结果         if res.X is not None and not return_pop:             # 如果找到了最优解且不需要返回整个种群             x = res.X  # 最优解                          # 处理不同格式的解             if isinstance(x, dict):                 # 如果解是字典格式,转换为列表                 x = [x]             if isinstance(x, np.ndarray):                 # 如果解是numpy数组,转换为列表                 x = x.tolist()                          # 提取参数值并转换为DataFrame             opt_x = pd.DataFrame(x)[self.space.para_names].values.astype(float)         else:             # 返回整个种群或当没有明确最优解时             # 从种群中提取所有个体的参数             opt_x = pd.DataFrame([p.X for p in res.pop])[self.space.para_names].values.astype(float)                          # 如果是单目标优化且不需要返回整个种群,随机选择一个个体             if self.acq.num_obj == 1 and not return_pop:                 opt_x = opt_x[[np.random.choice(opt_x.shape[0])]]                  # 保存优化结果(用于调试和分析)         self.res  = res                  # 分离连续变量和离散变量         opt_xcont = torch.from_numpy(opt_x[:, :self.space.num_numeric])  # 连续变量部分         opt_xenum = torch.from_numpy(opt_x[:, self.space.num_numeric:])  # 离散变量部分                  # 将优化结果转换回原始参数空间         df_opt    = self.space.inverse_transform(opt_xcont, opt_xenum)                  # 如果有需要固定的输入参数,覆盖优化结果中的对应参数         if fix_input is not None:             for k, v in fix_input.items():                 df_opt[k] = v                          return df_opt  # 返回最终的优化结果 

输入:采集函数(定义了一个复杂的优化景观)
输出:在采集函数上得分最高的参数配置

发表评论

评论已关闭。

相关文章

当前内容话题