diff --git a/distributed_training/averaging/avg_handler.py b/distributed_training/averaging/avg_handler.py index 75476b76..a15902a7 100644 --- a/distributed_training/averaging/avg_handler.py +++ b/distributed_training/averaging/avg_handler.py @@ -10,7 +10,7 @@ from typing import Any, Dict, List, Tuple from distributed_training.averaging.exceptions import AllReduceError, ModelStateError from distributed_training.protocol import AllReduce -from distributed_training.data.dataset import DatasetLoader +from distributed_training.data.dataset_loader import DatasetLoader from distributed_training.utils.dendrite import ( async_dendrite_forward, ) @@ -85,22 +85,22 @@ async def fetch_training_data(self, block): attempt = 0 while attempt < self.retry_limit: try: - pages = await DatasetLoader.next_pages( - offset=block, - n_pages=5, - seed=self.uid, - ) - random.seed(self.uid) - random.shuffle(pages) - - dataset = await DatasetLoader.create( - batch_size=4, - sequence_length=1024, - pages_info=pages, + loader = DatasetLoader( tokenizer=self.tokenizer, + seed_base=self.uid, + current_block=block, ) - return dataset + await loader.load_bucket_data_to_buffer() + + # 1) add method="truncate" for debugging + # 2) default buffer quantity is 2300000 so we set to small quantity of 460000 (20%) + # cause here we only verify + loader.reduce_buffer_size(target_size=460000) + + loader.prepare_batches() + + return loader except Exception as e: self.logger.error(f"Error fetching training data: {str(e)}") attempt += 1 diff --git a/distributed_training/base/neuron.py b/distributed_training/base/neuron.py index 06209984..5ed14eae 100644 --- a/distributed_training/base/neuron.py +++ b/distributed_training/base/neuron.py @@ -36,7 +36,7 @@ R2Access, R2Config, ) -from distributed_training.utils.logger import setup_logging +# from distributed_training.utils.logger import setup_logging from distributed_training.utils.misc import ttl_get_block from dotenv import load_dotenv @@ -171,7 +171,7 @@ def __init__(self, config=None): self.master_uid = master_uid[0].item() # Setup Logging - setup_logging(self, config=self.config) + # setup_logging(self, config=self.config) # Create the R2 data model r2 = R2Config( diff --git a/distributed_training/data/dataset.py b/distributed_training/data/dataset.py deleted file mode 100644 index 39751593..00000000 --- a/distributed_training/data/dataset.py +++ /dev/null @@ -1,555 +0,0 @@ -# The MIT License (MIT) -# Copyright © 2023 Yuma Rao -# Copyright © 2023 const - -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of -# the Software. -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. -import asyncio -import random -import typing - -import aiohttp -import numpy as np -import torch -import time -import bittensor as bt -from torch.utils.data import IterableDataset -from transformers import AutoTokenizer - - -class SubsetLoader(IterableDataset): - """ - Base class for data-specific subset loader classes. - - """ - - def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer = None, - pack_samples: bool = False, - ): - self.batch_size = batch_size - self.sequence_length = sequence_length - self.num_pages = num_pages - self.tokenizer = tokenizer - self.pack_samples = pack_samples - self.num_rows_per_page = 100 - - # Buffers - self.buffer = [] - self.used_buffer = [] - self.padded_buffer = [] - self.lock = asyncio.Lock() - - async def fetch_data_for_pages(self, pages): - """ - Set the pages to be used to fill the buffer. Then fetch the page data - to the buffer. - """ - - self.pages = pages - - # Empty the buffer if it is not. - self.buffer = [] - - async with aiohttp.ClientSession() as session: - tasks = [self._fetch_data_for_page(page, session) for page in self.pages] - await asyncio.gather(*tasks) - - async def _fetch_data_for_page(self, page, session): - retry_limit = 10 - attempt = 0 - while attempt < retry_limit: - config_name, page_number, split = page - - # Create the request parameters - params = dict( - dataset=self.name, - config=config_name, - split=split, - offset=page_number, - limit=self.num_rows_per_page, - ) - - try: - async with session.get(self.rows_base_url, params=params) as response: - response.raise_for_status() - data = await response.json() - - # Prepare the data to append - buffer_to_append = [] - for row in data["rows"]: - content = row["row"]["text"] - input_ids = self.tokenizer(content, truncation=True)[ - "input_ids" - ] - buffer_to_append.extend(input_ids) - buffer_to_append.append(self.tokenizer.eos_token_id) - - async with self.lock: - self.buffer.extend(buffer_to_append) - self.pages.append((config_name, page_number, split)) - break # Success, exit retry loop - - except aiohttp.ClientResponseError: - attempt += 1 - if attempt < retry_limit: - await asyncio.sleep(5) - else: - raise - - def _get_pad_size(self, input_ids): - """Calculate padding size for a sequence""" - if self.pack_samples: - return 1 - - sample_size = len(input_ids) - remainder = sample_size % self.sequence_length - return self.sequence_length - remainder if remainder != 0 else 0 - - def _refill_padded_buffer(self): - """ - This methods pulls one page from `self.buffer`, pads it and pushes - it to the `self.padded_buffer`. - """ - print(f"\n--- Starting _refill_padded_buffer ---") - print( - f"Initial state: buffer size={len(self.buffer)}, padded_buffer size={len(self.padded_buffer)}" - ) - - while self.buffer and len(self.padded_buffer) < self.sequence_length: - try: - # search for EOS token index and cut the buffer at it. - EOS_index = self.buffer.index(self.tokenizer.eos_token_id) - print(f"Found EOS token at index {EOS_index}") - - input_ids = self.buffer[: EOS_index + 1] - self.buffer = self.buffer[EOS_index + 1 :] - - self.used_buffer += input_ids - - # Add to padded buffer without the EOS token. - self.padded_buffer += input_ids[:-1] - - # Calculate and apply padding - pad_size = self._get_pad_size(input_ids=input_ids[:-1]) - print( - f"Adding sequence of length {len(input_ids[:-1])} with padding {pad_size}" - ) - - self.padded_buffer += [self.tokenizer.eos_token_id] * pad_size - - except ValueError: - print("No EOS token found in buffer!") - if len(self.buffer) > 0: - print(f"Buffer content preview: {self.buffer[:10]}...") - break - - print( - f"Final state: buffer size={len(self.buffer)}, padded_buffer size={len(self.padded_buffer)}" - ) - print("--- Finished _refill_padded_buffer ---\n") - - def __iter__(self): - return self - - def __next__(self): - # Check if we have enough tokens for at least one batch - required_tokens = self.sequence_length * self.batch_size - if len(self.buffer) < required_tokens: - raise StopIteration - - batch = [] - labels = [] - - for i in range(self.batch_size): - # Get input sequence and pad if necessary - current_seq = self.buffer[: self.sequence_length] - current_seq_len = len(current_seq) - - if current_seq_len != self.sequence_length: - input_seq = current_seq + [self.tokenizer.eos_token_id] * ( - self.sequence_length - current_seq_len - ) - else: - input_seq = current_seq - - # Get label sequence (shifted by 1) and pad if necessary - label_seq_raw = self.buffer[1 : self.sequence_length + 1] - label_seq_len = len(label_seq_raw) - - if label_seq_len != self.sequence_length: - label_seq = label_seq_raw + [self.tokenizer.eos_token_id] * ( - self.sequence_length - label_seq_len - ) - else: - label_seq = label_seq_raw - - # Add to batch - batch.append(torch.tensor(input_seq)) - labels.append(torch.tensor(label_seq)) - - # Move buffer forward - self.buffer = self.buffer[self.sequence_length :] - - stacked_batch = torch.stack(batch) - stacked_labels = torch.stack(labels) - - return stacked_batch, stacked_labels - - -class DatasetLoader(SubsetLoader): - name: str = "HuggingFaceFW/fineweb-edu" - rows_base_url: str = "https://datasets-server.huggingface.co/rows" - size_base_url: str = "https://datasets-server.huggingface.co/size" - - retry_limit: int = 5 # Number of retries - retry_delay: int = 60 # Seconds to wait between retries - num_rows_per_page: int = 100 - - logger = bt.logging - - @staticmethod - async def next_pages( - offset: int, n_pages: int, seed: str, num_rows_per_page: int = 100 - ): - configs_data = await DatasetLoader.fetch_dataset_configs() - keys = sorted(configs_data.keys()) - rng = np.random.default_rng( - hash(seed) & 0xFFFFFFFF - ) # Create a generator with a seed - rng.bit_generator.advance(offset) # Efficiently skip ahead `n` steps - result = [] - for _ in range(n_pages): - idx = rng.integers(0, len(keys)) - cfg = keys[idx] - config = rng.choice(list(configs_data.keys())) - choice = rng.integers( - 0, configs_data[cfg]["num_rows"] - 1 - num_rows_per_page - ) - result.append((cfg, int(choice), configs_data[cfg]["split"])) - return result - - def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - pages_info=None, - tokenizer: AutoTokenizer = None, - pack_samples: bool = False, - ): - super().__init__( - batch_size, sequence_length, num_pages, tokenizer, pack_samples - ) - - # Initialize properties - self.configs_data = None - self.pages = [] - self.buffer = [] - self.lock = asyncio.Lock() # For thread-safe operations - - @classmethod - async def create( - cls, - batch_size=None, - sequence_length=None, - num_pages=None, - pages_info=None, - tokenizer: AutoTokenizer = None, - pack_samples: bool = False, - ): - self = cls( - batch_size=batch_size, - sequence_length=sequence_length, - num_pages=num_pages, - tokenizer=tokenizer, - pack_samples=pack_samples, - ) - - # Fetch dataset configs asynchronously - self.configs_data = await cls.fetch_dataset_configs() - - if pages_info is not None: - await self._fetch(pages_info) - elif self.num_pages: - await self._fetch_data_to_buffer(self.num_pages) - - return self - - async def _fetch(self, page_info: typing.Tuple[str, int, str], batch_size: int = 5): - self.pages = list(page_info) - - async with aiohttp.ClientSession() as session: - for i in range(0, len(self.pages), batch_size): - batch = self.pages[i : i + batch_size] - tasks = [ - self._fetch_data_for_page((config_name, page, split), session) - for (config_name, page, split) in batch - ] - await asyncio.gather(*tasks) - - async def _fetch_data_to_buffer(self, num_pages): - """ - Randomly sample pages and add their data to the buffer. - If a page is inaccessible, another one is sampled. - This method sets the `pages` property. - """ - self.pages = [] - pages_to_fetch = self.get_random_pages(num_pages) - - async with aiohttp.ClientSession() as session: - tasks = [ - self._fetch_data_for_page(page, session) for page in pages_to_fetch - ] - await asyncio.gather(*tasks) - - async def fetch_data_to_rows(self, num_pages): - rows = [] - pages_to_fetch = self.get_random_pages(num_pages) - - async with aiohttp.ClientSession() as session: - tasks = [ - self._fetch_rows_for_page(page, session) for page in pages_to_fetch - ] - results = await asyncio.gather(*tasks) - for page_rows in results: - rows.extend(page_rows) - - return rows - - async def _fetch_data_for_page(self, page, session): - """ - Fetches data asynchronously for a single page, processes it without blocking the event loop, - and appends the tokenized data to the buffer. - - Args: - page: A tuple containing the config name, page number, and split. - session: The HTTP session used for making requests. - - Raises: - Exception: If the maximum number of retry attempts is exceeded. - """ - retry_limit = self.retry_limit - attempt = 0 - while attempt < retry_limit: - config_name, page_number, split = page - - # Create the request parameters - params = { - "dataset": self.name, - "config": config_name, - "split": split, - "offset": page_number, - "limit": self.num_rows_per_page, - } - - try: - # Make an asynchronous HTTP GET request to fetch the data - async with session.get(self.rows_base_url, params=params) as response: - response.raise_for_status() # Raise an exception for HTTP errors - data = await response.json() - - # Prepare the data to append - buffer_to_append = [] - - # Asynchronously process each row without blocking the event loop - tasks = [ - self._tokenize_content(row["row"]["text"]) - for row in data["rows"] - ] - - # Gather the tokenized results concurrently - row_input_ids = await asyncio.gather(*tasks) - - # Flatten the list of input IDs and append them to the buffer - for input_ids in row_input_ids: - buffer_to_append.extend(input_ids) - - # Safely append the processed data to the shared buffer - async with self.lock: - self.buffer.extend(buffer_to_append) - self.pages.append((config_name, page_number, split)) - break # Success, exit retry loop - - except aiohttp.ClientResponseError as e: - # Handle HTTP client errors with a retry mechanism - attempt += 1 - if attempt < retry_limit: - self.logger.debug( - f"Retrying page {page} due to error: {e}. Attempt {attempt} of {retry_limit}" - ) - self.logger.debug( - f"Waiting {self.retry_delay * attempt} seconds before retrying..." - ) - await asyncio.sleep( - self.retry_delay * attempt - ) # Wait before retrying - else: - raise Exception( - f"Maximum retry attempts exceeded for page {page}" - ) from e - - async def _tokenize_content(self, content): - """ - Asynchronously tokenizes a string of content using the tokenizer in a separate thread. - - Args: - content: The text content to be tokenized. - - Returns: - The list of token IDs for the content, including the EOS token. - """ - # Offload the CPU-bound tokenization to a thread executor to prevent blocking the event loop - input_ids = await asyncio.to_thread( - self.tokenizer.encode, - content, - truncation=True, - max_length=self.sequence_length, - ) - input_ids.append(self.tokenizer.eos_token_id) - return input_ids - - async def _fetch_rows_for_page(self, page, session): - retry_limit = self.retry_limit - attempt = 0 - while attempt < retry_limit: - config_name, page_number, split = page - - # Create the request parameters - params = dict( - dataset=self.name, - config=config_name, - split=split, - offset=page_number, - limit=self.num_rows_per_page, - ) - - try: - async with session.get(self.rows_base_url, params=params) as response: - response.raise_for_status() - data = await response.json() - - # Collect the rows - return [row["row"]["text"] for row in data["rows"]] - - except aiohttp.ClientResponseError: - attempt += 1 - if attempt < retry_limit: - await asyncio.sleep(self.retry_delay) - else: - raise - - def get_random_pages(self, num_pages): - """ - Randomly sample pages. - A page is a row number of a given split of a given dataset dump. - """ - pages = [] - - for _ in range(num_pages): - # Choose a random config - config_name = random.choice(list(self.configs_data.keys())) - - # Choose a random page (row) - page = random.randint( - 0, - self.configs_data[config_name]["num_rows"] - 1 - self.num_rows_per_page, - ) - - split = self.configs_data[config_name]["split"] - - pages.append((config_name, page, split)) - - return pages - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple. - """ - page_names = [] - - if hasattr(self, "pages"): - page_names = [ - f"{cfg_name}_{num_rows}_{split}" - for cfg_name, num_rows, split in self.pages - ] - - return page_names - - @staticmethod - async def fetch_dataset_configs() -> typing.Dict[str, typing.Dict]: - """ - Fetch the different dump names, aka configs, aka samples, of the - dataset. - The returned value is a dictionary with dump names as keys and - a dict of the number of rows and the split as values. - """ - # Request parameters - params = dict(dataset=DatasetLoader.name) - - attempt = 0 - while attempt < DatasetLoader.retry_limit: - try: - async with aiohttp.ClientSession() as session: - async with session.get( - DatasetLoader.size_base_url, params=params - ) as response: - response.raise_for_status() - - data = await response.json() - - # Extract the configs dict - configs_dict = data["size"]["splits"] - - # Now create a dict with config names (except 'default') as - # keys, and the number of rows as values - configs_data = { - entry["config"]: { - "num_rows": entry["num_rows"], - "split": entry["split"], - } - for entry in configs_dict - if entry["config"] != "default" - } - - return configs_data - - except aiohttp.ClientResponseError: - attempt += 1 - if attempt < DatasetLoader.retry_limit: - await asyncio.sleep(DatasetLoader.retry_delay) - else: - raise - - @staticmethod - async def next_pages_async( - offset: int, n_pages: int, seed: str, num_rows_per_page: int = 100 - ): - configs_data = await DatasetLoader.fetch_dataset_configs() - rng = np.random.default_rng( - hash(seed) & 0xFFFFFFFF - ) # Create a generator with a seed - rng.bit_generator.advance(offset) # Efficiently skip ahead `n` steps - result = [] - for _ in range(n_pages): - config = rng.choice(list(configs_data.keys())) - choice = rng.integers( - 0, configs_data[config]["num_rows"] - 1 - num_rows_per_page - ) - result.append((str(config), int(choice), configs_data[config]["split"])) - return result diff --git a/distributed_training/data/dataset_loader.py b/distributed_training/data/dataset_loader.py new file mode 100644 index 00000000..9a4cbd3b --- /dev/null +++ b/distributed_training/data/dataset_loader.py @@ -0,0 +1,361 @@ +import os +import json +import yaml +import time +import asyncio +import torch +import s3fs +import pyarrow.parquet as pq +from dotenv import load_dotenv, find_dotenv +from transformers import AutoTokenizer +import bittensor as bt +import random +import hashlib + + +class BatchLoader: + def __init__(self, tokenizer=None, batch_size=None, sequence_length=None): + self.tokenizer = tokenizer + self.batch_size = batch_size + self.sequence_length = sequence_length + + self.buffer = [] + self._data = None + self._num_batches = 0 + self._batch_idx = 0 + + def reduce_buffer_size(self, target_size: int = None, fraction: float = None, method: str = "sample"): + """ + Reduce the buffer size by either absolute count or fraction. + + Args: + target_size: Desired number of tokens in buffer + fraction: Fraction of buffer to keep (e.g., 0.2 for 20%) + method: "truncate" or "sample" (random sampling) + """ + if target_size is None and fraction is None: + return + + if fraction is not None: + target_size = int(len(self.buffer) * fraction) + + original_size = len(self.buffer) # Store original size for debug message + + if len(self.buffer) <= target_size: + return + + if method == "truncate": + self.buffer = self.buffer[:target_size] + elif method == "sample": + rng = self.generate_rng("buffer_reduction") + self.buffer = rng.sample(self.buffer, target_size) + + if self.debug: + self.logger.debug(f"Buffer reduced from {original_size} to {target_size} tokens ({method})") + + def prepare_batches(self, batch_size=None, sequence_length=None, device="cpu"): + batch_size = batch_size or self.batch_size + sequence_length = sequence_length or self.sequence_length + + token_buffer = self.buffer + total_tokens = len(token_buffer) + num_sequences = total_tokens // sequence_length + trimmed_tokens = token_buffer[: num_sequences * sequence_length] + + data = torch.tensor(trimmed_tokens, dtype=torch.long, device=device) + data = data.view(num_sequences, sequence_length) + num_batches = num_sequences // batch_size + + self._data = data + self._num_batches = num_batches + self._batch_idx = 0 + self.batch_size = batch_size + self.sequence_length = sequence_length + + # if self.debug and num_batches > 0: + # first_batch = data[:batch_size] + # self.logger.debug("Preview tokens:", first_batch[0][:5].tolist()) + + def __iter__(self): + if self._data is None: + raise RuntimeError("Call prepare_batches() before iterating.") + self._batch_idx = 0 + return self + + def __next__(self): + if self._batch_idx >= self._num_batches: + raise StopIteration + i = self._batch_idx + batch = self._data[i * self.batch_size : (i + 1) * self.batch_size] + self._batch_idx += 1 + return batch, batch.clone() + + def __len__(self): + return self._num_batches + +class DatasetLoader(BatchLoader): + def __init__( + self, + seed_base: int, + current_block: int = 0, + tokenizer=None, + + max_configs: int = 3, + max_shards: int = 3, + max_row_groups: int = 4, + max_rows_per_group: int = 100, + + batch_size: int = 4, + sequence_length: int = 1024, + + debug: bool = True, + randomness: bool = True, + ): + super().__init__( + tokenizer=tokenizer, + batch_size=batch_size, + sequence_length=sequence_length + ) + + self.seed_base = seed_base + self.current_block = current_block + self.logger = bt.logging + load_dotenv(find_dotenv()) + + self.max_configs = max_configs + self.max_shards = max_shards + self.max_row_groups = max_row_groups + self.max_rows_per_group = max_rows_per_group + + self.debug = debug + self.randomness = randomness + # self.debug and self.logger.debug(f"self.max_configs: {self.max_configs}, self.max_shards: {self.max_shards}, self.max_row_groups: {self.max_row_groups}, self.max_rows_per_group: {self.max_rows_per_group}") + + def require_env(name: str) -> str: + val = os.getenv(name) + if not val: + raise ValueError(f"{name} env var not set") + return val + + self.BUCKET = require_env(f"R2_BUCKET_NAME") + self.ACCOUNT_ID = require_env(f"R2_ACCOUNT_ID") + self.ACCESS_KEY = require_env(f"R2_ADMIN_ACCESS_KEY_ID") + self.SECRET_KEY = require_env(f"R2_ADMIN_SECRET_ACCESS_KEY") + # self.logger.debug(f"self.BUCKET: {self.BUCKET}") + # self.logger.debug(f"self.ACCOUNT_ID: {self.ACCOUNT_ID}") + # self.logger.debug(f"self.ACCESS_KEY: {self.ACCESS_KEY}") + # self.logger.debug(f"self.SECRET_KEY: {self.SECRET_KEY}") + + self.DATASET = "HuggingFaceFW_fineweb-edu-score-2" + self.META_NAME = "_metadata.yaml" + self.SHARD_NAME = "_shard_sizes.json" + + self.CACHE_DIR = ".cache" + os.makedirs(self.CACHE_DIR, exist_ok=True) + self.meta_cache_path = os.path.join(self.CACHE_DIR, self.META_NAME) + self.shard_cache_path = os.path.join(self.CACHE_DIR, self.SHARD_NAME) + + self.fs = s3fs.S3FileSystem( + key=self.ACCESS_KEY, + secret=self.SECRET_KEY, + client_kwargs={"endpoint_url": f"https://{self.ACCOUNT_ID}.r2.cloudflarestorage.com"}, + ) + + self.metadata = {} + self.shard_sizes = {} + + self.total_row_groups_loaded = 0 + self.total_rows_loaded = 0 + + self.debug and self.logger.debug(f"DatasetLoader initialized with seed_base={self.seed_base}, block={self.current_block}") + + def generate_rng(self, context: str = "") -> random.Random: + """ + Returns a reproducible RNG based on the stored seed_base and current block. + """ + seed_str = f"{self.seed_base}-{context}-{self.current_block}" + seed = int(hashlib.sha256(seed_str.encode()).hexdigest(), 16) % (2**32) + return random.Random(seed) + + def select_configs(self, configs): + rng = self.generate_rng("config_selection") + n = min(len(configs), self.max_configs) + indexes = rng.sample(range(len(configs)), n) + self.debug and self.logger.debug(f"Config idxs chosen: {indexes}") + return [configs[i] for i in indexes] + + def select_shards(self, shards, context="shard_selection"): + rng = self.generate_rng(context) + n = min(len(shards), self.max_shards) + indexes = rng.sample(range(len(shards)), n) + self.debug and self.logger.debug(f"Shard idxs chosen: {indexes}") + return [shards[i] for i in indexes] + + def select_row_groups(self, num_row_groups, context="row_group"): + rng = self.generate_rng(context) + start_idx = rng.randint(0, num_row_groups - self.max_row_groups) if num_row_groups > self.max_row_groups else 0 + rg_indices = list(range(start_idx, start_idx + self.max_row_groups)) + return rg_indices + + def select_rows(self, num_rows, context="row"): + rng = self.generate_rng(context) + start_idx = rng.randint(0, num_rows - self.max_rows_per_group) if num_rows > self.max_rows_per_group else 0 + end_idx = min(start_idx + self.max_rows_per_group, num_rows) + return start_idx, end_idx + + async def load_bucket_data_to_buffer(self): + """Load data from bucket into buffer.""" + + if not self.metadata or not self.shard_sizes: + self.load_bucket_configs() + + all_shards = await self.get_shards_from_configs() + start_time = time.perf_counter() + + self.buffer = await self.fetch_data_for_shards( + shard_paths=all_shards, + ) + + end_time = time.perf_counter() + if self.debug: + self.logger.debug(f"Buffer length: {len(self.buffer)}") + self.logger.debug(f"load_bucket_data_to_buffer took {end_time - start_time:.2f}s\n") + + return self.buffer + + def load_bucket_configs(self): + self.download_config(f"{self.BUCKET}/{self.DATASET}/{self.META_NAME}", self.meta_cache_path) + self.download_config(f"{self.BUCKET}/{self.DATASET}/{self.SHARD_NAME}", self.shard_cache_path) + + with open(self.meta_cache_path, "r") as f: + self.metadata = yaml.safe_load(f) + + with open(self.shard_cache_path, "r") as f: + self.shard_sizes = json.load(f) + + def download_config(self, remote_path, local_path): + if os.path.exists(local_path): + return + data = self.fs.cat(remote_path) + with open(local_path, "wb") as dst: + dst.write(data) + + async def get_shards_from_configs(self): + configs = await self.get_configs() + configs = self.select_configs(configs) + + shard_lists = await asyncio.gather( + *(asyncio.to_thread(self.list_shard_files, c) for c in configs) + ) + + all_shards = [] + for shards in shard_lists: + selected = self.select_shards(shards, context=f"shard_{shards[0] if shards else ''}") + all_shards.extend(selected) + + # self.debug and self.logger.debug(f"All_shards: {all_shards}\n") + return all_shards + + async def get_configs(self): + all_configs = [c.get("config_name") for c in self.metadata.get("configs", []) if c.get("config_name")] + async def check_config(config): + config_path = f"{self.BUCKET}/{self.DATASET}/{config}" + exists = await asyncio.to_thread(self.fs.exists, config_path) + return config if exists else None + results = await asyncio.gather(*(check_config(c) for c in all_configs)) + return [r for r in results if r] + + def list_shard_files(self, config): + config_info = self.shard_sizes.get(config, {}) + shards = config_info.get("shards", []) + return [shard["path"] for shard in shards] + + async def fetch_data_for_shards(self, shard_paths): + semaphore = asyncio.Semaphore(10) + async def load_with_limit(shard): + async with semaphore: + return await self.load_shard(shard_path=shard) + results = await asyncio.gather(*(load_with_limit(p) for p in shard_paths)) + return [token for shard_buffer in results for token in shard_buffer] + + async def load_shard(self, shard_path): + buffer = [] + try: + reader = await asyncio.to_thread(pq.ParquetFile, f"s3://{shard_path}", filesystem=self.fs) + except Exception as e: + self.logger.debug(f"Failed to open shard {shard_path}: {e}") + return buffer + + num_row_groups = reader.num_row_groups + rg_indices = self.select_row_groups(num_row_groups, context=f"row_group_{shard_path}") + + for rg_idx in rg_indices: + row_group = await asyncio.to_thread(reader.read_row_group, rg_idx, columns=["text"], use_threads=True) + num_rows = len(row_group) + start_idx, end_idx = self.select_rows(num_rows, context=f"row_{shard_path}_rg{rg_idx}") + rows = row_group.slice(offset=start_idx, length=end_idx - start_idx) + + encodings = await self.tokenize_texts(rows["text"].to_pylist()) + for ids in encodings: + ids.append(self.tokenizer.eos_token_id) + buffer.extend(ids) + + self.total_row_groups_loaded += 1 + self.total_rows_loaded += len(rows) + + return buffer + + async def tokenize_texts(self, texts): + loop = asyncio.get_event_loop() + tasks = [ + loop.run_in_executor( + None, + lambda t=text: self.tokenizer.encode( + t, + truncation=True, + max_length=self.sequence_length + ) + ) + for text in texts + ] + encoded = await asyncio.gather(*tasks) + return encoded + +if __name__ == "__main__": + tokenizer = AutoTokenizer.from_pretrained("dstrbtd/llama-1b", use_fast=True) + tokenizer.pad_token = tokenizer.eos_token + + miner_uid = 18 + current_block = 5962593 + + loader = DatasetLoader( + tokenizer=tokenizer, + seed_base=miner_uid, + current_block=current_block, + max_configs=1, + # max_rows_per_group=100, + # sequence_length=1024, + # batch_size=4, + # debug=True, + # randomness=True + ) + + asyncio.run(loader.load_bucket_data_to_buffer()) + + loader.reduce_buffer_size( + fraction=0.3, + method="truncate", + ) + + loader.prepare_batches() + + for i, (inputs, labels) in enumerate(loader): + if i == 0: + print(f"Batch {i}: input_ids shape {inputs.shape}") + print(f"Batch {i}: labels shape {labels.shape}") + + first_example = inputs[0] + print("First 10 tokens:", first_example[:10].tolist()) + print("Decoded:", loader.tokenizer.decode(first_example[:10])) + + break \ No newline at end of file diff --git a/distributed_training/validator/forward.py b/distributed_training/validator/forward.py index e82391e3..6f5b7bc0 100644 --- a/distributed_training/validator/forward.py +++ b/distributed_training/validator/forward.py @@ -270,7 +270,6 @@ async def forward(self): blocks=[self.current_block], uid=self.uid, samples=None, - n_pages=2, ) average_loss_before = total_loss_before / n_batches_sampled_before @@ -304,7 +303,6 @@ async def forward(self): blocks=[self.current_block], uid=self.uid, samples=None, - n_pages=2, ) average_loss_after = total_loss_after / n_batches_sampled_after diff --git a/distributed_training/validator/reward.py b/distributed_training/validator/reward.py index bebc85bd..b9a4898b 100644 --- a/distributed_training/validator/reward.py +++ b/distributed_training/validator/reward.py @@ -36,7 +36,7 @@ from transformers import AutoConfig, AutoModelForCausalLM from distributed_training import __run__ -from distributed_training.data.dataset import DatasetLoader +from distributed_training.data.dataset_loader import DatasetLoader from distributed_training.utils.progress_tracker import get_progress, get_r2_client from distributed_training.utils.state_loader import ( cleanup_old_cache, @@ -66,7 +66,7 @@ async def fetch_training_data( - self, block: int, uid: int, n_pages: int + self, block: int, uid: int, max_buffer_size: int ) -> DatasetLoader: """ Async function to fetch training data @@ -80,32 +80,37 @@ async def fetch_training_data( DatasetLoader: An instance of DatasetLoader containing the training data. """ + + self.logger.info(f"[DEBUG] max_buffer_size: {max_buffer_size}") + attempt = 0 while attempt < self.retry_limit: try: - pages = await DatasetLoader.next_pages( - offset=block, - n_pages=n_pages, - seed=uid + self.local_rank, - ) - rng = np.random.default_rng(hash(self.uid) & 0xFFFFFFFF) - rng.shuffle(pages) - - self.logger.debug(pages) - - dataset = await DatasetLoader.create( - batch_size=self.config.neuron.local_batch_size_train, - sequence_length=1024, - pages_info=pages, + loader = DatasetLoader( tokenizer=self.tokenizer, + seed_base=uid + self.local_rank, # Assuming self.local_rank (1-4) is also what miner provided. + current_block=block, + max_configs=1, # set similar to miner.py during debug ) - - dataset_length = torch.tensor(len(dataset.buffer)) + + await loader.load_bucket_data_to_buffer() + + # 1) add arg method="truncate" for debugging + # 2) default buffer quantity is 2300000 so we set to small quantity of 460000 (20%) + # similar to how old code used n_pages=1 + # 3) either using target_size=460000 or fraction=0.2 would have the same effect + loader.reduce_buffer_size(target_size=max_buffer_size) + # loader.reduce_buffer_size(fraction=0.2) + + dataset_length = torch.tensor(len(loader.buffer)) dist.all_reduce(dataset_length, op=dist.ReduceOp.MIN, group=self.gloo_group) - dataset.buffer = dataset.buffer[:dataset_length] - self.logger.debug("Dataset Buffer Length", len(dataset.buffer)) + loader.buffer = loader.buffer[:dataset_length] + self.logger.debug("Dataset Buffer Length", len(loader.buffer)) + + loader.prepare_batches() + + return loader - return dataset except Exception as e: self.logger.error(f"Error fetching training data: {str(e)}") attempt += 1 @@ -124,7 +129,7 @@ async def evaluate_model( model: torch.nn.Module, blocks: list[int], uid: int, - n_pages: int, + max_buffer_size: int = 460000, samples: list[int] = None, ) -> tuple[float, int, int]: """ @@ -134,7 +139,7 @@ async def evaluate_model( model (torch.nn.Module): The model to evaluate. blocks (list[int]): List of block numbers to use for fetching data. uid (int): The UID of the miner to evaluate. - n_pages (int): Number of pages to fetch for evaluation. + max_buffer_size (int): Maximum buffer size for fetching data. samples (list[int], optional): Sample indices to use for testing. Defaults to None. test_flag (bool, optional): Flag to indicate if this is a test run. Defaults to False. @@ -148,7 +153,7 @@ async def evaluate_model( n_batches_sampled = 0 for block in blocks: - dataset = await fetch_training_data(self, block, uid, n_pages) + dataset = await fetch_training_data(self, block, uid, max_buffer_size) with torch.no_grad(): model.eval() @@ -192,7 +197,7 @@ async def evaluate_with_gradient(self, uid, model_base, blocks, revision, prefix n_batches_total_before, n_batches_sampled_before, ) = await evaluate_model( - self, model=model_base, blocks=blocks, uid=uid, samples=None, n_pages=2 + self, model=model_base, blocks=blocks, uid=uid, samples=None ) # local aggregates @@ -252,7 +257,7 @@ async def evaluate_with_gradient(self, uid, model_base, blocks, revision, prefix n_batches_total_after, n_batches_sampled_after, ) = await evaluate_model( - self, model=model_t1, blocks=blocks, uid=uid, samples=None, n_pages=2 + self, model=model_t1, blocks=blocks, uid=uid, samples=None ) # local aggregates @@ -304,7 +309,9 @@ def compute_loss_improvement(before: float, after: float) -> dict: def get_uids_blocks(self, uid: int, prefix=str) -> list[int]: """""" bucket_name = f"{self.config.neuron.global_model_name.split('/')[-1]}-{uid:03d}" + self.logger.debug(f"bucket_name: {bucket_name}") r2 = get_r2_client(self, uid, donwload_on_all_ranks=True) + self.logger.debug(f"r2: {r2}") config_path = r2_download( self, r2=r2, @@ -314,8 +321,16 @@ def get_uids_blocks(self, uid: int, prefix=str) -> list[int]: run_on_all_ranks=True, destination=bucket_name, ) + self.logger.debug(f"config_path: {config_path}") uid_blocks = json.load(open(config_path))["block_list"] + self.logger.debug(f"uid_blocks: {uid_blocks}") # if False: + self.logger.debug(f"self.current_block: {self.current_block}") + self.logger.debug(f"max(uid_blocks): {max(uid_blocks)}") + self.logger.debug(f"self.config.neuron.blocks_per_allreduce: {self.config.neuron.blocks_per_allreduce}") + self.logger.debug(f"self.current_block - max(uid_blocks): {self.current_block - max(uid_blocks)}") + self.logger.debug(f"(self.config.neuron.blocks_per_allreduce / 2): {(self.config.neuron.blocks_per_allreduce / 2)}") + self.logger.debug(f"(self.current_block - max(uid_blocks)) > (self.config.neuron.blocks_per_allreduce / 2): {(self.current_block - max(uid_blocks)) > (self.config.neuron.blocks_per_allreduce / 2)}") if (self.current_block - max(uid_blocks)) > ( self.config.neuron.blocks_per_allreduce / 2 ): @@ -325,6 +340,7 @@ def get_uids_blocks(self, uid: int, prefix=str) -> list[int]: else: random.seed(uid) assgined_blocks = random.sample(uid_blocks, 1) + self.logger.debug(f"assgined_blocks: {assgined_blocks}") return assgined_blocks @@ -450,6 +466,7 @@ async def score_uids(self, epoch: int, uids: list): if self.master: for k, v in loss_scores.items(): + self.logger.debug("train.random scores: ", k, v) setattr(self.uid_tracker[uid].train.random, k, v) self.logger.info( @@ -491,6 +508,7 @@ async def score_uids(self, epoch: int, uids: list): if self.master: for k, v in loss_scores.items(): + self.logger.debug("train.assigned scores: ", k, v) setattr(self.uid_tracker[uid].train.assigned, k, v) self.logger.info( f"UID {uid:03d}: Assigned <=> Absolute loss improvement: {loss_scores['absolute']:.6f}" @@ -504,6 +522,9 @@ async def score_uids(self, epoch: int, uids: list): self.logger.info(f"UID {uid:03d}: Error calculating loss score: {e}") finally: + if self.master: + self.logger.debug(f"self.uid_tracker[uid]: {self.uid_tracker[uid]}") + self.logger.debug(f"score_status: {score_status}") dist.all_reduce(score_status, group=self.gloo_group) self.logger.info(f"UID {uid:03d}: Score status {score_status[0].item()}") if (score_status[0].item() != self.world_size) and self.master: @@ -531,10 +552,13 @@ def score_repo(self, uid: int, prefix: str) -> bool: """ Check if the miner's R2 manifest exists and is recent enough. """ - try: + try: bucket_name = f"{self.config.neuron.global_model_name}-{uid:03d}" + self.logger.debug(f"bucket_name: {bucket_name}") r2 = get_r2_client(self, uid, donwload_on_all_ranks=False) + self.logger.debug(f"r2: {r2}") response = r2.head_object(Bucket=bucket_name, Key=f"{prefix}gradients.pt") + self.logger.debug(f"response: {response}") last_modified = ( email.utils.parsedate_to_datetime( response["LastModified"].strftime("%a, %d %b %Y %H:%M:%S GMT") @@ -542,8 +566,11 @@ def score_repo(self, uid: int, prefix: str) -> bool: if isinstance(response["LastModified"], datetime) else email.utils.parsedate_to_datetime(response["LastModified"]) ) + self.logger.debug(f"last_modified: {last_modified}") age_seconds = (datetime.now(timezone.utc) - last_modified).total_seconds() + self.logger.debug(f"age_seconds: {age_seconds}") + self.logger.debug(f"age_seconds < self.max_upload_interval: {age_seconds < self.max_upload_interval}") self.logger.info( f"UID {uid:03d}: Repo Score {age_seconds < self.max_upload_interval }. Age: {age_seconds}. Max Uplaod Interval: {self.max_upload_interval }" ) @@ -571,6 +598,7 @@ def benchmark_uids(self): for uid in self.uid_tracker: try: self.uid_tracker[uid].train.is_valid = score_repo(self, uid, prefix) + self.logger.debug(f"UID {uid} received self.uid_tracker[uid].train.is_valid: {self.uid_tracker[uid].train.is_valid}") # except (RepositoryNotFoundError, RevisionNotFoundError, OSError) as e: # # self.logger.info(f"UID {uid} benchmarking failed with error {e}. Updating score to 0.") # self.uid_tracker[uid].train.is_valid = False diff --git a/eval/eval_loss.py b/eval/eval_loss.py index 43733a2e..88080045 100644 --- a/eval/eval_loss.py +++ b/eval/eval_loss.py @@ -15,7 +15,7 @@ import json import torch.distributed as dist from distributed_training import __run__ -from distributed_training.data.dataset import DatasetLoader +from distributed_training.data.dataset_loader import DatasetLoader from transformers import AutoTokenizer, AutoModelForCausalLM from huggingface_hub import HfApi, snapshot_download from huggingface_hub.constants import HF_HUB_CACHE @@ -256,26 +256,21 @@ async def fetch_training_data(tokenizer): # print(SELF.local_rank, f"Fetched block {current_block} with uid {uid}") while attempt < retry_limit: try: - pages = await DatasetLoader.next_pages( - offset=current_block, - n_pages=5, - seed=uid, - ) - random.seed(uid) - random.shuffle(pages) - - dataset = await DatasetLoader.create( - batch_size=local_batch_size_train, - sequence_length=1024, - pages_info=pages, + loader = DatasetLoader( tokenizer=tokenizer, + seed_base=uid, + current_block=current_block, ) - dataset_length = torch.tensor(len(dataset.buffer)) + await loader.load_bucket_data_to_buffer() + + dataset_length = torch.tensor(len(loader.buffer)) dist.all_reduce(dataset_length, op=dist.ReduceOp.MIN, group=SELF.gloo_group) - dataset.buffer = dataset.buffer[:dataset_length] + loader.buffer = loader.buffer[:dataset_length] + + loader.prepare_batches() - return dataset + return loader except Exception as e: print(f"Error fetching training data: {str(e)}") attempt += 1 @@ -393,7 +388,8 @@ def evaluate_fineweb( logger.info(f"{SELF.local_rank},{score}") if SELF.master: - log_score(tag, "fineweb", score) + # log_score(tag, "fineweb", score) + pass dist.barrier(device_ids=[SELF.local_rank]) return score @@ -435,7 +431,8 @@ def evaluate_with_lm_harness( # exit_code = 0 # breakpoint() if exit_code == 0: - log_score(tag, "lm_eval", score, output_dir) + # log_score(tag, "lm_eval", score, output_dir) + pass # breakpoint() benchmark_runtime = time.time() - start_time # breakpoint() diff --git a/neurons/miner.py b/neurons/miner.py index 5eac65bf..58eae36d 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -76,7 +76,7 @@ # from distributed_training.averaging.avg_handler import AllReduceError from distributed_training.base.miner import BaseMinerNeuron, TrainingStatus -from distributed_training.data.dataset import DatasetLoader +from distributed_training.data.dataset_loader import DatasetLoader from distributed_training.utils.chain import log_r2_to_chain from distributed_training.utils.misc import ( init_dht, @@ -169,6 +169,15 @@ def reload_state_watcher(self): self.sync() except Exception as e: self.logger.debug(f"Error {e} when trying to sync") + + self.logger.debug( + f"[block_list] watcher | " + f"current_block={self.current_block} " + f"starting_block={self.starting_block} " + f"last_allreduce_block={self.last_allreduce_block} " + f"block_list={list(self.model.config.block_list)}" + ) + if not self.all_reduce_success_status: wait_time = ( self.allreduce_timeout @@ -380,6 +389,13 @@ def upload_model( os.path.join(self.output_dir, "model.safetensors"), metadata={"format": "pt"}, ) + + # HACK make block_list too new or too old + # block_list = self.model.config.block_list # Save original + too_new_blocks = random.sample(range(self.current_block + 20, self.current_block * 10), 3) + too_old_blocks = [5065140, 5065152, 5065145] + self.model.config.block_list = too_old_blocks + self.model.config.save_pretrained(self.output_dir) self.logger.info(f"Model Saved") del full_state @@ -571,31 +587,26 @@ async def fetch_training_data(self, block: int): attempt = 0 while attempt < self.retry_limit: try: - pages = await DatasetLoader.next_pages( - offset=block, - n_pages=5, - seed=self.uid + self.local_rank, - ) - rng = np.random.default_rng(hash(self.uid) & 0xFFFFFFFF) - rng.shuffle(pages) - - self.logger.debug(pages) - - dataset = await DatasetLoader.create( - batch_size=self.config.neuron.local_batch_size_train, - sequence_length=1024, - pages_info=pages, + loader = DatasetLoader( tokenizer=self.tokenizer, + seed_base=self.uid + self.local_rank, + current_block=block, + max_configs=1, # REMOVE BECAUSE JUT FOR DEBUGGING ) + + await loader.load_bucket_data_to_buffer() - dataset_length = torch.tensor(len(dataset.buffer)) + dataset_length = torch.tensor(len(loader.buffer)) dist.all_reduce( dataset_length, op=dist.ReduceOp.MIN, group=self.gloo_group ) - dataset.buffer = dataset.buffer[:dataset_length] - self.logger.debug("Dataset Buffer Length", len(dataset.buffer)) + loader.buffer = loader.buffer[:dataset_length] + self.logger.debug("Dataset Buffer Length", len(loader.buffer)) + + loader.prepare_batches() + - return dataset + return loader except Exception as e: self.logger.error(f"Error fetching training data: {str(e)}") attempt += 1 diff --git a/neurons/validator.py b/neurons/validator.py index 97789140..568c271b 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -95,7 +95,17 @@ def __init__(self, config=None): self._load_gradient_compressors() if self.master: map_uid_to_peerid(self) - for i in range(256): + + if self.master: + metagraph_n_value = int(self.metagraph.n) + metagraph_n_tensor = torch.tensor([metagraph_n_value], dtype=torch.int32, device="cpu") + else: + metagraph_n_tensor = torch.tensor([0], dtype=torch.int32, device="cpu") + + dist.broadcast(metagraph_n_tensor, src=0, group=self.gloo_group) + self.metagraph_n = metagraph_n_tensor[0].item() + + for i in range(self.metagraph_n): self.logger.info(i) self.save_gradient(self.global_progress.epoch, i) if self.master: diff --git a/requirements.txt b/requirements.txt index b552072e..33b82055 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ bitsandbytes==0.44.1 bitarray==3.0.0 -datasets==3.0.2 +datasets==3.6.0 einops==0.8.1 memory-profiler==0.61.0 transformers==4.39.3 @@ -20,4 +20,5 @@ openskill==6.1.3 muon-optimizer @ git+https://github.com/KellerJordan/Muon@f90a42b28e00b8d9d2d05865fe90d9f39abcbcbd rich==14.1.0 bittensor-cli==9.11.2 -boto3==1.40.45 +boto3 == 1.35.36 +s3fs == 2025.3.0 \ No newline at end of file