rllm中的推理流程

打印一条推理路径

上文中,我们跑通了rllm框架,下面,让我们仔细分析一下examples/math_tool/run_math_with_tool.py中的内部过程。
run_math_with_tool.py的大致代码如下:

	agent_args = {"tools": ["python"], "parser_name": "qwen", "system_prompt": "You are a math assistant that can write python to solve math problems."}  	env_args = { 		"tools": ["python"], 		"reward_fn": math_reward_fn, 	} 	     engine = AgentExecutionEngine(         agent_class=ToolAgent,         agent_args=agent_args,         env_class=ToolEnvironment,         env_args=env_args,         engine_name="openai",         rollout_engine_args={"base_url": "http://localhost:30000/v1", "api_key": "None"},         tokenizer=tokenizer,         sampling_params=sampling_params,         max_response_length=16384,         max_prompt_length=2048,         n_parallel_agents=n_parallel_agents,     )      test_dataset = DatasetRegistry.load_dataset("aime2024", "test")     ...     tasks = test_dataset.repeat(n=8)  # repeat to evaluate pass@k 	...     results = asyncio.run(engine.execute_tasks(tasks[:5])) # 只跑前10条 

我们打印出一条推理路径看看效果

first_traj = results[0]  print("n======= 示例轨迹 =======")  print("问题:", first_traj.task)  for i, step in enumerate(first_traj.steps): 	print(f"n--- Step {i} ---") 	print("Observation:", step.observation) 	print("Model response:", step.model_response) 	print("Action:", step.action) 	print("Reward:", step.reward) 	print("Done:", step.done) 	 print("======================n") 

打印出来的结果为(一共有5步,第0步为LLM接受问题;第5步为LLM输出答案,中间步骤都是根据工具调用结果生成推理的过程。Observation是模型接受到的信息,包括问题,工具调用结果等;Action是模型产生的动作,包括工具调用,最终回复等)

