diff --git a/projects/deep_rl_python/samples/dynamic_programming/utils.py b/projects/deep_rl_python/samples/dynamic_programming/utils.py index b83c5b5..17aa803 100644 --- a/projects/deep_rl_python/samples/dynamic_programming/utils.py +++ b/projects/deep_rl_python/samples/dynamic_programming/utils.py @@ -1,5 +1,6 @@ -from typing import List - +from random import gammavariate +from typing import List, Dict +from enum import Enum import numpy as np from gym import Env @@ -18,18 +19,112 @@ def extract_policy( Returns: policy: a list of actions for each state """ - _n_states = env.observation_space.n - _n_actions = env.action_space.n - _policy = np.zeros(_n_states) + n_states = env.observation_space.n + n_actions = env.action_space.n + next_policy = np.zeros(n_states) - for s in range(_n_states): - _q_states_actions = [ + for s in range(n_states): + q_states_actions = [ sum( trans_proba * (reward + gamma * value_states[next_state]) for trans_proba, next_state, reward, _ in env.P[s][a] ) - for a in range(_n_actions) + for a in range(n_actions) ] - _policy[s] = np.argmax(np.array(_q_states_actions)) + next_policy[s] = np.argmax(np.array(q_states_actions)) + + return next_policy + + +def compute_value_state_action( + states_actions: Dict[int : Dict[int : List[float, int, float, bool]]], + next_value_states: List[float], + state: int, + action: int, + gamma: float = 1.0, +) -> List[float]: + return [ + trans_proba * (reward + gamma * next_value_states[next_state]) + for trans_proba, next_state, reward, _ in states_actions[state][action] + ] + + +class IterationStrategy(Enum): + VALUE = 1 + POLICY = 2 + + +class MarkovDecisionProcess: + def __init__( + self, + env: Env, + n_iterations: int = 1000, + convergence_threshold: float = 1e-20, + gamma: float = 1.0, + ): + self.env = env + self.n_iterations = n_iterations + self.convergence_threshold = convergence_threshold + self.gamma = gamma + self.n_states = self.env.observation_space.n + self.n_actions = self.env.action_space.n + self.states_actions = self.env.P + self.value_states = np.zeros(self.n_states) + + def compute_value_function( + self, + iteration_strategy: IterationStrategy, + policy: List[int] = None, + ): + if (iteration_strategy == IterationStrategy.POLICY) and ( + policy is None + ): + raise ValueError( + "Policy is required for policy iteration strategy." + ) + for i in range(self.n_iterations): + next_value_states = np.copy(self.value_states) + for s in range(self.n_states): + if self.iteration_strategy == IterationStrategy.VALUE: + q_states_actions = [ + sum( + compute_value_state_action( + self.states_actions, + next_value_states, + s, + a, + ) + ) + for a in range(self.n_actions) + ] + self.value_states[s] = max(q_states_actions) + elif self.iteration_strategy == IterationStrategy.POLICY: + a = self.policy[s] + self.value_states[s] = sum( + compute_value_state_action( + self.states_actions, next_value_states, s, a + ) + ) + else: + raise ValueError("Invalid iteration strategy") + + if ( + np.sum(np.fabs(next_value_states - self.value_states)) + <= self.convergence_threshold + ): + print(f"Iteration converged at iteration {i+1}.") + break + return np.round(self.value_states, 4) + + +class OptimalPolicy: + def __init__( + self, + mdp: MarkovDecisionProcess, + iteration_strategy: IterationStrategy, + ): + self.mdp = mdp + self.iteration_strategy = iteration_strategy - return _policy + def extract(self): + pass