Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ⚡ Dask: Faster-than-Pandas (CPU Benchmark)\n",
"\n",
"### **Template Review**\n",
"This template demonstrates how to handle **larger-than-memory** datasets using **Dask** on a standard CPU environment. Optimized for **Saturn Cloud Jupyter Notebooks**, it provides a direct performance comparison between standard Pandas (single-core) and Dask (multi-core parallel processing).\n",
"\n",
"**Core Workflow:** We will generate a massive **2GB+ Synthetic Dataset** locally with complex data types (Dates, Strings). This ensures the workload is heavy enough to make Pandas struggle, highlighting Dask's ability to process data in chunks without crashing memory.\n",
"\n",
"### **Tech Stack**\n",
"* **Dask**: Parallel computing library for scaling Python.\n",
"* **Pandas**: Standard data analysis library (the baseline).\n",
"* **Infrastructure**: [Saturn Cloud](https://saturncloud.io/) CPU Jupyter Instance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install Dask with complete dependencies (Dashboard + Distributed)\n",
"# Quotes are used to prevent ZSH/Shell errors with brackets\n",
"!pip install \"dask[complete]\" pandas numpy -q"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 1: Generate Massive Synthetic Data (2GB)**\n",
"We generate **20 Million rows** of complex data. We include **Dates** and **String Categories** because these data types are computationally expensive and slow down Pandas significantly."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"from datetime import datetime, timedelta\n",
"\n",
"FILENAME = \"heavy_data.csv\"\n",
"NUM_ROWS = 20_000_000 # 20 Million Rows\n",
"CHUNK_SIZE = 1_000_000 # Process in chunks to save memory during creation\n",
"\n",
"def generate_data():\n",
" print(f\"🔨 Generating {NUM_ROWS} rows (Approx 2GB)... Please wait.\")\n",
" \n",
" # Categories for string complexity (Strings are slower than Ints)\n",
" categories = ['Electronics', 'Furniture', 'Clothing', 'Food', 'Auto']\n",
" start_date = datetime(2020, 1, 1)\n",
" \n",
" # Remove old file if exists\n",
" if os.path.exists(FILENAME):\n",
" os.remove(FILENAME)\n",
" \n",
" for i in range(0, NUM_ROWS, CHUNK_SIZE):\n",
" # Create complex data: Dates + Strings + Floats\n",
" df = pd.DataFrame({\n",
" 'transaction_id': np.arange(i, i + CHUNK_SIZE),\n",
" 'date': [start_date + timedelta(days=x % 365) for x in range(CHUNK_SIZE)],\n",
" 'category': np.random.choice(categories, CHUNK_SIZE),\n",
" 'amount': np.random.uniform(10.0, 500.0, CHUNK_SIZE),\n",
" 'discount': np.random.uniform(0.0, 0.3, CHUNK_SIZE)\n",
" })\n",
" \n",
" # Append to CSV\n",
" mode = 'a' if i > 0 else 'w'\n",
" header = (i == 0)\n",
" df.to_csv(FILENAME, index=False, mode=mode, header=header)\n",
" \n",
" if (i // CHUNK_SIZE) % 5 == 0:\n",
" print(f\"... Written {(i + CHUNK_SIZE) / 1_000_000:.0f} Million rows\")\n",
" \n",
" print(f\"✅ Done! File size: {os.path.getsize(FILENAME) / (1024**3):.2f} GB\")\n",
"\n",
"# Check if file exists and is big enough (>1.5GB)\n",
"if not os.path.exists(FILENAME) or os.path.getsize(FILENAME) < 1.5 * 1024**3:\n",
" generate_data()\n",
"else:\n",
" print(\"✅ Large file already exists. Skipping generation.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 2: Initialize Dask Client**\n",
"Dask uses a \"Client\" to manage parallel workers. Running this cell will provide a **Dashboard Link** where you can watch your CPU cores light up in real-time."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"import dask.dataframe as dd\n",
"import time\n",
"\n",
"# Start a local Dask cluster using all available CPU cores\n",
"client = Client()\n",
"print(f\"🚀 Dask Dashboard available at: {client.dashboard_link}\")\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 3: The Pandas Benchmark (The Struggle)**\n",
"We attempt to read the 2GB file and parse dates into memory. \n",
"\n",
"> **Note:** We use `parse_dates` specifically because it is an expensive operation that forces Pandas to work hard. This step usually takes **60-90 seconds** or triggers a MemoryError on smaller machines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"print(\"🐢 Pandas: Reading & Processing...\")\n",
"\n",
"try:\n",
" # We force date parsing to ensure CPU load is high\n",
" df_pd = pd.read_csv(FILENAME, parse_dates=['date'])\n",
" \n",
" # Complex GroupBy Operation\n",
" print(\"🐢 Pandas: Grouping & Aggregating...\")\n",
" res_pd = df_pd.groupby('category')['amount'].mean()\n",
" \n",
" pd_duration = time.time() - start_time\n",
" print(f\"⏱️ Pandas Time: {pd_duration:.2f} seconds\")\n",
"except MemoryError:\n",
" print(\"❌ Pandas Crashed (MemoryError)! This proves the data is too big for RAM.\")\n",
" pd_duration = float('inf')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 4: The Dask Benchmark (The Solution)**\n",
"Dask processes this **lazily**. It scans the file structure instantly and then processes chunks in parallel across all your CPU cores. It never loads the entire file at once."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"print(\"🐇 Dask: Lazy Read & Parallel Compute...\")\n",
"\n",
"# Dask handles date parsing efficiently across cores\n",
"ddf = dd.read_csv(FILENAME, parse_dates=['date'])\n",
"\n",
"# .compute() triggers the actual parallel execution\n",
"res_dask = ddf.groupby('category')['amount'].mean().compute()\n",
"\n",
"dask_duration = time.time() - start_time\n",
"print(f\"⏱️ Dask Time: {dask_duration:.2f} seconds\")\n",
"\n",
"# Calculate Speedup Factor\n",
"if pd_duration != float('inf'):\n",
" print(f\"\\n🚀 Speedup: {pd_duration / dask_duration:.2f}x Faster\")\n",
"else:\n",
" print(\"\\n🏆 Dask Wins (Pandas Crashed)\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 🏁 Conclusion & Next Steps\n",
"We have successfully demonstrated that **Dask** can handle heavy datasets that slow down standard Pandas workflows. By parallelizing the `read_csv` and `groupby` operations, Dask maximizes the utility of your **Saturn Cloud CPU** instance.\n",
"\n",
"### **Resources & Backlinks**\n",
"* **Cloud Infrastructure**: [Deploy on Saturn Cloud](https://saturncloud.io/)\n",
"* **Dask Documentation**: [Best Practices](https://docs.dask.org/en/stable/best-practices.html)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "cpu-plotly-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 🚀 Ray Local Tasks/Actors\n",
"\n",
"<div align=\"center\">\n",
" <img src=\"./rayBasicPatterns.png\" width=\"250\">\n",
"</div>\n",
"\n",
"### **Template Review**\n",
"This template introduces the core primitives of **Ray**: **Tasks** (Stateless functions) and **Actors** (Stateful classes). Optimized for **Saturn Cloud Jupyter Notebooks**, it demonstrates how to parallelize standard Python code across all available CPU cores on a single instance.\n",
"\n",
"**Core Workflow:** We will compare serial Python execution with **Ray Tasks** for parallel work and use **Ray Actors** to manage a shared distributed state.\n",
"\n",
"### **Tech Stack**\n",
"* **Ray**: Distributed computing framework.\n",
"* **Infrastructure**: [Saturn Cloud](https://saturncloud.io/) CPU Jupyter Instance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install Ray\n",
"!pip install ray -q"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 1: Initialize Ray Locally**\n",
"Running `ray.init()` starts a local scheduler and worker processes on your machine. It will automatically detect the number of CPUs available."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ray\n",
"import time\n",
"\n",
"# Initialize Ray on the local machine\n",
"if ray.is_initialized():\n",
" ray.shutdown()\n",
"ray.init()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 2: Ray Tasks (Stateless Parallelism)**\n",
"Tasks are functions decorated with `@ray.remote`. They are executed asynchronously and return 'Object Refs' (futures)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# A standard Python function\n",
"def slow_function(i):\n",
" time.sleep(1)\n",
" return i * i\n",
"\n",
"# A Ray Task\n",
"@ray.remote\n",
"def remote_slow_function(i):\n",
" time.sleep(1)\n",
" return i * i\n",
"\n",
"print(\"🐢 Running Serially...\")\n",
"start = time.time()\n",
"results_serial = [slow_function(i) for i in range(4)]\n",
"print(f\"Serial Duration: {time.time() - start:.2f}s\")\n",
"\n",
"print(\"\\n🐇 Running Parallel Ray Tasks...\")\n",
"start = time.time()\n",
"# Task calls return futures immediately\n",
"futures = [remote_slow_function.remote(i) for i in range(4)]\n",
"# ray.get() blocks until all tasks are finished\n",
"results_ray = ray.get(futures)\n",
"print(f\"Ray Task Duration: {time.time() - start:.2f}s\")\n",
"print(f\"Results: {results_ray}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 3: Ray Actors (Stateful Parallelism)**\n",
"Actors are classes decorated with `@ray.remote`. They allow you to maintain state (like a counter or a model) across multiple tasks in a distributed environment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@ray.remote\n",
"class Counter:\n",
" def __init__(self):\n",
" self.value = 0\n",
"\n",
" def increment(self):\n",
" self.value += 1\n",
" return self.value\n",
"\n",
" def get_count(self):\n",
" return self.value\n",
"\n",
"# Instantiate the actor\n",
"counter = Counter.remote()\n",
"\n",
"print(\"🚀 Incrementing actor state from multiple calls...\")\n",
"results = ray.get([counter.increment.remote() for _ in range(5)])\n",
"final_count = ray.get(counter.get_count.remote())\n",
"\n",
"print(f\"Progressive increments: {results}\")\n",
"print(f\"Final Actor State: {final_count}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## Conclusion & Next Steps\n",
"We have successfully used **Ray Tasks** to parallelize functions and **Ray Actors** to manage state across a distributed system. Ray is designed to scale seamlessly from local CPU to a cluster of thousands of nodes.\n",
"\n",
"### **Resources**\n",
"* **Cloud Infrastructure**: [Deploy on Saturn Cloud](https://saturncloud.io/)\n",
"* **Ray Documentation**: [Key Concepts](https://docs.ray.io/en/latest/ray-core/concepts.html)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "cpu-plotly-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading