From 1068c3f2b0d078016c28d481b3cae56e2cd79f14 Mon Sep 17 00:00:00 2001 From: lejinvarghese Date: Sun, 2 Oct 2022 19:41:59 -0400 Subject: [PATCH 1/2] first draft, created value function Signed-off-by: lejinvarghese --- .../samples/dynamic_programming/utils.py | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/projects/deep_rl_python/samples/dynamic_programming/utils.py b/projects/deep_rl_python/samples/dynamic_programming/utils.py index b83c5b5..2a4260f 100644 --- a/projects/deep_rl_python/samples/dynamic_programming/utils.py +++ b/projects/deep_rl_python/samples/dynamic_programming/utils.py @@ -1,5 +1,6 @@ +from random import gammavariate from typing import List - +from enum import Enum import numpy as np from gym import Env @@ -33,3 +34,68 @@ def extract_policy( _policy[s] = np.argmax(np.array(_q_states_actions)) return _policy + + +class IterationStrategy(Enum): + VALUE = 1 + POLICY = 2 + + +class ValueFunction: + def __init__( + self, + env: Env, + iteration_strategy: IterationStrategy, + policy: List[int] = None, + n_iterations: int = 1000, + convergence_threshold: float = 1e-20, + gamma: float = 1.0, + ): + self.env = env + self.iteration_strategy = iteration_strategy + if (policy is None) and ( + iteration_strategy == IterationStrategy.POLICY + ): + raise ValueError("Policy is required for policy iteration") + else: + self.policy = policy + self.n_iterations = n_iterations + self.convergence_threshold = convergence_threshold + self.gamma = gamma + self.n_states = self.env.observation_space.n + self.n_actions = self.env.action_space.n + self.value_states = np.zeros(self.n_states) + + def compute(self): + for i in range(self.n_iterations): + next_value_states = np.copy(self.value_states) + for s in range(self.n_states): + if self.iteration_strategy == IterationStrategy.VALUE: + q_states_actions = [ + sum(self.__compute_value_state(s, a, next_value_states)) + for a in range(self.n_actions) + ] + self.value_states[s] = max(q_states_actions) + elif self.iteration_strategy == IterationStrategy.POLICY: + a = self.policy[s] + self.value_states[s] = sum( + self.__compute_value_state(s, a, next_value_states) + ) + else: + raise ValueError("Invalid iteration strategy") + + if ( + np.sum(np.fabs(next_value_states - self.value_states)) + <= self.convergence_threshold + ): + print(f"Iteration converged at iteration {i+1}.") + break + return np.round(self.value_states, 4) + + def __compute_value_state( + self, state: int, action: int, next_value_states: List[float] + ) -> List[float]: + return [ + trans_proba * (reward + self.gamma * next_value_states[next_state]) + for trans_proba, next_state, reward, _ in self.env.P[state][action] + ] From bc75055b78ca2896f6245507feeb6f00bf40e07f Mon Sep 17 00:00:00 2001 From: lejinvarghese Date: Sun, 2 Oct 2022 20:27:47 -0400 Subject: [PATCH 2/2] draft commit, mdp, op Signed-off-by: lejinvarghese --- .../samples/dynamic_programming/utils.py | 87 ++++++++++++------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/projects/deep_rl_python/samples/dynamic_programming/utils.py b/projects/deep_rl_python/samples/dynamic_programming/utils.py index 2a4260f..17aa803 100644 --- a/projects/deep_rl_python/samples/dynamic_programming/utils.py +++ b/projects/deep_rl_python/samples/dynamic_programming/utils.py @@ -1,5 +1,5 @@ from random import gammavariate -from typing import List +from typing import List, Dict from enum import Enum import numpy as np from gym import Env @@ -19,21 +19,34 @@ def extract_policy( Returns: policy: a list of actions for each state """ - _n_states = env.observation_space.n - _n_actions = env.action_space.n - _policy = np.zeros(_n_states) + n_states = env.observation_space.n + n_actions = env.action_space.n + next_policy = np.zeros(n_states) - for s in range(_n_states): - _q_states_actions = [ + for s in range(n_states): + q_states_actions = [ sum( trans_proba * (reward + gamma * value_states[next_state]) for trans_proba, next_state, reward, _ in env.P[s][a] ) - for a in range(_n_actions) + for a in range(n_actions) ] - _policy[s] = np.argmax(np.array(_q_states_actions)) + next_policy[s] = np.argmax(np.array(q_states_actions)) - return _policy + return next_policy + + +def compute_value_state_action( + states_actions: Dict[int : Dict[int : List[float, int, float, bool]]], + next_value_states: List[float], + state: int, + action: int, + gamma: float = 1.0, +) -> List[float]: + return [ + trans_proba * (reward + gamma * next_value_states[next_state]) + for trans_proba, next_state, reward, _ in states_actions[state][action] + ] class IterationStrategy(Enum): @@ -41,45 +54,56 @@ class IterationStrategy(Enum): POLICY = 2 -class ValueFunction: +class MarkovDecisionProcess: def __init__( self, env: Env, - iteration_strategy: IterationStrategy, - policy: List[int] = None, n_iterations: int = 1000, convergence_threshold: float = 1e-20, gamma: float = 1.0, ): self.env = env - self.iteration_strategy = iteration_strategy - if (policy is None) and ( - iteration_strategy == IterationStrategy.POLICY - ): - raise ValueError("Policy is required for policy iteration") - else: - self.policy = policy self.n_iterations = n_iterations self.convergence_threshold = convergence_threshold self.gamma = gamma self.n_states = self.env.observation_space.n self.n_actions = self.env.action_space.n + self.states_actions = self.env.P self.value_states = np.zeros(self.n_states) - def compute(self): + def compute_value_function( + self, + iteration_strategy: IterationStrategy, + policy: List[int] = None, + ): + if (iteration_strategy == IterationStrategy.POLICY) and ( + policy is None + ): + raise ValueError( + "Policy is required for policy iteration strategy." + ) for i in range(self.n_iterations): next_value_states = np.copy(self.value_states) for s in range(self.n_states): if self.iteration_strategy == IterationStrategy.VALUE: q_states_actions = [ - sum(self.__compute_value_state(s, a, next_value_states)) + sum( + compute_value_state_action( + self.states_actions, + next_value_states, + s, + a, + ) + ) for a in range(self.n_actions) ] self.value_states[s] = max(q_states_actions) elif self.iteration_strategy == IterationStrategy.POLICY: a = self.policy[s] self.value_states[s] = sum( - self.__compute_value_state(s, a, next_value_states) + compute_value_state_action( + self.states_actions, next_value_states, s, a + ) ) else: raise ValueError("Invalid iteration strategy") @@ -92,10 +116,15 @@ def compute(self): break return np.round(self.value_states, 4) - def __compute_value_state( - self, state: int, action: int, next_value_states: List[float] - ) -> List[float]: - return [ - trans_proba * (reward + self.gamma * next_value_states[next_state]) - for trans_proba, next_state, reward, _ in self.env.P[state][action] - ] + +class OptimalPolicy: + def __init__( + self, + mdp: MarkovDecisionProcess, + iteration_strategy: IterationStrategy, + ): + self.mdp = mdp + self.iteration_strategy = iteration_strategy + + def extract(self): + pass