Source code for smpl.envs.atropineenv

# -*- coding: utf-8 -*-
"""
AtropineEnv simulates an atropine production environment.
"""

import io

import matplotlib.pyplot as plt
import numpy as np
import requests
from casadi import DM
from gym import spaces, Env

from .helpers.constants import USS, INPUT_REFS, OUTPUT_REF, SIM_TIME
from .helpers.helper_funcs import state_estimator
from .models.atropine_process import Plant, atropine_mpc_controller
from .models.config import ND1, ND2, ND3, V1, V2, V3, V4, dt
from .utils import *


[docs]class AtropineMPC: def __init__(self, model_loc='https://github.com/smpl-env/smpl-experiments/blob/main/configdata/atropineenv/model.npy?raw=true', N=30, Nx=2, Nu=4, uss_subtracted=True, reward_on_ess_subtracted=False, reward_on_steady=True, reward_on_absolute_efactor=False, reward_on_actions_penalty=0.0, reward_on_reject_actions=True, relaxed_max_min_actions=False, observation_include_t=True, observation_include_action=False, observation_include_uss=True, observation_include_ess=True, observation_include_e=True, observation_include_kf=True, observation_include_z=True, observation_include_x=False): response = requests.get(model_loc) response.raise_for_status() self.model_preconfig = np.load(io.BytesIO(response.content), allow_pickle=True) self.N = N self.Nx = Nx self.Nu = Nu self.uss_subtracted = uss_subtracted # we assume that we can see the steady state output during steps. If true, we plus the actions with USS during steps. self.reward_on_ess_subtracted = reward_on_ess_subtracted self.reward_on_steady = reward_on_steady # whether reward base on Efactor (the small the better) or base on how close it is to the steady e-factor self.reward_on_absolute_efactor = reward_on_absolute_efactor # whether reward base on absolute Efactor. (is a valid input only if reward_on_steady is False) self.reward_on_actions_penalty = reward_on_actions_penalty self.reward_on_reject_actions = reward_on_reject_actions # when input actions are larger than max_actions, reject it and end the env immediately. self.relaxed_max_min_actions = relaxed_max_min_actions # assume uss_subtracted = false. # now, select what to include during observations. by default we should have format like # USS1, USS2, USS3, USS4, U1, U2, U3, U4, ESS, E, KF_X1, KF_X2, Z1, Z2, ..., Z30 self.observation_include_t = observation_include_t # 1 self.observation_include_action = observation_include_action # 4 self.observation_include_uss = observation_include_uss # 4 self.observation_include_ess = observation_include_ess # yss, 1 self.observation_include_e = observation_include_e # y, efactor, 1 self.observation_include_kf = observation_include_kf # after kalman filter, 2 self.observation_include_z = observation_include_z # 30 self.observation_include_x = observation_include_x # 1694 assert (self.observation_include_kf)
[docs] def predict(self, state): # self.observation_include_t = observation_include_t # 1 # self.observation_include_action = observation_include_action # 4 # self.observation_include_uss = observation_include_uss # 4 # self.observation_include_ess = observation_include_ess # yss, 1 # self.observation_include_e = observation_include_e # y, efactor, 1 # self.observation_include_kf = observation_include_kf # after kalman filter, 2 # self.observation_include_z = observation_include_z # 30 # self.observation_include_x = observation_include_x # 1694 observations = list(state) if self.observation_include_t: t = observations[:1] observations = observations[1:] if self.observation_include_action: action = observations[:4] observations = observations[4:] if self.observation_include_uss: uss = observations[:4] observations = observations[4:] if self.observation_include_ess: yss = observations[:1] observations = observations[1:] if self.observation_include_e: efactor = observations[:1] observations = observations[1:] if self.observation_include_kf: kf = observations[:2] observations = observations[2:] if self.observation_include_z: z = observations[:30] observations = observations[30:] if self.observation_include_x: x = observations[:1694] observations = observations[1694:] # TODO: self.model_preconfig diff from self.model.A, B, C, D, K !!!!!!!!!! uk = atropine_mpc_controller( np.array(kf), self.N, self.Nx, self.Nu, USS, INPUT_REFS, OUTPUT_REF, self.model_preconfig[0], self.model_preconfig[1], self.model_preconfig[2], self.model_preconfig[3], ) / 1000 # unscale if self.uss_subtracted is False: uk = uk + USS return uk
[docs]class AtropineEnvGym(smplEnvBase): def __init__(self, dense_reward=True, normalize=True, debug_mode=False, action_dim=4, reward_function=None, done_calculator=None, observation_name=None, action_name=None, np_dtype=np.float32, max_steps=60, error_reward=-100000.0, x0_loc='https://raw.githubusercontent.com/smpl-env/smpl/main/smpl/configdata/atropineenv/x0.txt', z0_loc='https://raw.githubusercontent.com/smpl-env/smpl/main/smpl/configdata/atropineenv/z0.txt', model_loc='https://github.com/smpl-env/smpl-experiments/blob/main/configdata/atropineenv/model.npy?raw=true', uss_subtracted=True, reward_on_ess_subtracted=False, reward_on_steady=True, reward_on_absolute_efactor=False, reward_on_actions_penalty=0.0, reward_on_reject_actions=True, reward_scaler=1.0, relaxed_max_min_actions=False, observation_include_t=True, observation_include_action=False, observation_include_uss=True, observation_include_ess=True, observation_include_e=True, observation_include_kf=True, observation_include_z=True, observation_include_x=False, # init_mpc_controller=False ): # define arguments self.step_count = 0 self.total_reward = 0 self.done = False self.dense_reward = dense_reward self.normalize = normalize self.debug_mode = debug_mode self.action_dim = action_dim self.reward_function = reward_function self.done_calculator = done_calculator self.observation_name = observation_name self.action_name = action_name self.np_dtype = np_dtype self.max_steps = max_steps # note that if self.max_steps == -1 then this simulator runs forever (open-loop). self.error_reward = error_reward if self.reward_function is None: self.reward_function = self.reward_function_standard if self.done_calculator is None: self.done_calculator = self.done_calculator_standard self.uss_subtracted = uss_subtracted # we assume that we can see the steady state output during steps. If true, we plus the actions with USS during steps. self.reward_on_ess_subtracted = reward_on_ess_subtracted self.reward_on_steady = reward_on_steady # whether reward base on Efactor (the small the better) or base on how close it is to the steady e-factor self.reward_on_absolute_efactor = reward_on_absolute_efactor # whether reward base on absolute Efactor. (is a valid input only if reward_on_steady is False) self.reward_on_actions_penalty = reward_on_actions_penalty self.reward_on_reject_actions = reward_on_reject_actions # when input actions are larger than max_actions, reject it and end the env immediately. self.reward_scaler = reward_scaler # a scaler multiplied on rewards to avoid numerical errors. By default, it is 1.0. In our paper, it is 100000.0 (which takes readability into account). self.relaxed_max_min_actions = relaxed_max_min_actions # assume uss_subtracted = false. # now, select what to include during observations. by default we should have format like # USS1, USS2, USS3, USS4, U1, U2, U3, U4, ESS, E, KF_X1, KF_X2, Z1, Z2, ..., Z30 self.observation_include_t = observation_include_t # 1 self.observation_include_action = observation_include_action # 4 self.observation_include_uss = observation_include_uss # 4 self.observation_include_ess = observation_include_ess # yss, 1 self.observation_include_e = observation_include_e # y, efactor, 1 self.observation_include_kf = observation_include_kf # after kalman filter, 2 self.observation_include_z = observation_include_z # 30 self.observation_include_x = observation_include_x # 1694 if type(x0_loc) is str: response = requests.get(x0_loc) response.raise_for_status() self.x0 = np.loadtxt(io.BytesIO(response.content)) elif type(x0_loc) is np.ndarray: self.x0 = x0_loc elif type(x0_loc) is list: self.x0 = np.array(x0_loc) else: raise Exception("x0_loc must be a string, list or a numpy array") if type(z0_loc) is str: response = requests.get(z0_loc) response.raise_for_status() self.z0 = np.loadtxt(io.BytesIO(response.content)) elif type(z0_loc) is np.ndarray: self.z0 = z0_loc elif type(z0_loc) is list: self.z0 = np.array(z0_loc) else: raise Exception("z0_loc must be a string, list or a numpy array") response = requests.get(model_loc) response.raise_for_status() self.model_preconfig = np.load(io.BytesIO(response.content), allow_pickle=True) # for a fixed batch. self.ur = INPUT_REFS # reference inputs self.yr = OUTPUT_REF # reference output self.num_sim = int(SIM_TIME / dt) # SIM_TIME/ 400 hours as fixed batch. self.observation_dim = 1 * self.observation_include_t + 4 * self.observation_include_action + 4 * self.observation_include_uss + 1 * self.observation_include_ess + \ 1 * self.observation_include_e + 2 * self.observation_include_kf + 30 * self.observation_include_z + 1694 * self.observation_include_x if self.observation_name is None: self.observation_name = [f'o_{i}' for i in range(self.observation_dim)] if self.action_name is None: self.action_name = [f'a_{i}' for i in range(self.action_dim)] max_observations = [] if self.observation_include_t: max_observations.append(np.ones(1, dtype=np.float32) * 100.0) # by convention if self.observation_include_action: max_observations.append(np.ones(4, dtype=np.float32) * 1.0) # by convention if self.observation_include_uss: max_observations.append(np.ones(4, dtype=np.float32) * 0.5) # from dataset if self.observation_include_ess: max_observations.append(np.ones(1, dtype=np.float32) * 15.0) # from dataset if self.observation_include_e: max_observations.append(np.ones(1, dtype=np.float32) * 20.0) # from dataset if self.observation_include_kf: max_observations.append(np.ones(2, dtype=np.float32) * 5.0) # from dataset if self.observation_include_z: max_observations.append(np.ones(30, dtype=np.float32) * 0.5) # by convention if self.observation_include_x: max_observations.append(np.ones(1694, dtype=np.float32) * 50.0) # by convention try: self.max_observations = np.concatenate(max_observations) except ValueError: raise Exception("observations must contain something! Need at least one array to concatenate") min_observations = [] if self.observation_include_t: min_observations.append(np.ones(1, dtype=np.float32) * 0.0) # by convention if self.observation_include_action: min_observations.append(np.ones(4, dtype=np.float32) * 0.0) # by convention if self.observation_include_uss: min_observations.append(np.ones(4, dtype=np.float32) * 0.0) # from dataset if self.observation_include_ess: min_observations.append(np.ones(1, dtype=np.float32) * 0.0) # from dataset if self.observation_include_e: min_observations.append(np.ones(1, dtype=np.float32) * 0.0) # from dataset if self.observation_include_kf: min_observations.append(np.ones(2, dtype=np.float32) * -5.0) # from dataset if self.observation_include_z: min_observations.append(np.ones(30, dtype=np.float32) * 0.0) # by convention if self.observation_include_x: min_observations.append(np.ones(1694, dtype=np.float32) * 0.0) # by convention try: self.min_observations = np.concatenate(min_observations) except ValueError: raise Exception("observations must contain something! Need at least one array to concatenate") if not self.uss_subtracted: self.max_actions = np.array([0.408, 0.125, 0.392, 0.214], dtype=np.float32) # from dataset self.min_actions = np.array([0.4075, 0.105, 0.387, 0.208], dtype=np.float32) # from dataset if self.relaxed_max_min_actions: self.max_actions = np.array([0.5, 0.2, 0.5, 0.4], dtype=np.float32) # from dataset self.min_actions = np.array([0.3, 0.0, 0.2, 0.1], dtype=np.float32) # from dataset else: self.max_actions = np.array([1.92476206e-05, 1.22118426e-02, 1.82154982e-03, 3.59729230e-04], dtype=np.float32) # from dataset self.min_actions = np.array([-0.00015742, -0.00146234, -0.00021812, -0.00300454], dtype=np.float32) # from dataset if self.relaxed_max_min_actions: self.max_actions = np.array([2.0e-05, 1.3e-02, 2.0e-03, 4.0e-04], dtype=np.float32) # from dataset self.min_actions = np.array([-0.00016, -0.0015, -0.00022, -0.00301], dtype=np.float32) # from dataset if self.normalize: self.observation_space = spaces.Box(low=-1, high=1, shape=(self.observation_dim,)) self.action_space = spaces.Box(low=-1, high=1, shape=(self.action_dim,)) else: self.observation_space = spaces.Box(low=self.min_observations, high=self.max_observations, shape=(self.observation_dim,)) self.action_space = spaces.Box(low=self.min_actions, high=self.max_actions, shape=(self.action_dim,)) self.plant = Plant(ND1, ND2, ND3, V1, V2, V3, V4, dt) self.yss = self.plant.calculate_Efactor(DM(self.z0)) # steady state output, e-factor
[docs] def sample_initial_state(self, no_sample=False, lower_bound=0.99, upper_bound=1.01): if no_sample: x0 = self.x0 else: x0_c = self.x0.clip( 1e-30) # treat all negative concentration values as numerical error and set to close to zero to avoid numerical issues. low = x0_c * lower_bound up = x0_c * upper_bound x0 = np.random.uniform(low, up) self.zk = DM(self.z0) # shape is (30,) self.xk = self.plant.mix_and_get_initial_condition(x0, USS)[0] # shape is (1694,) self.previous_efactor = self.yss # steady state output, e-factor, the small the better observations = [] if self.observation_include_t: observations.append(np.array([0], dtype=np.float32)) # since initial state should have self.step_count = 0 if self.observation_include_action: observations.append(np.zeros(4, dtype=np.float32)) if self.observation_include_uss: observations.append(USS) if self.observation_include_ess: observations.append(np.array([self.yss], dtype=np.float32)) if self.observation_include_e: observations.append(np.array([self.previous_efactor], dtype=np.float32)) if self.observation_include_kf: self.xe = np.ones(2, dtype=np.float32) * 0.001 observations.append(self.xe) if self.observation_include_z: observations.append(self.zk.full().flatten()) if self.observation_include_x: observations.append(self.xk.full().flatten()) try: observation = np.concatenate(observations) except ValueError: raise Exception("observations must contain something! Need at least one array to concatenate") return observation
[docs] def reset(self, initial_state=None, normalize=None): """ Required by gym, this function resets the environment and returns an initial observation. """ self.step_count = 0 self.total_reward = 0 self.done = False if initial_state is not None: initial_state = np.array(initial_state, dtype=self.np_dtype) observation = initial_state self.init_observation = initial_state else: observation = self.sample_initial_state() self.init_observation = observation self.previous_observation = observation # TOMODIFY: reset your environment here. self.U = [] # inputs self.Y = [] normalize = self.normalize if normalize is None else normalize if normalize: observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations) return observation
[docs] def reward_function_standard(self, previous_observation, action, current_observation, reward=None): """the s, a, r, s, a calculation. Args: previous_observation ([np.ndarray]): This is denormalized observation, as usual. action ([np.ndarray]): This is denormalized action, as usual. current_observation ([np.ndarray]): This is denormalized observation, as usual. reward ([float]): If reward is provided, directly return the reward. Returns: [float]: reward. """ if self.observation_beyond_box(current_observation) or self.action_beyond_box(action): return self.error_reward elif reward is not None: # TOMODIFY: insert your own reward function here. reward = reward * self.reward_scaler reward = max(self.error_reward, reward) # reward cannot be smaller than the error_reward if self.debug_mode: print("reward:", reward) return reward
[docs] def done_calculator_standard(self, current_observation, step_count, reward, done=None, done_info=None): """check whether the current episode is considered finished. returns a boolean value indicated done or not, and a dictionary with information. here in done_calculator_standard, done_info looks like {"terminal": boolean, "timeout": boolean}, where "timeout" is true when episode end due to reaching the maximum episode length, "terminal" is true when "timeout" or episode end due to termination conditions such as env error encountered. (basically done) Args: current_observation ([np.ndarray]): This is denormalized observation, as usual. step_count ([int]): step_count. reward ([float]): reward. done ([bool], optional): Defaults to None. done_info ([dict], optional): how the environment is finished. Defaults to None. Returns: [(float, dict)]: done and done_info. """ if done is None: done = False else: if done_info is not None: return done, done_info else: raise Exception("When done is given, done_info should also be given.") if done_info is None: done_info = {"terminal": False, "timeout": False} else: if done_info["terminal"] or done_info["timeout"]: done = True return done, done_info if self.observation_beyond_box(current_observation): done_info["terminal"] = True done = True if reward == self.error_reward: done_info["terminal"] = True done = True if step_count >= self.max_steps and self.max_steps != -1: # same as range(0, max_steps) done_info["terminal"] = True done_info["timeout"] = True done = True return done, done_info
def _step(self, action, normalize=None): """ Required by gym, his function performs one step within the environment and returns the observation, the reward, whether the episode is finished and debug information, if any. """ if self.debug_mode: print("action:", action) reward = None done = None done_info = {"terminal": False, "timeout": False} action = np.array(action, dtype=self.np_dtype) normalize = self.normalize if normalize is None else normalize if normalize: action, _, _ = denormalize_spaces(action, self.max_actions, self.min_actions) # TOMODIFY: proceed your environment here and collect the observation. The observation should be a numpy array. if self.uss_subtracted: uk = action uk_p_USS = [ action[0] + USS[0], action[1] + USS[1], action[2] + USS[2], action[3] + USS[3] ] else: uk = action uk_p_USS = action self.U.append(uk_p_USS) _, xnext, znext = self.plant.simulate(self.xk, self.zk, uk_p_USS) efactor = self.plant.calculate_Efactor(znext) self.Y.append(efactor) # ---- reward calculation here, to avoid large if statements. ---- reward_on_steady = -abs(efactor - self.yss) reward_on_absolute_efactor = -abs(efactor) reward_on_efactor_diff = self.previous_efactor - efactor previous_efactor = self.previous_efactor if self.reward_on_ess_subtracted: reward = self.yss - efactor # efactor the smaller the better elif self.reward_on_steady: reward = reward_on_steady else: if self.reward_on_absolute_efactor: reward = reward_on_absolute_efactor else: reward = reward_on_efactor_diff reward += np.linalg.norm(action * self.reward_on_actions_penalty, ord=2) # ---- reward calculation here, to avoid large if statements. ---- self.previous_efactor = efactor self.xk = xnext self.zk = znext observations = [] if self.observation_include_t: observations.append(np.array([self.step_count], dtype=np.float32)) if self.observation_include_action: observations.append(action) if self.observation_include_uss: observations.append(USS) if self.observation_include_ess: observations.append(np.array([self.yss], dtype=np.float32)) if self.observation_include_e: observations.append(np.array([self.previous_efactor], dtype=np.float32)) # !!!!!!!also, shall I run a step here? how do we align? if self.observation_include_kf: self.xe = state_estimator( self.xe, uk, efactor - self.yss, # self.Xhat[k] is previous step xe self.model_preconfig[0], self.model_preconfig[1], self.model_preconfig[2], self.model_preconfig[4] ) observations.append(self.xe) if self.observation_include_z: observations.append(self.zk.full().flatten()) if self.observation_include_x: observations.append(self.xk.full().flatten()) try: observation = np.concatenate(observations) except ValueError: raise Exception("observations must contain something! Need at least one array to concatenate") observation = np.array(observation, dtype=self.np_dtype) # compute reward reward = self.reward_function(self.previous_observation, action, observation, reward=reward) # compute done if not done: done, done_info = self.done_calculator(observation, self.step_count, reward, done=done, done_info=done_info) self.previous_observation = observation self.total_reward += reward if self.dense_reward: reward = reward # conventional elif not done: reward = 0.0 else: reward = self.total_reward # clip observation so that it won't be beyond the box observation = observation.clip(self.min_observations, self.max_observations) if normalize: observation, _, _ = normalize_spaces(observation, self.max_observations, self.min_observations) self.step_count += 1 info = { "efactor": efactor, "previous_efactor": previous_efactor, "reward_on_steady": reward_on_steady, "reward_on_absolute_efactor": reward_on_absolute_efactor, "reward_on_efactor_diff": reward_on_efactor_diff } info.update(done_info) return observation, reward, done, info
[docs] def step(self, action, normalize=None): try: return self._step(action) except Exception: reward = self.error_reward done = True done_info = {"terminal": True, "timeout": False} observation = np.zeros(self.observation_dim, dtype=np.float32) info = { "efactor": -self.error_reward, "previous_efactor": self.previous_efactor, "reward_on_steady": reward, "reward_on_absolute_efactor": reward, "reward_on_efactor_diff": reward } return observation, reward, done, info
[docs] def plot(self, show=False, efactor_fig_name=None, input_fig_name=None): target_efactor = [self.yss + self.yr] * self.num_sim target_inputs = [USS + self.ur] * self.num_sim U = np.array(self.U) * 1000 # scale the solution to micro Litres target_inputs = np.array(target_inputs) * 1000 # scale the solution to micro Litres local_t = [k * dt for k in range(self.num_sim)] # plots plt.close("all") plt.figure(0) plt.plot(local_t, self.Y, label='Real Output') plt.plot(local_t, target_efactor, linestyle="--", label='Steady State Output') plt.xlabel('Time [min]') plt.ylabel('E-Factor [A.U.]') plt.legend() plt.grid() if efactor_fig_name is not None: plt.savefig(efactor_fig_name) plt.tight_layout() # create figure (fig), and array of axes (ax) fig, axs = plt.subplots(nrows=2, ncols=2) axs[0, 0].step(local_t, U[:, 0], where='post', label='Real Input') axs[0, 0].step(local_t, target_inputs[:, 0], where='post', linestyle="--", label='Steady State Input') axs[0, 0].set_ylabel(u'U1 [\u03bcL/min]') axs[0, 0].set_xlabel('time [min]') axs[0, 0].grid() axs[0, 1].step(local_t, U[:, 1], where='post', label='Real Input') axs[0, 1].step(local_t, target_inputs[:, 1], where='post', linestyle="--", label='Steady State Input') axs[0, 1].set_ylabel(u'U2 [\u03bcL/min]') axs[0, 1].set_xlabel('time [min]') axs[0, 1].grid() axs[1, 0].step(local_t, U[:, 2], where='post', label='Real Input') axs[1, 0].step(local_t, target_inputs[:, 2], where='post', linestyle="--", label='Steady State Input') axs[1, 0].set_ylabel(u'U3 [\u03bcL/min]') axs[1, 0].set_xlabel('time [min]') axs[1, 0].grid() axs[1, 1].step(local_t, U[:, 3], where='post', label='Real Input') axs[1, 1].step(local_t, target_inputs[:, 3], where='post', linestyle="--", label='Steady State Input') axs[1, 1].set_ylabel(u'U4 [\u03bcL/min]') axs[1, 1].set_xlabel('time [min]') axs[1, 1].legend() plt.tight_layout() plt.grid() if input_fig_name is not None: plt.savefig(input_fig_name) if show: plt.show() else: plt.close()
[docs] def evalute_algorithms(self, algorithms, num_episodes=1, error_reward=None, initial_states=None, to_plt=True, plot_dir='./plt_results'): """ when excecuting evalute_algorithms, the self.normalize should be False. algorithms: list of (algorithm, algorithm_name, normalize). algorithm has to have a method predict(observation) -> action: np.ndarray. num_episodes: number of episodes to run error_reward: overwrite self.error_reward initial_states: None, location of numpy file of initial states or a (numpy) list of initial states to_plt: whether generates plot or not plot_dir: None or directory to save plots returns: list of average_rewards over each episode and num of episodes """ try: assert self.normalize is False except AssertionError: print("env.normalize should be False when executing evalute_algorithms") self.normalize = False if error_reward is not None: self.error_reward = error_reward if plot_dir is not None: mkdir_p(plot_dir) initial_states = self.set_initial_states(initial_states, num_episodes) observations_list = [[] for _ in range( len(algorithms))] # observations_list[i][j][t][k] is algorithm_i_game_j_observation_t_element_k actions_list = [[] for _ in range(len(algorithms))] # actions_list[i][j][t][k] is algorithm_i_game_j_action_t_element_k rewards_list = [[] for _ in range(len(algorithms))] # rewards_list[i][j][t] is algorithm_i_game_j_reward_t for n_epi in tqdm(range(num_episodes)): for n_algo in range(len(algorithms)): algo, algo_name, normalize = algorithms[n_algo] algo_observes = [] algo_actions = [] algo_rewards = [] # list, for this algorithm, reawards of this trajectory. init_obs = self.reset(initial_state=initial_states[n_epi]) # algo_observes.append(init_obs) o = init_obs done = False while not done: if normalize: o, _, _ = normalize_spaces(o, self.max_observations, self.min_observations) a = algo.predict(o) if normalize: a, _, _ = denormalize_spaces(a, self.max_actions, self.min_actions) algo_actions.append(a) o, r, done, _ = self.step(a) algo_observes.append(o) algo_rewards.append(r) observations_list[n_algo].append(algo_observes) actions_list[n_algo].append(algo_actions) rewards_list[n_algo].append(algo_rewards) if to_plt: # plot observations for n_o in range(self.observation_dim): o_name = self.observation_name[n_o] plt.close("all") plt.figure(0) plt.title(f"{o_name}") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(observations_list[n_algo][-1])[:, n_o], label=algo_name, alpha=alpha) plt.plot([initial_states[n_epi][n_o] for _ in range(self.max_steps)], linestyle="--", label=f"initial_{o_name}") # plt.plot([self.steady_observations[n_o] for _ in range(self.max_steps)], linestyle="-.", # label=f"steady_{o_name}") # add back when we confirm steady_observations yss+ plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.annotate(str(initial_states[n_epi][n_o]), xy=(0, initial_states[n_epi][n_o])) # plt.annotate(str(self.steady_observations[n_o]), xy=(0, self.steady_observations[n_o])) # add back when we confirm steady_observations plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_observation_{o_name}.png") plt.savefig(path_name) plt.close() # plot actions for n_a in range(self.action_dim): a_name = self.action_name[n_a] plt.close("all") plt.figure(0) plt.title(f"{a_name}") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(actions_list[n_algo][-1])[:, n_a], label=algo_name, alpha=alpha) # plt.plot([self.steady_actions[n_a] for _ in range(self.max_steps)], linestyle="-.", # label=f"steady_{a_name}") # add back when we confirm steady_actions USS+ plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_action_{a_name}.png") plt.savefig(path_name) plt.close() # plot rewards plt.close("all") plt.figure(0) plt.title("reward") for n_algo in range(len(algorithms)): alpha = 1 * (0.7 ** (len(algorithms) - 1 - n_algo)) _, algo_name, _ = algorithms[n_algo] plt.plot(np.array(rewards_list[n_algo][-1]), label=algo_name, alpha=alpha) plt.xticks(np.arange(1, self.max_steps + 2, 1)) plt.legend() if plot_dir is not None: path_name = os.path.join(plot_dir, f"{n_epi}_reward.png") plt.savefig(path_name) plt.close() observations_list = np.array(observations_list) actions_list = np.array(actions_list) rewards_list = np.array(rewards_list) return observations_list, actions_list, rewards_list
# /---- standard ----
[docs] def evaluate_rewards_mean_std_over_episodes(self, algorithms, num_episodes=1, error_reward=None, initial_states=None, to_plt=True, plot_dir='./plt_results', computer_on_episodes=False): """ returns: mean and std of rewards over all episodes. since the rewards_list is not aligned (e.g. some trajectories are shorter than the others), so we cannot directly convert it to numpy array. we have to convert and unwrap the nested list. if computer_on_episodes, we first average the rewards_list over episodes, then compute the mean and std. else, we directly compute the mean and std for each step. """ result_dict = {} observations_list, actions_list, rewards_list = self.evalute_algorithms(algorithms, num_episodes=num_episodes, error_reward=error_reward, initial_states=initial_states, to_plt=to_plt, plot_dir=plot_dir) from warnings import warn warn('The function evaluate_rewards_mean_std_over_episodes is deprecated. Please use report_rewards.', DeprecationWarning, stacklevel=2) for n_algo in range(len(algorithms)): _, algo_name, _ = algorithms[n_algo] rewards_list_curr_algo = rewards_list[n_algo] if computer_on_episodes: rewards_mean_over_episodes = [] # rewards_mean_over_episodes[n_epi] is mean of rewards of n_epi for n_epi in range(num_episodes): if rewards_list_curr_algo[n_epi][ -1] == self.error_reward: # if error_reward is provided, self.error_reward is overwritten in self.evalute_algorithms rewards_mean_over_episodes.append(self.error_reward) else: rewards_mean_over_episodes.append(np.mean(rewards_list_curr_algo[n_epi])) rewards_mean = np.mean(rewards_mean_over_episodes) rewards_std = np.std(rewards_mean_over_episodes) else: unwrap_list = [] for games_r_list in rewards_list_curr_algo: unwrap_list += games_r_list rewards_mean = np.mean(unwrap_list) rewards_std = np.std(unwrap_list) print(f"{algo_name}_reward_mean: {rewards_mean}") result_dict[algo_name + "_reward_mean"] = rewards_mean print(f"{algo_name}_reward_std: {rewards_std}") result_dict[algo_name + "_reward_std"] = rewards_std if plot_dir is not None: f_dir = os.path.join(plot_dir, 'result.json') else: f_dir = 'result.json' json.dump(result_dict, open(f_dir, 'w+')) return observations_list, actions_list, rewards_list