diff --git a/README.md b/README.md index 8ff05d8..e15bdd0 100644 --- a/README.md +++ b/README.md @@ -53,12 +53,29 @@ LDP gives you a realistic local environment to develop and test before deploying - **Apache Iceberg** - Modern table format with ACID transactions - **Jupyter** - Interactive development environment +## 📚 Getting Started Tutorial + +**New to LDP?** Start with our comprehensive tutorial: + +👉 **[Getting Started Tutorial](docs/getting-started-tutorial.md)** - Complete hands-on guide with tested examples + +The tutorial covers: +- ✅ Platform setup for Windows, Linux, and macOS +- ✅ Working with MinIO (S3-compatible storage) +- ✅ Processing data with Spark +- ✅ Managing Iceberg tables (ACID transactions, time travel) +- ✅ Orchestrating workflows with Airflow +- ✅ Building your own data pipelines +- ✅ Production-ready examples and best practices + +**All tutorial code is tested and ready to use!** + ## Quick Start LDP works on **macOS**, **Windows**, and **Linux**. Choose your platform: -- **[macOS](docs/platform-guides/macos.md)** - Use Homebrew and native tools - **[Windows](docs/platform-guides/windows.md)** - Use PowerShell scripts and Chocolatey/winget +- **[macOS](docs/platform-guides/macos.md)** - Use Homebrew and native tools - **[Linux](docs/setup-guide.md#linux-setup)** - Standard package managers ### Prerequisites @@ -376,7 +393,34 @@ This copies all examples to their respective directories for testing and learnin ## Documentation -See the **[Documentation Index](docs/)** for detailed guides, architecture documentation, and troubleshooting. +### Getting Started +- **[📚 Getting Started Tutorial](docs/getting-started-tutorial.md)** - **START HERE!** Complete hands-on guide +- [Setup Guide](docs/setup-guide.md) - Detailed installation instructions +- [Writing Code Guide](docs/writing-code.md) - Best practices for developing pipelines +- [Platform Guides](docs/platform-guides/) - Windows, macOS, Linux specific guides + +### Understanding LDP +- [Project Structure](docs/project-structure.md) - Directory layout and organization +- [Hive vs Iceberg](docs/hive-vs-iceberg.md) - Why we use Iceberg +- [Iceberg Catalog](docs/iceberg-hadoop-catalog.md) - HadoopCatalog explained + +### Operations & Deployment +- [Production Guide](docs/production-guide.md) - Deploying to production +- [CI/CD Testing](docs/ci-testing.md) - Automated testing documentation +- [Troubleshooting](docs/troubleshooting.md) - Common issues and solutions + +### Directory READMEs +Each major directory has its own README explaining its purpose: +- [airflow/](airflow/README.md) - Airflow DAG development +- [spark/](spark/README.md) - Spark job development +- [examples/](examples/README.md) - Example code library +- [docker/](docker/README.md) - Custom Docker images +- [config/](config/README.md) - Configuration files +- [terraform/](terraform/README.md) - Infrastructure as Code +- [scripts/](scripts/README.md) - Utility scripts +- [tests/](tests/README.md) - Testing strategies + +See the **[Documentation Index](docs/)** for the complete list. ## Contributing @@ -389,6 +433,32 @@ See the **[Documentation Index](docs/)** for detailed guides, architecture docum MIT License +## Recent Updates + +### December 2024 + +**🎉 Major Documentation Update** +- Added comprehensive [Getting Started Tutorial](docs/getting-started-tutorial.md) with tested examples +- Added README files to all major directories explaining their purpose +- Cross-platform support documentation (Windows PowerShell + Linux/macOS Bash) +- Examples directory is now clearly optional and can be deleted if desired + +**🔧 Dependency Updates** +- Fixed: Pinned s3fs==2024.12.0 and fsspec==2024.12.0 to avoid yanked PyPI versions +- Updated: Python 3.13, Airflow 3.1.5, PySpark 4.0.1 +- Updated: NumPy 2.3.5, Pandas 2.3.3, PyArrow 22.0.0 +- See [UPGRADE-PLAN-2025](docs/UPGRADE-PLAN-2025.md) for migration details + +**📝 Documentation Improvements** +- Clarified that LDP uses Minikube + Terraform (not docker-compose) +- Added Windows-first documentation with PowerShell scripts +- Tutorial uses actual scripts instead of make commands for clarity +- Added examples of Iceberg CRUD, MinIO operations, and Airflow DAGs + +**🗑️ Cleanup** +- Removed Hive configuration (LDP uses Iceberg only) +- Clarified examples/ directory is optional reference material + ## Support For issues and questions, please open an issue in the repository. diff --git a/airflow/README.md b/airflow/README.md new file mode 100644 index 0000000..752c23c --- /dev/null +++ b/airflow/README.md @@ -0,0 +1,115 @@ +# Airflow Directory + +This directory contains Apache Airflow configuration and DAG files for workflow orchestration. + +## Structure + +``` +airflow/ +├── dags/ # Your Airflow DAG files go here +├── logs/ # Airflow execution logs (auto-generated) +├── plugins/ # Custom Airflow plugins +└── README.md # This file +``` + +## What is Airflow? + +Apache Airflow is a platform to programmatically author, schedule, and monitor workflows (DAGs - Directed Acyclic Graphs). + +## Adding DAGs + +### Option 1: Copy from Examples + +Copy tested DAG examples to this directory: + +```bash +# Simple example +cp examples/simple_dag.py airflow/dags/ + +# Production examples +cp examples/dags/data_ingestion/ingest_daily.py airflow/dags/ +cp examples/dags/data_transformation/transform_pipeline.py airflow/dags/ +``` + +### Option 2: Create Your Own + +Create a new DAG file in `airflow/dags/`: + +```python +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.bash import BashOperator + +with DAG( + 'my_pipeline', + default_args={ + 'owner': 'ldp', + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + }, + description='My data pipeline', + schedule='@daily', + catchup=False, +) as dag: + + task = BashOperator( + task_id='my_task', + bash_command='echo "Hello LDP!"', + ) +``` + +## DAG Best Practices + +1. **Use `catchup=False`** - Don't backfill historical runs automatically +2. **Set proper retries** - Allow tasks to retry on transient failures +3. **Tag your DAGs** - Use tags for organization: `tags=['ingestion', 'daily']` +4. **Use logical_date** - Instead of deprecated `execution_date` (Airflow 3.0+) +5. **Make tasks idempotent** - Tasks should be safe to re-run + +## Useful Commands + +```bash +# Trigger a DAG +make airflow-trigger DAG=my_pipeline + +# List all DAGs +make airflow-dags + +# Check DAG for errors +make airflow-check + +# View logs +make airflow-logs +``` + +## Accessing Airflow UI + +- **URL**: http://localhost:8080 +- **Username**: admin +- **Password**: admin + +## Example DAGs + +See the `examples/` directory for tested, production-ready DAG examples: + +- `examples/simple_dag.py` - Basic DAG structure +- `examples/dags/data_ingestion/ingest_daily.py` - Daily data ingestion +- `examples/dags/data_transformation/transform_pipeline.py` - Spark transformation pipeline + +## Common Issues + +### DAG not appearing in UI + +1. Check for Python syntax errors: `python airflow/dags/your_dag.py` +2. Wait 1-2 minutes for Airflow to scan for new DAGs +3. Check Airflow logs: `make airflow-logs` + +### Import errors + +Ensure all required packages are in `docker/airflow/requirements.txt` + +## Learn More + +- [Airflow Documentation](https://airflow.apache.org/docs/) +- Tutorial: `docs/getting-started-tutorial.md` +- Production Guide: `docs/production-guide.md` diff --git a/config/README.md b/config/README.md new file mode 100644 index 0000000..c4acd77 --- /dev/null +++ b/config/README.md @@ -0,0 +1,114 @@ +# Config Directory + +This directory contains configuration files for all LDP services. + +## Structure + +``` +config/ +├── iceberg/ +│ └── catalog.properties # Iceberg catalog configuration +└── README.md # This file +``` + +## Configuration Files + +### Iceberg Configuration + +**File**: `iceberg/catalog.properties` + +Configures the Apache Iceberg table format: + +```properties +# Catalog type - using HadoopCatalog (file-based) +catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog + +# Warehouse location - where Iceberg tables are stored +warehouse=s3a://warehouse/ + +# S3/MinIO Configuration +s3.endpoint=http://minio:9000 +s3.access-key-id=admin +s3.secret-access-key=minioadmin +s3.path-style-access=true + +# File format settings +write.format.default=parquet +write.parquet.compression-codec=snappy + +# Metadata settings +commit.retry.num-retries=3 +commit.retry.min-wait-ms=100 +``` + +**Key Settings:** + +- **`catalog-impl`**: Uses HadoopCatalog (file-based, no external metastore) +- **`warehouse`**: All Iceberg tables stored in MinIO's `warehouse` bucket +- **`s3.endpoint`**: Points to MinIO service +- **`s3.path-style-access=true`**: Required for MinIO compatibility + +## Understanding Iceberg Catalog + +The HadoopCatalog is a simple, file-based catalog suitable for development and small deployments. + +**Pros:** +- No external metastore needed +- Simple setup +- Works well with object storage (MinIO/S3) + +**Cons:** +- Limited concurrency +- Not recommended for production multi-user environments + +**Learn more:** +- `docs/iceberg-hadoop-catalog.md` - Detailed explanation +- `docs/hive-vs-iceberg.md` - Why we use Iceberg + +## Environment-Specific Configuration + +For different environments (dev, staging, prod), you can: + +1. Create environment-specific config files: + ``` + config/iceberg/ + ├── catalog.dev.properties + ├── catalog.staging.properties + └── catalog.prod.properties + ``` + +2. Use environment variables in your Spark configurations to switch between them + +## Adding New Configurations + +When adding new services or configuration files: + +1. Create a subdirectory: `config/service_name/` +2. Add configuration files +3. Update this README +4. Document in service README + +## Service Configuration Locations + +Some services have configuration embedded in other locations: + +- **Airflow**: Environment variables in `docker-compose.yml` +- **Spark**: Configuration in Spark session creation (see `examples/iceberg_crud.py`) +- **MinIO**: Environment variables in `docker-compose.yml` +- **PostgreSQL**: Environment variables in `docker-compose.yml` + +## Security Note + +**⚠️ Important**: The current configuration uses default credentials suitable for local development only. + +**For production:** +- Use strong passwords +- Store credentials in secrets management (AWS Secrets Manager, HashiCorp Vault) +- Never commit production credentials to git +- Use environment variables or secret files + +## Learn More + +- Iceberg Configuration: https://iceberg.apache.org/docs/latest/configuration/ +- Getting Started: `docs/getting-started-tutorial.md` +- Production Setup: `docs/production-guide.md` diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..bd2215d --- /dev/null +++ b/docker/README.md @@ -0,0 +1,221 @@ +# Docker Directory + +This directory contains Dockerfiles and requirements files for building custom Docker images used in LDP. + +## Structure + +``` +docker/ +├── airflow/ +│ ├── Dockerfile # Custom Airflow image +│ └── requirements.txt # Python packages for Airflow +├── jupyter/ +│ ├── Dockerfile # Jupyter Lab image with Spark + Iceberg +│ └── requirements.txt # Python packages for Jupyter +├── spark/ +│ ├── Dockerfile # Custom Spark image +│ └── requirements.txt # Python packages for Spark workers +└── README.md # This file +``` + +## Docker Images + +### Airflow Image + +**Base**: `apache/airflow:3.1.5-python3.13` + +**Custom additions:** +- AWS provider for MinIO integration +- Spark provider for Spark job submission +- Additional data processing libraries (pandas, numpy, pyarrow) + +**Build:** +```bash +docker-compose build airflow +``` + +**Modify dependencies:** +Edit `docker/airflow/requirements.txt` and rebuild. + +### Spark Image + +**Base**: `apache/spark:4.0.1-python3` + +**Custom additions:** +- PySpark 4.0.1 +- Data processing libraries (pandas, numpy, pyarrow) +- S3/MinIO support (boto3, s3fs) +- PyIceberg for Iceberg table operations + +**Build:** +```bash +docker-compose build spark-master spark-worker +``` + +**Modify dependencies:** +Edit `docker/spark/requirements.txt` and rebuild. + +### Jupyter Image + +**Base**: `jupyter/pyspark-notebook` + +**Custom additions:** +- JupyterLab for interactive development +- PySpark 4.0.1 with Iceberg support +- Data science libraries (pandas, matplotlib, seaborn, plotly) +- Database connectivity (SQLAlchemy, psycopg2) + +**Build:** +```bash +docker-compose build jupyter +``` + +**Access Jupyter:** +- URL: http://localhost:8888 +- Token: Check logs with `docker logs ldp-jupyter` + +**Modify dependencies:** +Edit `docker/jupyter/requirements.txt` and rebuild. + +## Requirements Files + +### Pinned Versions + +All `requirements.txt` files use **pinned versions** to ensure reproducibility: + +```txt +# Good (pinned) +pandas==2.3.3 +numpy==2.3.5 + +# Avoid (unpinned) +pandas>=2.0.0 +numpy +``` + +**Why pin versions?** +- Reproducible builds +- Avoid breaking changes +- Easier debugging + +### Recent Version Updates + +The project has been upgraded to modern versions: +- **Python**: 3.13 +- **Airflow**: 3.1.5 +- **PySpark**: 4.0.1 +- **NumPy**: 2.3.5 +- **Pandas**: 2.3.3 +- **PyArrow**: 22.0.0 + +See `docs/UPGRADE-PLAN-2025.md` for migration details. + +## Adding New Dependencies + +### For Airflow + +1. Edit `docker/airflow/requirements.txt` +2. Add the package with pinned version: + ```txt + my-package==1.2.3 + ``` +3. Rebuild and restart: + ```bash + docker-compose build airflow + docker-compose up -d airflow-webserver airflow-scheduler + ``` + +### For Spark + +1. Edit `docker/spark/requirements.txt` +2. Add the package +3. Rebuild all Spark services: + ```bash + docker-compose build spark-master spark-worker + docker-compose up -d spark-master spark-worker + ``` + +### For Jupyter + +1. Edit `docker/jupyter/requirements.txt` +2. Add the package +3. Rebuild: + ```bash + docker-compose build jupyter + docker-compose up -d jupyter + ``` + +## Dockerfile Best Practices + +### 1. Use Official Base Images +All our Dockerfiles use official images from Apache and Jupyter. + +### 2. Pin Package Versions +Always specify exact versions in `requirements.txt`. + +### 3. Layer Caching +Requirements are copied before other files to leverage Docker layer caching: +```dockerfile +COPY requirements.txt /tmp/ +RUN pip install -r /tmp/requirements.txt +# Other files copied after +``` + +### 4. Security +- Run as non-root user where possible +- Keep base images updated +- Scan for vulnerabilities: `docker scan image-name` + +## Common Issues + +### Build Failures + +**Dependency conflicts:** +```bash +# Clear Docker cache and rebuild +docker-compose build --no-cache service-name +``` + +**Version not found:** +- Check if the version exists on PyPI +- See recent fix for fsspec/s3fs version pinning + +### Image Size Too Large + +- Use `.dockerignore` to exclude unnecessary files +- Combine RUN commands to reduce layers +- Use multi-stage builds if needed + +## Rebuilding All Images + +To rebuild all services: + +```bash +# Stop services +make down + +# Rebuild all images +docker-compose build + +# Start services +make up +``` + +## Image Management + +```bash +# List images +docker images | grep ldp + +# Remove unused images +docker image prune + +# Remove all LDP images (careful!) +docker-compose down --rmi all +``` + +## Learn More + +- Dockerfile reference: https://docs.docker.com/engine/reference/builder/ +- Docker Compose: https://docs.docker.com/compose/ +- Getting Started: `docs/getting-started-tutorial.md` diff --git a/docs/getting-started-tutorial.md b/docs/getting-started-tutorial.md new file mode 100644 index 0000000..131ad2f --- /dev/null +++ b/docs/getting-started-tutorial.md @@ -0,0 +1,861 @@ +# Getting Started with LDP (Local Data Platform) + +This tutorial will guide you through using the Local Data Platform (LDP) - a modern data platform built on Apache Spark, Apache Airflow, Apache Iceberg, and MinIO. + +## What is LDP? + +LDP is a local data platform that provides: +- **Apache Spark** for distributed data processing +- **Apache Airflow** for workflow orchestration +- **Apache Iceberg** for table management with ACID transactions +- **MinIO** for S3-compatible object storage +- **PostgreSQL** for metadata storage + +## Prerequisites + +Before you begin, ensure you have installed: +- **Minikube** - Local Kubernetes cluster +- **kubectl** - Kubernetes command-line tool +- **Terraform** - Infrastructure as Code tool +- **Helm** - Kubernetes package manager +- Basic knowledge of Python and SQL +- Understanding of data pipelines + +### Platform Support + +LDP works on: +- ✅ **Windows** (Windows 10/11) +- ✅ **Linux** (Ubuntu, Fedora, etc.) +- ✅ **macOS** (Intel and Apple Silicon) + +### Installation + +**💻 Windows (PowerShell - Run as Administrator):** + +Using Chocolatey (recommended): +```powershell +choco install minikube kubectl terraform kubernetes-helm +``` + +Using winget: +```powershell +winget install Kubernetes.minikube +winget install Kubernetes.kubectl +winget install Hashicorp.Terraform +winget install Helm.Helm +``` + +**🐧 Linux:** +```bash +# Follow installation guides at: +# - Minikube: https://minikube.sigs.k8s.io/docs/start/ +# - kubectl: https://kubernetes.io/docs/tasks/tools/ +# - Terraform: https://www.terraform.io/downloads +# - Helm: https://helm.sh/docs/intro/install/ +``` + +**🍎 macOS:** +```bash +brew install minikube kubectl terraform helm +``` + +**For Windows users**: All commands in this tutorial use PowerShell scripts. Look for the 💻 and 🐧 icons. + +## Starting the Platform + +### 1. Initial Setup (First Time Only) + +**💻 Windows (PowerShell):** +```powershell +.\scripts\windows\setup.ps1 +``` + +**🐧 Linux/macOS:** +```bash +./scripts/setup.sh +``` + +This will: +- Start Minikube (local Kubernetes cluster) +- Enable required addons +- Verify prerequisites + +### 2. Deploy the Platform + +**💻 Windows (PowerShell):** +```powershell +.\scripts\windows\start.ps1 +``` + +**🐧 Linux/macOS:** +```bash +./scripts/start.sh +``` + +This will: +- Initialize Terraform +- Deploy all services to Kubernetes (Airflow, Spark, MinIO, PostgreSQL) +- Takes 10-15 minutes on first run + +### 3. Verify services are running + +**💻 Windows (PowerShell):** +```powershell +.\scripts\windows\check-health.ps1 +``` + +**🐧 Linux/macOS:** +```bash +./scripts/check-health.sh +``` + +### 4. Get service URLs + +**💻 Windows (PowerShell):** +```powershell +$minikubeIp = minikube ip +Write-Host "Airflow UI: http://${minikubeIp}:30080" +Write-Host "MinIO Console: http://${minikubeIp}:30901" +Write-Host "Spark Master: http://${minikubeIp}:30707" +Write-Host "Jupyter: http://${minikubeIp}:30888" +``` + +**🐧 Linux/macOS:** +```bash +minikube ip +# Use the IP with these ports: +# Airflow UI: http://:30080 +# MinIO Console: http://:30901 +# Spark Master: http://:30707 +# Jupyter: http://:30888 +``` + +**Default Credentials:** +- Airflow: username `admin`, password `admin` +- MinIO: username `admin`, password `minioadmin` + +## Your First Data Pipeline + +Let's walk through a complete data pipeline using the tested example code provided in the `examples/` directory. + +**Important**: The `examples/` directory contains reference code that should **never be run directly**. Always copy examples to the appropriate location first: +- Spark jobs → `spark/jobs/` +- Airflow DAGs → `airflow/dags/` +- Scripts → `scripts/` + +This keeps the examples clean as reference material and teaches you the proper workflow. + +**Note for Windows users**: Replace forward slashes (`/`) with backslashes (`\`) in file paths, or use PowerShell which supports both. + +### Example 1: Working with MinIO (Object Storage) + +MinIO provides S3-compatible object storage for your data lake. + +**Reference**: `examples/minio_operations.py` + +```python +""" +MinIO operations example using boto3. +""" +import boto3 +from botocore.client import Config + + +def create_s3_client(): + """Create S3 client for MinIO.""" + return boto3.client( + 's3', + endpoint_url='http://localhost:30900', + aws_access_key_id='admin', + aws_secret_access_key='minioadmin', + config=Config(signature_version='s3v4'), + region_name='us-east-1' + ) + + +def main(): + """Demonstrate MinIO operations.""" + s3 = create_s3_client() + + # List buckets + print("Listing buckets:") + response = s3.list_buckets() + for bucket in response['Buckets']: + print(f" - {bucket['Name']}") + + # Upload file + print("\nUploading file...") + bucket_name = 'datalake' + file_key = 'examples/test.txt' + s3.put_object( + Bucket=bucket_name, + Key=file_key, + Body=b'Hello from MinIO!' + ) + print(f"Uploaded {file_key} to {bucket_name}") + + # List objects + print(f"\nListing objects in {bucket_name}:") + response = s3.list_objects_v2(Bucket=bucket_name, Prefix='examples/') + if 'Contents' in response: + for obj in response['Contents']: + print(f" - {obj['Key']} ({obj['Size']} bytes)") + + # Download file + print(f"\nDownloading {file_key}:") + response = s3.get_object(Bucket=bucket_name, Key=file_key) + content = response['Body'].read().decode('utf-8') + print(f"Content: {content}") + + # Delete file + print(f"\nDeleting {file_key}...") + s3.delete_object(Bucket=bucket_name, Key=file_key) + print("Deleted successfully") + + +if __name__ == "__main__": + main() +``` + +**To run this example:** + +1. First, copy it to your working directory: + + **🐧 Linux/macOS:** + ```bash + cp examples/minio_operations.py scripts/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\minio_operations.py scripts\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\minio_operations.py scripts\ + ``` + +2. Then run it: + ```bash + python scripts/minio_operations.py + ``` + *(Same command for all platforms)* + +**Note**: Never run code directly from `examples/` - always copy it to the appropriate location first. This keeps the examples clean as reference material. + +### Example 2: Processing Data with Spark + +Spark allows you to process large datasets in a distributed manner. + +**Reference**: `examples/spark_job.py` + +```python +""" +Simple Spark job example. +""" +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, count + + +def main(): + """Simple Spark job.""" + # Create Spark session + spark = SparkSession.builder \ + .appName("SimpleSparkJob") \ + .getOrCreate() + + # Create sample data + data = [ + ("Alice", 25, "Engineering"), + ("Bob", 30, "Sales"), + ("Charlie", 35, "Engineering"), + ("David", 28, "Sales"), + ("Eve", 32, "Engineering"), + ] + + # Create DataFrame + df = spark.createDataFrame(data, ["name", "age", "department"]) + + # Show data + print("Sample Data:") + df.show() + + # Perform aggregation + print("Department Summary:") + df.groupBy("department") \ + .agg(count("*").alias("count")) \ + .show() + + # Calculate average age by department + print("Average Age by Department:") + df.groupBy("department") \ + .avg("age") \ + .show() + + spark.stop() + + +if __name__ == "__main__": + main() +``` + +**To run this example:** + +1. First, copy it to the Spark jobs directory: + + **🐧 Linux/macOS:** + ```bash + cp examples/spark_job.py spark/jobs/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\spark_job.py spark\jobs\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\spark_job.py spark\jobs\ + ``` + +2. Then submit to the Spark cluster: + + **Both Windows and Linux/macOS:** + ```bash + kubectl exec -n ldp deployment/spark-master -- spark-submit --master spark://spark-master:7077 /opt/spark/jobs/spark_job.py + ``` + +**Note**: Always copy examples to `spark/jobs/` before running, not from `examples/` directly. + +### Example 3: Iceberg Tables (ACID Transactions) + +Iceberg provides ACID transactions, schema evolution, and time travel for your data lake. + +**Reference**: `examples/iceberg_crud.py` + +This example demonstrates: +- Creating Iceberg tables +- Inserting data +- Updating records +- Deleting records +- Querying table history (time travel) + +**Key features demonstrated:** + +```python +# Create Iceberg table +spark.sql(""" + CREATE TABLE IF NOT EXISTS local.demo.users ( + id BIGINT, + name STRING, + email STRING, + created_at TIMESTAMP + ) USING iceberg +""") + +# Insert data +df.writeTo("local.demo.users").append() + +# Update records +spark.sql(""" + UPDATE local.demo.users + SET email = 'alice.new@example.com' + WHERE id = 1 +""") + +# Delete records +spark.sql("DELETE FROM local.demo.users WHERE id = 3") + +# View table history (time travel) +spark.sql("SELECT * FROM local.demo.users.history").show() +``` + +**To run this example:** + +1. First, copy it to the Spark jobs directory: + + **🐧 Linux/macOS:** + ```bash + cp examples/iceberg_crud.py spark/jobs/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\iceberg_crud.py spark\jobs\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\iceberg_crud.py spark\jobs\ + ``` + +2. Then submit to the Spark cluster: + + **Both Windows and Linux/macOS:** + ```bash + kubectl exec -n ldp deployment/spark-master -- spark-submit --master spark://spark-master:7077 /opt/spark/jobs/iceberg_crud.py + ``` + +### Example 4: Airflow DAG (Workflow Orchestration) + +Airflow orchestrates your data pipelines, scheduling and monitoring tasks. + +**Reference**: `examples/simple_dag.py` + +```python +""" +Simple Airflow DAG example. +""" +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + + +def hello_world(): + """Simple Python function.""" + print("Hello from Local Data Platform!") + return "Success" + + +with DAG( + 'simple_example', + default_args={ + 'owner': 'ldp', + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + }, + description='A simple example DAG', + schedule=timedelta(days=1), + catchup=False, + tags=['example'], +) as dag: + + task1 = BashOperator( + task_id='print_date', + bash_command='date', + ) + + task2 = PythonOperator( + task_id='hello_world', + python_callable=hello_world, + ) + + task3 = BashOperator( + task_id='finish', + bash_command='echo "Pipeline completed!"', + ) + + task1 >> task2 >> task3 +``` + +**To run this example:** + +1. **First, copy** the DAG file to the Airflow DAGs folder: + + **🐧 Linux/macOS:** + ```bash + cp examples/simple_dag.py airflow/dags/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\simple_dag.py airflow\dags\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\simple_dag.py airflow\dags\ + ``` + +2. Wait for Airflow to detect it (1-2 minutes) + +3. Trigger the DAG from the Airflow UI or CLI: + + **Both Windows and Linux/macOS:** + ```bash + kubectl exec -n ldp deployment/airflow-webserver -- airflow dags trigger simple_example + ``` + +**Important**: DAGs must be in `airflow/dags/` to be discovered by Airflow. Never reference `examples/` directly in your DAG paths. + +## Production-Ready Examples + +The `examples/dags/` directory contains more advanced, production-ready examples: + +### Daily Data Ingestion + +**Reference**: `examples/dags/data_ingestion/ingest_daily.py` + +This DAG demonstrates: +- Ingesting data daily at 1 AM +- Using Airflow's S3Hook to interact with MinIO +- Proper error handling with retries +- Date-based partitioning + +**Key concepts:** +```python +def upload_to_minio(**context): + """Upload sample data to MinIO.""" + s3_hook = S3Hook(aws_conn_id='minio_default') + + # Use logical_date (Airflow 3.0 best practice) + logical_date = context['logical_date'].strftime('%Y-%m-%d') + s3_hook.load_string( + string_data=f"Sample data for {logical_date}", + key=f"raw/daily/{logical_date}/data.txt", + bucket_name='datalake', + replace=True + ) +``` + +**To use this example:** + +1. **Copy to your DAGs directory:** + + **🐧 Linux/macOS:** + ```bash + cp examples/dags/data_ingestion/ingest_daily.py airflow/dags/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\dags\data_ingestion\ingest_daily.py airflow\dags\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\dags\data_ingestion\ingest_daily.py airflow\dags\ + ``` + +2. Wait for Airflow to detect it, then trigger from the UI + +### Data Transformation Pipeline + +**Reference**: `examples/dags/data_transformation/transform_pipeline.py` + +This DAG demonstrates: +- Orchestrating Spark jobs with Airflow +- Using SparkSubmitOperator +- Task dependencies +- Passing parameters to Spark jobs + +**Key concepts:** +```python +transform_raw_data = SparkSubmitOperator( + task_id='transform_raw_data', + application='/opt/spark/jobs/batch_processing.py', + conn_id='spark_default', + application_args=['--date', '{{ ds }}'], + conf={ + 'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0', + }, +) +``` + +**To use this example:** + +1. **Copy to your DAGs directory:** + + **🐧 Linux/macOS:** + ```bash + cp examples/dags/data_transformation/transform_pipeline.py airflow/dags/ + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\dags\data_transformation\transform_pipeline.py airflow\dags\ + ``` + + **💻 Windows (Command Prompt):** + ```cmd + copy examples\dags\data_transformation\transform_pipeline.py airflow\dags\ + ``` + +2. Wait for Airflow to detect it, then trigger from the UI + +## Building Your Own Pipeline + +Now that you've seen the examples, here's how to build your own pipeline: + +### Step 1: Design Your Data Flow + +1. **Source**: Where is your data coming from? (files, APIs, databases) +2. **Storage**: Where will raw data be stored? (MinIO buckets) +3. **Processing**: What transformations are needed? (Spark jobs) +4. **Destination**: Where will processed data go? (Iceberg tables) +5. **Schedule**: How often should this run? (Airflow schedule) + +### Step 2: Create Your Spark Job + +1. Copy an example as a starting point: + + **🐧 Linux/macOS:** + ```bash + cp examples/spark_job.py spark/jobs/my_job.py + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\spark_job.py spark\jobs\my_job.py + ``` + +2. Edit `spark/jobs/my_job.py` with your transformations + +3. Test it: + + **Both Windows and Linux/macOS:** + ```bash + kubectl exec -n ldp deployment/spark-master -- spark-submit --master spark://spark-master:7077 /opt/spark/jobs/my_job.py + ``` + +### Step 3: Create Your Airflow DAG + +1. Copy an example as a starting point: + + **🐧 Linux/macOS:** + ```bash + cp examples/simple_dag.py airflow/dags/my_pipeline.py + ``` + + **💻 Windows (PowerShell):** + ```powershell + Copy-Item examples\simple_dag.py airflow\dags\my_pipeline.py + ``` + +2. Edit `airflow/dags/my_pipeline.py` with your logic + +3. Airflow will automatically detect it + +4. Test and monitor in the Airflow UI + +### Step 4: Monitor and Debug + +- **Airflow UI**: Monitor task execution, view logs +- **Spark UI**: Monitor Spark job performance +- **MinIO Console**: Verify data files + +## Common Use Cases + +### Use Case 1: Batch Data Processing + +1. Ingest data daily from external sources → MinIO +2. Process with Spark → transform and aggregate +3. Write to Iceberg tables → queryable data lake +4. Orchestrate with Airflow → schedule and monitor + +**Example Flow:** +``` +External API → Airflow (ingest) → MinIO (raw/) +→ Airflow (trigger Spark) → Spark Processing +→ Iceberg Table (processed/) → Analytics +``` + +### Use Case 2: Data Quality Pipeline + +1. Read data from Iceberg tables +2. Run data quality checks with Spark +3. Write results back to quality tables +4. Alert on failures + +See `examples/tests/spark/test_data_quality.py` for testing patterns. + +### Use Case 3: Incremental Processing + +1. Track last processed timestamp +2. Read only new data +3. Process incrementally +4. Update Iceberg tables (ACID safe) + +## Best Practices + +### 1. Never Run Code Directly from examples/ +- **Always copy** example code to the proper location before running +- Spark jobs: Copy to `spark/jobs/` +- Airflow DAGs: Copy to `airflow/dags/` +- Scripts: Copy to `scripts/` +- This keeps examples clean as reference and prevents accidental modifications + +### 2. Use Iceberg for All Tables +- ACID transactions prevent data corruption +- Schema evolution allows changes without breaking queries +- Time travel enables auditing and rollback + +### 3. Organize Your Data in MinIO +``` +datalake/ +├── raw/ # Unprocessed data +├── staging/ # Intermediate processing +├── processed/ # Final clean data +└── archive/ # Historical data +``` + +### 4. Make Your DAGs Idempotent +- Tasks should produce the same result when re-run +- Use date parameters for partitioning +- Clean up before writing (or use upserts) + +### 5. Use the Tested Examples +- All code in `examples/` is tested and working +- Copy and modify rather than starting from scratch +- Run tests: `make test` + +## Troubleshooting + +### Services won't start + +**💻 Windows (PowerShell):** +```powershell +# Stop and clean up +.\scripts\windows\stop.ps1 + +# Restart Minikube +minikube stop +minikube delete +minikube start --cpus=4 --memory=8192 + +# Deploy again +.\scripts\windows\setup.ps1 +.\scripts\windows\start.ps1 +``` + +**🐧 Linux/macOS:** +```bash +./scripts/cleanup.sh +./scripts/setup.sh +./scripts/start.sh +``` + +### Can't connect to services + +**Check if services are running:** + +**💻 Windows:** +```powershell +kubectl get pods -n ldp +``` + +**🐧 Linux/macOS:** +```bash +kubectl get pods -n ldp +``` + +**Get service URLs:** +```bash +minikube ip +# Access services at http://: +# Ports: Airflow=30080, MinIO=30901, Spark=30707, Jupyter=30888 +``` + +### Airflow DAG not showing up + +**💻 Windows:** +```powershell +# Check syntax +python airflow/dags/your_dag.py + +# View logs +kubectl logs -n ldp deployment/airflow-webserver +``` + +**🐧 Linux/macOS:** +```bash +kubectl logs -n ldp deployment/airflow-webserver +``` + +### Spark job failing + +**💻 Windows:** +```powershell +kubectl logs -n ldp deployment/spark-master +``` + +**🐧 Linux/macOS:** +```bash +kubectl logs -n ldp deployment/spark-master +``` + +**All platforms:** +- Verify Iceberg configuration in `config/iceberg/catalog.properties` + +## Next Steps + +1. **Run all examples**: Work through each example in order +2. **Explore the tests**: See `examples/tests/` for integration test patterns +3. **Read the docs**: + - `docs/hive-vs-iceberg.md` - Understand why we use Iceberg + - `docs/iceberg-hadoop-catalog.md` - Learn about catalog configuration +4. **Build your pipeline**: Start with a simple use case +5. **Join the community**: Contribute examples and improvements + +## Additional Resources + +- **Scripts**: + - Linux/macOS: Use bash scripts in `scripts/` + - Windows: Use PowerShell scripts in `scripts/windows/` + - Advanced users: See `Makefile` for convenience wrappers +- **Configuration Files**: See `config/` directory for all service configurations +- **Testing**: See `examples/tests/` for test examples +- **Production Guide**: See `docs/production-guide.md` for deployment guidance + +## Platform-Specific Notes + +### Windows Users + +**Available PowerShell Scripts:** +- `.\scripts\windows\setup.ps1` - Initial setup (first time only) +- `.\scripts\windows\start.ps1` - Deploy/start the platform +- `.\scripts\windows\stop.ps1` - Stop the platform +- `.\scripts\windows\check-health.ps1` - Health checks + +**File Paths:** +- Windows uses backslashes `\` for paths (e.g., `airflow\dags\`) +- PowerShell supports both `/` and `\` +- Always use forward slashes `/` inside Kubernetes pods + +**Common Commands:** +| Task | Linux/macOS | Windows (PowerShell) | +|------|-------------|----------------------| +| Copy file | `cp source dest` | `Copy-Item source dest` | +| List files | `ls` | `dir` or `Get-ChildItem` | +| Setup platform | `./scripts/setup.sh` | `.\scripts\windows\setup.ps1` | +| Start platform | `./scripts/start.sh` | `.\scripts\windows\start.ps1` | +| Stop platform | `./scripts/stop.sh` | `.\scripts\windows\stop.ps1` | +| Health check | `./scripts/check-health.sh` | `.\scripts\windows\check-health.ps1` | +| View logs | `kubectl logs -n ldp deployment/service-name` | `kubectl logs -n ldp deployment/service-name` | +| List pods | `kubectl get pods -n ldp` | `kubectl get pods -n ldp` | + +**Prerequisites:** +- Install Minikube, kubectl, Terraform, Helm (see Prerequisites section) +- Hyper-V or Docker Desktop for Minikube driver +- Run PowerShell as Administrator + +### Linux/macOS Users + +**Available Bash Scripts:** +- `./scripts/setup.sh` - Initial setup (first time only) +- `./scripts/start.sh` - Deploy/start the platform +- `./scripts/stop.sh` - Stop the platform +- `./scripts/check-health.sh` - Health checks +- `./scripts/cleanup.sh` - Complete cleanup + +**Advanced:** +- For convenience, you can use `make` commands which wrap these scripts +- Run `make help` to see all available make targets +- No additional drivers needed for Minikube + +## Summary + +You've learned how to: +- ✅ Start and access LDP services +- ✅ Work with MinIO for object storage +- ✅ Process data with Spark +- ✅ Manage tables with Iceberg (ACID, time travel) +- ✅ Orchestrate workflows with Airflow +- ✅ Use tested example code for your own pipelines + +All the code in this tutorial is tested and ready to use. Start by running the examples, then modify them for your specific use cases. + +Happy data engineering! 🚀 diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..9f35e90 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,417 @@ +# Examples Directory + +This directory contains **reference examples** and **template code** for working with LDP. + +## Important: This Directory is Optional + +**The `examples/` directory is for learning and reference only. You can:** +- ✅ **Copy examples** to the appropriate folders and modify them for your needs +- ✅ **Delete this entire directory** if you want to start fresh without examples +- ✅ **Keep it** as a reference library for tested code patterns + +**The LDP platform does not require this directory to function.** + +## Structure + +``` +examples/ +├── dags/ # Example Airflow DAGs +│ ├── data_ingestion/ +│ │ └── ingest_daily.py # Daily data ingestion to MinIO +│ └── data_transformation/ +│ └── transform_pipeline.py # Spark transformation pipeline +├── spark-jobs/ # Example Spark jobs +├── tests/ # Integration and E2E tests +│ ├── airflow/ # Airflow DAG tests +│ ├── spark/ # Spark job tests +│ ├── integration/ # Integration tests +│ └── e2e/ # End-to-end pipeline tests +├── iceberg_crud.py # Iceberg CRUD operations +├── minio_operations.py # MinIO/S3 operations +├── spark_job.py # Simple Spark job +├── simple_dag.py # Simple Airflow DAG +└── README.md # This file +``` + +## How to Use These Examples + +**Important**: Never run code directly from `examples/`. Always copy to the appropriate location first: + +| Example Type | Copy To | Purpose | +|-------------|---------|---------| +| Spark jobs | `spark/jobs/` | Data processing jobs | +| Airflow DAGs | `airflow/dags/` | Workflow orchestration | +| Scripts | `scripts/` | Utility scripts | + +## Quick Start + +Follow these steps to try the examples: + +### 1. MinIO Operations (Storage) + +**Copy and run:** +```bash +# Copy to scripts directory +cp examples/minio_operations.py scripts/ + +# Run it +python scripts/minio_operations.py +``` + +**What it does:** +- Connects to MinIO (S3-compatible storage) +- Lists buckets +- Uploads/downloads files +- Demonstrates basic object storage operations + +### 2. Simple Spark Job (Processing) + +**Copy and run:** +```bash +# Copy to Spark jobs directory +cp examples/spark_job.py spark/jobs/ + +# Submit to Spark +make spark-submit APP=spark/jobs/spark_job.py +``` + +**What it does:** +- Creates a Spark DataFrame +- Performs aggregations +- Shows basic Spark operations + +### 3. Iceberg CRUD (Table Management) + +**Copy and run:** +```bash +# Copy to Spark jobs directory +cp examples/iceberg_crud.py spark/jobs/ + +# Submit to Spark +make spark-submit APP=spark/jobs/iceberg_crud.py +``` + +**What it does:** +- Creates Iceberg tables with ACID support +- Inserts, updates, and deletes data +- Demonstrates time travel (view table history) + +### 4. Simple Airflow DAG (Orchestration) + +```bash +# Copy DAG to Airflow +cp examples/simple_dag.py airflow/dags/ + +# Wait for Airflow to detect it (1-2 minutes) + +# Trigger from UI or CLI +make airflow-trigger DAG=simple_example +``` + +**What it does:** +- Defines a simple 3-task workflow +- Shows task dependencies +- Demonstrates BashOperator and PythonOperator + +## Production-Ready Examples + +### Daily Data Ingestion + +**File**: `dags/data_ingestion/ingest_daily.py` + +**Features:** +- Scheduled to run daily at 1 AM +- Uses Airflow's S3Hook for MinIO +- Proper error handling with retries +- Date-based partitioning + +**Use case:** Ingesting daily data from external sources into your data lake + +**To use:** +```bash +# Copy to Airflow DAGs directory +cp examples/dags/data_ingestion/ingest_daily.py airflow/dags/ + +# Wait for Airflow to detect it +``` + +### Data Transformation Pipeline + +**File**: `dags/data_transformation/transform_pipeline.py` + +**Features:** +- Orchestrates Spark jobs with SparkSubmitOperator +- Task dependencies (start → transform → end) +- Passes date parameters to Spark jobs +- Includes Iceberg dependencies + +**Use case:** Daily batch processing of raw data + +**To use:** +```bash +# Copy to Airflow DAGs directory +cp examples/dags/data_transformation/transform_pipeline.py airflow/dags/ + +# Wait for Airflow to detect it +``` + +## Testing Examples + +The `tests/` directory contains comprehensive testing patterns: + +### Unit Tests + +**Airflow DAG tests** (`tests/airflow/test_dags.py`): +- Validate DAG structure +- Check for cycles +- Verify task configurations + +**Spark tests** (`tests/spark/`): +- Data transformation tests +- Data quality validation +- Schema validation + +### Integration Tests + +**Location**: `tests/integration/` + +- **Iceberg tables** (`test_iceberg_tables.py`): Test Iceberg table operations +- **MinIO access** (`test_minio_access.py`): Test S3/MinIO connectivity +- **Airflow + Spark** (`test_airflow_spark.py`): Test full pipeline integration + +### End-to-End Tests + +**Location**: `tests/e2e/test_pipeline.py` + +Tests complete data pipeline from ingestion to transformation. + +**Run tests:** +```bash +# All tests +make test + +# Specific test file +pytest examples/tests/integration/test_iceberg_tables.py +``` + +## Working with Examples + +### Approach 1: Copy and Modify (Recommended) + +Best for learning and building new features: + +```bash +# 1. Copy the example +cp examples/simple_dag.py airflow/dags/my_first_dag.py + +# 2. Edit the copied file +vim airflow/dags/my_first_dag.py + +# 3. Run/deploy it +``` + +### Approach 2: Use as Reference Only + +For building custom pipelines: + +1. Read the example code in `examples/` +2. Understand the pattern +3. Write your own from scratch in the appropriate directory +4. Reference examples when stuck + +### Approach 3: Start Fresh Without Examples + +If you prefer to start with a clean slate: + +```bash +# Remove the entire examples directory +rm -rf examples/ + +# The LDP platform will continue to work normally +# Build your own code in: +# - spark/jobs/ +# - airflow/dags/ +# - scripts/ +``` + +## Example Code Patterns + +### Pattern 1: Spark Session Creation + +All Spark examples show how to create a properly configured Spark session: + +```python +spark = SparkSession.builder \ + .appName("MyApp") \ + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.local.type", "hadoop") \ + .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/") \ + .getOrCreate() +``` + +### Pattern 2: MinIO Connection + +All MinIO examples use boto3 with proper configuration: + +```python +s3 = boto3.client( + 's3', + endpoint_url='http://localhost:30900', + aws_access_key_id='admin', + aws_secret_access_key='minioadmin', + config=Config(signature_version='s3v4'), + region_name='us-east-1' +) +``` + +### Pattern 3: Airflow Best Practices + +All DAG examples follow Airflow 3.0+ best practices: + +```python +with DAG( + 'dag_name', + default_args={ + 'owner': 'ldp', + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + }, + schedule='@daily', + catchup=False, # Don't backfill + tags=['category'], # For organization +) as dag: + # Tasks here +``` + +### Pattern 4: Error Handling + +Examples include proper error handling: + +```python +try: + # Your operation + result = spark.table("my_table") +except Exception as e: + logging.error(f"Failed to read table: {e}") + raise +finally: + spark.stop() +``` + +## Customizing Examples + +### Change MinIO Endpoint + +Update endpoint URLs based on your deployment: + +- **Local**: `http://localhost:30900` +- **Docker network**: `http://minio:9000` +- **Kubernetes**: `http://minio.default.svc.cluster.local:9000` + +### Change Catalog Configuration + +Modify Iceberg catalog settings in `config/iceberg/catalog.properties` + +### Change Schedules + +Adjust DAG schedules: + +```python +schedule='@daily' # Once per day +schedule='@hourly' # Once per hour +schedule='0 */6 * * *' # Every 6 hours +schedule='0 2 * * *' # Daily at 2 AM +``` + +## Common Modifications + +### Add Authentication + +For production, add proper authentication: + +```python +# MinIO with temporary credentials +s3 = boto3.client( + 's3', + endpoint_url=os.getenv('MINIO_ENDPOINT'), + aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), +) +``` + +### Add Data Validation + +Extend examples with data quality checks: + +```python +# Example validation +if df.count() == 0: + raise ValueError("DataFrame is empty!") + +if df.filter(col("id").isNull()).count() > 0: + raise ValueError("Found null IDs!") +``` + +### Add Monitoring + +Add logging and metrics: + +```python +import logging + +logging.info(f"Processing {df.count()} records") +logging.info(f"Write completed to {table_name}") +``` + +## Best Practices + +1. **Never run from examples/ directly** - Always copy to the appropriate folder first +2. **Use examples as templates** - Copy, then modify for your needs +3. **Keep examples/ as reference** - Or delete it if you prefer a clean start +4. **Test locally first** - Use `make spark-submit` or `python` to test +5. **Keep credentials separate** - Use environment variables +6. **Add error handling** - Don't let pipelines fail silently +7. **Make tasks idempotent** - Safe to re-run +8. **Use proper logging** - For debugging and monitoring + +## Next Steps + +1. **Run all examples** - Get familiar with each component +2. **Read the tutorial** - See `docs/getting-started-tutorial.md` +3. **Check the tests** - Learn testing patterns +4. **Build your pipeline** - Use examples as foundation + +## Troubleshooting + +### Import errors + +Ensure packages are installed: +```bash +# For Spark jobs +cat docker/spark/requirements.txt + +# For Airflow DAGs +cat docker/airflow/requirements.txt +``` + +### Connection errors + +- Verify services are running: `make status` +- Check endpoints match your deployment +- Verify credentials + +### Spark job failures + +- Check logs: `make spark-logs` +- Verify Iceberg configuration +- Test locally first + +## Learn More + +- **Getting Started Tutorial**: `docs/getting-started-tutorial.md` +- **Iceberg vs Hive**: `docs/hive-vs-iceberg.md` +- **Production Guide**: `docs/production-guide.md` +- **Project Structure**: `docs/project-structure.md` diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..a6c720a --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,408 @@ +# Scripts Directory + +This directory contains utility scripts for managing and operating the LDP platform. + +## Structure + +``` +scripts/ +├── init/ # Initialization scripts +├── setup/ # Setup and configuration scripts +├── validation/ # Validation and health check scripts +└── README.md # This file +``` + +## Purpose + +Scripts in this directory are used for: + +- **Platform initialization** - Setting up services on first run +- **Health checks** - Verifying services are running correctly +- **Data seeding** - Loading initial or test data +- **Maintenance** - Cleanup, backups, and other operations +- **Validation** - Checking configurations and dependencies + +## Using Scripts + +### Running Scripts + +Most scripts are designed to be run from the project root: + +```bash +# Example +./scripts/init/setup_minio.sh +``` + +Or via Make targets (recommended): + +```bash +# Make wraps scripts with proper error handling +make init +make validate +``` + +## Common Script Categories + +### Initialization Scripts + +**Purpose**: Set up services and create required resources on first deployment + +**Examples:** +- Create MinIO buckets +- Initialize Iceberg warehouse +- Set up Airflow connections +- Create database schemas + +**When to run**: First deployment or after `make clean` + +### Validation Scripts + +**Purpose**: Verify platform health and configuration + +**Examples:** +- Check service connectivity +- Validate configuration files +- Verify required buckets/tables exist +- Test authentication + +**When to run**: After deployment, before production use, in CI/CD + +### Maintenance Scripts + +**Purpose**: Ongoing platform maintenance + +**Examples:** +- Clean up old logs +- Compact Iceberg tables +- Backup metadata +- Rotate credentials + +**When to run**: Scheduled (cron) or as needed + +## Best Practices for Scripts + +### 1. Make Scripts Idempotent + +Scripts should be safe to run multiple times: + +```bash +# Good - check before creating +if ! bucket_exists "my-bucket"; then + create_bucket "my-bucket" +fi + +# Bad - fails on second run +create_bucket "my-bucket" +``` + +### 2. Add Error Handling + +Always check for errors: + +```bash +#!/bin/bash +set -e # Exit on error +set -u # Exit on undefined variable +set -o pipefail # Exit on pipe failure + +# Your script here +``` + +### 3. Use Logging + +Add informative logging: + +```bash +echo "[INFO] Starting initialization..." +echo "[SUCCESS] Bucket created: datalake" +echo "[ERROR] Failed to connect to MinIO" >&2 +``` + +### 4. Document Parameters + +If script takes arguments, document them: + +```bash +#!/bin/bash +# Usage: ./script.sh +# Example: ./script.sh prod us-east-1 + +if [ $# -ne 2 ]; then + echo "Usage: $0 " + exit 1 +fi +``` + +### 5. Make Scripts Executable + +```bash +chmod +x scripts/your_script.sh +``` + +## Integration with Makefile + +Many scripts are wrapped in Makefile targets for ease of use: + +```makefile +# In Makefile +.PHONY: init +init: + @./scripts/init/setup_platform.sh +``` + +**Benefits:** +- Consistent interface (`make init`) +- Error handling +- Documentation (`make help`) +- Dependencies between targets + +## Creating New Scripts + +When adding a new script: + +1. **Choose the right directory**: + - Initialization? → `scripts/init/` + - Validation? → `scripts/validation/` + - Other? → Appropriate subdirectory + +2. **Follow naming conventions**: + - Use descriptive names: `setup_minio_buckets.sh` + - Use underscores, not hyphens: `check_services.sh` + +3. **Add a header**: + ```bash + #!/bin/bash + # Description: What this script does + # Usage: How to run it + # Author: Your name + ``` + +4. **Make it executable**: + ```bash + chmod +x scripts/your_script.sh + ``` + +5. **Test thoroughly**: + - Test on clean environment + - Test idempotency (run twice) + - Test error cases + +6. **Document in README**: + - Add to this file + - Update main README if needed + - Add to Makefile help if appropriate + +7. **Add to version control**: + ```bash + git add scripts/your_script.sh + ``` + +## Script Templates + +### Basic Bash Script + +```bash +#!/bin/bash +set -euo pipefail + +# Description of what this script does +# Usage: ./script_name.sh [args] + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Helper functions +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" >&2 +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +# Main script logic +main() { + log_info "Starting script..." + + # Your logic here + + log_info "Script completed successfully" +} + +# Run main function +main "$@" +``` + +### Python Script + +```python +#!/usr/bin/env python3 +""" +Description: What this script does +Usage: python script_name.py [args] +""" +import sys +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def main(): + """Main script logic.""" + logger.info("Starting script...") + + try: + # Your logic here + pass + except Exception as e: + logger.error(f"Script failed: {e}") + sys.exit(1) + + logger.info("Script completed successfully") + + +if __name__ == "__main__": + main() +``` + +## Common Operations + +### Check Service Health + +```bash +#!/bin/bash +# Check if all services are healthy + +services=("airflow" "spark-master" "minio" "postgres") + +for service in "${services[@]}"; do + if docker ps | grep -q "$service"; then + echo "✓ $service is running" + else + echo "✗ $service is NOT running" + exit 1 + fi +done +``` + +### Create MinIO Bucket + +```bash +#!/bin/bash +# Create MinIO bucket if it doesn't exist + +BUCKET_NAME="${1:-datalake}" + +# Using mc (MinIO Client) +mc alias set minio http://localhost:9000 admin minioadmin + +if mc ls minio/$BUCKET_NAME >/dev/null 2>&1; then + echo "Bucket $BUCKET_NAME already exists" +else + mc mb minio/$BUCKET_NAME + echo "Created bucket $BUCKET_NAME" +fi +``` + +### Validate Configuration + +```bash +#!/bin/bash +# Validate Iceberg configuration + +CONFIG_FILE="config/iceberg/catalog.properties" + +if [ ! -f "$CONFIG_FILE" ]; then + echo "ERROR: Configuration file not found: $CONFIG_FILE" + exit 1 +fi + +# Check required properties +required_props=("catalog-impl" "warehouse" "s3.endpoint") + +for prop in "${required_props[@]}"; do + if ! grep -q "^$prop=" "$CONFIG_FILE"; then + echo "ERROR: Missing required property: $prop" + exit 1 + fi +done + +echo "Configuration validated successfully" +``` + +## Security Considerations + +### Handling Secrets + +**Never hardcode secrets in scripts:** + +```bash +# Bad +PASSWORD="secret123" + +# Good - use environment variables +PASSWORD="${MINIO_PASSWORD:-}" + +# Good - read from secrets file +PASSWORD=$(cat /run/secrets/minio_password) + +# Good - prompt user +read -s -p "Enter password: " PASSWORD +``` + +### File Permissions + +Protect sensitive scripts: + +```bash +# Make script readable/executable only by owner +chmod 700 scripts/sensitive_script.sh + +# Or for group access +chmod 750 scripts/sensitive_script.sh +``` + +## Troubleshooting + +### Permission Denied + +```bash +# Make script executable +chmod +x scripts/your_script.sh +``` + +### Command Not Found + +```bash +# Check if command is installed +which python3 +which docker + +# Install if missing +apt-get install python3 +``` + +### Script Fails in CI/CD + +- Test locally first +- Check for environment-specific assumptions +- Use relative paths, not absolute +- Verify all dependencies are available + +## Learn More + +- Bash scripting guide: https://www.gnu.org/software/bash/manual/ +- Shell script best practices: https://google.github.io/styleguide/shellguide.html +- Makefile documentation: See `Makefile` in project root diff --git a/spark/README.md b/spark/README.md new file mode 100644 index 0000000..4800ab2 --- /dev/null +++ b/spark/README.md @@ -0,0 +1,531 @@ +# Spark Directory + +This directory contains Apache Spark job files and configurations for distributed data processing. + +## Structure + +``` +spark/ +├── jobs/ # Your Spark job scripts go here +└── README.md # This file +``` + +## What is Spark? + +Apache Spark is a unified analytics engine for large-scale data processing. It provides: + +- **Fast processing**: In-memory computing +- **Distributed**: Processes data across multiple nodes +- **Rich APIs**: Python (PySpark), Scala, Java, SQL, R +- **Libraries**: SQL, Streaming, ML, Graph processing + +## Adding Spark Jobs + +### Option 1: Copy from Examples + +Use tested job examples: + +```bash +# Simple Spark job +cp examples/spark_job.py spark/jobs/ + +# Iceberg operations +cp examples/iceberg_crud.py spark/jobs/ +``` + +### Option 2: Create Your Own + +Create a new job file in `spark/jobs/`: + +```python +""" +My Spark Job +Description: What this job does +""" +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, count, avg + + +def create_spark_session(): + """Create Spark session with Iceberg support.""" + return SparkSession.builder \ + .appName("MySparkJob") \ + .config("spark.jars.packages", + "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0") \ + .config("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.local", + "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.local.type", "hadoop") \ + .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/") \ + .getOrCreate() + + +def main(): + """Main job logic.""" + spark = create_spark_session() + + try: + # Your data processing logic here + df = spark.read.parquet("s3a://datalake/raw/data.parquet") + + # Transform + result = df.groupBy("category") \ + .agg(count("*").alias("count"), + avg("amount").alias("avg_amount")) + + # Write to Iceberg table + result.writeTo("local.analytics.summary") \ + .using("iceberg") \ + .createOrReplace() + + print(f"Processed {df.count()} records") + + finally: + spark.stop() + + +if __name__ == "__main__": + main() +``` + +## Running Spark Jobs + +### Local Development + +Submit jobs to your local Spark cluster: + +```bash +# Using Makefile (recommended) +make spark-submit APP=spark/jobs/my_job.py + +# Or directly with docker-compose +docker-compose exec spark-master spark-submit \ + --master spark://spark-master:7077 \ + /opt/spark/jobs/my_job.py +``` + +### With Arguments + +Pass arguments to your Spark job: + +```bash +make spark-submit APP=spark/jobs/my_job.py ARGS="--date 2024-12-20 --env prod" +``` + +In your Python script: + +```python +import sys + +if __name__ == "__main__": + # Parse arguments + date = sys.argv[1] if len(sys.argv) > 1 else "2024-01-01" + env = sys.argv[2] if len(sys.argv) > 2 else "dev" + + main(date, env) +``` + +### From Airflow + +Use SparkSubmitOperator in your DAG: + +```python +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +submit_job = SparkSubmitOperator( + task_id='run_spark_job', + application='/opt/spark/jobs/my_job.py', + conn_id='spark_default', + application_args=['--date', '{{ ds }}'], + conf={ + 'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0', + }, +) +``` + +## Spark Configuration + +### Basic Configuration + +All Spark jobs should include Iceberg support: + +```python +spark = SparkSession.builder \ + .appName("MyJob") \ + .config("spark.jars.packages", + "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0") \ + .config("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.local", + "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.local.type", "hadoop") \ + .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/") \ + .getOrCreate() +``` + +### Memory Configuration + +Adjust memory based on your data size: + +```python +spark = SparkSession.builder \ + .config("spark.executor.memory", "4g") \ + .config("spark.driver.memory", "2g") \ + .getOrCreate() +``` + +### S3/MinIO Configuration + +For reading/writing to MinIO: + +```python +spark = SparkSession.builder \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "admin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .getOrCreate() +``` + +## Common Spark Operations + +### Reading Data + +```python +# Parquet from S3/MinIO +df = spark.read.parquet("s3a://datalake/raw/data.parquet") + +# CSV with schema inference +df = spark.read.csv("s3a://datalake/raw/data.csv", header=True, inferSchema=True) + +# Iceberg table +df = spark.table("local.db.table_name") + +# JSON +df = spark.read.json("s3a://datalake/raw/data.json") +``` + +### Writing Data + +```python +# Write to Parquet +df.write.mode("overwrite").parquet("s3a://datalake/processed/output.parquet") + +# Write to Iceberg table (append) +df.writeTo("local.db.table_name").append() + +# Write to Iceberg table (replace) +df.writeTo("local.db.table_name").createOrReplace() + +# Partitioned write +df.write.partitionBy("year", "month") \ + .parquet("s3a://datalake/processed/data/") +``` + +### Transformations + +```python +from pyspark.sql.functions import col, when, lit, concat + +# Filter +df_filtered = df.filter(col("age") > 18) + +# Select and rename +df_selected = df.select( + col("id"), + col("name").alias("full_name"), + col("age") +) + +# Add columns +df_with_col = df.withColumn("status", + when(col("age") >= 18, lit("adult")) + .otherwise(lit("minor")) +) + +# Join +df_joined = df1.join(df2, df1.id == df2.user_id, "left") + +# Aggregate +df_agg = df.groupBy("department") \ + .agg( + count("*").alias("count"), + avg("salary").alias("avg_salary") + ) +``` + +### Working with Iceberg + +```python +# Create table +spark.sql(""" + CREATE TABLE IF NOT EXISTS local.db.users ( + id BIGINT, + name STRING, + email STRING, + created_at TIMESTAMP + ) USING iceberg + PARTITIONED BY (days(created_at)) +""") + +# Insert +df.writeTo("local.db.users").append() + +# Update +spark.sql(""" + UPDATE local.db.users + SET email = 'newemail@example.com' + WHERE id = 123 +""") + +# Delete +spark.sql("DELETE FROM local.db.users WHERE id = 456") + +# Time travel +df_historical = spark.read \ + .option("snapshot-id", "123456789") \ + .table("local.db.users") + +# View history +spark.sql("SELECT * FROM local.db.users.history").show() +``` + +## Job Patterns + +### Pattern 1: ETL Job + +```python +def etl_job(date: str): + """Extract, Transform, Load pattern.""" + spark = create_spark_session() + + # Extract + raw_df = spark.read.parquet(f"s3a://datalake/raw/{date}/") + + # Transform + clean_df = raw_df \ + .filter(col("value").isNotNull()) \ + .withColumn("processed_date", lit(date)) + + # Load + clean_df.writeTo(f"local.warehouse.cleaned_data").append() + + spark.stop() +``` + +### Pattern 2: Incremental Processing + +```python +def incremental_process(last_processed_id: int): + """Process only new records.""" + spark = create_spark_session() + + # Read only new records + new_records = spark.table("local.raw.events") \ + .filter(col("id") > last_processed_id) + + if new_records.count() == 0: + print("No new records to process") + return + + # Process + processed = new_records.transform(apply_business_logic) + + # Write + processed.writeTo("local.processed.events").append() + + spark.stop() +``` + +### Pattern 3: Data Quality Checks + +```python +def run_quality_checks(df): + """Run data quality validations.""" + errors = [] + + # Check for nulls + null_count = df.filter(col("id").isNull()).count() + if null_count > 0: + errors.append(f"Found {null_count} null IDs") + + # Check for duplicates + dup_count = df.groupBy("id").count() \ + .filter(col("count") > 1).count() + if dup_count > 0: + errors.append(f"Found {dup_count} duplicate IDs") + + # Check value ranges + invalid_ages = df.filter((col("age") < 0) | (col("age") > 150)).count() + if invalid_ages > 0: + errors.append(f"Found {invalid_ages} invalid ages") + + if errors: + raise ValueError(f"Data quality checks failed: {errors}") + + return True +``` + +## Best Practices + +### 1. Always Stop Spark Session + +Use try-finally to ensure cleanup: + +```python +def main(): + spark = create_spark_session() + try: + # Your processing logic + pass + finally: + spark.stop() +``` + +### 2. Use Appropriate File Formats + +- **Parquet**: Default choice for analytics (columnar, compressed) +- **Iceberg**: For tables needing ACID, updates, or time travel +- **CSV**: Only for human-readable data or external systems +- **JSON**: For nested/semi-structured data + +### 3. Partition Large Datasets + +```python +# Partition by date for time-series data +df.write \ + .partitionBy("year", "month", "day") \ + .parquet("s3a://datalake/data/") +``` + +### 4. Cache Reused DataFrames + +```python +# Cache if DataFrame is used multiple times +df = spark.read.parquet("s3a://datalake/data/") +df.cache() + +# Use multiple times +df.filter(condition1).count() +df.filter(condition2).show() + +# Unpersist when done +df.unpersist() +``` + +### 5. Broadcast Small Tables + +```python +from pyspark.sql.functions import broadcast + +# For joins with small dimension tables +result = large_df.join(broadcast(small_df), "key") +``` + +### 6. Handle Schema Evolution + +With Iceberg, schemas can evolve safely: + +```python +# Add column +spark.sql("ALTER TABLE local.db.users ADD COLUMN phone STRING") + +# Rename column +spark.sql("ALTER TABLE local.db.users RENAME COLUMN email TO email_address") +``` + +## Monitoring and Debugging + +### View Spark UI + +Access Spark UI for monitoring: + +- **Master UI**: http://localhost:8081 +- **Worker UI**: http://localhost:8082 +- **Application UI**: Available while job is running + +### Check Logs + +```bash +# View Spark master logs +make spark-logs + +# Or directly +docker logs ldp-spark-master + +# Worker logs +docker logs ldp-spark-worker +``` + +### Enable Debug Logging + +```python +# In your job +spark.sparkContext.setLogLevel("DEBUG") +``` + +## Common Issues + +### Out of Memory + +Increase executor memory: + +```python +spark = SparkSession.builder \ + .config("spark.executor.memory", "8g") \ + .config("spark.driver.memory", "4g") \ + .getOrCreate() +``` + +### Slow Shuffles + +- Increase shuffle partitions +- Use broadcast joins for small tables +- Repartition data appropriately + +```python +spark.conf.set("spark.sql.shuffle.partitions", "200") +``` + +### Connection to MinIO Failed + +Verify configuration: +- Endpoint: `http://minio:9000` (in Docker network) +- Credentials: `admin` / `minioadmin` +- Path style access: `true` + +## Testing Spark Jobs + +See `examples/tests/spark/` for testing patterns: + +```bash +# Run Spark tests +pytest examples/tests/spark/ +``` + +## Dependencies + +Python packages for Spark workers are defined in: + +``` +docker/spark/requirements.txt +``` + +To add new dependencies: + +1. Edit `docker/spark/requirements.txt` +2. Rebuild Spark images: + ```bash + docker-compose build spark-master spark-worker + docker-compose up -d + ``` + +## Learn More + +- **Spark Documentation**: https://spark.apache.org/docs/latest/ +- **PySpark API**: https://spark.apache.org/docs/latest/api/python/ +- **Iceberg Spark**: https://iceberg.apache.org/docs/latest/spark-getting-started/ +- **Getting Started Tutorial**: `docs/getting-started-tutorial.md` +- **Examples**: `examples/` directory diff --git a/terraform/README.md b/terraform/README.md new file mode 100644 index 0000000..4648ba3 --- /dev/null +++ b/terraform/README.md @@ -0,0 +1,654 @@ +# Terraform Directory + +This directory contains Infrastructure as Code (IaC) for deploying LDP to cloud environments. + +## Structure + +``` +terraform/ +├── modules/ # Reusable Terraform modules +├── environments/ # Environment-specific configurations +│ ├── dev/ +│ ├── staging/ +│ └── prod/ +└── README.md # This file +``` + +## What is Terraform? + +Terraform is an Infrastructure as Code tool that allows you to define and provision infrastructure using a declarative configuration language. + +## Purpose + +Use Terraform to deploy LDP infrastructure to: + +- **AWS** - EKS, S3, RDS, etc. +- **Azure** - AKS, Blob Storage, etc. +- **GCP** - GKE, Cloud Storage, etc. +- **On-premises** - Kubernetes clusters + +## Quick Start + +### Prerequisites + +```bash +# Install Terraform +# macOS +brew install terraform + +# Linux +wget https://releases.hashicorp.com/terraform/1.6.0/terraform_1.6.0_linux_amd64.zip +unzip terraform_1.6.0_linux_amd64.zip +sudo mv terraform /usr/local/bin/ + +# Verify +terraform version +``` + +### Initialize + +```bash +cd terraform/environments/dev +terraform init +``` + +### Plan + +Preview changes before applying: + +```bash +terraform plan +``` + +### Apply + +Deploy infrastructure: + +```bash +terraform apply +``` + +### Destroy + +Remove all infrastructure: + +```bash +terraform destroy +``` + +## Directory Structure + +### Modules + +**Location**: `terraform/modules/` + +Reusable infrastructure components: + +``` +modules/ +├── kubernetes/ # Kubernetes cluster (EKS, AKS, GKE) +├── storage/ # Object storage (S3, Blob, GCS) +├── database/ # Managed databases +├── networking/ # VPC, subnets, security groups +└── monitoring/ # CloudWatch, Prometheus, Grafana +``` + +**Example module usage:** + +```hcl +module "kubernetes" { + source = "../../modules/kubernetes" + + cluster_name = "ldp-prod" + cluster_version = "1.28" + node_count = 3 + instance_type = "m5.xlarge" +} +``` + +### Environments + +**Location**: `terraform/environments/` + +Environment-specific configurations: + +- **dev**: Development environment (small, cost-optimized) +- **staging**: Pre-production environment (production-like) +- **prod**: Production environment (high availability, scaled) + +**Example directory structure:** + +``` +environments/dev/ +├── main.tf # Main configuration +├── variables.tf # Input variables +├── outputs.tf # Output values +├── terraform.tfvars # Variable values +└── backend.tf # Remote state configuration +``` + +## Basic Terraform Configuration + +### Main Configuration (main.tf) + +```hcl +terraform { + required_version = ">= 1.5.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 5.0" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = "~> 2.23" + } + } +} + +provider "aws" { + region = var.aws_region +} + +# Deploy Kubernetes cluster +module "kubernetes" { + source = "../../modules/kubernetes" + + cluster_name = "ldp-${var.environment}" + node_count = var.node_count +} + +# Deploy S3 buckets +module "storage" { + source = "../../modules/storage" + + bucket_prefix = "ldp-${var.environment}" + buckets = [ + "datalake", + "warehouse", + "archive" + ] +} +``` + +### Variables (variables.tf) + +```hcl +variable "environment" { + description = "Environment name (dev, staging, prod)" + type = string +} + +variable "aws_region" { + description = "AWS region" + type = string + default = "us-east-1" +} + +variable "node_count" { + description = "Number of Kubernetes nodes" + type = number + default = 3 +} +``` + +### Variable Values (terraform.tfvars) + +```hcl +environment = "dev" +aws_region = "us-east-1" +node_count = 2 +``` + +### Outputs (outputs.tf) + +```hcl +output "cluster_endpoint" { + description = "Kubernetes cluster endpoint" + value = module.kubernetes.endpoint +} + +output "bucket_names" { + description = "Created S3 bucket names" + value = module.storage.bucket_names +} +``` + +## Remote State Management + +Use remote state for team collaboration: + +**Backend configuration (backend.tf):** + +```hcl +terraform { + backend "s3" { + bucket = "ldp-terraform-state" + key = "dev/terraform.tfstate" + region = "us-east-1" + + dynamodb_table = "ldp-terraform-locks" + encrypt = true + } +} +``` + +## Common Resources + +### AWS Deployment + +```hcl +# EKS Cluster +module "eks" { + source = "terraform-aws-modules/eks/aws" + + cluster_name = "ldp-cluster" + cluster_version = "1.28" + + vpc_id = module.vpc.vpc_id + subnet_ids = module.vpc.private_subnets + + eks_managed_node_groups = { + workers = { + min_size = 2 + max_size = 10 + desired_size = 3 + + instance_types = ["m5.large"] + } + } +} + +# S3 Buckets +resource "aws_s3_bucket" "datalake" { + bucket = "ldp-datalake-${var.environment}" + + tags = { + Environment = var.environment + Project = "LDP" + } +} + +# RDS for PostgreSQL (Airflow metadata) +resource "aws_db_instance" "postgres" { + identifier = "ldp-postgres-${var.environment}" + engine = "postgres" + engine_version = "16" + instance_class = "db.t3.medium" + allocated_storage = 20 + + db_name = "airflow" + username = "airflow" + password = var.db_password + + vpc_security_group_ids = [aws_security_group.database.id] + db_subnet_group_name = aws_db_subnet_group.main.name + + skip_final_snapshot = var.environment != "prod" +} +``` + +### Kubernetes Deployment + +```hcl +# Deploy Airflow +resource "kubernetes_deployment" "airflow" { + metadata { + name = "airflow-webserver" + namespace = kubernetes_namespace.ldp.metadata[0].name + } + + spec { + replicas = 2 + + selector { + match_labels = { + app = "airflow-webserver" + } + } + + template { + metadata { + labels = { + app = "airflow-webserver" + } + } + + spec { + container { + name = "webserver" + image = "apache/airflow:3.1.5" + + env { + name = "AIRFLOW__CORE__SQL_ALCHEMY_CONN" + value = var.database_connection_string + } + } + } + } + } +} +``` + +## Best Practices + +### 1. Use Modules + +Organize reusable infrastructure into modules: + +``` +modules/ +├── storage/ +│ ├── main.tf +│ ├── variables.tf +│ └── outputs.tf +``` + +### 2. Separate Environments + +Keep environments isolated: + +``` +environments/ +├── dev/ +├── staging/ +└── prod/ +``` + +### 3. Use Remote State + +Store state remotely for collaboration: + +```hcl +backend "s3" { + bucket = "terraform-state" + key = "ldp/terraform.tfstate" +} +``` + +### 4. Tag Resources + +Tag all resources for organization and cost tracking: + +```hcl +tags = { + Environment = var.environment + Project = "LDP" + ManagedBy = "Terraform" +} +``` + +### 5. Use Variables + +Make configurations flexible: + +```hcl +variable "instance_type" { + default = { + dev = "t3.medium" + staging = "m5.large" + prod = "m5.xlarge" + } +} +``` + +### 6. Validate Before Apply + +Always review changes: + +```bash +terraform plan -out=tfplan +# Review the plan +terraform apply tfplan +``` + +### 7. Lock State + +Use state locking to prevent concurrent modifications: + +```hcl +backend "s3" { + dynamodb_table = "terraform-locks" +} +``` + +## Security Best Practices + +### 1. Never Commit Secrets + +Use AWS Secrets Manager, HashiCorp Vault, or similar: + +```hcl +data "aws_secretsmanager_secret_version" "db_password" { + secret_id = "ldp/database/password" +} +``` + +### 2. Use IAM Roles + +Don't hardcode AWS credentials: + +```hcl +# Use instance profiles or EKS pod identity +``` + +### 3. Encrypt State + +Enable encryption for remote state: + +```hcl +backend "s3" { + encrypt = true + kms_key_id = "arn:aws:kms:..." +} +``` + +### 4. Restrict Access + +Use least-privilege IAM policies: + +```hcl +resource "aws_iam_policy" "terraform" { + name = "terraform-ldp-policy" + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "eks:*", + "ec2:*", + "s3:*" + ] + Resource = "*" + } + ] + }) +} +``` + +## Workflow + +### Initial Setup + +```bash +# 1. Configure AWS credentials +aws configure + +# 2. Initialize Terraform +cd terraform/environments/dev +terraform init + +# 3. Create workspace +terraform workspace new dev + +# 4. Plan and apply +terraform plan +terraform apply +``` + +### Making Changes + +```bash +# 1. Edit Terraform files +vim main.tf + +# 2. Format code +terraform fmt -recursive + +# 3. Validate syntax +terraform validate + +# 4. Review changes +terraform plan + +# 5. Apply changes +terraform apply +``` + +### Managing Multiple Environments + +```bash +# Switch to staging +terraform workspace select staging + +# Or pass var file +terraform apply -var-file=staging.tfvars +``` + +## Common Commands + +```bash +# Initialize +terraform init + +# Format code +terraform fmt + +# Validate configuration +terraform validate + +# Plan changes +terraform plan + +# Apply changes +terraform apply + +# Destroy infrastructure +terraform destroy + +# Show current state +terraform show + +# List resources +terraform state list + +# Import existing resource +terraform import aws_s3_bucket.example my-bucket +``` + +## Troubleshooting + +### State Lock Issues + +```bash +# Force unlock (use carefully!) +terraform force-unlock LOCK_ID +``` + +### Import Existing Resources + +```bash +# Import resource to state +terraform import module.storage.aws_s3_bucket.datalake my-bucket-name +``` + +### Debug + +```bash +# Enable debug logging +export TF_LOG=DEBUG +terraform apply +``` + +## Integration with CI/CD + +### GitHub Actions Example + +```yaml +name: Terraform + +on: + push: + branches: [main] + paths: + - 'terraform/**' + +jobs: + terraform: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Setup Terraform + uses: hashicorp/setup-terraform@v2 + + - name: Terraform Init + run: terraform init + working-directory: terraform/environments/prod + + - name: Terraform Plan + run: terraform plan + working-directory: terraform/environments/prod + + - name: Terraform Apply + if: github.ref == 'refs/heads/main' + run: terraform apply -auto-approve + working-directory: terraform/environments/prod +``` + +## Cost Optimization + +### Use Auto-scaling + +```hcl +eks_managed_node_groups = { + workers = { + min_size = 1 + max_size = 10 + desired_size = 2 + } +} +``` + +### Use Spot Instances + +```hcl +eks_managed_node_groups = { + spot = { + capacity_type = "SPOT" + instance_types = ["m5.large", "m5a.large"] + } +} +``` + +### Tag for Cost Tracking + +```hcl +tags = { + Environment = var.environment + Project = "LDP" + CostCenter = "DataEngineering" +} +``` + +## Learn More + +- **Terraform Documentation**: https://www.terraform.io/docs +- **AWS Provider**: https://registry.terraform.io/providers/hashicorp/aws/latest/docs +- **Kubernetes Provider**: https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs +- **Best Practices**: https://www.terraform-best-practices.com/ +- **Production Guide**: `docs/production-guide.md` diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..3d542f3 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,656 @@ +# Tests Directory + +This directory contains test suites for validating LDP components and pipelines. + +## Structure + +``` +tests/ +├── unit/ # Unit tests for individual components +├── integration/ # Integration tests for service interactions +├── e2e/ # End-to-end pipeline tests +├── conftest.py # Pytest configuration and fixtures +└── README.md # This file +``` + +## Testing Framework + +LDP uses **pytest** as the primary testing framework. + +### Installation + +```bash +pip install pytest pytest-cov +``` + +### Running Tests + +```bash +# Run all tests +make test + +# Or directly with pytest +pytest tests/ + +# Run specific test file +pytest tests/integration/test_iceberg.py + +# Run with coverage +pytest --cov=. tests/ + +# Run tests matching pattern +pytest -k "test_iceberg" + +# Verbose output +pytest -v tests/ +``` + +## Test Categories + +### Unit Tests + +**Location**: `tests/unit/` + +Test individual functions and components in isolation. + +**Example:** + +```python +""" +Unit tests for data transformations +""" +import pytest +from pyspark.sql import SparkSession +from spark.jobs.transformations import clean_data + + +@pytest.fixture +def spark(): + """Create Spark session for testing.""" + spark = SparkSession.builder \ + .appName("test") \ + .master("local[*]") \ + .getOrCreate() + yield spark + spark.stop() + + +def test_clean_data_removes_nulls(spark): + """Test that clean_data removes null values.""" + # Arrange + data = [ + (1, "Alice", "alice@example.com"), + (2, None, "bob@example.com"), + (3, "Charlie", None), + ] + df = spark.createDataFrame(data, ["id", "name", "email"]) + + # Act + result = clean_data(df) + + # Assert + assert result.count() == 1 + assert result.filter("name IS NOT NULL").count() == 1 + assert result.filter("email IS NOT NULL").count() == 1 +``` + +**Best practices:** +- Test one thing per test +- Use descriptive names: `test___` +- Use arrange-act-assert pattern +- Keep tests fast (no external dependencies) + +### Integration Tests + +**Location**: `tests/integration/` + +Test interactions between multiple components. + +**Example:** + +```python +""" +Integration tests for Iceberg table operations +""" +import pytest +from pyspark.sql import SparkSession + + +@pytest.fixture(scope="module") +def spark_with_iceberg(): + """Create Spark session with Iceberg configuration.""" + spark = SparkSession.builder \ + .appName("integration-test") \ + .config("spark.jars.packages", + "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0") \ + .config("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.test", + "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.test.type", "hadoop") \ + .config("spark.sql.catalog.test.warehouse", + "/tmp/test-warehouse") \ + .getOrCreate() + yield spark + spark.stop() + + +def test_create_and_query_iceberg_table(spark_with_iceberg): + """Test creating and querying an Iceberg table.""" + spark = spark_with_iceberg + + # Create table + spark.sql(""" + CREATE TABLE IF NOT EXISTS test.db.users ( + id BIGINT, + name STRING + ) USING iceberg + """) + + # Insert data + data = [(1, "Alice"), (2, "Bob")] + df = spark.createDataFrame(data, ["id", "name"]) + df.writeTo("test.db.users").append() + + # Query + result = spark.table("test.db.users") + + # Assert + assert result.count() == 2 + assert result.filter("id = 1").first()["name"] == "Alice" + + # Cleanup + spark.sql("DROP TABLE IF EXISTS test.db.users") +``` + +**Examples included:** +- `test_iceberg_tables.py` - Iceberg CRUD operations +- `test_minio_access.py` - MinIO connectivity and operations +- `test_airflow_spark.py` - Airflow + Spark integration + +### End-to-End Tests + +**Location**: `tests/e2e/` + +Test complete pipelines from start to finish. + +**Example:** + +```python +""" +End-to-end pipeline test +""" +import pytest +from datetime import datetime + + +def test_complete_data_pipeline(spark, minio_client): + """Test complete data pipeline: ingest → transform → load.""" + # 1. Ingest: Upload raw data to MinIO + test_data = "user_id,name,age\n1,Alice,25\n2,Bob,30" + minio_client.put_object( + Bucket="datalake", + Key="raw/users.csv", + Body=test_data.encode() + ) + + # 2. Transform: Process with Spark + df = spark.read.csv("s3a://datalake/raw/users.csv", header=True) + transformed = df.filter(df.age >= 18) + + # 3. Load: Write to Iceberg table + transformed.writeTo("test.analytics.users").createOrReplace() + + # 4. Verify: Query final table + result = spark.table("test.analytics.users") + + assert result.count() == 2 + assert all(row.age >= 18 for row in result.collect()) + + # Cleanup + spark.sql("DROP TABLE IF EXISTS test.analytics.users") + minio_client.delete_object(Bucket="datalake", Key="raw/users.csv") +``` + +**Use cases:** +- Verify complete workflows +- Test data quality end-to-end +- Validate error handling +- Check monitoring and alerts + +## Pytest Configuration + +### conftest.py + +Shared fixtures and configuration: + +```python +""" +Pytest configuration and shared fixtures +""" +import pytest +from pyspark.sql import SparkSession +import boto3 + + +@pytest.fixture(scope="session") +def spark(): + """Create Spark session for all tests.""" + spark = SparkSession.builder \ + .appName("ldp-tests") \ + .master("local[*]") \ + .config("spark.driver.memory", "2g") \ + .getOrCreate() + + yield spark + + spark.stop() + + +@pytest.fixture(scope="session") +def minio_client(): + """Create MinIO client for tests.""" + return boto3.client( + 's3', + endpoint_url='http://localhost:9000', + aws_access_key_id='admin', + aws_secret_access_key='minioadmin' + ) + + +@pytest.fixture +def clean_database(spark): + """Clean test database before and after tests.""" + # Setup: Drop all test tables + spark.sql("DROP DATABASE IF EXISTS test_db CASCADE") + spark.sql("CREATE DATABASE IF NOT EXISTS test_db") + + yield + + # Teardown: Clean up + spark.sql("DROP DATABASE IF EXISTS test_db CASCADE") +``` + +### pytest.ini + +Project-level pytest configuration: + +```ini +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = + -v + --strict-markers + --tb=short + --cov=. + --cov-report=html + --cov-report=term-missing + +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + integration: marks tests as integration tests + e2e: marks tests as end-to-end tests +``` + +## Test Data + +### Using Fixtures + +Create reusable test data: + +```python +@pytest.fixture +def sample_users_df(spark): + """Sample users DataFrame.""" + data = [ + (1, "Alice", 25, "Engineering"), + (2, "Bob", 30, "Sales"), + (3, "Charlie", 35, "Engineering"), + ] + return spark.createDataFrame(data, ["id", "name", "age", "department"]) + + +def test_filter_by_department(sample_users_df): + """Test filtering by department.""" + result = sample_users_df.filter("department = 'Engineering'") + assert result.count() == 2 +``` + +### Test Data Files + +Store test data in `tests/data/`: + +``` +tests/data/ +├── sample_users.csv +├── sample_events.json +└── sample_transactions.parquet +``` + +Load in tests: + +```python +def test_read_csv(spark): + """Test reading CSV file.""" + df = spark.read.csv("tests/data/sample_users.csv", header=True) + assert df.count() > 0 +``` + +## Markers + +Use markers to categorize tests: + +```python +import pytest + + +@pytest.mark.slow +def test_large_dataset_processing(spark): + """Test processing large dataset (marked as slow).""" + # This test takes a long time + pass + + +@pytest.mark.integration +def test_minio_connection(): + """Test MinIO connectivity (marked as integration).""" + pass + + +@pytest.mark.e2e +def test_full_pipeline(): + """Test complete pipeline (marked as e2e).""" + pass +``` + +Run specific markers: + +```bash +# Run only integration tests +pytest -m integration + +# Skip slow tests +pytest -m "not slow" + +# Run integration and e2e tests +pytest -m "integration or e2e" +``` + +## Testing Best Practices + +### 1. Arrange-Act-Assert Pattern + +```python +def test_example(): + # Arrange: Set up test data and conditions + data = [1, 2, 3] + expected = 6 + + # Act: Execute the code being tested + result = sum(data) + + # Assert: Verify the result + assert result == expected +``` + +### 2. Descriptive Test Names + +```python +# Good +def test_clean_data_removes_null_emails(): + pass + +# Bad +def test_clean(): + pass +``` + +### 3. One Assertion per Test (when possible) + +```python +# Good +def test_user_has_valid_email(): + user = create_user() + assert "@" in user.email + +def test_user_has_valid_name(): + user = create_user() + assert len(user.name) > 0 + +# Less ideal +def test_user_validation(): + user = create_user() + assert "@" in user.email + assert len(user.name) > 0 +``` + +### 4. Use Fixtures for Setup/Teardown + +```python +@pytest.fixture +def test_table(spark): + """Create test table, clean up after.""" + # Setup + spark.sql("CREATE TABLE test.users (id INT, name STRING)") + + yield "test.users" + + # Teardown + spark.sql("DROP TABLE IF EXISTS test.users") + + +def test_insert_data(test_table): + """Test using the fixture.""" + # Table is created automatically + # Will be cleaned up automatically + pass +``` + +### 5. Isolate Tests + +Tests should not depend on each other: + +```python +# Bad - depends on test order +def test_create_user(): + global user + user = User("Alice") + +def test_user_name(): + assert user.name == "Alice" # Fails if test_create_user didn't run + +# Good - self-contained +def test_create_user(): + user = User("Alice") + assert user.name == "Alice" + +def test_user_name(): + user = User("Alice") + assert user.name == "Alice" +``` + +### 6. Test Edge Cases + +```python +def test_divide_by_zero(): + """Test edge case: division by zero.""" + with pytest.raises(ZeroDivisionError): + result = 10 / 0 + + +def test_empty_dataframe(spark): + """Test edge case: empty DataFrame.""" + df = spark.createDataFrame([], schema="id INT, name STRING") + result = process_data(df) + assert result.count() == 0 +``` + +## Coverage + +### Generate Coverage Reports + +```bash +# Run tests with coverage +pytest --cov=spark --cov=airflow --cov-report=html tests/ + +# View report +open htmlcov/index.html +``` + +### Coverage Configuration + +In `pytest.ini`: + +```ini +[coverage:run] +source = spark,airflow +omit = + */tests/* + */venv/* + */__pycache__/* +``` + +## Continuous Integration + +### GitHub Actions + +```yaml +name: Tests + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.13' + + - name: Install dependencies + run: | + pip install -r requirements-test.txt + + - name: Run tests + run: | + pytest --cov --cov-report=xml + + - name: Upload coverage + uses: codecov/codecov-action@v3 +``` + +## Example Tests + +### Test Airflow DAG + +```python +""" +Test Airflow DAG validation +""" +from airflow.models import DagBag + + +def test_dag_loads_with_no_errors(): + """Test that all DAGs load without errors.""" + dag_bag = DagBag(include_examples=False) + assert len(dag_bag.import_errors) == 0 + + +def test_simple_dag_has_correct_tasks(): + """Test simple_dag has expected tasks.""" + dag_bag = DagBag(include_examples=False) + dag = dag_bag.get_dag('simple_example') + + assert dag is not None + assert len(dag.tasks) == 3 + + task_ids = [task.task_id for task in dag.tasks] + assert 'print_date' in task_ids + assert 'hello_world' in task_ids + assert 'finish' in task_ids +``` + +### Test Spark Transformation + +```python +""" +Test Spark data transformations +""" +from pyspark.sql import SparkSession + + +def test_filter_adults(spark): + """Test filtering users by age.""" + data = [(1, "Alice", 25), (2, "Bob", 17), (3, "Charlie", 30)] + df = spark.createDataFrame(data, ["id", "name", "age"]) + + result = df.filter(df.age >= 18) + + assert result.count() == 2 + ages = [row.age for row in result.collect()] + assert all(age >= 18 for age in ages) +``` + +### Test Data Quality + +```python +""" +Test data quality checks +""" +def test_no_null_ids(sample_df): + """Test that there are no null IDs.""" + null_count = sample_df.filter("id IS NULL").count() + assert null_count == 0 + + +def test_no_duplicate_ids(sample_df): + """Test that IDs are unique.""" + total_count = sample_df.count() + distinct_count = sample_df.select("id").distinct().count() + assert total_count == distinct_count + + +def test_email_format(sample_df): + """Test that emails contain @.""" + invalid_emails = sample_df.filter(~sample_df.email.contains("@")).count() + assert invalid_emails == 0 +``` + +## Troubleshooting + +### Tests Fail Locally + +```bash +# Clean pytest cache +pytest --cache-clear + +# Reinstall dependencies +pip install -r requirements-test.txt +``` + +### Spark Tests Hang + +- Reduce executor memory in test fixtures +- Use `master("local[1]")` for single-threaded testing +- Ensure `spark.stop()` is called + +### MinIO Connection Fails + +- Verify MinIO is running: `docker ps | grep minio` +- Check endpoint URL matches your setup +- Verify credentials + +## Learn More + +- **Pytest Documentation**: https://docs.pytest.org/ +- **Testing Best Practices**: https://testdriven.io/blog/testing-best-practices/ +- **PySpark Testing**: https://spark.apache.org/docs/latest/api/python/user_guide/testing.html +- **Example Tests**: `examples/tests/` directory +- **CI Configuration**: `.github/workflows/` directory