问题: {'id': 60, 'problem': '...', 'answer': '204', 'url': '...', 'year': '2024', 'question': 'Every morning Aya goes for a $9$-kilometer-long walk and stops at a coffee shop afterwards. When she walks at a constant speed of $s$ kilometers per hour, the walk takes her 4 hours, including $t$ minutes spent in the coffee shop. When she walks $s+2$ kilometers per hour, the walk takes her 2 hours and 24 minutes, including $t$ minutes spent in the coffee shop. Suppose Aya walks at $s+\frac{1}{2}$ kilometers per hour. Find the number of minutes the walk takes her, including the $t$ minutes spent in the coffee shop.', 'ground_truth': '204', 'data_source': 'math'}  --- Step 0 --- Observation: {'id': 60, 'problem': '...', 'answer': '204', 'url': '...', 'year': '2024', 'question': 'Every morning Aya goes for a $9$-kilometer-long walk and stops at a coffee shop afterwards. When she walks at a constant speed of $s$ kilometers per hour, the walk takes her 4 hours, including $t$ minutes spent in the coffee shop. When she walks $s+2$ kilometers per hour, the walk takes her 2 hours and 24 minutes, including $t$ minutes spent in the coffee shop. Suppose Aya walks at $s+\frac{1}{2}$ kilometers per hour. Find the number of minutes the walk takes her, including the $t$ minutes spent in the coffee shop.', 'ground_truth': '204', 'data_source': 'math'}  Model response:  .... <tool_call> {"name": "python", "arguments": {"code": "import mathnna = 1nb = 2nc = -11.25nndiscriminant = b**2 - 4*a*cnsqrt_discriminant = math.sqrt(discriminant)ns1 = (-b + sqrt_discriminant) / (2*a)ns2 = (-b - sqrt_discriminant) / (2*a)nnprint(s1, s2)"}} </tool_call> Action: [{'id': '5c7285c2-d967-4e60-a228-7947d8c87524', 'type': 'function', 'function': {'name': 'python', 'arguments': '{"code": "import math\n\na = 1\nb = 2\nc = -11.25\n\ndiscriminant = b**2 - 4*a*c\nsqrt_discriminant = math.sqrt(discriminant)\ns1 = (-b + sqrt_discriminant) / (2*a)\ns2 = (-b - sqrt_discriminant) / (2*a)\n\nprint(s1, s2)"}'}}] Reward: 0 Done: False  --- Step 1 --- Observation: {'tool_outputs': {'5c7285c2-d967-4e60-a228-7947d8c87524': '2.5 -4.5n'}} Model response:  ... <tool_call> {"name": "python", "arguments": {"code": "s = 2.5nn# Calculate t using equation (1)nt = (4 - (9 / s)) * 60nprint(t)"}} </tool_call> Action: [{'id': '6e6fc722-29c4-4806-917a-933e7ff87d1b', 'type': 'function', 'function': {'name': 'python', 'arguments': '{"code": "s = 2.5\n\n# Calculate t using equation (1)\nt = (4 - (9 / s)) * 60\nprint(t)"}'}}] Reward: 0 Done: False  --- Step 2 --- ... --- Step 3 --- ...  --- Step 4 --- Observation: {'tool_outputs': {'dc3840b8-3d86-477e-b51e-32a26ad668d4': 's = 2.5 km/hnt = 23.999999999999993 minutesnTotal time for third scenario: 204.0 minutesn'}} Model response:  ... ### Final Answer: $$ boxed{204} $$ Action: [{'id': 'b35c758b-ce5d-4f8b-a412-5b7fd86cf383', 'type': 'function', 'function': {'name': 'finish', 'arguments': {'response': "<think>nOkay, let me check the output again. The code output shows that s is 2.5 km/h, t is approximately 24 minutes (due to floating-point precision), and the total time for the third scenario is exactly 204 minutes. That matches our manual calculations. So the answer is 204 minutes.nnThe slight discrepancy in t is because of how floating-point numbers are handled in Python, but since t was originally calculated as 24 minutes, the total time comes out perfectly as 204 minutes when converted. Therefore, the final answer should be 204 minutes.n</think>nnThe number of minutes the walk takes Aya, including the $ t $ minutes spent in the coffee shop, when she walks at $ s + \frac{1}{2} $ kilometers per hour, is **204 minutes**.nn### Step-by-Step Explanation:n1. **Define Variables:**n   - Let $ s $ be Aya's walking speed (km/h).n   - Let $ t $ be the time spent in the coffee shop (minutes).nn2. **Set Up Equations:**n   - **First Scenario:**  n     Walking time: $ \frac{9}{s} $ hours.  n     Total time: $ \frac{9}{s} + \frac{t}{60} = 4 $ hours.  n     Equation: $ \frac{9}{s} + \frac{t}{60} = 4 $.nn   - **Second Scenario:**  n     Walking time: $ \frac{9}{s+2} $ hours.  n     Total time: $ \frac{9}{s+2} + \frac{t}{60} = 2.4 $ hours.  n     Equation: $ \frac{9}{s+2} + \frac{t}{60} = 2.4 $.nn3. **Solve for $ s $ and $ t $:**n   - Subtract the second equation from the first:  n     $ \frac{9}{s} - \frac{9}{s+2} = 1.6 $.  n     Simplify: $ \frac{18}{s(s+2)} = 1.6 $.  n     Solve: $ s^2 + 2s - 11.25 = 0 $.  n     Roots: $ s = 2.5 $ km/h (discarding the negative root).nn   - Substitute $ s = 2.5 $ into the first equation:  n     $ \frac{9}{2.5} + \frac{t}{60} = 4 $.  n     Solve: $ t = 24 $ minutes.nn4. **Third Scenario:**n   - Speed: $ s + 0.5 = 3 $ km/h.  n   - Walking time: $ \frac{9}{3} = 3 $ hours.  n   - Total time: $ 3 + \frac{24}{60} = 3.4 $ hours = $ 204 $ minutes.nn### Final Answer:n$$n\boxed{204}n$$"}}}] Reward: 1.0 Done: True ====================== 

