Observation Disruptor#
Observation-disruptor: An agent’s observations may not perfectly reflect the true status of the environment due to factors like sensor noise and time delays.
To model this sensing inaccuracy, we introduce an additional module—the observation-disruptor—which determines the agent’s observations from the environment:
- Agents’ observed state \(\tilde{s}_t\)
The observation-disruptor takes the true current state \(s_t\) as input and outputs a perturbed state \(\tilde{s}_t = D_s(s_t)\). The agent uses \(\tilde{s}_t\) as input to its policy to select an action.
- Agents’ observed reward \(\tilde{r}_t\)
The observation-disruptor takes the real immediate reward \(r_t\) as input and outputs a perturbed reward \(\tilde{r}_t = D_r(r_t)\). The agent observes \(\tilde{r}_t\) and updates its policy accordingly.
if args.noise_factor == "state":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
observation = self._get_obs() + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
observation = self._get_obs() + args.noise_shift
elif args.noise_type =="uniform":
observation = observation + random.uniform(args.uniform_low, args.uniform_high)
else:
observation = self._get_obs()
else:
observation = self._get_obs()
if args.llm_guide == "adversary":
self.llm_disturb_iteration += 1
if args.llm_guide_type == "stochastic":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
prompt = "This is about a robust reinforcement learning setting; we want you as an adversary policy. If the current reward exceeds the previous reward value, please input some observation noises to disturb the environment and improve the learning algorithm's robustness. " \
"the current reward:" + str(reward) + ", the previous reward is" + str(self.previous_reward) \
+ "please slightly revise the current environment state values:" + str(
observation) + ", just output the revised state with its original format" \
"do not output any other things."
prompt_state = gpt_call(prompt)
observation = prompt_state
elif args.llm_guide_type == "uniform":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
prompt = "This is about a robust reinforcement learning setting; we want you as an adversary policy. If the current reward exceeds the previous reward value, please input some observation noises to disturb the environment and improve the learning algorithm's robustness. " \
"The noises should subject the uniform distribution:" +str((args.uniform_low, args.uniform_high))+ ", the current reward:" + str(reward) + ", the previous reward is" + str(self.previous_reward) \
+ "please slightly revise the current environment state values:" + str(
observation) + ", just output the revised state with its original format" \
"do not output any other things."
prompt_state = gpt_call(prompt)
observation = prompt_state
elif args.llm_guide_type == "constraint":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
prompt = "This is about a robust reinforcement learning setting; we want you as an adversary policy. If the current reward exceeds the previous reward value, please input some observation noises to disturb the environment and improve the learning algorithm's robustness. " \
"The noises should should be in this area:" +str((args.uniform_low, args.uniform_high))+ ", the current reward:" + str(reward) + ", the previous reward is" + str(self.previous_reward) \
+ "please slightly revise the current environment state values:" + str(
observation) + ", just output the revised state with its original format" \
"do not output any other things."
prompt_state = gpt_call(prompt)
observation = prompt_state
def step(self, robust_input):
action = robust_input["action"]
args = robust_input["robust_config"]
mu = args.noise_mu
sigma = args.noise_sigma
xy_position_before = self.get_body_com("torso")[:2].copy()
self.do_simulation(action, self.frame_skip)
xy_position_after = self.get_body_com("torso")[:2].copy()
xy_velocity = (xy_position_after - xy_position_before) / self.dt
x_velocity, y_velocity = xy_velocity
forward_reward = x_velocity
healthy_reward = self.healthy_reward
rewards = forward_reward + healthy_reward
costs = ctrl_cost = self.control_cost(action)
terminated = self.terminated
if args.noise_factor == "state":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
observation = self._get_obs() + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
observation = self._get_obs() + args.noise_shift
elif args.noise_type =="uniform":
observation = observation + random.uniform(args.uniform_low, args.uniform_high)
else:
observation = self._get_obs()
else:
observation = self._get_obs()
fullpath_original = self.expand_model_path(self.xml_file_original)
info = {
"reward_forward": forward_reward,
"reward_ctrl": -ctrl_cost,
"reward_survive": healthy_reward,
"x_position": xy_position_after[0],
"y_position": xy_position_after[1],
"distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
"x_velocity": x_velocity,
"y_velocity": y_velocity,
"forward_reward": forward_reward,
"source_file_path": fullpath_original,
"target_file_path": self.fullpath,
}
if self._use_contact_forces:
contact_cost = self.contact_cost
costs += contact_cost
info["reward_ctrl"] = -contact_cost
reward = rewards - costs
if args.noise_factor == "reward":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
reward = reward + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
reward = reward + args.noise_shift
elif args.noise_type =="uniform":
reward = reward + random.uniform(args.uniform_low, args.uniform_high)
else:
reward = reward
else:
reward = reward
if self.render_mode == "human":
self.render()
if args.llm_guide == "adversary":
self.llm_disturb_iteration += 1
if args.llm_guide_type == "stochastic":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
prompt = "This is about a robust reinforcement learning setting; we want you as an adversary policy. If the current reward exceeds the previous reward value, please input some observation noises to disturb the environment and improve the learning algorithm's robustness. " \
"the current reward:" + str(reward) + ", the previous reward is" + str(self.previous_reward) \
+ "please slightly revise the current environment state values:" + str(
observation) + ", just output the revised state with its original format" \
"do not output any other things."
prompt_state = gpt_call(prompt)
observation = prompt_state
elif args.llm_guide_type == "uniform":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
observation = gpt_call("the current observation is"+ str(observation))
self.previous_reward = reward
return observation, reward, terminated, False, info
def _get_obs(self):
return self.simulation_state()
def reset_model(self):
self.previous_reward = 0
return self._get_obs()