From 1953c7f6a86344a35782eed44e24a307f0e4a184 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Fri, 12 Dec 2025 15:15:20 +0100 Subject: [PATCH 01/11] wip --- mostlyai/engine/_tabular/transformer.py | 279 +++++++++++ mostlyai/engine/_tabular/transformer_data.py | 374 ++++++++++++++ .../engine/_tabular/transformer_training.py | 456 ++++++++++++++++++ .../test_transformer_synthetic.py | 221 +++++++++ scripts/foundational/train_transformer_t4.py | 149 ++++++ 5 files changed, 1479 insertions(+) create mode 100644 mostlyai/engine/_tabular/transformer.py create mode 100644 mostlyai/engine/_tabular/transformer_data.py create mode 100644 mostlyai/engine/_tabular/transformer_training.py create mode 100644 scripts/foundational/test_transformer_synthetic.py create mode 100644 scripts/foundational/train_transformer_t4.py diff --git a/mostlyai/engine/_tabular/transformer.py b/mostlyai/engine/_tabular/transformer.py new file mode 100644 index 00000000..027e99f3 --- /dev/null +++ b/mostlyai/engine/_tabular/transformer.py @@ -0,0 +1,279 @@ +""" +Foundational Tabular Transformer for categorical data. + +A unified architecture for pre-training (MCM) and classification on tabular data. +Uses feature hashing for universal value representation and no positional encoding +(permutation-invariant over columns). +""" + +from dataclasses import dataclass +from typing import Optional + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +# Special token IDs +PAD_TOKEN_ID = 0 +MASK_TOKEN_ID = 1 +SPECIAL_TOKENS_COUNT = 2 # PAD and MASK + + +@dataclass +class TabularTransformerConfig: + """Configuration for FoundationalTabularTransformer.""" + + hash_vocab_size: int = 32768 + max_columns: int = 64 + hidden_size: int = 128 + num_layers: int = 4 + num_heads: int = 4 + intermediate_size: int = 512 + dropout: float = 0.1 + attention_dropout: float = 0.1 + + +class TabularEmbeddings(nn.Module): + """ + Embedding layer for tabular data. + + Maps hashed value IDs to dense vectors. No positional encoding - + the model is permutation-invariant over columns. + """ + + def __init__(self, config: TabularTransformerConfig): + super().__init__() + self.config = config + + # Value embeddings: hash_vocab_size + special tokens (PAD, MASK) + total_vocab_size = config.hash_vocab_size + SPECIAL_TOKENS_COUNT + self.value_embeddings = nn.Embedding( + num_embeddings=total_vocab_size, + embedding_dim=config.hidden_size, + padding_idx=PAD_TOKEN_ID, + ) + + self.layer_norm = nn.LayerNorm(config.hidden_size) + self.dropout = nn.Dropout(config.dropout) + + def forward(self, value_ids: torch.Tensor) -> torch.Tensor: + """ + Embed value IDs. + + Args: + value_ids: (batch_size, seq_len) tensor of hashed value IDs + + Returns: + embeddings: (batch_size, seq_len, hidden_size) + """ + embeddings = self.value_embeddings(value_ids) + embeddings = self.layer_norm(embeddings) + embeddings = self.dropout(embeddings) + return embeddings + + +class FoundationalTabularTransformer(nn.Module): + """ + Foundational transformer for tabular data. + + Unified architecture for: + - Pre-training: Masked Cell Modeling (MCM) - mask random cells, predict original + - Classification: Mask target column, predict at that position + + Key design choices: + - Feature hashing: Values are hashed with column name to create unique IDs + - No positional encoding: Columns are treated as an unordered set + - No pooling: Predictions are made directly at masked positions + """ + + def __init__(self, config: TabularTransformerConfig): + super().__init__() + self.config = config + + # Embeddings (no positional encoding) + self.embeddings = TabularEmbeddings(config) + + # Transformer encoder with Pre-LN (norm_first=True) + encoder_layer = nn.TransformerEncoderLayer( + d_model=config.hidden_size, + nhead=config.num_heads, + dim_feedforward=config.intermediate_size, + dropout=config.dropout, + activation="gelu", + batch_first=True, + norm_first=True, # Pre-LN for better training stability + ) + self.encoder = nn.TransformerEncoder( + encoder_layer, + num_layers=config.num_layers, + ) + + # Output projection to vocabulary + total_vocab_size = config.hash_vocab_size + SPECIAL_TOKENS_COUNT + self.output_projection = nn.Linear(config.hidden_size, total_vocab_size) + + # Initialize weights + self._init_weights() + + def _init_weights(self): + """Initialize weights with Xavier uniform.""" + for module in self.modules(): + if isinstance(module, nn.Linear): + nn.init.xavier_uniform_(module.weight) + if module.bias is not None: + nn.init.zeros_(module.bias) + elif isinstance(module, nn.Embedding): + nn.init.normal_(module.weight, mean=0.0, std=0.02) + if module.padding_idx is not None: + nn.init.zeros_(module.weight[module.padding_idx]) + + def forward( + self, + value_ids: torch.Tensor, + attention_mask: torch.Tensor, + labels: Optional[torch.Tensor] = None, + ) -> dict[str, torch.Tensor]: + """ + Forward pass. + + Args: + value_ids: (batch_size, seq_len) hashed value IDs (with MASK tokens for prediction) + attention_mask: (batch_size, seq_len) True for real tokens, False for padding + labels: (batch_size, seq_len) original value IDs for loss computation + Use -100 for positions to ignore in loss + + Returns: + dict with: + - logits: (batch_size, seq_len, vocab_size) predictions at each position + - loss: scalar loss (if labels provided) + - hidden_states: (batch_size, seq_len, hidden_size) encoder outputs + """ + # Embed inputs + hidden_states = self.embeddings(value_ids) + + # Create attention mask for transformer (True = masked/ignored) + # PyTorch transformer expects True for positions to IGNORE + src_key_padding_mask = ~attention_mask + + # Encode + hidden_states = self.encoder( + hidden_states, + src_key_padding_mask=src_key_padding_mask, + ) + + # Project to vocabulary + logits = self.output_projection(hidden_states) + + output = { + "logits": logits, + "hidden_states": hidden_states, + } + + # Compute loss if labels provided + if labels is not None: + # Flatten for cross-entropy + loss = F.cross_entropy( + logits.view(-1, logits.size(-1)), + labels.view(-1), + ignore_index=-100, + ) + output["loss"] = loss + + return output + + def predict_masked( + self, + value_ids: torch.Tensor, + attention_mask: torch.Tensor, + masked_positions: torch.Tensor, + ) -> torch.Tensor: + """ + Get predictions at masked positions. + + Args: + value_ids: (batch_size, seq_len) with MASK tokens at positions to predict + attention_mask: (batch_size, seq_len) + masked_positions: (batch_size,) indices of masked positions to predict + + Returns: + probs: (batch_size, vocab_size) probability distribution at masked positions + """ + output = self.forward(value_ids, attention_mask) + logits = output["logits"] + + # Gather logits at masked positions + batch_size = logits.size(0) + batch_indices = torch.arange(batch_size, device=logits.device) + masked_logits = logits[batch_indices, masked_positions] # (batch_size, vocab_size) + + # Convert to probabilities + probs = F.softmax(masked_logits, dim=-1) + return probs + + @staticmethod + def hash_value(value: str, column_name: str, vocab_size: int = 32768) -> int: + """ + Hash a categorical value to an integer ID. + + Combines column name and value to create unique hashes even for + same values in different columns. + + Args: + value: The categorical value (converted to string) + column_name: Name of the column + vocab_size: Size of hash vocabulary + + Returns: + Hash ID in range [SPECIAL_TOKENS_COUNT, vocab_size + SPECIAL_TOKENS_COUNT) + """ + combined = f"{column_name}:{value}" + # Use Python's built-in hash (for simplicity; can switch to mmh3 for production) + hash_int = hash(combined) + # Map to vocabulary range, offset by special tokens + return (hash_int % vocab_size) + SPECIAL_TOKENS_COUNT + + +# Model size presets +MODEL_CONFIGS = { + "small": TabularTransformerConfig( + hash_vocab_size=32768, + max_columns=64, + hidden_size=128, + num_layers=4, + num_heads=4, + intermediate_size=512, + ), + "medium": TabularTransformerConfig( + hash_vocab_size=65536, + max_columns=128, + hidden_size=256, + num_layers=6, + num_heads=8, + intermediate_size=1024, + ), + "large": TabularTransformerConfig( + hash_vocab_size=131072, + max_columns=256, + hidden_size=512, + num_layers=12, + num_heads=8, + intermediate_size=2048, + ), +} + + +def create_model(size: str = "small") -> FoundationalTabularTransformer: + """ + Create a model with preset configuration. + + Args: + size: One of "small", "medium", "large" + + Returns: + Initialized FoundationalTabularTransformer + """ + if size not in MODEL_CONFIGS: + raise ValueError(f"Unknown model size: {size}. Choose from {list(MODEL_CONFIGS.keys())}") + config = MODEL_CONFIGS[size] + return FoundationalTabularTransformer(config) diff --git a/mostlyai/engine/_tabular/transformer_data.py b/mostlyai/engine/_tabular/transformer_data.py new file mode 100644 index 00000000..6bef4c5c --- /dev/null +++ b/mostlyai/engine/_tabular/transformer_data.py @@ -0,0 +1,374 @@ +""" +Data processing utilities for the Foundational Tabular Transformer. + +Provides tokenization (feature hashing), dataset wrappers, and masking utilities +for Masked Cell Modeling (MCM) pre-training. +""" + +from typing import Optional + +import pandas as pd +import torch +from torch.utils.data import Dataset + +from .transformer import MASK_TOKEN_ID, PAD_TOKEN_ID, SPECIAL_TOKENS_COUNT + + +class TabularTokenizer: + """ + Tokenizer for tabular data using feature hashing. + + Converts categorical values to integer IDs by hashing the combination + of column name and value. This enables handling arbitrary vocabularies + without pre-building a vocabulary. + """ + + def __init__(self, vocab_size: int = 32768): + """ + Initialize tokenizer. + + Args: + vocab_size: Size of hash vocabulary (excluding special tokens) + """ + self.vocab_size = vocab_size + + def hash_value(self, value: str, column_name: str) -> int: + """ + Hash a categorical value to an integer ID. + + Args: + value: The categorical value (will be converted to string) + column_name: Name of the column + + Returns: + Hash ID in range [SPECIAL_TOKENS_COUNT, vocab_size + SPECIAL_TOKENS_COUNT) + """ + combined = f"{column_name}:{value}" + hash_int = hash(combined) + return (hash_int % self.vocab_size) + SPECIAL_TOKENS_COUNT + + def tokenize_row(self, row: dict, columns: list[str]) -> list[int]: + """ + Tokenize a single row. + + Args: + row: Dictionary mapping column names to values + columns: Ordered list of column names to include + + Returns: + List of hashed value IDs + """ + return [self.hash_value(str(row[col]), col) for col in columns] + + def tokenize_dataframe( + self, + df: pd.DataFrame, + columns: Optional[list[str]] = None, + max_columns: int = 64, + ) -> tuple[torch.Tensor, torch.Tensor, list[str]]: + """ + Tokenize an entire DataFrame. + + Args: + df: pandas DataFrame to tokenize + columns: Column names to use (defaults to all columns) + max_columns: Maximum number of columns to include + + Returns: + Tuple of: + - value_ids: (num_rows, num_cols) tensor of hashed IDs + - attention_mask: (num_rows, num_cols) tensor of True for real values + - columns: List of column names used + """ + if columns is None: + columns = df.columns.tolist() + + # Truncate to max columns + columns = columns[:max_columns] + + # Tokenize all rows + value_ids = [] + for _, row in df.iterrows(): + row_ids = self.tokenize_row(row.to_dict(), columns) + value_ids.append(row_ids) + + value_ids = torch.tensor(value_ids, dtype=torch.long) + attention_mask = torch.ones_like(value_ids, dtype=torch.bool) + + return value_ids, attention_mask, columns + + def pad_batch( + self, + value_ids_list: list[torch.Tensor], + attention_masks_list: list[torch.Tensor], + max_length: Optional[int] = None, + ) -> tuple[torch.Tensor, torch.Tensor]: + """ + Pad a batch of tokenized rows to the same length. + + Args: + value_ids_list: List of (seq_len,) tensors + attention_masks_list: List of (seq_len,) tensors + max_length: Pad to this length (defaults to max in batch) + + Returns: + Tuple of: + - value_ids: (batch_size, max_length) padded tensor + - attention_mask: (batch_size, max_length) mask tensor + """ + if max_length is None: + max_length = max(v.size(0) for v in value_ids_list) + + batch_size = len(value_ids_list) + padded_ids = torch.full((batch_size, max_length), PAD_TOKEN_ID, dtype=torch.long) + padded_mask = torch.zeros(batch_size, max_length, dtype=torch.bool) + + for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): + length = ids.size(0) + padded_ids[i, :length] = ids + padded_mask[i, :length] = mask + + return padded_ids, padded_mask + + +class TabularDataset(Dataset): + """ + PyTorch Dataset wrapper for tabular data. + + Stores pre-tokenized data for efficient batch loading during training. + """ + + def __init__( + self, + df: pd.DataFrame, + tokenizer: TabularTokenizer, + columns: Optional[list[str]] = None, + max_columns: int = 64, + ): + """ + Initialize dataset. + + Args: + df: pandas DataFrame with categorical data + tokenizer: TabularTokenizer instance + columns: Column names to use (defaults to categorical columns) + max_columns: Maximum number of columns + """ + self.tokenizer = tokenizer + + # Select categorical columns if not specified + if columns is None: + columns = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() + + self.columns = columns[:max_columns] + + # Tokenize entire dataframe + self.value_ids, self.attention_mask, self.columns = tokenizer.tokenize_dataframe( + df, self.columns, max_columns + ) + + def __len__(self) -> int: + return len(self.value_ids) + + def __getitem__(self, idx: int) -> dict[str, torch.Tensor]: + return { + "value_ids": self.value_ids[idx], + "attention_mask": self.attention_mask[idx], + } + + +class MultiTableDataset(Dataset): + """ + Dataset that samples rows from multiple tables. + + For pre-training on diverse tabular data (e.g., T4 dataset). + Each table may have different columns. + """ + + def __init__( + self, + tables: list[pd.DataFrame], + tokenizer: TabularTokenizer, + max_columns: int = 64, + rows_per_table: Optional[int] = None, + ): + """ + Initialize multi-table dataset. + + Args: + tables: List of DataFrames + tokenizer: TabularTokenizer instance + max_columns: Maximum columns per table + rows_per_table: Max rows to sample per table (None = all) + """ + self.tokenizer = tokenizer + self.max_columns = max_columns + self.samples = [] # List of (value_ids, attention_mask) + + for df in tables: + # Select categorical columns + cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() + if len(cat_cols) < 2: + continue # Skip tables with too few categorical columns + + cat_cols = cat_cols[:max_columns] + + # Sample rows if needed + if rows_per_table is not None and len(df) > rows_per_table: + df = df.sample(n=rows_per_table, random_state=42) + + # Tokenize + value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(df, cat_cols, max_columns) + + # Add each row as a sample + for i in range(len(value_ids)): + self.samples.append((value_ids[i], attention_mask[i])) + + def __len__(self) -> int: + return len(self.samples) + + def __getitem__(self, idx: int) -> dict[str, torch.Tensor]: + value_ids, attention_mask = self.samples[idx] + return { + "value_ids": value_ids, + "attention_mask": attention_mask, + } + + +def apply_masking( + value_ids: torch.Tensor, + attention_mask: torch.Tensor, + mask_prob: float = 0.15, + random_prob: float = 0.1, + keep_prob: float = 0.1, + vocab_size: int = 32768, +) -> tuple[torch.Tensor, torch.Tensor]: + """ + Apply BERT-style masking to value IDs for MCM pre-training. + + Of the selected positions: + - 80% are replaced with [MASK] + - 10% are replaced with a random token + - 10% are kept as original + + Args: + value_ids: (batch_size, seq_len) original value IDs + attention_mask: (batch_size, seq_len) True for real tokens + mask_prob: Probability of selecting a position for masking + random_prob: Of selected positions, probability of random replacement + keep_prob: Of selected positions, probability of keeping original + vocab_size: Size of vocabulary for random replacement + + Returns: + Tuple of: + - masked_ids: (batch_size, seq_len) with masking applied + - labels: (batch_size, seq_len) original IDs at masked positions, -100 elsewhere + """ + device = value_ids.device + batch_size, seq_len = value_ids.shape + + # Only mask positions where attention_mask is True (real tokens) + can_mask = attention_mask.clone() + + # Randomly select positions to mask + mask_probs = torch.full_like(value_ids, mask_prob, dtype=torch.float) + mask_probs = mask_probs * can_mask.float() + selected = torch.bernoulli(mask_probs).bool() + + # Create labels: original values at selected positions, -100 elsewhere + labels = value_ids.clone() + labels[~selected] = -100 + + # Create masked input + masked_ids = value_ids.clone() + + # Determine which selected positions get which treatment + # 80% -> [MASK], 10% -> random, 10% -> keep original + rand_vals = torch.rand(batch_size, seq_len, device=device) + + # [MASK] replacement (80% of selected) + mask_replace = selected & (rand_vals < (1 - random_prob - keep_prob)) + masked_ids[mask_replace] = MASK_TOKEN_ID + + # Random replacement (10% of selected) + random_replace = selected & (rand_vals >= (1 - random_prob - keep_prob)) & (rand_vals < (1 - keep_prob)) + random_tokens = torch.randint( + SPECIAL_TOKENS_COUNT, + vocab_size + SPECIAL_TOKENS_COUNT, + (batch_size, seq_len), + device=device, + ) + masked_ids[random_replace] = random_tokens[random_replace] + + # Keep original (10% of selected) - already in masked_ids + + return masked_ids, labels + + +def mask_target_column( + value_ids: torch.Tensor, + attention_mask: torch.Tensor, + target_column_idx: int, +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Mask a specific column for classification inference. + + Args: + value_ids: (batch_size, seq_len) original value IDs + attention_mask: (batch_size, seq_len) attention mask + target_column_idx: Index of column to mask + + Returns: + Tuple of: + - masked_ids: (batch_size, seq_len) with target column masked + - labels: (batch_size, seq_len) original IDs at target position, -100 elsewhere + - target_positions: (batch_size,) indices of target column + """ + batch_size = value_ids.size(0) + + # Create masked input + masked_ids = value_ids.clone() + masked_ids[:, target_column_idx] = MASK_TOKEN_ID + + # Create labels + labels = torch.full_like(value_ids, -100) + labels[:, target_column_idx] = value_ids[:, target_column_idx] + + # Target positions + target_positions = torch.full((batch_size,), target_column_idx, dtype=torch.long, device=value_ids.device) + + return masked_ids, labels, target_positions + + +def collate_fn(batch: list[dict[str, torch.Tensor]]) -> dict[str, torch.Tensor]: + """ + Collate function for DataLoader. + + Pads sequences to the same length within a batch. + + Args: + batch: List of samples from TabularDataset + + Returns: + Batched and padded tensors + """ + value_ids_list = [item["value_ids"] for item in batch] + attention_masks_list = [item["attention_mask"] for item in batch] + + # Find max length in batch + max_length = max(v.size(0) for v in value_ids_list) + + # Pad + batch_size = len(batch) + padded_ids = torch.full((batch_size, max_length), PAD_TOKEN_ID, dtype=torch.long) + padded_mask = torch.zeros(batch_size, max_length, dtype=torch.bool) + + for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): + length = ids.size(0) + padded_ids[i, :length] = ids + padded_mask[i, :length] = mask + + return { + "value_ids": padded_ids, + "attention_mask": padded_mask, + } diff --git a/mostlyai/engine/_tabular/transformer_training.py b/mostlyai/engine/_tabular/transformer_training.py new file mode 100644 index 00000000..3a0a9c47 --- /dev/null +++ b/mostlyai/engine/_tabular/transformer_training.py @@ -0,0 +1,456 @@ +""" +Training utilities for the Foundational Tabular Transformer. + +Provides pre-training (MCM) and classification functions. +""" + +import logging +from pathlib import Path +from typing import Iterator, Optional + +import pandas as pd +import torch +from torch.optim import AdamW +from torch.optim.lr_scheduler import CosineAnnealingLR +from torch.utils.data import DataLoader + +from .transformer import ( + FoundationalTabularTransformer, + TabularTransformerConfig, + create_model, +) +from .transformer_data import ( + MultiTableDataset, + TabularDataset, + TabularTokenizer, + apply_masking, + collate_fn, + mask_target_column, +) + +_LOG = logging.getLogger(__name__) + + +def pretrain_mcm( + model: FoundationalTabularTransformer, + tables: list[pd.DataFrame], + num_epochs: int = 10, + batch_size: int = 256, + mask_prob: float = 0.15, + learning_rate: float = 1e-4, + weight_decay: float = 0.01, + warmup_steps: int = 1000, + max_columns: int = 64, + device: Optional[str] = None, + checkpoint_dir: Optional[Path] = None, + log_every: int = 100, +) -> FoundationalTabularTransformer: + """ + Pre-train model with Masked Cell Modeling (MCM) on multiple tables. + + Args: + model: The transformer model to train + tables: List of DataFrames to train on + num_epochs: Number of training epochs + batch_size: Batch size + mask_prob: Probability of masking each cell + learning_rate: Peak learning rate + weight_decay: AdamW weight decay + warmup_steps: Linear warmup steps + max_columns: Maximum columns per table + device: Device to train on (defaults to CUDA if available) + checkpoint_dir: Directory to save checkpoints + log_every: Log metrics every N steps + + Returns: + Trained model + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + + model = model.to(device) + model.train() + + # Create tokenizer and dataset + tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) + dataset = MultiTableDataset(tables, tokenizer, max_columns=max_columns) + + if len(dataset) == 0: + raise ValueError("No valid samples found in tables. Ensure tables have categorical columns.") + + _LOG.info(f"Pre-training on {len(dataset)} samples from {len(tables)} tables") + + dataloader = DataLoader( + dataset, + batch_size=batch_size, + shuffle=True, + collate_fn=collate_fn, + num_workers=0, # Keep simple for now + ) + + # Optimizer and scheduler + optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) + total_steps = len(dataloader) * num_epochs + scheduler = CosineAnnealingLR(optimizer, T_max=total_steps) + + # Training loop + global_step = 0 + for epoch in range(num_epochs): + epoch_loss = 0.0 + num_batches = 0 + + for batch in dataloader: + value_ids = batch["value_ids"].to(device) + attention_mask = batch["attention_mask"].to(device) + + # Apply masking + masked_ids, labels = apply_masking( + value_ids, + attention_mask, + mask_prob=mask_prob, + vocab_size=model.config.hash_vocab_size, + ) + + # Skip batch if no tokens were masked (would cause NaN loss) + num_masked = (labels != -100).sum() + if num_masked == 0: + continue + + # Forward pass + optimizer.zero_grad() + output = model(masked_ids, attention_mask, labels=labels) + loss = output["loss"] + + # Skip if loss is NaN (shouldn't happen now but safety check) + if torch.isnan(loss): + _LOG.warning("NaN loss detected, skipping batch") + continue + + # Backward pass + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + optimizer.step() + scheduler.step() + + epoch_loss += loss.item() + num_batches += 1 + global_step += 1 + + # Logging + if global_step % log_every == 0: + avg_loss = epoch_loss / num_batches + lr = scheduler.get_last_lr()[0] + _LOG.info(f"Step {global_step}, Epoch {epoch + 1}, Loss: {avg_loss:.4f}, LR: {lr:.2e}") + + # End of epoch logging + avg_epoch_loss = epoch_loss / max(num_batches, 1) + _LOG.info(f"Epoch {epoch + 1}/{num_epochs} completed. Average loss: {avg_epoch_loss:.4f}") + + # Save checkpoint + if checkpoint_dir is not None: + checkpoint_path = checkpoint_dir / f"checkpoint_epoch_{epoch + 1}.pt" + torch.save( + { + "epoch": epoch + 1, + "model_state_dict": model.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + "loss": avg_epoch_loss, + "config": model.config, + }, + checkpoint_path, + ) + _LOG.info(f"Saved checkpoint to {checkpoint_path}") + + return model + + +def pretrain_mcm_streaming( + model: FoundationalTabularTransformer, + table_iterator: Iterator[pd.DataFrame], + num_steps: int = 10000, + batch_size: int = 256, + mask_prob: float = 0.15, + learning_rate: float = 1e-4, + weight_decay: float = 0.01, + max_columns: int = 64, + device: Optional[str] = None, + checkpoint_dir: Optional[Path] = None, + log_every: int = 100, + checkpoint_every: int = 1000, +) -> FoundationalTabularTransformer: + """ + Pre-train model with MCM using a streaming table iterator. + + Designed for large datasets like T4 that don't fit in memory. + + Args: + model: The transformer model to train + table_iterator: Iterator yielding DataFrames (e.g., from HuggingFace streaming) + num_steps: Total training steps + batch_size: Batch size + mask_prob: Probability of masking each cell + learning_rate: Learning rate + weight_decay: AdamW weight decay + max_columns: Maximum columns per table + device: Device to train on + checkpoint_dir: Directory to save checkpoints + log_every: Log metrics every N steps + checkpoint_every: Save checkpoint every N steps + + Returns: + Trained model + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + + model = model.to(device) + model.train() + + tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) + optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) + scheduler = CosineAnnealingLR(optimizer, T_max=num_steps) + + # Buffer for accumulating samples + sample_buffer = [] + running_loss = 0.0 + steps_since_log = 0 + + _LOG.info(f"Starting streaming pre-training for {num_steps} steps") + + for step in range(num_steps): + # Fill buffer if needed + while len(sample_buffer) < batch_size: + try: + table = next(table_iterator) + except StopIteration: + _LOG.warning("Table iterator exhausted") + break + + # Filter to categorical columns + cat_cols = table.select_dtypes(include=["object", "string", "category"]).columns.tolist() + if len(cat_cols) < 2: + continue + + cat_cols = cat_cols[:max_columns] + + # Tokenize and add to buffer + value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(table, cat_cols, max_columns) + for i in range(len(value_ids)): + sample_buffer.append((value_ids[i], attention_mask[i])) + + if len(sample_buffer) < batch_size: + _LOG.warning(f"Buffer has only {len(sample_buffer)} samples, ending training") + break + + # Sample a batch + batch_samples = sample_buffer[:batch_size] + sample_buffer = sample_buffer[batch_size:] + + # Prepare batch tensors + value_ids_list = [s[0] for s in batch_samples] + attention_masks_list = [s[1] for s in batch_samples] + + # Pad to same length + max_len = max(v.size(0) for v in value_ids_list) + value_ids = torch.zeros(batch_size, max_len, dtype=torch.long) + attention_mask = torch.zeros(batch_size, max_len, dtype=torch.bool) + + for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): + length = ids.size(0) + value_ids[i, :length] = ids + attention_mask[i, :length] = mask + + value_ids = value_ids.to(device) + attention_mask = attention_mask.to(device) + + # Apply masking + masked_ids, labels = apply_masking( + value_ids, attention_mask, mask_prob=mask_prob, vocab_size=model.config.hash_vocab_size + ) + + # Skip batch if no tokens were masked + num_masked = (labels != -100).sum() + if num_masked == 0: + continue + + # Forward pass + optimizer.zero_grad() + output = model(masked_ids, attention_mask, labels=labels) + loss = output["loss"] + + # Skip if loss is NaN + if torch.isnan(loss): + _LOG.warning("NaN loss detected, skipping batch") + continue + + # Backward pass + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + optimizer.step() + scheduler.step() + + running_loss += loss.item() + steps_since_log += 1 + + # Logging + if (step + 1) % log_every == 0: + avg_loss = running_loss / steps_since_log + lr = scheduler.get_last_lr()[0] + _LOG.info(f"Step {step + 1}/{num_steps}, Loss: {avg_loss:.4f}, LR: {lr:.2e}") + running_loss = 0.0 + steps_since_log = 0 + + # Checkpoint + if checkpoint_dir is not None and (step + 1) % checkpoint_every == 0: + checkpoint_path = checkpoint_dir / f"checkpoint_step_{step + 1}.pt" + torch.save( + { + "step": step + 1, + "model_state_dict": model.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + "config": model.config, + }, + checkpoint_path, + ) + _LOG.info(f"Saved checkpoint to {checkpoint_path}") + + return model + + +def classify( + model: FoundationalTabularTransformer, + df: pd.DataFrame, + target_column: str, + target_classes: list[str], + columns: Optional[list[str]] = None, + device: Optional[str] = None, +) -> pd.DataFrame: + """ + Classify rows by masking target column and predicting. + + Args: + model: Trained transformer model + df: DataFrame with features (target column can have any values) + target_column: Name of column to predict + target_classes: List of possible class values + columns: Feature columns to use (defaults to all categorical) + device: Device for inference + + Returns: + DataFrame with probability columns for each class + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + + model = model.to(device) + model.eval() + + tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) + + # Determine columns + if columns is None: + columns = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() + + # Ensure target column is in columns + if target_column not in columns: + columns = columns + [target_column] + + columns = columns[: model.config.max_columns] + + # Find target column index + target_idx = columns.index(target_column) + + # Pre-compute hash IDs for target classes + class_hash_ids = {cls: tokenizer.hash_value(str(cls), target_column) for cls in target_classes} + + # Tokenize data + value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(df, columns, model.config.max_columns) + value_ids = value_ids.to(device) + attention_mask = attention_mask.to(device) + + # Mask target column + masked_ids, _, target_positions = mask_target_column(value_ids, attention_mask, target_idx) + + # Get predictions + with torch.no_grad(): + probs = model.predict_masked(masked_ids, attention_mask, target_positions) + + # Extract probabilities for target classes + probs_np = probs.cpu().numpy() + result_data = {} + for cls in target_classes: + hash_id = class_hash_ids[cls] + result_data[f"prob_{cls}"] = probs_np[:, hash_id] + + return pd.DataFrame(result_data) + + +def evaluate_classification( + model: FoundationalTabularTransformer, + df: pd.DataFrame, + target_column: str, + columns: Optional[list[str]] = None, + device: Optional[str] = None, +) -> dict[str, float]: + """ + Evaluate classification accuracy on a labeled dataset. + + Args: + model: Trained transformer model + df: DataFrame with features and true labels + target_column: Name of target column + columns: Feature columns to use + device: Device for inference + + Returns: + Dictionary with accuracy metrics + """ + # Get unique classes from data + target_classes = df[target_column].dropna().unique().tolist() + + # Get predictions + probs_df = classify(model, df, target_column, target_classes, columns, device) + + # Get predicted class (highest probability) + prob_cols = [f"prob_{cls}" for cls in target_classes] + predicted_idx = probs_df[prob_cols].values.argmax(axis=1) + predicted_classes = [target_classes[i] for i in predicted_idx] + + # Calculate accuracy + true_labels = df[target_column].tolist() + correct = sum(p == t for p, t in zip(predicted_classes, true_labels)) + accuracy = correct / len(true_labels) + + return { + "accuracy": accuracy, + "num_samples": len(true_labels), + "num_classes": len(target_classes), + } + + +def save_model(model: FoundationalTabularTransformer, path: Path): + """Save model to disk.""" + torch.save( + { + "model_state_dict": model.state_dict(), + "config": model.config, + }, + path, + ) + _LOG.info(f"Model saved to {path}") + + +def load_model(path: Path, device: Optional[str] = None) -> FoundationalTabularTransformer: + """Load model from disk.""" + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + + checkpoint = torch.load(path, map_location=device) + config = checkpoint["config"] + + model = FoundationalTabularTransformer(config) + model.load_state_dict(checkpoint["model_state_dict"]) + model = model.to(device) + + _LOG.info(f"Model loaded from {path}") + return model diff --git a/scripts/foundational/test_transformer_synthetic.py b/scripts/foundational/test_transformer_synthetic.py new file mode 100644 index 00000000..a5a704b6 --- /dev/null +++ b/scripts/foundational/test_transformer_synthetic.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python +""" +Test the Foundational Tabular Transformer with synthetic data. + +This script validates the model works end-to-end without requiring T4 access. +""" + +import logging +import random + +import pandas as pd +import torch + +from mostlyai.engine._tabular.transformer import create_model +from mostlyai.engine._tabular.transformer_training import ( + classify, + evaluate_classification, + pretrain_mcm, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +_LOG = logging.getLogger(__name__) + + +def generate_synthetic_tables(num_tables: int = 20, rows_per_table: int = 100) -> list[pd.DataFrame]: + """Generate synthetic categorical tables for testing.""" + tables = [] + + # Different "schemas" to simulate diverse tables + schemas = [ + { + "columns": ["color", "size", "shape", "material"], + "values": { + "color": ["red", "blue", "green", "yellow", "black", "white"], + "size": ["small", "medium", "large", "xlarge"], + "shape": ["round", "square", "triangle", "oval"], + "material": ["wood", "metal", "plastic", "glass"], + }, + }, + { + "columns": ["country", "city", "weather", "season"], + "values": { + "country": ["USA", "UK", "Germany", "France", "Japan", "Brazil"], + "city": ["New York", "London", "Berlin", "Paris", "Tokyo", "Rio"], + "weather": ["sunny", "rainy", "cloudy", "snowy", "windy"], + "season": ["spring", "summer", "fall", "winter"], + }, + }, + { + "columns": ["animal", "habitat", "diet", "size_class"], + "values": { + "animal": ["lion", "eagle", "shark", "elephant", "snake", "wolf"], + "habitat": ["forest", "ocean", "desert", "jungle", "arctic"], + "diet": ["carnivore", "herbivore", "omnivore"], + "size_class": ["tiny", "small", "medium", "large", "huge"], + }, + }, + { + "columns": ["food", "cuisine", "taste", "temperature"], + "values": { + "food": ["pizza", "sushi", "tacos", "curry", "pasta", "burger"], + "cuisine": ["italian", "japanese", "mexican", "indian", "american"], + "taste": ["sweet", "salty", "sour", "spicy", "savory"], + "temperature": ["hot", "cold", "warm", "room_temp"], + }, + }, + { + "columns": ["genre", "mood", "tempo", "instrument"], + "values": { + "genre": ["rock", "jazz", "classical", "electronic", "hip-hop"], + "mood": ["happy", "sad", "energetic", "calm", "angry"], + "tempo": ["slow", "medium", "fast", "very_fast"], + "instrument": ["guitar", "piano", "drums", "violin", "synth"], + }, + }, + ] + + for i in range(num_tables): + schema = random.choice(schemas) + data = {} + for col in schema["columns"]: + data[col] = [random.choice(schema["values"][col]) for _ in range(rows_per_table)] + tables.append(pd.DataFrame(data)) + + return tables + + +def generate_classification_dataset(num_samples: int = 200) -> pd.DataFrame: + """Generate a dataset with a predictable target for classification testing.""" + data = { + "feature1": [], + "feature2": [], + "feature3": [], + "target": [], + } + + # Create a simple rule: target depends on feature1 and feature2 + feature1_values = ["A", "B", "C"] + feature2_values = ["X", "Y", "Z"] + feature3_values = ["P", "Q", "R", "S"] + + for _ in range(num_samples): + f1 = random.choice(feature1_values) + f2 = random.choice(feature2_values) + f3 = random.choice(feature3_values) + + # Simple rule: if f1 is A and f2 is X, target is "positive" + # Otherwise, mostly "negative" with some noise + if f1 == "A" and f2 == "X": + target = "positive" if random.random() < 0.9 else "negative" + elif f1 == "B" and f2 == "Y": + target = "positive" if random.random() < 0.7 else "negative" + else: + target = "negative" if random.random() < 0.8 else "positive" + + data["feature1"].append(f1) + data["feature2"].append(f2) + data["feature3"].append(f3) + data["target"].append(target) + + return pd.DataFrame(data) + + +def main(): + device = "cuda" if torch.cuda.is_available() else "cpu" + _LOG.info(f"Using device: {device}") + + # Step 1: Create model + _LOG.info("Creating small model...") + model = create_model("small") + num_params = sum(p.numel() for p in model.parameters()) + _LOG.info(f"Model has {num_params:,} parameters") + + # Step 2: Generate synthetic pre-training data + _LOG.info("Generating synthetic tables for pre-training...") + tables = generate_synthetic_tables(num_tables=50, rows_per_table=100) + _LOG.info(f"Generated {len(tables)} tables with ~100 rows each") + + # Step 3: Pre-train with MCM + _LOG.info("Pre-training with Masked Cell Modeling...") + model = pretrain_mcm( + model=model, + tables=tables, + num_epochs=3, + batch_size=32, + learning_rate=1e-3, + device=device, + log_every=50, + ) + _LOG.info("Pre-training complete!") + + # Step 4: Generate classification test data + _LOG.info("Generating classification dataset...") + train_df = generate_classification_dataset(num_samples=500) + test_df = generate_classification_dataset(num_samples=100) + + _LOG.info(f"Train set: {len(train_df)} samples") + _LOG.info(f"Test set: {len(test_df)} samples") + _LOG.info(f"Target distribution (train): {train_df['target'].value_counts().to_dict()}") + + # Step 5: Evaluate classification (zero-shot after pre-training) + _LOG.info("Evaluating zero-shot classification...") + metrics = evaluate_classification( + model=model, + df=test_df, + target_column="target", + device=device, + ) + _LOG.info(f"Zero-shot accuracy: {metrics['accuracy']:.2%}") + + # Step 6: Fine-tune on classification task (optional - just more MCM on task data) + _LOG.info("Fine-tuning on classification data...") + model = pretrain_mcm( + model=model, + tables=[train_df], + num_epochs=5, + batch_size=32, + learning_rate=5e-4, + device=device, + log_every=50, + ) + + # Step 7: Evaluate after fine-tuning + _LOG.info("Evaluating after fine-tuning...") + metrics = evaluate_classification( + model=model, + df=test_df, + target_column="target", + device=device, + ) + _LOG.info(f"Fine-tuned accuracy: {metrics['accuracy']:.2%}") + + # Step 8: Show sample predictions + _LOG.info("\nSample predictions:") + sample = test_df.head(5).copy() + probs = classify( + model=model, + df=sample, + target_column="target", + target_classes=["positive", "negative"], + device=device, + ) + + for i in range(len(sample)): + true_label = sample.iloc[i]["target"] + prob_pos = probs.iloc[i]["prob_positive"] + prob_neg = probs.iloc[i]["prob_negative"] + predicted = "positive" if prob_pos > prob_neg else "negative" + _LOG.info( + f" Row {i}: f1={sample.iloc[i]['feature1']}, f2={sample.iloc[i]['feature2']} " + f"-> pred={predicted} (p={max(prob_pos, prob_neg):.2f}), true={true_label}" + ) + + _LOG.info("\nTest complete!") + + +if __name__ == "__main__": + main() diff --git a/scripts/foundational/train_transformer_t4.py b/scripts/foundational/train_transformer_t4.py new file mode 100644 index 00000000..a57399df --- /dev/null +++ b/scripts/foundational/train_transformer_t4.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +""" +Train the Foundational Tabular Transformer on the T4 dataset. + +Usage: + python scripts/train_transformer_t4.py --num_steps 1000 --batch_size 64 + +Requirements: + pip install datasets +""" + +import argparse +import logging +from pathlib import Path + +import pandas as pd +import torch +from datasets import load_dataset + +from mostlyai.engine._tabular.transformer import create_model +from mostlyai.engine._tabular.transformer_training import ( + pretrain_mcm_streaming, + save_model, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +_LOG = logging.getLogger(__name__) + + +def t4_table_iterator(max_tables: int | None = None): + """ + Iterator that yields DataFrames from the T4 dataset. + + Args: + max_tables: Maximum number of tables to yield (None = unlimited) + + Yields: + pandas DataFrames with categorical columns only + """ + _LOG.info("Loading T4 dataset (streaming mode)...") + + # Load dataset in streaming mode + dataset = load_dataset( + "mlfoundations/t4-full", + split="train", + streaming=True, + ) + + tables_yielded = 0 + for example in dataset: + # Each example should be a table - convert to DataFrame + # The exact structure depends on how T4 stores tables + # Let's handle different possible formats + + if isinstance(example, dict): + # Try to convert dict to DataFrame + try: + # T4 stores tables as dicts with column names as keys + df = pd.DataFrame(example) + except Exception as e: + _LOG.debug(f"Skipping table: {e}") + continue + elif isinstance(example, pd.DataFrame): + df = example + else: + _LOG.debug(f"Unknown example type: {type(example)}") + continue + + # Filter to categorical columns only + cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() + + # Skip if too few categorical columns + if len(cat_cols) < 2: + continue + + # Keep only categorical columns + df = df[cat_cols] + + # Skip empty tables + if len(df) == 0: + continue + + yield df + tables_yielded += 1 + + if tables_yielded % 100 == 0: + _LOG.info(f"Yielded {tables_yielded} tables") + + if max_tables is not None and tables_yielded >= max_tables: + _LOG.info(f"Reached max_tables limit: {max_tables}") + break + + +def main(): + parser = argparse.ArgumentParser(description="Train transformer on T4 dataset") + parser.add_argument("--num_steps", type=int, default=10000, help="Number of training steps") + parser.add_argument("--batch_size", type=int, default=64, help="Batch size") + parser.add_argument("--learning_rate", type=float, default=1e-4, help="Learning rate") + parser.add_argument("--model_size", type=str, default="small", choices=["small", "medium", "large"]) + parser.add_argument("--max_tables", type=int, default=None, help="Max tables to use (for testing)") + parser.add_argument("--checkpoint_dir", type=str, default="checkpoints", help="Checkpoint directory") + parser.add_argument("--log_every", type=int, default=100, help="Log every N steps") + parser.add_argument("--checkpoint_every", type=int, default=1000, help="Save checkpoint every N steps") + parser.add_argument("--device", type=str, default=None, help="Device (cuda/cpu)") + args = parser.parse_args() + + # Setup + checkpoint_dir = Path(args.checkpoint_dir) + checkpoint_dir.mkdir(parents=True, exist_ok=True) + + device = args.device + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + _LOG.info(f"Using device: {device}") + + # Create model + _LOG.info(f"Creating {args.model_size} model...") + model = create_model(args.model_size) + num_params = sum(p.numel() for p in model.parameters()) + _LOG.info(f"Model has {num_params:,} parameters") + + # Create table iterator + table_iter = t4_table_iterator(max_tables=args.max_tables) + + # Pre-train + _LOG.info(f"Starting pre-training for {args.num_steps} steps...") + model = pretrain_mcm_streaming( + model=model, + table_iterator=table_iter, + num_steps=args.num_steps, + batch_size=args.batch_size, + learning_rate=args.learning_rate, + device=device, + checkpoint_dir=checkpoint_dir, + log_every=args.log_every, + checkpoint_every=args.checkpoint_every, + ) + + # Save final model + final_path = checkpoint_dir / "model_final.pt" + save_model(model, final_path) + _LOG.info(f"Training complete! Model saved to {final_path}") + + +if __name__ == "__main__": + main() From eb1d12c998eac7047d28bbf0272efcf32e6ab8eb Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Fri, 12 Dec 2025 15:31:55 +0100 Subject: [PATCH 02/11] wip --- scripts/foundational/train_transformer_t4.py | 132 ++++++++++++------- 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/scripts/foundational/train_transformer_t4.py b/scripts/foundational/train_transformer_t4.py index a57399df..2cd9fb64 100644 --- a/scripts/foundational/train_transformer_t4.py +++ b/scripts/foundational/train_transformer_t4.py @@ -3,19 +3,22 @@ Train the Foundational Tabular Transformer on the T4 dataset. Usage: - python scripts/train_transformer_t4.py --num_steps 1000 --batch_size 64 + python scripts/foundational/train_transformer_t4.py --num_steps 1000 --batch_size 64 Requirements: - pip install datasets + pip install huggingface_hub pyarrow """ import argparse +import io import logging +import zipfile from pathlib import Path import pandas as pd +import pyarrow.parquet as pq import torch -from datasets import load_dataset +from huggingface_hub import HfFileSystem from mostlyai.engine._tabular.transformer import create_model from mostlyai.engine._tabular.transformer_training import ( @@ -34,64 +37,93 @@ def t4_table_iterator(max_tables: int | None = None): """ Iterator that yields DataFrames from the T4 dataset. + Each parquet file in T4 is a separate table with its own schema. + We load them individually using HfFileSystem to avoid schema conflicts. + Args: max_tables: Maximum number of tables to yield (None = unlimited) Yields: pandas DataFrames with categorical columns only """ - _LOG.info("Loading T4 dataset (streaming mode)...") - - # Load dataset in streaming mode - dataset = load_dataset( - "mlfoundations/t4-full", - split="train", - streaming=True, - ) + _LOG.info("Connecting to T4 dataset via HfFileSystem...") - tables_yielded = 0 - for example in dataset: - # Each example should be a table - convert to DataFrame - # The exact structure depends on how T4 stores tables - # Let's handle different possible formats - - if isinstance(example, dict): - # Try to convert dict to DataFrame - try: - # T4 stores tables as dicts with column names as keys - df = pd.DataFrame(example) - except Exception as e: - _LOG.debug(f"Skipping table: {e}") - continue - elif isinstance(example, pd.DataFrame): - df = example - else: - _LOG.debug(f"Unknown example type: {type(example)}") - continue + fs = HfFileSystem() + base_path = "datasets/mlfoundations/t4-full" - # Filter to categorical columns only - cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() + # List all chunk zip files + try: + all_files = fs.ls(base_path, detail=False) + chunk_zips = sorted([f for f in all_files if f.endswith(".zip")]) + _LOG.info(f"Found {len(chunk_zips)} chunk zip files") + except Exception as e: + _LOG.error(f"Failed to list dataset: {e}") + raise - # Skip if too few categorical columns - if len(cat_cols) < 2: - continue - - # Keep only categorical columns - df = df[cat_cols] - - # Skip empty tables - if len(df) == 0: + tables_yielded = 0 + tables_skipped = 0 + + for chunk_path in chunk_zips: + chunk_name = chunk_path.split("/")[-1] + _LOG.info(f"Processing {chunk_name}...") + + try: + # Download and open the zip file + with fs.open(chunk_path, "rb") as f: + zip_data = f.read() + + with zipfile.ZipFile(io.BytesIO(zip_data)) as zf: + parquet_files = [n for n in zf.namelist() if n.endswith(".parquet")] + _LOG.info(f" Found {len(parquet_files)} parquet files in {chunk_name}") + + for pq_name in parquet_files: + try: + # Read parquet file from zip + with zf.open(pq_name) as pq_file: + pq_data = pq_file.read() + table = pq.read_table(io.BytesIO(pq_data)) + df = table.to_pandas() + + # Filter to categorical columns only (object/string dtype) + cat_cols = df.select_dtypes( + include=["object", "string", "category"] + ).columns.tolist() + + # Skip if too few categorical columns + if len(cat_cols) < 2: + tables_skipped += 1 + continue + + # Keep only categorical columns + df = df[cat_cols] + + # Skip empty or tiny tables + if len(df) < 10: + tables_skipped += 1 + continue + + yield df + tables_yielded += 1 + + if tables_yielded % 100 == 0: + _LOG.info( + f" Progress: yielded {tables_yielded} tables, skipped {tables_skipped}" + ) + + if max_tables is not None and tables_yielded >= max_tables: + _LOG.info(f"Reached max_tables limit: {max_tables}") + return + + except Exception as e: + _LOG.debug(f" Failed to read {pq_name}: {e}") + tables_skipped += 1 + continue + + except Exception as e: + _LOG.warning(f"Failed to process {chunk_name}: {e}") continue - yield df - tables_yielded += 1 - - if tables_yielded % 100 == 0: - _LOG.info(f"Yielded {tables_yielded} tables") - - if max_tables is not None and tables_yielded >= max_tables: - _LOG.info(f"Reached max_tables limit: {max_tables}") - break + _LOG.info(f"Finished: yielded {tables_yielded} tables, skipped {tables_skipped}") def main(): From a5e5710aa72e5cc8c86fb10f6db6db2ae040e5be Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Fri, 12 Dec 2025 16:57:33 +0100 Subject: [PATCH 03/11] wip --- .../engine/_tabular/transformer_training.py | 29 ++++++++++++------- uv.lock | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/mostlyai/engine/_tabular/transformer_training.py b/mostlyai/engine/_tabular/transformer_training.py index 3a0a9c47..a1ea33f6 100644 --- a/mostlyai/engine/_tabular/transformer_training.py +++ b/mostlyai/engine/_tabular/transformer_training.py @@ -13,6 +13,7 @@ from torch.optim import AdamW from torch.optim.lr_scheduler import CosineAnnealingLR from torch.utils.data import DataLoader +from tqdm import tqdm from .transformer import ( FoundationalTabularTransformer, @@ -93,13 +94,17 @@ def pretrain_mcm( total_steps = len(dataloader) * num_epochs scheduler = CosineAnnealingLR(optimizer, T_max=total_steps) + _LOG.info(f"Training config: {num_epochs} epochs, {len(dataloader)} batches/epoch, {total_steps} total steps") + _LOG.info(f"Batch size: {batch_size}, Learning rate: {learning_rate}, Device: {device}") + # Training loop global_step = 0 for epoch in range(num_epochs): epoch_loss = 0.0 num_batches = 0 - for batch in dataloader: + pbar = tqdm(dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}", leave=True) + for batch in pbar: value_ids = batch["value_ids"].to(device) attention_mask = batch["attention_mask"].to(device) @@ -136,11 +141,9 @@ def pretrain_mcm( num_batches += 1 global_step += 1 - # Logging - if global_step % log_every == 0: - avg_loss = epoch_loss / num_batches - lr = scheduler.get_last_lr()[0] - _LOG.info(f"Step {global_step}, Epoch {epoch + 1}, Loss: {avg_loss:.4f}, LR: {lr:.2e}") + # Update progress bar + avg_loss = epoch_loss / num_batches + pbar.set_postfix({"loss": f"{avg_loss:.4f}", "lr": f"{scheduler.get_last_lr()[0]:.2e}"}) # End of epoch logging avg_epoch_loss = epoch_loss / max(num_batches, 1) @@ -216,8 +219,10 @@ def pretrain_mcm_streaming( steps_since_log = 0 _LOG.info(f"Starting streaming pre-training for {num_steps} steps") + _LOG.info(f"Batch size: {batch_size}, Learning rate: {learning_rate}, Device: {device}") - for step in range(num_steps): + pbar = tqdm(range(num_steps), desc="Pre-training", leave=True) + for step in pbar: # Fill buffer if needed while len(sample_buffer) < batch_size: try: @@ -292,11 +297,13 @@ def pretrain_mcm_streaming( running_loss += loss.item() steps_since_log += 1 - # Logging - if (step + 1) % log_every == 0: + # Update progress bar + if steps_since_log > 0: avg_loss = running_loss / steps_since_log - lr = scheduler.get_last_lr()[0] - _LOG.info(f"Step {step + 1}/{num_steps}, Loss: {avg_loss:.4f}, LR: {lr:.2e}") + pbar.set_postfix({"loss": f"{avg_loss:.4f}", "lr": f"{scheduler.get_last_lr()[0]:.2e}", "buf": len(sample_buffer)}) + + # Reset running stats periodically + if (step + 1) % log_every == 0: running_loss = 0.0 steps_since_log = 0 diff --git a/uv.lock b/uv.lock index 5c767336..b322e48c 100644 --- a/uv.lock +++ b/uv.lock @@ -2282,7 +2282,7 @@ wheels = [ [[package]] name = "mostlyai-engine" -version = "2.3.1" +version = "2.3.3" source = { editable = "." } dependencies = [ { name = "accelerate" }, From 6ca6b2bfb84f2d87601db947e9eb6c9332916641 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Fri, 12 Dec 2025 17:33:22 +0100 Subject: [PATCH 04/11] wip --- mostlyai/engine/_tabular/interface.py | 53 ++++++++++++++++++++++ mostlyai/engine/_tabular/probability.py | 10 ++-- tests/end_to_end/test_tabular_interface.py | 13 ++++-- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/mostlyai/engine/_tabular/interface.py b/mostlyai/engine/_tabular/interface.py index 4cf11930..b322d698 100644 --- a/mostlyai/engine/_tabular/interface.py +++ b/mostlyai/engine/_tabular/interface.py @@ -32,12 +32,15 @@ from mostlyai.engine._common import ( ensure_dataframe, + get_cardinalities, + get_columns_from_cardinalities, list_fn, load_generated_data, mean_fn, median_fn, mode_fn, ) +from mostlyai.engine._tabular.common import get_argn_column_names from mostlyai.engine._tabular.probability import log_prob as _log_prob from mostlyai.engine._tabular.probability import predict_proba as _predict_proba from mostlyai.engine._workspace import Workspace @@ -284,6 +287,49 @@ def __del__(self): """Fallback cleanup if context manager wasn't used.""" self.close() + def _validate_column_order( + self, + data: pd.DataFrame, + workspace_dir: str | Path, + target_columns: list[str] | None = None, + ) -> None: + """ + Validate that data columns (and optionally target columns) appear in training order. + + Args: + data: DataFrame with seed columns to validate + workspace_dir: Path to workspace containing training statistics + target_columns: Optional list of target column names. If provided, validates + the combined seed+target column order. If None, validates only seed columns. + + Raises: + ValueError: If column order doesn't match training order and enable_flexible_generation=False + """ + if not self.enable_flexible_generation: + workspace = Workspace(workspace_dir) + tgt_stats = workspace.tgt_stats.read() + tgt_cardinalities = get_cardinalities(tgt_stats) + all_columns = get_columns_from_cardinalities(tgt_cardinalities) + + # Convert data columns to ARGN format + data_df = ensure_dataframe(data) + seed_columns_argn = get_argn_column_names(tgt_stats["columns"], list(data_df.columns)) + + # If target_columns provided, validate combined seed+target order + if target_columns is not None: + target_columns_argn = get_argn_column_names(tgt_stats["columns"], target_columns) + columns_to_check = seed_columns_argn + target_columns_argn + else: + columns_to_check = seed_columns_argn + + # Check that columns appear in training order + expected_order = [col for col in all_columns if col in columns_to_check] + if columns_to_check != expected_order: + raise ValueError( + "Column order does not match training order. " + "A change in column order is only permitted for models that were trained with `enable_flexible_generation=True`." + ) + def sample( self, n_samples: int | None = None, @@ -349,6 +395,10 @@ def sample( if n_samples is None: n_samples = 1 + # Validate seed_data column order when flexible generation is disabled + if seed_data is not None: + self._validate_column_order(seed_data, workspace_dir) + # Generate synthetic data using configured parameters generate( ctx_data=ctx_data_df, @@ -706,6 +756,9 @@ def predict_proba( _LOG.info(f"Dropping target columns from seed data: {target_cols_in_X}") X_df = X_df.drop(columns=target_cols_in_X) + # Validate seed+target column order when flexible generation is disabled + self._validate_column_order(X_df, self.workspace_dir, target_columns=target_columns) + # Call new predict_proba utility that returns probabilities in-memory workspace = Workspace(self.workspace_dir) diff --git a/mostlyai/engine/_tabular/probability.py b/mostlyai/engine/_tabular/probability.py index 74a1a094..855207dd 100644 --- a/mostlyai/engine/_tabular/probability.py +++ b/mostlyai/engine/_tabular/probability.py @@ -362,15 +362,11 @@ def predict_proba( ) ) - # Get seed column names (needed for column order check and _generate_marginal_probs) + # Get seed column names (needed for _generate_marginal_probs) seed_columns = list(seed_data.columns) - # Check column order when flexible generation is disabled - if not enable_flexible_generation: - seed_columns_argn = get_argn_column_names(tgt_stats["columns"], seed_columns) - target_columns_argn = get_argn_column_names(tgt_stats["columns"], target_columns) - gen_column_order = seed_columns_argn + target_columns_argn - check_column_order(gen_column_order, all_columns) + # Note: Column order validation is now handled in the interface layer (TabularARGN.predict_proba) + # to centralize validation logic with sample() method # Encode seed data (features to condition on) - common for both single and multi-target # seed_data should NOT include any target columns diff --git a/tests/end_to_end/test_tabular_interface.py b/tests/end_to_end/test_tabular_interface.py index 601049db..5d0bdaac 100644 --- a/tests/end_to_end/test_tabular_interface.py +++ b/tests/end_to_end/test_tabular_interface.py @@ -347,8 +347,8 @@ def test_predict_proba_multi_target( # Numeric binned values (may be bin labels or ranges) assert len(col_values) >= 3 # At least some bins present - def test_predict_proba_wrong_column_order_raises(self, classification_data, tmp_path_factory): - """Test predict_proba raises error with different column order when flexible generation is disabled.""" + def test_wrong_column_order_raises(self, classification_data, tmp_path_factory): + """Test that wrong column order raises error when flexible generation is disabled.""" data = classification_data X = data[["feature1", "feature2"]] y = data["target"] @@ -362,11 +362,14 @@ def test_predict_proba_wrong_column_order_raises(self, classification_data, tmp_ ) argn.fit(X=X, y=y) - # Reorder columns in test data - test_X = X.head(10)[["feature2", "feature1"]] + # Reorder columns + X_reordered = X.head(5)[["feature2", "feature1"]] with pytest.raises(ValueError, match="(?i)column order.*does not match"): - argn.predict_proba(test_X, target="target") + argn.predict_proba(X_reordered, target="target") + + with pytest.raises(ValueError, match="(?i)column order.*does not match"): + argn.sample(n_samples=10, seed_data=X_reordered) class TestTabularARGNRegression: From 3bd5b76ad7f79df5ec41069a1586d27dba23ef44 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Mon, 15 Dec 2025 14:20:58 +0100 Subject: [PATCH 05/11] wip --- mostlyai/engine/_tabular/transformer.py | 279 ----------- mostlyai/engine/_tabular/transformer_data.py | 374 -------------- .../engine/_tabular/transformer_training.py | 463 ------------------ .../test_transformer_synthetic.py | 221 --------- scripts/foundational/train_transformer_t4.py | 181 ------- 5 files changed, 1518 deletions(-) delete mode 100644 mostlyai/engine/_tabular/transformer.py delete mode 100644 mostlyai/engine/_tabular/transformer_data.py delete mode 100644 mostlyai/engine/_tabular/transformer_training.py delete mode 100644 scripts/foundational/test_transformer_synthetic.py delete mode 100644 scripts/foundational/train_transformer_t4.py diff --git a/mostlyai/engine/_tabular/transformer.py b/mostlyai/engine/_tabular/transformer.py deleted file mode 100644 index 027e99f3..00000000 --- a/mostlyai/engine/_tabular/transformer.py +++ /dev/null @@ -1,279 +0,0 @@ -""" -Foundational Tabular Transformer for categorical data. - -A unified architecture for pre-training (MCM) and classification on tabular data. -Uses feature hashing for universal value representation and no positional encoding -(permutation-invariant over columns). -""" - -from dataclasses import dataclass -from typing import Optional - -import torch -import torch.nn as nn -import torch.nn.functional as F - - -# Special token IDs -PAD_TOKEN_ID = 0 -MASK_TOKEN_ID = 1 -SPECIAL_TOKENS_COUNT = 2 # PAD and MASK - - -@dataclass -class TabularTransformerConfig: - """Configuration for FoundationalTabularTransformer.""" - - hash_vocab_size: int = 32768 - max_columns: int = 64 - hidden_size: int = 128 - num_layers: int = 4 - num_heads: int = 4 - intermediate_size: int = 512 - dropout: float = 0.1 - attention_dropout: float = 0.1 - - -class TabularEmbeddings(nn.Module): - """ - Embedding layer for tabular data. - - Maps hashed value IDs to dense vectors. No positional encoding - - the model is permutation-invariant over columns. - """ - - def __init__(self, config: TabularTransformerConfig): - super().__init__() - self.config = config - - # Value embeddings: hash_vocab_size + special tokens (PAD, MASK) - total_vocab_size = config.hash_vocab_size + SPECIAL_TOKENS_COUNT - self.value_embeddings = nn.Embedding( - num_embeddings=total_vocab_size, - embedding_dim=config.hidden_size, - padding_idx=PAD_TOKEN_ID, - ) - - self.layer_norm = nn.LayerNorm(config.hidden_size) - self.dropout = nn.Dropout(config.dropout) - - def forward(self, value_ids: torch.Tensor) -> torch.Tensor: - """ - Embed value IDs. - - Args: - value_ids: (batch_size, seq_len) tensor of hashed value IDs - - Returns: - embeddings: (batch_size, seq_len, hidden_size) - """ - embeddings = self.value_embeddings(value_ids) - embeddings = self.layer_norm(embeddings) - embeddings = self.dropout(embeddings) - return embeddings - - -class FoundationalTabularTransformer(nn.Module): - """ - Foundational transformer for tabular data. - - Unified architecture for: - - Pre-training: Masked Cell Modeling (MCM) - mask random cells, predict original - - Classification: Mask target column, predict at that position - - Key design choices: - - Feature hashing: Values are hashed with column name to create unique IDs - - No positional encoding: Columns are treated as an unordered set - - No pooling: Predictions are made directly at masked positions - """ - - def __init__(self, config: TabularTransformerConfig): - super().__init__() - self.config = config - - # Embeddings (no positional encoding) - self.embeddings = TabularEmbeddings(config) - - # Transformer encoder with Pre-LN (norm_first=True) - encoder_layer = nn.TransformerEncoderLayer( - d_model=config.hidden_size, - nhead=config.num_heads, - dim_feedforward=config.intermediate_size, - dropout=config.dropout, - activation="gelu", - batch_first=True, - norm_first=True, # Pre-LN for better training stability - ) - self.encoder = nn.TransformerEncoder( - encoder_layer, - num_layers=config.num_layers, - ) - - # Output projection to vocabulary - total_vocab_size = config.hash_vocab_size + SPECIAL_TOKENS_COUNT - self.output_projection = nn.Linear(config.hidden_size, total_vocab_size) - - # Initialize weights - self._init_weights() - - def _init_weights(self): - """Initialize weights with Xavier uniform.""" - for module in self.modules(): - if isinstance(module, nn.Linear): - nn.init.xavier_uniform_(module.weight) - if module.bias is not None: - nn.init.zeros_(module.bias) - elif isinstance(module, nn.Embedding): - nn.init.normal_(module.weight, mean=0.0, std=0.02) - if module.padding_idx is not None: - nn.init.zeros_(module.weight[module.padding_idx]) - - def forward( - self, - value_ids: torch.Tensor, - attention_mask: torch.Tensor, - labels: Optional[torch.Tensor] = None, - ) -> dict[str, torch.Tensor]: - """ - Forward pass. - - Args: - value_ids: (batch_size, seq_len) hashed value IDs (with MASK tokens for prediction) - attention_mask: (batch_size, seq_len) True for real tokens, False for padding - labels: (batch_size, seq_len) original value IDs for loss computation - Use -100 for positions to ignore in loss - - Returns: - dict with: - - logits: (batch_size, seq_len, vocab_size) predictions at each position - - loss: scalar loss (if labels provided) - - hidden_states: (batch_size, seq_len, hidden_size) encoder outputs - """ - # Embed inputs - hidden_states = self.embeddings(value_ids) - - # Create attention mask for transformer (True = masked/ignored) - # PyTorch transformer expects True for positions to IGNORE - src_key_padding_mask = ~attention_mask - - # Encode - hidden_states = self.encoder( - hidden_states, - src_key_padding_mask=src_key_padding_mask, - ) - - # Project to vocabulary - logits = self.output_projection(hidden_states) - - output = { - "logits": logits, - "hidden_states": hidden_states, - } - - # Compute loss if labels provided - if labels is not None: - # Flatten for cross-entropy - loss = F.cross_entropy( - logits.view(-1, logits.size(-1)), - labels.view(-1), - ignore_index=-100, - ) - output["loss"] = loss - - return output - - def predict_masked( - self, - value_ids: torch.Tensor, - attention_mask: torch.Tensor, - masked_positions: torch.Tensor, - ) -> torch.Tensor: - """ - Get predictions at masked positions. - - Args: - value_ids: (batch_size, seq_len) with MASK tokens at positions to predict - attention_mask: (batch_size, seq_len) - masked_positions: (batch_size,) indices of masked positions to predict - - Returns: - probs: (batch_size, vocab_size) probability distribution at masked positions - """ - output = self.forward(value_ids, attention_mask) - logits = output["logits"] - - # Gather logits at masked positions - batch_size = logits.size(0) - batch_indices = torch.arange(batch_size, device=logits.device) - masked_logits = logits[batch_indices, masked_positions] # (batch_size, vocab_size) - - # Convert to probabilities - probs = F.softmax(masked_logits, dim=-1) - return probs - - @staticmethod - def hash_value(value: str, column_name: str, vocab_size: int = 32768) -> int: - """ - Hash a categorical value to an integer ID. - - Combines column name and value to create unique hashes even for - same values in different columns. - - Args: - value: The categorical value (converted to string) - column_name: Name of the column - vocab_size: Size of hash vocabulary - - Returns: - Hash ID in range [SPECIAL_TOKENS_COUNT, vocab_size + SPECIAL_TOKENS_COUNT) - """ - combined = f"{column_name}:{value}" - # Use Python's built-in hash (for simplicity; can switch to mmh3 for production) - hash_int = hash(combined) - # Map to vocabulary range, offset by special tokens - return (hash_int % vocab_size) + SPECIAL_TOKENS_COUNT - - -# Model size presets -MODEL_CONFIGS = { - "small": TabularTransformerConfig( - hash_vocab_size=32768, - max_columns=64, - hidden_size=128, - num_layers=4, - num_heads=4, - intermediate_size=512, - ), - "medium": TabularTransformerConfig( - hash_vocab_size=65536, - max_columns=128, - hidden_size=256, - num_layers=6, - num_heads=8, - intermediate_size=1024, - ), - "large": TabularTransformerConfig( - hash_vocab_size=131072, - max_columns=256, - hidden_size=512, - num_layers=12, - num_heads=8, - intermediate_size=2048, - ), -} - - -def create_model(size: str = "small") -> FoundationalTabularTransformer: - """ - Create a model with preset configuration. - - Args: - size: One of "small", "medium", "large" - - Returns: - Initialized FoundationalTabularTransformer - """ - if size not in MODEL_CONFIGS: - raise ValueError(f"Unknown model size: {size}. Choose from {list(MODEL_CONFIGS.keys())}") - config = MODEL_CONFIGS[size] - return FoundationalTabularTransformer(config) diff --git a/mostlyai/engine/_tabular/transformer_data.py b/mostlyai/engine/_tabular/transformer_data.py deleted file mode 100644 index 6bef4c5c..00000000 --- a/mostlyai/engine/_tabular/transformer_data.py +++ /dev/null @@ -1,374 +0,0 @@ -""" -Data processing utilities for the Foundational Tabular Transformer. - -Provides tokenization (feature hashing), dataset wrappers, and masking utilities -for Masked Cell Modeling (MCM) pre-training. -""" - -from typing import Optional - -import pandas as pd -import torch -from torch.utils.data import Dataset - -from .transformer import MASK_TOKEN_ID, PAD_TOKEN_ID, SPECIAL_TOKENS_COUNT - - -class TabularTokenizer: - """ - Tokenizer for tabular data using feature hashing. - - Converts categorical values to integer IDs by hashing the combination - of column name and value. This enables handling arbitrary vocabularies - without pre-building a vocabulary. - """ - - def __init__(self, vocab_size: int = 32768): - """ - Initialize tokenizer. - - Args: - vocab_size: Size of hash vocabulary (excluding special tokens) - """ - self.vocab_size = vocab_size - - def hash_value(self, value: str, column_name: str) -> int: - """ - Hash a categorical value to an integer ID. - - Args: - value: The categorical value (will be converted to string) - column_name: Name of the column - - Returns: - Hash ID in range [SPECIAL_TOKENS_COUNT, vocab_size + SPECIAL_TOKENS_COUNT) - """ - combined = f"{column_name}:{value}" - hash_int = hash(combined) - return (hash_int % self.vocab_size) + SPECIAL_TOKENS_COUNT - - def tokenize_row(self, row: dict, columns: list[str]) -> list[int]: - """ - Tokenize a single row. - - Args: - row: Dictionary mapping column names to values - columns: Ordered list of column names to include - - Returns: - List of hashed value IDs - """ - return [self.hash_value(str(row[col]), col) for col in columns] - - def tokenize_dataframe( - self, - df: pd.DataFrame, - columns: Optional[list[str]] = None, - max_columns: int = 64, - ) -> tuple[torch.Tensor, torch.Tensor, list[str]]: - """ - Tokenize an entire DataFrame. - - Args: - df: pandas DataFrame to tokenize - columns: Column names to use (defaults to all columns) - max_columns: Maximum number of columns to include - - Returns: - Tuple of: - - value_ids: (num_rows, num_cols) tensor of hashed IDs - - attention_mask: (num_rows, num_cols) tensor of True for real values - - columns: List of column names used - """ - if columns is None: - columns = df.columns.tolist() - - # Truncate to max columns - columns = columns[:max_columns] - - # Tokenize all rows - value_ids = [] - for _, row in df.iterrows(): - row_ids = self.tokenize_row(row.to_dict(), columns) - value_ids.append(row_ids) - - value_ids = torch.tensor(value_ids, dtype=torch.long) - attention_mask = torch.ones_like(value_ids, dtype=torch.bool) - - return value_ids, attention_mask, columns - - def pad_batch( - self, - value_ids_list: list[torch.Tensor], - attention_masks_list: list[torch.Tensor], - max_length: Optional[int] = None, - ) -> tuple[torch.Tensor, torch.Tensor]: - """ - Pad a batch of tokenized rows to the same length. - - Args: - value_ids_list: List of (seq_len,) tensors - attention_masks_list: List of (seq_len,) tensors - max_length: Pad to this length (defaults to max in batch) - - Returns: - Tuple of: - - value_ids: (batch_size, max_length) padded tensor - - attention_mask: (batch_size, max_length) mask tensor - """ - if max_length is None: - max_length = max(v.size(0) for v in value_ids_list) - - batch_size = len(value_ids_list) - padded_ids = torch.full((batch_size, max_length), PAD_TOKEN_ID, dtype=torch.long) - padded_mask = torch.zeros(batch_size, max_length, dtype=torch.bool) - - for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): - length = ids.size(0) - padded_ids[i, :length] = ids - padded_mask[i, :length] = mask - - return padded_ids, padded_mask - - -class TabularDataset(Dataset): - """ - PyTorch Dataset wrapper for tabular data. - - Stores pre-tokenized data for efficient batch loading during training. - """ - - def __init__( - self, - df: pd.DataFrame, - tokenizer: TabularTokenizer, - columns: Optional[list[str]] = None, - max_columns: int = 64, - ): - """ - Initialize dataset. - - Args: - df: pandas DataFrame with categorical data - tokenizer: TabularTokenizer instance - columns: Column names to use (defaults to categorical columns) - max_columns: Maximum number of columns - """ - self.tokenizer = tokenizer - - # Select categorical columns if not specified - if columns is None: - columns = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() - - self.columns = columns[:max_columns] - - # Tokenize entire dataframe - self.value_ids, self.attention_mask, self.columns = tokenizer.tokenize_dataframe( - df, self.columns, max_columns - ) - - def __len__(self) -> int: - return len(self.value_ids) - - def __getitem__(self, idx: int) -> dict[str, torch.Tensor]: - return { - "value_ids": self.value_ids[idx], - "attention_mask": self.attention_mask[idx], - } - - -class MultiTableDataset(Dataset): - """ - Dataset that samples rows from multiple tables. - - For pre-training on diverse tabular data (e.g., T4 dataset). - Each table may have different columns. - """ - - def __init__( - self, - tables: list[pd.DataFrame], - tokenizer: TabularTokenizer, - max_columns: int = 64, - rows_per_table: Optional[int] = None, - ): - """ - Initialize multi-table dataset. - - Args: - tables: List of DataFrames - tokenizer: TabularTokenizer instance - max_columns: Maximum columns per table - rows_per_table: Max rows to sample per table (None = all) - """ - self.tokenizer = tokenizer - self.max_columns = max_columns - self.samples = [] # List of (value_ids, attention_mask) - - for df in tables: - # Select categorical columns - cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() - if len(cat_cols) < 2: - continue # Skip tables with too few categorical columns - - cat_cols = cat_cols[:max_columns] - - # Sample rows if needed - if rows_per_table is not None and len(df) > rows_per_table: - df = df.sample(n=rows_per_table, random_state=42) - - # Tokenize - value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(df, cat_cols, max_columns) - - # Add each row as a sample - for i in range(len(value_ids)): - self.samples.append((value_ids[i], attention_mask[i])) - - def __len__(self) -> int: - return len(self.samples) - - def __getitem__(self, idx: int) -> dict[str, torch.Tensor]: - value_ids, attention_mask = self.samples[idx] - return { - "value_ids": value_ids, - "attention_mask": attention_mask, - } - - -def apply_masking( - value_ids: torch.Tensor, - attention_mask: torch.Tensor, - mask_prob: float = 0.15, - random_prob: float = 0.1, - keep_prob: float = 0.1, - vocab_size: int = 32768, -) -> tuple[torch.Tensor, torch.Tensor]: - """ - Apply BERT-style masking to value IDs for MCM pre-training. - - Of the selected positions: - - 80% are replaced with [MASK] - - 10% are replaced with a random token - - 10% are kept as original - - Args: - value_ids: (batch_size, seq_len) original value IDs - attention_mask: (batch_size, seq_len) True for real tokens - mask_prob: Probability of selecting a position for masking - random_prob: Of selected positions, probability of random replacement - keep_prob: Of selected positions, probability of keeping original - vocab_size: Size of vocabulary for random replacement - - Returns: - Tuple of: - - masked_ids: (batch_size, seq_len) with masking applied - - labels: (batch_size, seq_len) original IDs at masked positions, -100 elsewhere - """ - device = value_ids.device - batch_size, seq_len = value_ids.shape - - # Only mask positions where attention_mask is True (real tokens) - can_mask = attention_mask.clone() - - # Randomly select positions to mask - mask_probs = torch.full_like(value_ids, mask_prob, dtype=torch.float) - mask_probs = mask_probs * can_mask.float() - selected = torch.bernoulli(mask_probs).bool() - - # Create labels: original values at selected positions, -100 elsewhere - labels = value_ids.clone() - labels[~selected] = -100 - - # Create masked input - masked_ids = value_ids.clone() - - # Determine which selected positions get which treatment - # 80% -> [MASK], 10% -> random, 10% -> keep original - rand_vals = torch.rand(batch_size, seq_len, device=device) - - # [MASK] replacement (80% of selected) - mask_replace = selected & (rand_vals < (1 - random_prob - keep_prob)) - masked_ids[mask_replace] = MASK_TOKEN_ID - - # Random replacement (10% of selected) - random_replace = selected & (rand_vals >= (1 - random_prob - keep_prob)) & (rand_vals < (1 - keep_prob)) - random_tokens = torch.randint( - SPECIAL_TOKENS_COUNT, - vocab_size + SPECIAL_TOKENS_COUNT, - (batch_size, seq_len), - device=device, - ) - masked_ids[random_replace] = random_tokens[random_replace] - - # Keep original (10% of selected) - already in masked_ids - - return masked_ids, labels - - -def mask_target_column( - value_ids: torch.Tensor, - attention_mask: torch.Tensor, - target_column_idx: int, -) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - """ - Mask a specific column for classification inference. - - Args: - value_ids: (batch_size, seq_len) original value IDs - attention_mask: (batch_size, seq_len) attention mask - target_column_idx: Index of column to mask - - Returns: - Tuple of: - - masked_ids: (batch_size, seq_len) with target column masked - - labels: (batch_size, seq_len) original IDs at target position, -100 elsewhere - - target_positions: (batch_size,) indices of target column - """ - batch_size = value_ids.size(0) - - # Create masked input - masked_ids = value_ids.clone() - masked_ids[:, target_column_idx] = MASK_TOKEN_ID - - # Create labels - labels = torch.full_like(value_ids, -100) - labels[:, target_column_idx] = value_ids[:, target_column_idx] - - # Target positions - target_positions = torch.full((batch_size,), target_column_idx, dtype=torch.long, device=value_ids.device) - - return masked_ids, labels, target_positions - - -def collate_fn(batch: list[dict[str, torch.Tensor]]) -> dict[str, torch.Tensor]: - """ - Collate function for DataLoader. - - Pads sequences to the same length within a batch. - - Args: - batch: List of samples from TabularDataset - - Returns: - Batched and padded tensors - """ - value_ids_list = [item["value_ids"] for item in batch] - attention_masks_list = [item["attention_mask"] for item in batch] - - # Find max length in batch - max_length = max(v.size(0) for v in value_ids_list) - - # Pad - batch_size = len(batch) - padded_ids = torch.full((batch_size, max_length), PAD_TOKEN_ID, dtype=torch.long) - padded_mask = torch.zeros(batch_size, max_length, dtype=torch.bool) - - for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): - length = ids.size(0) - padded_ids[i, :length] = ids - padded_mask[i, :length] = mask - - return { - "value_ids": padded_ids, - "attention_mask": padded_mask, - } diff --git a/mostlyai/engine/_tabular/transformer_training.py b/mostlyai/engine/_tabular/transformer_training.py deleted file mode 100644 index a1ea33f6..00000000 --- a/mostlyai/engine/_tabular/transformer_training.py +++ /dev/null @@ -1,463 +0,0 @@ -""" -Training utilities for the Foundational Tabular Transformer. - -Provides pre-training (MCM) and classification functions. -""" - -import logging -from pathlib import Path -from typing import Iterator, Optional - -import pandas as pd -import torch -from torch.optim import AdamW -from torch.optim.lr_scheduler import CosineAnnealingLR -from torch.utils.data import DataLoader -from tqdm import tqdm - -from .transformer import ( - FoundationalTabularTransformer, - TabularTransformerConfig, - create_model, -) -from .transformer_data import ( - MultiTableDataset, - TabularDataset, - TabularTokenizer, - apply_masking, - collate_fn, - mask_target_column, -) - -_LOG = logging.getLogger(__name__) - - -def pretrain_mcm( - model: FoundationalTabularTransformer, - tables: list[pd.DataFrame], - num_epochs: int = 10, - batch_size: int = 256, - mask_prob: float = 0.15, - learning_rate: float = 1e-4, - weight_decay: float = 0.01, - warmup_steps: int = 1000, - max_columns: int = 64, - device: Optional[str] = None, - checkpoint_dir: Optional[Path] = None, - log_every: int = 100, -) -> FoundationalTabularTransformer: - """ - Pre-train model with Masked Cell Modeling (MCM) on multiple tables. - - Args: - model: The transformer model to train - tables: List of DataFrames to train on - num_epochs: Number of training epochs - batch_size: Batch size - mask_prob: Probability of masking each cell - learning_rate: Peak learning rate - weight_decay: AdamW weight decay - warmup_steps: Linear warmup steps - max_columns: Maximum columns per table - device: Device to train on (defaults to CUDA if available) - checkpoint_dir: Directory to save checkpoints - log_every: Log metrics every N steps - - Returns: - Trained model - """ - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - - model = model.to(device) - model.train() - - # Create tokenizer and dataset - tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) - dataset = MultiTableDataset(tables, tokenizer, max_columns=max_columns) - - if len(dataset) == 0: - raise ValueError("No valid samples found in tables. Ensure tables have categorical columns.") - - _LOG.info(f"Pre-training on {len(dataset)} samples from {len(tables)} tables") - - dataloader = DataLoader( - dataset, - batch_size=batch_size, - shuffle=True, - collate_fn=collate_fn, - num_workers=0, # Keep simple for now - ) - - # Optimizer and scheduler - optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) - total_steps = len(dataloader) * num_epochs - scheduler = CosineAnnealingLR(optimizer, T_max=total_steps) - - _LOG.info(f"Training config: {num_epochs} epochs, {len(dataloader)} batches/epoch, {total_steps} total steps") - _LOG.info(f"Batch size: {batch_size}, Learning rate: {learning_rate}, Device: {device}") - - # Training loop - global_step = 0 - for epoch in range(num_epochs): - epoch_loss = 0.0 - num_batches = 0 - - pbar = tqdm(dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}", leave=True) - for batch in pbar: - value_ids = batch["value_ids"].to(device) - attention_mask = batch["attention_mask"].to(device) - - # Apply masking - masked_ids, labels = apply_masking( - value_ids, - attention_mask, - mask_prob=mask_prob, - vocab_size=model.config.hash_vocab_size, - ) - - # Skip batch if no tokens were masked (would cause NaN loss) - num_masked = (labels != -100).sum() - if num_masked == 0: - continue - - # Forward pass - optimizer.zero_grad() - output = model(masked_ids, attention_mask, labels=labels) - loss = output["loss"] - - # Skip if loss is NaN (shouldn't happen now but safety check) - if torch.isnan(loss): - _LOG.warning("NaN loss detected, skipping batch") - continue - - # Backward pass - loss.backward() - torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) - optimizer.step() - scheduler.step() - - epoch_loss += loss.item() - num_batches += 1 - global_step += 1 - - # Update progress bar - avg_loss = epoch_loss / num_batches - pbar.set_postfix({"loss": f"{avg_loss:.4f}", "lr": f"{scheduler.get_last_lr()[0]:.2e}"}) - - # End of epoch logging - avg_epoch_loss = epoch_loss / max(num_batches, 1) - _LOG.info(f"Epoch {epoch + 1}/{num_epochs} completed. Average loss: {avg_epoch_loss:.4f}") - - # Save checkpoint - if checkpoint_dir is not None: - checkpoint_path = checkpoint_dir / f"checkpoint_epoch_{epoch + 1}.pt" - torch.save( - { - "epoch": epoch + 1, - "model_state_dict": model.state_dict(), - "optimizer_state_dict": optimizer.state_dict(), - "loss": avg_epoch_loss, - "config": model.config, - }, - checkpoint_path, - ) - _LOG.info(f"Saved checkpoint to {checkpoint_path}") - - return model - - -def pretrain_mcm_streaming( - model: FoundationalTabularTransformer, - table_iterator: Iterator[pd.DataFrame], - num_steps: int = 10000, - batch_size: int = 256, - mask_prob: float = 0.15, - learning_rate: float = 1e-4, - weight_decay: float = 0.01, - max_columns: int = 64, - device: Optional[str] = None, - checkpoint_dir: Optional[Path] = None, - log_every: int = 100, - checkpoint_every: int = 1000, -) -> FoundationalTabularTransformer: - """ - Pre-train model with MCM using a streaming table iterator. - - Designed for large datasets like T4 that don't fit in memory. - - Args: - model: The transformer model to train - table_iterator: Iterator yielding DataFrames (e.g., from HuggingFace streaming) - num_steps: Total training steps - batch_size: Batch size - mask_prob: Probability of masking each cell - learning_rate: Learning rate - weight_decay: AdamW weight decay - max_columns: Maximum columns per table - device: Device to train on - checkpoint_dir: Directory to save checkpoints - log_every: Log metrics every N steps - checkpoint_every: Save checkpoint every N steps - - Returns: - Trained model - """ - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - - model = model.to(device) - model.train() - - tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) - optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) - scheduler = CosineAnnealingLR(optimizer, T_max=num_steps) - - # Buffer for accumulating samples - sample_buffer = [] - running_loss = 0.0 - steps_since_log = 0 - - _LOG.info(f"Starting streaming pre-training for {num_steps} steps") - _LOG.info(f"Batch size: {batch_size}, Learning rate: {learning_rate}, Device: {device}") - - pbar = tqdm(range(num_steps), desc="Pre-training", leave=True) - for step in pbar: - # Fill buffer if needed - while len(sample_buffer) < batch_size: - try: - table = next(table_iterator) - except StopIteration: - _LOG.warning("Table iterator exhausted") - break - - # Filter to categorical columns - cat_cols = table.select_dtypes(include=["object", "string", "category"]).columns.tolist() - if len(cat_cols) < 2: - continue - - cat_cols = cat_cols[:max_columns] - - # Tokenize and add to buffer - value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(table, cat_cols, max_columns) - for i in range(len(value_ids)): - sample_buffer.append((value_ids[i], attention_mask[i])) - - if len(sample_buffer) < batch_size: - _LOG.warning(f"Buffer has only {len(sample_buffer)} samples, ending training") - break - - # Sample a batch - batch_samples = sample_buffer[:batch_size] - sample_buffer = sample_buffer[batch_size:] - - # Prepare batch tensors - value_ids_list = [s[0] for s in batch_samples] - attention_masks_list = [s[1] for s in batch_samples] - - # Pad to same length - max_len = max(v.size(0) for v in value_ids_list) - value_ids = torch.zeros(batch_size, max_len, dtype=torch.long) - attention_mask = torch.zeros(batch_size, max_len, dtype=torch.bool) - - for i, (ids, mask) in enumerate(zip(value_ids_list, attention_masks_list)): - length = ids.size(0) - value_ids[i, :length] = ids - attention_mask[i, :length] = mask - - value_ids = value_ids.to(device) - attention_mask = attention_mask.to(device) - - # Apply masking - masked_ids, labels = apply_masking( - value_ids, attention_mask, mask_prob=mask_prob, vocab_size=model.config.hash_vocab_size - ) - - # Skip batch if no tokens were masked - num_masked = (labels != -100).sum() - if num_masked == 0: - continue - - # Forward pass - optimizer.zero_grad() - output = model(masked_ids, attention_mask, labels=labels) - loss = output["loss"] - - # Skip if loss is NaN - if torch.isnan(loss): - _LOG.warning("NaN loss detected, skipping batch") - continue - - # Backward pass - loss.backward() - torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) - optimizer.step() - scheduler.step() - - running_loss += loss.item() - steps_since_log += 1 - - # Update progress bar - if steps_since_log > 0: - avg_loss = running_loss / steps_since_log - pbar.set_postfix({"loss": f"{avg_loss:.4f}", "lr": f"{scheduler.get_last_lr()[0]:.2e}", "buf": len(sample_buffer)}) - - # Reset running stats periodically - if (step + 1) % log_every == 0: - running_loss = 0.0 - steps_since_log = 0 - - # Checkpoint - if checkpoint_dir is not None and (step + 1) % checkpoint_every == 0: - checkpoint_path = checkpoint_dir / f"checkpoint_step_{step + 1}.pt" - torch.save( - { - "step": step + 1, - "model_state_dict": model.state_dict(), - "optimizer_state_dict": optimizer.state_dict(), - "config": model.config, - }, - checkpoint_path, - ) - _LOG.info(f"Saved checkpoint to {checkpoint_path}") - - return model - - -def classify( - model: FoundationalTabularTransformer, - df: pd.DataFrame, - target_column: str, - target_classes: list[str], - columns: Optional[list[str]] = None, - device: Optional[str] = None, -) -> pd.DataFrame: - """ - Classify rows by masking target column and predicting. - - Args: - model: Trained transformer model - df: DataFrame with features (target column can have any values) - target_column: Name of column to predict - target_classes: List of possible class values - columns: Feature columns to use (defaults to all categorical) - device: Device for inference - - Returns: - DataFrame with probability columns for each class - """ - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - - model = model.to(device) - model.eval() - - tokenizer = TabularTokenizer(vocab_size=model.config.hash_vocab_size) - - # Determine columns - if columns is None: - columns = df.select_dtypes(include=["object", "string", "category"]).columns.tolist() - - # Ensure target column is in columns - if target_column not in columns: - columns = columns + [target_column] - - columns = columns[: model.config.max_columns] - - # Find target column index - target_idx = columns.index(target_column) - - # Pre-compute hash IDs for target classes - class_hash_ids = {cls: tokenizer.hash_value(str(cls), target_column) for cls in target_classes} - - # Tokenize data - value_ids, attention_mask, _ = tokenizer.tokenize_dataframe(df, columns, model.config.max_columns) - value_ids = value_ids.to(device) - attention_mask = attention_mask.to(device) - - # Mask target column - masked_ids, _, target_positions = mask_target_column(value_ids, attention_mask, target_idx) - - # Get predictions - with torch.no_grad(): - probs = model.predict_masked(masked_ids, attention_mask, target_positions) - - # Extract probabilities for target classes - probs_np = probs.cpu().numpy() - result_data = {} - for cls in target_classes: - hash_id = class_hash_ids[cls] - result_data[f"prob_{cls}"] = probs_np[:, hash_id] - - return pd.DataFrame(result_data) - - -def evaluate_classification( - model: FoundationalTabularTransformer, - df: pd.DataFrame, - target_column: str, - columns: Optional[list[str]] = None, - device: Optional[str] = None, -) -> dict[str, float]: - """ - Evaluate classification accuracy on a labeled dataset. - - Args: - model: Trained transformer model - df: DataFrame with features and true labels - target_column: Name of target column - columns: Feature columns to use - device: Device for inference - - Returns: - Dictionary with accuracy metrics - """ - # Get unique classes from data - target_classes = df[target_column].dropna().unique().tolist() - - # Get predictions - probs_df = classify(model, df, target_column, target_classes, columns, device) - - # Get predicted class (highest probability) - prob_cols = [f"prob_{cls}" for cls in target_classes] - predicted_idx = probs_df[prob_cols].values.argmax(axis=1) - predicted_classes = [target_classes[i] for i in predicted_idx] - - # Calculate accuracy - true_labels = df[target_column].tolist() - correct = sum(p == t for p, t in zip(predicted_classes, true_labels)) - accuracy = correct / len(true_labels) - - return { - "accuracy": accuracy, - "num_samples": len(true_labels), - "num_classes": len(target_classes), - } - - -def save_model(model: FoundationalTabularTransformer, path: Path): - """Save model to disk.""" - torch.save( - { - "model_state_dict": model.state_dict(), - "config": model.config, - }, - path, - ) - _LOG.info(f"Model saved to {path}") - - -def load_model(path: Path, device: Optional[str] = None) -> FoundationalTabularTransformer: - """Load model from disk.""" - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - - checkpoint = torch.load(path, map_location=device) - config = checkpoint["config"] - - model = FoundationalTabularTransformer(config) - model.load_state_dict(checkpoint["model_state_dict"]) - model = model.to(device) - - _LOG.info(f"Model loaded from {path}") - return model diff --git a/scripts/foundational/test_transformer_synthetic.py b/scripts/foundational/test_transformer_synthetic.py deleted file mode 100644 index a5a704b6..00000000 --- a/scripts/foundational/test_transformer_synthetic.py +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env python -""" -Test the Foundational Tabular Transformer with synthetic data. - -This script validates the model works end-to-end without requiring T4 access. -""" - -import logging -import random - -import pandas as pd -import torch - -from mostlyai.engine._tabular.transformer import create_model -from mostlyai.engine._tabular.transformer_training import ( - classify, - evaluate_classification, - pretrain_mcm, -) - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) -_LOG = logging.getLogger(__name__) - - -def generate_synthetic_tables(num_tables: int = 20, rows_per_table: int = 100) -> list[pd.DataFrame]: - """Generate synthetic categorical tables for testing.""" - tables = [] - - # Different "schemas" to simulate diverse tables - schemas = [ - { - "columns": ["color", "size", "shape", "material"], - "values": { - "color": ["red", "blue", "green", "yellow", "black", "white"], - "size": ["small", "medium", "large", "xlarge"], - "shape": ["round", "square", "triangle", "oval"], - "material": ["wood", "metal", "plastic", "glass"], - }, - }, - { - "columns": ["country", "city", "weather", "season"], - "values": { - "country": ["USA", "UK", "Germany", "France", "Japan", "Brazil"], - "city": ["New York", "London", "Berlin", "Paris", "Tokyo", "Rio"], - "weather": ["sunny", "rainy", "cloudy", "snowy", "windy"], - "season": ["spring", "summer", "fall", "winter"], - }, - }, - { - "columns": ["animal", "habitat", "diet", "size_class"], - "values": { - "animal": ["lion", "eagle", "shark", "elephant", "snake", "wolf"], - "habitat": ["forest", "ocean", "desert", "jungle", "arctic"], - "diet": ["carnivore", "herbivore", "omnivore"], - "size_class": ["tiny", "small", "medium", "large", "huge"], - }, - }, - { - "columns": ["food", "cuisine", "taste", "temperature"], - "values": { - "food": ["pizza", "sushi", "tacos", "curry", "pasta", "burger"], - "cuisine": ["italian", "japanese", "mexican", "indian", "american"], - "taste": ["sweet", "salty", "sour", "spicy", "savory"], - "temperature": ["hot", "cold", "warm", "room_temp"], - }, - }, - { - "columns": ["genre", "mood", "tempo", "instrument"], - "values": { - "genre": ["rock", "jazz", "classical", "electronic", "hip-hop"], - "mood": ["happy", "sad", "energetic", "calm", "angry"], - "tempo": ["slow", "medium", "fast", "very_fast"], - "instrument": ["guitar", "piano", "drums", "violin", "synth"], - }, - }, - ] - - for i in range(num_tables): - schema = random.choice(schemas) - data = {} - for col in schema["columns"]: - data[col] = [random.choice(schema["values"][col]) for _ in range(rows_per_table)] - tables.append(pd.DataFrame(data)) - - return tables - - -def generate_classification_dataset(num_samples: int = 200) -> pd.DataFrame: - """Generate a dataset with a predictable target for classification testing.""" - data = { - "feature1": [], - "feature2": [], - "feature3": [], - "target": [], - } - - # Create a simple rule: target depends on feature1 and feature2 - feature1_values = ["A", "B", "C"] - feature2_values = ["X", "Y", "Z"] - feature3_values = ["P", "Q", "R", "S"] - - for _ in range(num_samples): - f1 = random.choice(feature1_values) - f2 = random.choice(feature2_values) - f3 = random.choice(feature3_values) - - # Simple rule: if f1 is A and f2 is X, target is "positive" - # Otherwise, mostly "negative" with some noise - if f1 == "A" and f2 == "X": - target = "positive" if random.random() < 0.9 else "negative" - elif f1 == "B" and f2 == "Y": - target = "positive" if random.random() < 0.7 else "negative" - else: - target = "negative" if random.random() < 0.8 else "positive" - - data["feature1"].append(f1) - data["feature2"].append(f2) - data["feature3"].append(f3) - data["target"].append(target) - - return pd.DataFrame(data) - - -def main(): - device = "cuda" if torch.cuda.is_available() else "cpu" - _LOG.info(f"Using device: {device}") - - # Step 1: Create model - _LOG.info("Creating small model...") - model = create_model("small") - num_params = sum(p.numel() for p in model.parameters()) - _LOG.info(f"Model has {num_params:,} parameters") - - # Step 2: Generate synthetic pre-training data - _LOG.info("Generating synthetic tables for pre-training...") - tables = generate_synthetic_tables(num_tables=50, rows_per_table=100) - _LOG.info(f"Generated {len(tables)} tables with ~100 rows each") - - # Step 3: Pre-train with MCM - _LOG.info("Pre-training with Masked Cell Modeling...") - model = pretrain_mcm( - model=model, - tables=tables, - num_epochs=3, - batch_size=32, - learning_rate=1e-3, - device=device, - log_every=50, - ) - _LOG.info("Pre-training complete!") - - # Step 4: Generate classification test data - _LOG.info("Generating classification dataset...") - train_df = generate_classification_dataset(num_samples=500) - test_df = generate_classification_dataset(num_samples=100) - - _LOG.info(f"Train set: {len(train_df)} samples") - _LOG.info(f"Test set: {len(test_df)} samples") - _LOG.info(f"Target distribution (train): {train_df['target'].value_counts().to_dict()}") - - # Step 5: Evaluate classification (zero-shot after pre-training) - _LOG.info("Evaluating zero-shot classification...") - metrics = evaluate_classification( - model=model, - df=test_df, - target_column="target", - device=device, - ) - _LOG.info(f"Zero-shot accuracy: {metrics['accuracy']:.2%}") - - # Step 6: Fine-tune on classification task (optional - just more MCM on task data) - _LOG.info("Fine-tuning on classification data...") - model = pretrain_mcm( - model=model, - tables=[train_df], - num_epochs=5, - batch_size=32, - learning_rate=5e-4, - device=device, - log_every=50, - ) - - # Step 7: Evaluate after fine-tuning - _LOG.info("Evaluating after fine-tuning...") - metrics = evaluate_classification( - model=model, - df=test_df, - target_column="target", - device=device, - ) - _LOG.info(f"Fine-tuned accuracy: {metrics['accuracy']:.2%}") - - # Step 8: Show sample predictions - _LOG.info("\nSample predictions:") - sample = test_df.head(5).copy() - probs = classify( - model=model, - df=sample, - target_column="target", - target_classes=["positive", "negative"], - device=device, - ) - - for i in range(len(sample)): - true_label = sample.iloc[i]["target"] - prob_pos = probs.iloc[i]["prob_positive"] - prob_neg = probs.iloc[i]["prob_negative"] - predicted = "positive" if prob_pos > prob_neg else "negative" - _LOG.info( - f" Row {i}: f1={sample.iloc[i]['feature1']}, f2={sample.iloc[i]['feature2']} " - f"-> pred={predicted} (p={max(prob_pos, prob_neg):.2f}), true={true_label}" - ) - - _LOG.info("\nTest complete!") - - -if __name__ == "__main__": - main() diff --git a/scripts/foundational/train_transformer_t4.py b/scripts/foundational/train_transformer_t4.py deleted file mode 100644 index 2cd9fb64..00000000 --- a/scripts/foundational/train_transformer_t4.py +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/bin/env python -""" -Train the Foundational Tabular Transformer on the T4 dataset. - -Usage: - python scripts/foundational/train_transformer_t4.py --num_steps 1000 --batch_size 64 - -Requirements: - pip install huggingface_hub pyarrow -""" - -import argparse -import io -import logging -import zipfile -from pathlib import Path - -import pandas as pd -import pyarrow.parquet as pq -import torch -from huggingface_hub import HfFileSystem - -from mostlyai.engine._tabular.transformer import create_model -from mostlyai.engine._tabular.transformer_training import ( - pretrain_mcm_streaming, - save_model, -) - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -_LOG = logging.getLogger(__name__) - - -def t4_table_iterator(max_tables: int | None = None): - """ - Iterator that yields DataFrames from the T4 dataset. - - Each parquet file in T4 is a separate table with its own schema. - We load them individually using HfFileSystem to avoid schema conflicts. - - Args: - max_tables: Maximum number of tables to yield (None = unlimited) - - Yields: - pandas DataFrames with categorical columns only - """ - _LOG.info("Connecting to T4 dataset via HfFileSystem...") - - fs = HfFileSystem() - base_path = "datasets/mlfoundations/t4-full" - - # List all chunk zip files - try: - all_files = fs.ls(base_path, detail=False) - chunk_zips = sorted([f for f in all_files if f.endswith(".zip")]) - _LOG.info(f"Found {len(chunk_zips)} chunk zip files") - except Exception as e: - _LOG.error(f"Failed to list dataset: {e}") - raise - - tables_yielded = 0 - tables_skipped = 0 - - for chunk_path in chunk_zips: - chunk_name = chunk_path.split("/")[-1] - _LOG.info(f"Processing {chunk_name}...") - - try: - # Download and open the zip file - with fs.open(chunk_path, "rb") as f: - zip_data = f.read() - - with zipfile.ZipFile(io.BytesIO(zip_data)) as zf: - parquet_files = [n for n in zf.namelist() if n.endswith(".parquet")] - _LOG.info(f" Found {len(parquet_files)} parquet files in {chunk_name}") - - for pq_name in parquet_files: - try: - # Read parquet file from zip - with zf.open(pq_name) as pq_file: - pq_data = pq_file.read() - table = pq.read_table(io.BytesIO(pq_data)) - df = table.to_pandas() - - # Filter to categorical columns only (object/string dtype) - cat_cols = df.select_dtypes( - include=["object", "string", "category"] - ).columns.tolist() - - # Skip if too few categorical columns - if len(cat_cols) < 2: - tables_skipped += 1 - continue - - # Keep only categorical columns - df = df[cat_cols] - - # Skip empty or tiny tables - if len(df) < 10: - tables_skipped += 1 - continue - - yield df - tables_yielded += 1 - - if tables_yielded % 100 == 0: - _LOG.info( - f" Progress: yielded {tables_yielded} tables, skipped {tables_skipped}" - ) - - if max_tables is not None and tables_yielded >= max_tables: - _LOG.info(f"Reached max_tables limit: {max_tables}") - return - - except Exception as e: - _LOG.debug(f" Failed to read {pq_name}: {e}") - tables_skipped += 1 - continue - - except Exception as e: - _LOG.warning(f"Failed to process {chunk_name}: {e}") - continue - - _LOG.info(f"Finished: yielded {tables_yielded} tables, skipped {tables_skipped}") - - -def main(): - parser = argparse.ArgumentParser(description="Train transformer on T4 dataset") - parser.add_argument("--num_steps", type=int, default=10000, help="Number of training steps") - parser.add_argument("--batch_size", type=int, default=64, help="Batch size") - parser.add_argument("--learning_rate", type=float, default=1e-4, help="Learning rate") - parser.add_argument("--model_size", type=str, default="small", choices=["small", "medium", "large"]) - parser.add_argument("--max_tables", type=int, default=None, help="Max tables to use (for testing)") - parser.add_argument("--checkpoint_dir", type=str, default="checkpoints", help="Checkpoint directory") - parser.add_argument("--log_every", type=int, default=100, help="Log every N steps") - parser.add_argument("--checkpoint_every", type=int, default=1000, help="Save checkpoint every N steps") - parser.add_argument("--device", type=str, default=None, help="Device (cuda/cpu)") - args = parser.parse_args() - - # Setup - checkpoint_dir = Path(args.checkpoint_dir) - checkpoint_dir.mkdir(parents=True, exist_ok=True) - - device = args.device - if device is None: - device = "cuda" if torch.cuda.is_available() else "cpu" - _LOG.info(f"Using device: {device}") - - # Create model - _LOG.info(f"Creating {args.model_size} model...") - model = create_model(args.model_size) - num_params = sum(p.numel() for p in model.parameters()) - _LOG.info(f"Model has {num_params:,} parameters") - - # Create table iterator - table_iter = t4_table_iterator(max_tables=args.max_tables) - - # Pre-train - _LOG.info(f"Starting pre-training for {args.num_steps} steps...") - model = pretrain_mcm_streaming( - model=model, - table_iterator=table_iter, - num_steps=args.num_steps, - batch_size=args.batch_size, - learning_rate=args.learning_rate, - device=device, - checkpoint_dir=checkpoint_dir, - log_every=args.log_every, - checkpoint_every=args.checkpoint_every, - ) - - # Save final model - final_path = checkpoint_dir / "model_final.pt" - save_model(model, final_path) - _LOG.info(f"Training complete! Model saved to {final_path}") - - -if __name__ == "__main__": - main() From 41298e294d822c9df0e853ff2c4ca634c59bad47 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 12:56:38 +0100 Subject: [PATCH 06/11] wip --- mostlyai/engine/_tabular/probability.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mostlyai/engine/_tabular/probability.py b/mostlyai/engine/_tabular/probability.py index 1e8074aa..5f66303e 100644 --- a/mostlyai/engine/_tabular/probability.py +++ b/mostlyai/engine/_tabular/probability.py @@ -366,9 +366,6 @@ def predict_proba( # Get seed column names (needed for _generate_marginal_probs) seed_columns = list(seed_data.columns) - # Note: Column order validation is now handled in the interface layer (TabularARGN.predict_proba) - # to centralize validation logic with sample() method - # Encode seed data (features to condition on) - common for both single and multi-target # seed_data should NOT include any target columns seed_encoded, _, _ = encode_df( From 3fa952139a71095a4014a7b911485c9a631b89d4 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 13:27:31 +0100 Subject: [PATCH 07/11] wip --- mostlyai/engine/_tabular/generation.py | 1 + mostlyai/engine/_tabular/interface.py | 53 ------------------------- mostlyai/engine/_tabular/probability.py | 1 - 3 files changed, 1 insertion(+), 54 deletions(-) diff --git a/mostlyai/engine/_tabular/generation.py b/mostlyai/engine/_tabular/generation.py index e36426bb..c8ec78f6 100644 --- a/mostlyai/engine/_tabular/generation.py +++ b/mostlyai/engine/_tabular/generation.py @@ -820,6 +820,7 @@ def generate( if not enable_flexible_generation: check_column_order(gen_column_order, trn_column_order) + _LOG.info(f"{rare_category_replacement_method=}") rare_token_fixed_probs = fix_rare_token_probs(tgt_stats, rare_category_replacement_method) imputation_fixed_probs = _fix_imputation_probs(tgt_stats, imputation) diff --git a/mostlyai/engine/_tabular/interface.py b/mostlyai/engine/_tabular/interface.py index b322d698..4cf11930 100644 --- a/mostlyai/engine/_tabular/interface.py +++ b/mostlyai/engine/_tabular/interface.py @@ -32,15 +32,12 @@ from mostlyai.engine._common import ( ensure_dataframe, - get_cardinalities, - get_columns_from_cardinalities, list_fn, load_generated_data, mean_fn, median_fn, mode_fn, ) -from mostlyai.engine._tabular.common import get_argn_column_names from mostlyai.engine._tabular.probability import log_prob as _log_prob from mostlyai.engine._tabular.probability import predict_proba as _predict_proba from mostlyai.engine._workspace import Workspace @@ -287,49 +284,6 @@ def __del__(self): """Fallback cleanup if context manager wasn't used.""" self.close() - def _validate_column_order( - self, - data: pd.DataFrame, - workspace_dir: str | Path, - target_columns: list[str] | None = None, - ) -> None: - """ - Validate that data columns (and optionally target columns) appear in training order. - - Args: - data: DataFrame with seed columns to validate - workspace_dir: Path to workspace containing training statistics - target_columns: Optional list of target column names. If provided, validates - the combined seed+target column order. If None, validates only seed columns. - - Raises: - ValueError: If column order doesn't match training order and enable_flexible_generation=False - """ - if not self.enable_flexible_generation: - workspace = Workspace(workspace_dir) - tgt_stats = workspace.tgt_stats.read() - tgt_cardinalities = get_cardinalities(tgt_stats) - all_columns = get_columns_from_cardinalities(tgt_cardinalities) - - # Convert data columns to ARGN format - data_df = ensure_dataframe(data) - seed_columns_argn = get_argn_column_names(tgt_stats["columns"], list(data_df.columns)) - - # If target_columns provided, validate combined seed+target order - if target_columns is not None: - target_columns_argn = get_argn_column_names(tgt_stats["columns"], target_columns) - columns_to_check = seed_columns_argn + target_columns_argn - else: - columns_to_check = seed_columns_argn - - # Check that columns appear in training order - expected_order = [col for col in all_columns if col in columns_to_check] - if columns_to_check != expected_order: - raise ValueError( - "Column order does not match training order. " - "A change in column order is only permitted for models that were trained with `enable_flexible_generation=True`." - ) - def sample( self, n_samples: int | None = None, @@ -395,10 +349,6 @@ def sample( if n_samples is None: n_samples = 1 - # Validate seed_data column order when flexible generation is disabled - if seed_data is not None: - self._validate_column_order(seed_data, workspace_dir) - # Generate synthetic data using configured parameters generate( ctx_data=ctx_data_df, @@ -756,9 +706,6 @@ def predict_proba( _LOG.info(f"Dropping target columns from seed data: {target_cols_in_X}") X_df = X_df.drop(columns=target_cols_in_X) - # Validate seed+target column order when flexible generation is disabled - self._validate_column_order(X_df, self.workspace_dir, target_columns=target_columns) - # Call new predict_proba utility that returns probabilities in-memory workspace = Workspace(self.workspace_dir) diff --git a/mostlyai/engine/_tabular/probability.py b/mostlyai/engine/_tabular/probability.py index 5f66303e..11c86ed8 100644 --- a/mostlyai/engine/_tabular/probability.py +++ b/mostlyai/engine/_tabular/probability.py @@ -559,7 +559,6 @@ def log_prob( device=device, ) - # Check column order of input data when flexible generation is disabled if not enable_flexible_generation: check_column_order(list(data.columns), all_columns) From 0a3100193f3659bef2766ac63f64a62482f88b5c Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 13:33:08 +0100 Subject: [PATCH 08/11] wip --- mostlyai/engine/_tabular/probability.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/mostlyai/engine/_tabular/probability.py b/mostlyai/engine/_tabular/probability.py index 11c86ed8..69a7d708 100644 --- a/mostlyai/engine/_tabular/probability.py +++ b/mostlyai/engine/_tabular/probability.py @@ -366,6 +366,14 @@ def predict_proba( # Get seed column names (needed for _generate_marginal_probs) seed_columns = list(seed_data.columns) + # Check column order when flexible generation is disabled + if not enable_flexible_generation: + seed_columns_argn = get_argn_column_names(tgt_stats["columns"], seed_columns) + target_columns_argn = get_argn_column_names(tgt_stats["columns"], target_columns) + columns_to_check = seed_columns_argn + target_columns_argn + expected_order = [col for col in all_columns if col in columns_to_check] + check_column_order(columns_to_check, expected_order) + # Encode seed data (features to condition on) - common for both single and multi-target # seed_data should NOT include any target columns seed_encoded, _, _ = encode_df( From aba2aad5e0a844c632c44d1f2e12f754ee2dcd9d Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 13:54:50 +0100 Subject: [PATCH 09/11] wip --- tests/end_to_end/test_tabular_interface.py | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/tests/end_to_end/test_tabular_interface.py b/tests/end_to_end/test_tabular_interface.py index 5d0bdaac..f6d82547 100644 --- a/tests/end_to_end/test_tabular_interface.py +++ b/tests/end_to_end/test_tabular_interface.py @@ -347,11 +347,15 @@ def test_predict_proba_multi_target( # Numeric binned values (may be bin labels or ranges) assert len(col_values) >= 3 # At least some bins present - def test_wrong_column_order_raises(self, classification_data, tmp_path_factory): + def test_wrong_column_order_raises(self, tmp_path_factory): """Test that wrong column order raises error when flexible generation is disabled.""" - data = classification_data - X = data[["feature1", "feature2"]] - y = data["target"] + data = pd.DataFrame( + { + "col_a": ["x", "y", "z"] * 20, + "col_b": ["p", "q", "r"] * 20, + "col_c": ["1", "2", "3"] * 20, + } + ) argn = TabularARGN( model="MOSTLY_AI/Small", @@ -360,16 +364,29 @@ def test_wrong_column_order_raises(self, classification_data, tmp_path_factory): enable_flexible_generation=False, workspace_dir=tmp_path_factory.mktemp("workspace"), ) - argn.fit(X=X, y=y) + argn.fit(X=data) + + # Wrong seed order for sample + X_wrong_seed = data.head(5)[["col_b", "col_a"]] # wrong: should be col_a, col_b + with pytest.raises(ValueError, match="(?i)column order.*does not match"): + argn.sample(n_samples=5, seed_data=X_wrong_seed) - # Reorder columns - X_reordered = X.head(5)[["feature2", "feature1"]] + # Wrong seed order for predict_proba + with pytest.raises(ValueError, match="(?i)column order.*does not match"): + argn.predict_proba(X_wrong_seed, target="col_c") + # Wrong seed order for predict with pytest.raises(ValueError, match="(?i)column order.*does not match"): - argn.predict_proba(X_reordered, target="target") + argn.predict(X_wrong_seed, target="col_c") + # Wrong target order for predict_proba (computes joint probabilities in order) + X_seed = data.head(5)[["col_a"]] with pytest.raises(ValueError, match="(?i)column order.*does not match"): - argn.sample(n_samples=10, seed_data=X_reordered) + argn.predict_proba(X_seed, target=["col_c", "col_b"]) # wrong: should be col_b, col_c + + # predict() doesn't require target order - it generates all columns and extracts targets + result = argn.predict(X_seed, target=["col_c", "col_b"]) + assert list(result.columns) == ["col_c", "col_b"] class TestTabularARGNRegression: From a927a6c84c86f1f2c43e10da62151969f3df4dd3 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 13:57:22 +0100 Subject: [PATCH 10/11] wip --- mostlyai/engine/_tabular/probability.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mostlyai/engine/_tabular/probability.py b/mostlyai/engine/_tabular/probability.py index 69a7d708..43c59a5e 100644 --- a/mostlyai/engine/_tabular/probability.py +++ b/mostlyai/engine/_tabular/probability.py @@ -363,10 +363,8 @@ def predict_proba( ) ) - # Get seed column names (needed for _generate_marginal_probs) seed_columns = list(seed_data.columns) - # Check column order when flexible generation is disabled if not enable_flexible_generation: seed_columns_argn = get_argn_column_names(tgt_stats["columns"], seed_columns) target_columns_argn = get_argn_column_names(tgt_stats["columns"], target_columns) @@ -567,6 +565,7 @@ def log_prob( device=device, ) + # Check column order of input data when flexible generation is disabled if not enable_flexible_generation: check_column_order(list(data.columns), all_columns) From 111b07747ed2b122318bada2f82238869d80f6b7 Mon Sep 17 00:00:00 2001 From: Antoine Bon Date: Tue, 16 Dec 2025 14:04:35 +0100 Subject: [PATCH 11/11] wip --- tests/end_to_end/test_tabular_interface.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/end_to_end/test_tabular_interface.py b/tests/end_to_end/test_tabular_interface.py index f6d82547..be6d9ee4 100644 --- a/tests/end_to_end/test_tabular_interface.py +++ b/tests/end_to_end/test_tabular_interface.py @@ -388,6 +388,10 @@ def test_wrong_column_order_raises(self, tmp_path_factory): result = argn.predict(X_seed, target=["col_c", "col_b"]) assert list(result.columns) == ["col_c", "col_b"] + # predict() works even with targets completely out of original order + result = argn.predict(X_seed, target=["col_c", "col_b", "col_a"]) + assert list(result.columns) == ["col_c", "col_b", "col_a"] + class TestTabularARGNRegression: """Test regression: predict numeric target."""