由此,我们可以分析出来rllm中Agent 工具调用的流程:

  1. agent观察到问题后,思考并进行function call
  2. rllm框架识别到工具调用操作后,执行工具,并返回结果
  3. Agent根据工具返回的结果继续分析。

此外,在正式讲解代码之前,还要明确几个术语:

  1. 环境:负责将问题传递给Agent+执行工具
  2. 观察:告诉Agent当前时刻的信息(包括接受到的问题,工具执行结果等)
  3. 动作:Agent给环境的指令,也就是Agent生成的工具调用的参数
  4. 奖励:这一步表现的好不好

举个例子,Agent调用代码工具,首先要从环境中接受到用户问题,然后Agent从环境中接受(观察)到问题,生成思考,思考后生成代码工具的调用参数(<tool_call></tool_call>中包裹的内容,也就是Agent的动作)。然后在环境中执行Agent生成的代码,将执行结果返回给Agent,Agent观察到结果后,继续进行分析。

下面,我们对环境,和环境交互的Agent,以及奖励进行分析。至于AgentExecutionEngine本身,则是起到了统一协调的作用。

环境

定义在rllm.environments.tools.tool_env中,用于接受用户输入和执行工具调用。

主要代码如下:

class ToolEnvironment(BaseEnv): 	def step(self, action: list[dict] | str | dict): 		""" 		Take a step in the environment based on the action. 		Args: 			actions: List containing a single action string from the agent 	 		Returns: 			next_observations, rewards, terminateds, infos 		"""  		# 检查action中是否有finish字段(如果当前找不到任何工具调用的动作,那么Agent就会执行finish动作,并传入到环境中),如果有,代表回答完成 		if isinstance(action, list) and action: 			for tool_call in action: 				if tool_call.get("function", {}).get("name") == "finish": 					done = True 					break 		 		# 如果回答完成,那么提取llm的回答,并且计算奖励 		if done: 			# 提取llm的回答 			if isinstance(action, str): 				llm_response = action 			elif isinstance(action, list): 				... 	 			# 根据问题,真实值和llm的回答计算奖励 			task_info = self.task if self.task is not None else {} 			reward_output = self.reward_fn(task_info=task_info, action=llm_response) 			return {}, reward_output.reward, done, {"response": action, "metadata": reward_output.metadata, "is_correct": reward_output.is_correct} 	 		# 如果回答没有完成,那么执行工具并返回工具执行结果 		tool_calls = action 		tool_outputs = self._execute_tool_calls(tool_calls) # 执行工具是,会调用工具类的call方法(一般定义在rllm/tools 文件夹中) 		next_obs = {"tool_outputs": tool_outputs} 		# Return results as lists with single items to maintain batch structure 		return next_obs, reward, done, {"response": action, "metadata": {}} 

Agent

Agent主要用来维护一个消息队列,其中内容包括系统提示词,用户输入,模型回复以及工具调用

