diff --git a/scripts/train.py b/scripts/train.py index ef5b20c0..8971a19e 100644 --- a/scripts/train.py +++ b/scripts/train.py @@ -118,6 +118,11 @@ def clear_tm_instances(): _, uncompiled_shared_network = make_untrained_iqn_network(jit=config_copy.use_jit, is_inference=False) uncompiled_shared_network.share_memory() + # init random number generator + seed = 275328254363729247691611008422666101254 + # creating the RNG that is passed around. spawn() will create new independent child generators from it + rng = np.random.default_rng(seed) + # Start learner process learner_process = mp.Process( target=learner_process_fn, @@ -129,6 +134,7 @@ def clear_tm_instances(): base_dir, save_dir, tensorboard_base_dir, + rng.spawn(1)[0], ), ) learner_process.start() @@ -148,6 +154,7 @@ def clear_tm_instances(): base_dir, save_dir, config_copy.base_tmi_port + process_number, + rng.spawn(1)[0], ), ) for rollout_queue, process_number in zip(rollout_queues, range(config_copy.gpu_collectors_count)) diff --git a/trackmania_rl/agents/iqn.py b/trackmania_rl/agents/iqn.py index 8665dbae..adc2e866 100644 --- a/trackmania_rl/agents/iqn.py +++ b/trackmania_rl/agents/iqn.py @@ -4,6 +4,7 @@ - The Trainer class, which implements the IQN training logic in method train_on_batch. - The Inferer class, which implements utilities for forward propagation with and without exploration. """ + import copy import math import random @@ -350,15 +351,17 @@ class Inferer: "epsilon_boltzmann", "tau_epsilon_boltzmann", "is_explo", + "_rng", ) - def __init__(self, inference_network, iqn_k, tau_epsilon_boltzmann): + def __init__(self, inference_network, iqn_k, tau_epsilon_boltzmann, rng: np.random.Generator): self.inference_network = inference_network self.iqn_k = iqn_k self.epsilon = None self.epsilon_boltzmann = None self.tau_epsilon_boltzmann = tau_epsilon_boltzmann self.is_explo = None + self._rng = rng def infer_network(self, img_inputs_uint8: npt.NDArray, float_inputs: npt.NDArray, tau=None) -> npt.NDArray: """ @@ -415,9 +418,9 @@ def get_exploration_action(self, img_inputs_uint8: npt.NDArray, float_inputs: np if self.is_explo and r < self.epsilon: # Choose a random action - get_argmax_on = np.random.randn(*q_values.shape) + get_argmax_on = self._rng.standard_normal(*q_values.shape) elif self.is_explo and r < self.epsilon + self.epsilon_boltzmann: - get_argmax_on = q_values + self.tau_epsilon_boltzmann * np.random.randn(*q_values.shape) + get_argmax_on = q_values + self.tau_epsilon_boltzmann * self._rng.standard_normal(*q_values.shape) else: get_argmax_on = q_values diff --git a/trackmania_rl/buffer_utilities.py b/trackmania_rl/buffer_utilities.py index 03a1c380..28732517 100644 --- a/trackmania_rl/buffer_utilities.py +++ b/trackmania_rl/buffer_utilities.py @@ -2,6 +2,7 @@ This file contains various utility functions used to manage replay buffers. This is where the magic of "mini-races" or "clipped horizon average reward" is handled. """ + import random from copy import deepcopy from typing import Any, Dict, Union @@ -17,7 +18,170 @@ from config_files import config_copy -to_torch_dtype = { + +class BufferUtil: + __slots__ = ("_rng",) + + def __init__( + self, + rng: np.random.Generator, + ): + self._rng = rng + + def _buffer_collate_function(self, batch): + ( + state_img, + state_float, + state_potential, + action, + rewards, + next_state_img, + next_state_float, + next_state_potential, + gammas, + terminal_actions, + n_steps, + ) = tuple( + map( + lambda attr_name: _fast_collate_cpu(batch, attr_name), + [ + "state_img", + "state_float", + "state_potential", + "action", + "rewards", + "next_state_img", + "next_state_float", + "next_state_potential", + "gammas", + "terminal_actions", + "n_steps", + ], + ) + ) + + temporal_mini_race_current_time_actions = ( + np.abs( + self._rng.integers( + low=-config_copy.oversample_long_term_steps + config_copy.oversample_maximum_term_steps, + high=config_copy.temporal_mini_race_duration_actions + config_copy.oversample_maximum_term_steps, + size=(len(state_img),), + dtype=int, + ) + ) + - config_copy.oversample_maximum_term_steps + ).clip(min=0) + + temporal_mini_race_next_time_actions = temporal_mini_race_current_time_actions + n_steps + + state_float[:, 0] = temporal_mini_race_current_time_actions + next_state_float[:, 0] = temporal_mini_race_next_time_actions + + possibly_reduced_n_steps = n_steps - (temporal_mini_race_next_time_actions - config_copy.temporal_mini_race_duration_actions).clip( + min=0 + ) + + terminal = (possibly_reduced_n_steps >= terminal_actions) | ( + temporal_mini_race_next_time_actions >= config_copy.temporal_mini_race_duration_actions + ) + + gammas = np.take_along_axis(gammas, possibly_reduced_n_steps[:, None] - 1, axis=1).squeeze(-1) + gammas = np.where(terminal, 0, gammas) + + rewards = np.take_along_axis(rewards, possibly_reduced_n_steps[:, None] - 1, axis=1).squeeze(-1) + + rewards += np.where(terminal, 0, gammas * next_state_potential) + rewards -= state_potential + + state_img, state_float, action, rewards, next_state_img, next_state_float, gammas = tuple( + map( + lambda batch, attr_name: _send_to_gpu(batch, attr_name), + [ + state_img, + state_float, + action, + rewards, + next_state_img, + next_state_float, + gammas, + ], + [ + "state_img", + "state_float", + "action", + "rewards", + "next_state_img", + "next_state_float", + "gammas", + ], + ) + ) + + state_img = (state_img.to(torch.float16) - 128) / 128 + next_state_img = (next_state_img.to(torch.float16) - 128) / 128 + + if config_copy.apply_randomcrop_augmentation: + # Same transformation is applied for state and next_state. + # Different transformation is applied to each element in a batch. + i = random.randint(0, 2 * config_copy.n_pixels_to_crop_on_each_side) + j = random.randint(0, 2 * config_copy.n_pixels_to_crop_on_each_side) + state_img = transforms.functional.crop( + transforms.functional.pad(state_img, padding=config_copy.n_pixels_to_crop_on_each_side, padding_mode="edge"), + i, + j, + config_copy.H_downsized, + config_copy.W_downsized, + ) + next_state_img = transforms.functional.crop( + transforms.functional.pad(next_state_img, padding=config_copy.n_pixels_to_crop_on_each_side, padding_mode="edge"), + i, + j, + config_copy.H_downsized, + config_copy.W_downsized, + ) + + return ( + state_img, + state_float, + action, + rewards, + next_state_img, + next_state_float, + gammas, + ) + + def make_buffers(self, buffer_size: int) -> tuple[ReplayBuffer, ReplayBuffer]: + buffer = ReplayBuffer( + storage=ListStorage(buffer_size), + batch_size=config_copy.batch_size, + collate_fn=self._buffer_collate_function, + prefetch=1, + sampler=_CustomPrioritizedSampler( + self._rng, buffer_size, config_copy.prio_alpha, config_copy.prio_beta, config_copy.prio_epsilon, torch.float64 + ) + if config_copy.prio_alpha > 0 + else RandomSampler(), + ) + buffer_test = ReplayBuffer( + storage=ListStorage(int(buffer_size * config_copy.buffer_test_ratio)), + batch_size=config_copy.batch_size, + collate_fn=self._buffer_collate_function, + sampler=_CustomPrioritizedSampler( + self._rng, buffer_size, config_copy.prio_alpha, config_copy.prio_beta, config_copy.prio_epsilon, torch.float64 + ) + if config_copy.prio_alpha > 0 + else RandomSampler(), + ) + return buffer, buffer_test + + def resize_buffers(self, buffer: ReplayBuffer, buffer_test: ReplayBuffer, new_buffer_size: int) -> tuple[ReplayBuffer, ReplayBuffer]: + new_buffer, new_buffer_test = self.make_buffers(new_buffer_size) + _copy_buffer_content_to_other_buffer(buffer, new_buffer) + _copy_buffer_content_to_other_buffer(buffer_test, new_buffer_test) + return new_buffer, new_buffer_test + + +_to_torch_dtype = { "uint8": torch.uint8, "float32": torch.float32, "int64": torch.int64, @@ -27,148 +191,25 @@ } -def fast_collate_cpu(batch, attr_name): +def _fast_collate_cpu(batch, attr_name): elem = getattr(batch[0], attr_name) elem_array = hasattr(elem, "__len__") shape = (len(batch),) + (elem.shape if elem_array else ()) data_type = elem.flat[0].dtype if elem_array else type(elem).__name__ - data_type = to_torch_dtype[str(data_type)] + data_type = _to_torch_dtype[str(data_type)] buffer = torch.empty(size=shape, dtype=data_type, pin_memory=True).numpy() source = [getattr(memory, attr_name) for memory in batch] buffer[:] = source[:] return buffer -def send_to_gpu(batch, attr_name): +def _send_to_gpu(batch, attr_name): return torch.as_tensor(batch).to( non_blocking=True, device="cuda", memory_format=torch.channels_last if "img" in attr_name else torch.preserve_format ) -def buffer_collate_function(batch): - ( - state_img, - state_float, - state_potential, - action, - rewards, - next_state_img, - next_state_float, - next_state_potential, - gammas, - terminal_actions, - n_steps, - ) = tuple( - map( - lambda attr_name: fast_collate_cpu(batch, attr_name), - [ - "state_img", - "state_float", - "state_potential", - "action", - "rewards", - "next_state_img", - "next_state_float", - "next_state_potential", - "gammas", - "terminal_actions", - "n_steps", - ], - ) - ) - - temporal_mini_race_current_time_actions = ( - np.abs( - np.random.randint( - low=-config_copy.oversample_long_term_steps + config_copy.oversample_maximum_term_steps, - high=config_copy.temporal_mini_race_duration_actions + config_copy.oversample_maximum_term_steps, - size=(len(state_img),), - dtype=int, - ) - ) - - config_copy.oversample_maximum_term_steps - ).clip(min=0) - - temporal_mini_race_next_time_actions = temporal_mini_race_current_time_actions + n_steps - - state_float[:, 0] = temporal_mini_race_current_time_actions - next_state_float[:, 0] = temporal_mini_race_next_time_actions - - possibly_reduced_n_steps = n_steps - (temporal_mini_race_next_time_actions - config_copy.temporal_mini_race_duration_actions).clip( - min=0 - ) - - terminal = (possibly_reduced_n_steps >= terminal_actions) | ( - temporal_mini_race_next_time_actions >= config_copy.temporal_mini_race_duration_actions - ) - - gammas = np.take_along_axis(gammas, possibly_reduced_n_steps[:, None] - 1, axis=1).squeeze(-1) - gammas = np.where(terminal, 0, gammas) - - rewards = np.take_along_axis(rewards, possibly_reduced_n_steps[:, None] - 1, axis=1).squeeze(-1) - - rewards += np.where(terminal, 0, gammas * next_state_potential) - rewards -= state_potential - - state_img, state_float, action, rewards, next_state_img, next_state_float, gammas = tuple( - map( - lambda batch, attr_name: send_to_gpu(batch, attr_name), - [ - state_img, - state_float, - action, - rewards, - next_state_img, - next_state_float, - gammas, - ], - [ - "state_img", - "state_float", - "action", - "rewards", - "next_state_img", - "next_state_float", - "gammas", - ], - ) - ) - - state_img = (state_img.to(torch.float16) - 128) / 128 - next_state_img = (next_state_img.to(torch.float16) - 128) / 128 - - if config_copy.apply_randomcrop_augmentation: - # Same transformation is applied for state and next_state. - # Different transformation is applied to each element in a batch. - i = random.randint(0, 2 * config_copy.n_pixels_to_crop_on_each_side) - j = random.randint(0, 2 * config_copy.n_pixels_to_crop_on_each_side) - state_img = transforms.functional.crop( - transforms.functional.pad(state_img, padding=config_copy.n_pixels_to_crop_on_each_side, padding_mode="edge"), - i, - j, - config_copy.H_downsized, - config_copy.W_downsized, - ) - next_state_img = transforms.functional.crop( - transforms.functional.pad(next_state_img, padding=config_copy.n_pixels_to_crop_on_each_side, padding_mode="edge"), - i, - j, - config_copy.H_downsized, - config_copy.W_downsized, - ) - - return ( - state_img, - state_float, - action, - rewards, - next_state_img, - next_state_float, - gammas, - ) - - -class CustomPrioritizedSampler(PrioritizedSampler): +class _CustomPrioritizedSampler(PrioritizedSampler): """ Custom Prioritized Sampler which implements a slightly modified behavior compared to torchrl's original implementation. @@ -178,6 +219,7 @@ class CustomPrioritizedSampler(PrioritizedSampler): def __init__( self, + rng: np.random.Generator, max_capacity: int, alpha: float, beta: float, @@ -186,7 +228,8 @@ def __init__( reduction: str = "max", default_priority_ratio: float = 2.0, ) -> None: - super(CustomPrioritizedSampler, self).__init__(max_capacity, alpha, beta, eps, dtype, reduction) + super(_CustomPrioritizedSampler, self).__init__(max_capacity, alpha, beta, eps, dtype, reduction) + self._rng = rng self._average_priority = None self._default_priority_ratio = default_priority_ratio self._uninitialized_memories = 0.0 @@ -206,7 +249,7 @@ def sample(self, storage: Storage, batch_size: int) -> tuple[Tensor, dict[str, A self._average_priority = p_sum / len(storage) if p_sum <= 0: raise RuntimeError("negative p_sum") - mass = np.random.uniform(0.0, p_sum, size=batch_size) + mass = self._rng.uniform(0.0, p_sum, size=batch_size) index = self._sum_tree.scan_lower_bound(mass) if not isinstance(index, np.ndarray): index = np.array([index]) @@ -265,47 +308,15 @@ def load_state_dict(self, state_dict: Dict[str, Any]) -> None: self._sum_tree = state_dict.pop("_sum_tree") -def copy_buffer_content_to_other_buffer(source_buffer: ReplayBuffer, target_buffer: ReplayBuffer) -> None: +def _copy_buffer_content_to_other_buffer(source_buffer: ReplayBuffer, target_buffer: ReplayBuffer) -> None: assert source_buffer._storage.max_size <= target_buffer._storage.max_size target_buffer.extend(source_buffer._storage._storage) - if isinstance(source_buffer._sampler, CustomPrioritizedSampler) and isinstance(target_buffer._sampler, CustomPrioritizedSampler): + if isinstance(source_buffer._sampler, _CustomPrioritizedSampler) and isinstance(target_buffer._sampler, _CustomPrioritizedSampler): target_buffer._sampler._average_priority = source_buffer._sampler._average_priority target_buffer._sampler._uninitialized_memories = source_buffer._sampler._uninitialized_memories if isinstance(source_buffer._sampler, PrioritizedSampler) and isinstance(target_buffer._sampler, PrioritizedSampler): for i in range(len(source_buffer)): target_buffer._sampler._sum_tree[i] = source_buffer._sampler._sum_tree.at(i) - - -def make_buffers(buffer_size: int) -> tuple[ReplayBuffer, ReplayBuffer]: - buffer = ReplayBuffer( - storage=ListStorage(buffer_size), - batch_size=config_copy.batch_size, - collate_fn=buffer_collate_function, - prefetch=1, - sampler=CustomPrioritizedSampler( - buffer_size, config_copy.prio_alpha, config_copy.prio_beta, config_copy.prio_epsilon, torch.float64 - ) - if config_copy.prio_alpha > 0 - else RandomSampler(), - ) - buffer_test = ReplayBuffer( - storage=ListStorage(int(buffer_size * config_copy.buffer_test_ratio)), - batch_size=config_copy.batch_size, - collate_fn=buffer_collate_function, - sampler=CustomPrioritizedSampler( - buffer_size, config_copy.prio_alpha, config_copy.prio_beta, config_copy.prio_epsilon, torch.float64 - ) - if config_copy.prio_alpha > 0 - else RandomSampler(), - ) - return buffer, buffer_test - - -def resize_buffers(buffer: ReplayBuffer, buffer_test: ReplayBuffer, new_buffer_size: int) -> tuple[ReplayBuffer, ReplayBuffer]: - new_buffer, new_buffer_test = make_buffers(new_buffer_size) - copy_buffer_content_to_other_buffer(buffer, new_buffer) - copy_buffer_content_to_other_buffer(buffer_test, new_buffer_test) - return new_buffer, new_buffer_test diff --git a/trackmania_rl/multiprocess/collector_process.py b/trackmania_rl/multiprocess/collector_process.py index 7cd2b79c..740a485e 100644 --- a/trackmania_rl/multiprocess/collector_process.py +++ b/trackmania_rl/multiprocess/collector_process.py @@ -1,6 +1,7 @@ """ This file implements a single multithreaded worker that handles a Trackmania game instance and provides rollout results to the learner process. """ + import importlib import time from itertools import chain, count, cycle @@ -24,6 +25,7 @@ def collector_process_fn( base_dir: Path, save_dir: Path, tmi_port: int, + rng: np.random.Generator, ): from trackmania_rl.map_loader import analyze_map_cycle, load_next_map_zone_centers from trackmania_rl.tmi_interaction import game_instance_manager @@ -44,7 +46,7 @@ def collector_process_fn( except Exception as e: print("Worker could not load weights, exception:", e) - inferer = iqn.Inferer(inference_network, config_copy.iqn_k, config_copy.tau_epsilon_boltzmann) + inferer = iqn.Inferer(inference_network, config_copy.iqn_k, config_copy.tau_epsilon_boltzmann, rng) def update_network(): # Update weights of the inference network @@ -67,8 +69,8 @@ def update_network(): # ======================================================== for _ in range(5): inferer.infer_network( - np.random.randint(low=0, high=255, size=(1, config_copy.H_downsized, config_copy.W_downsized), dtype=np.uint8), - np.random.rand(config_copy.float_input_dim).astype(np.float32), + rng.integers(low=0, high=255, size=(1, config_copy.H_downsized, config_copy.W_downsized), dtype=np.uint8), + rng.random(config_copy.float_input_dim).astype(np.float32), ) # game_instance_manager.update_current_zone_idx(0, zone_centers, np.zeros(3)) diff --git a/trackmania_rl/multiprocess/learner_process.py b/trackmania_rl/multiprocess/learner_process.py index 1fe5f174..7afe0ea3 100644 --- a/trackmania_rl/multiprocess/learner_process.py +++ b/trackmania_rl/multiprocess/learner_process.py @@ -1,6 +1,7 @@ """ This file implements the main training loop, tensorboard statistics tracking, etc... """ + import copy import importlib import math @@ -29,7 +30,7 @@ race_time_left_curves, tau_curves, ) -from trackmania_rl.buffer_utilities import make_buffers, resize_buffers +from trackmania_rl.buffer_utilities import BufferUtil from trackmania_rl.map_reference_times import reference_times @@ -41,6 +42,7 @@ def learner_process_fn( base_dir: Path, save_dir: Path, tensorboard_base_dir: Path, + rng: np.random.Generator, ): layout_version = "lay_mono" SummaryWriter(log_dir=str(tensorboard_base_dir / layout_version)).add_custom_scalars( @@ -163,7 +165,8 @@ def learner_process_fn( memory_size, memory_size_start_learn = utilities.from_staircase_schedule( config_copy.memory_size_schedule, accumulated_stats["cumul_number_memories_generated"] ) - buffer, buffer_test = make_buffers(memory_size) + buffer_util = BufferUtil(rng) + buffer, buffer_test = buffer_util.make_buffers(memory_size) offset_cumul_number_single_memories_used = memory_size_start_learn * config_copy.number_times_single_memory_is_used_before_discard # noinspection PyBroadException @@ -202,6 +205,7 @@ def learner_process_fn( inference_network=online_network, iqn_k=config_copy.iqn_k, tau_epsilon_boltzmann=config_copy.tau_epsilon_boltzmann, + rng=rng, ) while True: # Trainer loop @@ -242,7 +246,7 @@ def learner_process_fn( accumulated_stats["cumul_number_memories_generated"], ) if new_memory_size != memory_size: - buffer, buffer_test = resize_buffers(buffer, buffer_test, new_memory_size) + buffer, buffer_test = buffer_util.resize_buffers(buffer, buffer_test, new_memory_size) offset_cumul_number_single_memories_used += ( new_memory_size_start_learn - memory_size_start_learn ) * config_copy.number_times_single_memory_is_used_before_discard @@ -521,9 +525,9 @@ def learner_process_fn( >= accumulated_stats["cumul_number_single_memories_used_next_target_network_update"] ): accumulated_stats["cumul_number_target_network_updates"] += 1 - accumulated_stats[ - "cumul_number_single_memories_used_next_target_network_update" - ] += config_copy.number_memories_trained_on_between_target_network_updates + accumulated_stats["cumul_number_single_memories_used_next_target_network_update"] += ( + config_copy.number_memories_trained_on_between_target_network_updates + ) # print("UPDATE") utilities.soft_copy_param(target_network, online_network, config_copy.soft_update_tau) print("", flush=True)