[ 	{"role": "system", "content": ""}, 	{"role": "user", "content": ""}, 	{"role": "assistant", "content": ""}, 	{"role": "tool", "content": "","tool_call_id": ""} 	.... 	.... ] 
class ToolAgent(BaseAgent):  	def _format_observation_as_messages(self, obs: Any) -> list[dict]: 		"""格式化从环境中接收到的观察""" 		messages = [] 		 		if isinstance(obs, dict): 			# 如果有question字段,代表是用户传入的,将role设为user,加入到历史消息中 			if "question" in obs: 				messages.append({"role": "user", "content": obs["question"]}) 			# 如果有tool_outputs字段,代表是工具返回结果,将role设为tool,加入到历史消息中 			elif "tool_outputs" in obs: 				# Format tool outputs from environment observation 				for tool_call_id, tool_output_str in obs["tool_outputs"].items(): 					messages.append( 						{ 						"role": "tool", 						"content": tool_output_str, 						"tool_call_id": tool_call_id, 						}) 		elif isinstance(obs, str): 			messages.append({"role": "user", "content": obs}) 		elif obs: 			messages.append({"role": "user", "content": str(obs)}) 		return messages  	def update_from_env(self, observation: Any, reward: float, done: bool, info: dict, **kwargs): 		""" 		将环境中获取到的观察加入到消息队列中 		""" 		obs_messages = self._format_observation_as_messages(observation) 		 		self.messages.extend(obs_messages)	  	def update_from_model(self, response: str, **kwargs) -> Action: 		""" 		从response中解析模型生成的工具调用参数 		""" 		tool_calls_dict = [] 		assistant_content = response 		# 从模型响应中解析回答 		try: 			tool_calls = self.tool_parser.parse(response) 			tool_calls_dict = [ 				{ 					"id": str(uuid.uuid4()), 					"type": "function", 					"function": tool_call.to_dict(), 				} 				for tool_call in tool_calls 			]  		# 将模型的完整响应加入到消息队列中 		assistant_message = {"role": "assistant", "content": assistant_content} 		 		if len(tool_calls_dict) > 0: 			# 进行简单的格式转换 			... 			 		# 如果没有工具调用,那么将当前的动作设置为finish 		else: 			tool_calls_dict = [ 				{ 					"id": str(uuid.uuid4()), 					"type": "function", 					"function": { 						"name": "finish", 						"arguments": { 							"response": assistant_content, 						}, 					}, 				} 			] 		# 将模型的响应加入到消息队列中 		self.messages.append(assistant_message) 		return Action(action=tool_calls_dict) 		 	def reset(self): 		"""初始化(设置system prompt)"""	 		self.messages = [{"role": "system", "content": self.system_prompt + self.tools_prompt}] 

Agent执行引擎

代码在rllm/engine/agent_execution_engine.py中(为了简化起见,这里面移除了很多并行和状态维护的代码)。

可以看到,Agent执行引擎用于协调Agent和环境,实现了ReAct的推理模式。

class AgentExecutionEngine: 	async def run_agent_trajectory_async(self, idx, application_id, seed=0, mode="Text", **kwargs): 		"""执行Agent推理的代码""" 		# 初始化 		env.reset() 		agent.reset() 		 		for step_idx in range(self.max_steps): 			# 拿到prompt 			prompt_messages = agent.chat_completions.copy() 			# 得到response 			response = self.get_model_response(prompt_messages, application_id, **kwargs) 			# 从response中解析出动作 			action: Action = agent.update_from_model(response) 			action = action.action 			# 执行动作 			env.step(action) 			# Agent更新 			agent.update_from_env(...) 			# 执行完成后跳出循环 			if done: 				break 

奖励函数

奖励函数定义在rllm/rewards/math_reward.py中,这里只使用了正确性奖励,主要代码如下:

class RewardMathFn:   def __call__(self, task_info: dict, action: str) -> RewardOutput:  	model_response = action 	 	# 剔除<think></think>标签里面的内容 	if THOUGHT_DELIMITER_END in model_response: 		model_solution = model_response.split(THOUGHT_DELIMITER_END)[1] 	else: 		model_solution = model_response  	# 提取模型的回答(一般都包裹在box{}中) 	model_answer = extract_answer(model_solution) 	 	# 获取真实标签 	ground_truths = task_info.get("ground_truth", None) 	# 从真实标签中的boxed字段里提取答案 	processed_ground_truths = [] 	for truth in ground_truths: 		truth = str(truth) 		if "\boxed" in truth: 			processed_truth = extract_answer(truth) 			if processed_truth is not None: 				processed_ground_truths.append(processed_truth) 		else: 			processed_ground_truths.append(truth)  	# 设置正确性奖励 	for ground_truth in processed_ground_truths: 		# 模型回答是否正确? 		is_correct = grade_answer_mathd(model_answer, ground_truth) or grade_answer_sympy(model_answer, ground_truth) 		if is_correct: 			# 设置正确性奖励 			reward = self.config.correct_reward 			return RewardOutput(reward=reward, is_correct=True) 			 	# 模型回答错误 	return RewardOutput(reward=self.config.incorrect_reward, is_correct=False) 

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章