Skip to content

UnicoLab/GraphFlow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

23 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš€ GraphFlow

Intelligent Graph-Based Data Processing Framework with Automatic Context Management

Built with ❀️ by UnicoLab.ai

Python 3.11+ License: MIT Code style: black Documentation Tests UnicoLab.ai

🌐 Website β€’ πŸ“š Documentation β€’ πŸ’¬ Community β€’ πŸ› Issues

GraphFlow is a production-ready Python framework designed for intelligent data processing pipelines that automatically manage context and dependencies while scaling from prototype to production without code changes.

πŸš€ Ready for Production: Built-in caching, retries, monitoring, validation, and flexible execution backends make GraphFlow production-ready from day one.

πŸš€ Key Features

  • 🎯 Zero-friction Development: Write pandas/polars code, get distributed execution
  • 🧠 Automatic Context Resolution: Smart parameter detection and dependency injection based on graph analysis
  • πŸš€ Production-Grade: Built-in caching, retries, monitoring, and data quality validation
  • ⚑ Flexible Execution: Seamlessly choose between local, distributed, or cloud execution
  • πŸ“Š Dynamic Graph Exports: Export and visualize pipelines in multiple formats (HTML, GraphViz, Mermaid, JSON, YAML)
  • πŸ”§ Broad Applicability: Optimized for any data processing workflow - ETL, ML features, analytics, streaming
  • πŸ”„ Content-Addressed Caching: Automatic incremental recomputation based on content hashes
  • πŸ“ˆ Real-time Monitoring: Pipeline inspector with live analysis and performance profiling
  • πŸ’Ύ Memory Optimization: Built-in streaming, chunking, and memory management for big data processing
  • πŸ” Auto-Discovery: Automatically discover and add nodes from modules or scopes
  • βœ… Pipeline Validation: Comprehensive validation with dependency checking and cycle detection

πŸ—οΈ Quick Start

Installation

# Basic installation
pip install graphflow

# With all optional dependencies
pip install "graphflow[all]"

# For distributed execution
pip install "graphflow[distributed]"

# For cloud execution
pip install "graphflow[cloud]"

Simple Example

from graphflow import Pipeline, node, context, dataset
import pandas as pd

# Create context - parameters automatically flow where needed
ctx = context(
    lookback_days=30,
    min_samples=100,
    target_col="churn"
)

# Create pipeline
pipeline = Pipeline(
    name="my_data_pipeline",
    base_uri="s3://my-bucket/data/",
    context=ctx
)

# Define processing node - context params auto-detected and injected!
@node(
    inputs=[dataset("raw/customers")],
    outputs=[dataset("processed/customer_features")]
)
def process_customers(df: pd.DataFrame, 
                     lookback_days: int,    # Auto-injected from context
                     min_samples: int) -> pd.DataFrame:  # Auto-injected from context
    # Context automatically provides lookback_days and min_samples
    return df.groupby('customer_id').tail(lookback_days)

# Add to pipeline and run
pipeline.add_node(process_customers)

# Choose execution backend dynamically
result = pipeline.run(
    executor="auto"  # or "local", "ray", "cloud"
)

print(result.summary())

# Export pipeline graph
pipeline.export_graph(format="html", output="my_pipeline.html")

Memory Optimization Example

from graphflow import Pipeline, node, Context, Dataset
import pandas as pd

# Create pipeline with memory optimization enabled
pipeline = Pipeline(
    name="big_data_pipeline",
    base_uri="./data",
    context=Context({"chunk_size": 10000}),
    memory_optimization_enabled=True,
    chunk_size=10000,
    max_memory_mb=1000  # Limit memory usage to 1GB
)

@node(
    inputs=[Dataset("large_dataset")],
    outputs=[Dataset("processed_data")]
)
def process_large_data(df: pd.DataFrame, chunk_size: int = 10000) -> pd.DataFrame:
    """Process large dataset in chunks for memory efficiency."""
    result_chunks = []
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i + chunk_size].copy()
        # Process chunk
        chunk['processed'] = chunk['value'] * 2
        result_chunks.append(chunk)
    
    return pd.concat(result_chunks, ignore_index=True)

# Auto-discover nodes from current module
pipeline.auto_discover_nodes(globals())

# Run with memory optimization
result = pipeline.run()

# Check memory usage
memory_usage = pipeline.get_memory_usage()
print(f"Memory usage: {memory_usage['rss_mb']:.1f}MB")

# Clean up memory
pipeline.cleanup_memory()

🎯 Core Concepts

Automatic Context Management

  • No manual dependency wiring: Context parameters are automatically detected and injected
  • Smart categorization: Parameters are automatically organized by type (data, processing, ML, infrastructure)
  • Graph-aware: Context flows intelligently through the pipeline graph

Flexible Execution

  • Auto-selection: Framework chooses the best executor for your workload
  • Hybrid execution: Different nodes can run on different backends
  • Scale transparently: Same code runs locally or on 1000+ node clusters

Dynamic Graph Visualization

  • Multiple formats: Export to HTML, GraphViz, Mermaid, JSON, or YAML
  • Real-time inspection: Live pipeline analysis and filtering
  • Rich metadata: Show context dependencies, cache status, and execution history

Memory Optimization & Big Data Support

  • Streaming Processing: Process large datasets in chunks without loading everything into memory
  • Memory Management: Automatic garbage collection and memory usage monitoring
  • Chunked Processing: Configurable chunk sizes for optimal memory usage
  • Memory-Mapped Files: Efficient file-based data processing for very large datasets
  • Auto-Discovery: Automatically discover and add nodes from modules or scopes
  • Pipeline Validation: Comprehensive validation with dependency checking and cycle detection

πŸ“š Documentation

πŸ“– Quick Links

πŸ“Š Current Status

Version: 0.1.0 (Development)

βœ… Implemented Features

  • Core Framework: Pipeline engine, context management, dataset abstraction
  • Node System: Decorator-based node definition with automatic dependency detection
  • Execution Backends: Local, thread pool, and process pool executors
  • Context Management: Automatic parameter categorization and injection
  • Graph Analysis: Pipeline inspection and dependency analysis
  • Documentation: Comprehensive documentation with examples and API reference
  • Testing: Unit tests with parallel execution support
  • Development Tools: Makefile, pre-commit hooks, linting, formatting

βœ… Recently Implemented

  • Distributed Executors: Ray and Dask integration βœ…
  • Advanced Caching: Content-addressed caching system βœ…
  • Data Validation: Schema validation and data quality checks βœ…
  • Graph Visualization: Dynamic graph export and visualization βœ…
  • Jupyter Notebooks: Interactive examples and tutorials βœ…
  • Enhanced CLI: Rich output and better error handling βœ…
  • Memory Optimization: Streaming processing and memory management βœ…
  • Auto-Discovery: Automatic node discovery from modules βœ…
  • Pipeline Validation: Comprehensive validation with dependency checking βœ…
  • Big Data Support: Chunked processing and memory-mapped files βœ…

🚧 In Development

  • Cloud Executors: Vertex AI, AWS Batch, Azure ML support
  • Performance Profiling: Built-in performance monitoring
  • Streaming Support: Real-time data processing capabilities

🎯 Roadmap

  • v0.2.0: βœ… Distributed execution, caching, validation, and visualization (Current)
  • v0.3.0: Cloud backends and performance profiling
  • v0.4.0: Streaming support and advanced ML features
  • v1.0.0: Production-ready release with full feature set

πŸ› οΈ Development

Setup Development Environment

# Clone the repository
git clone https://github.com/UnicoLab/GraphFlow.git
cd GraphFlow

# Install development dependencies
make install-dev

# Run tests
make test

# Build documentation
make docs

# Serve documentation locally
make docs-serve

Available Commands

make help          # Show all available commands
make install       # Install package
make test          # Run tests
make lint          # Run linting
make format        # Format code
make docs          # Build documentation
make docs-serve    # Serve documentation locally
make build         # Build package
make publish       # Publish to PyPI

🀝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Ways to Contribute

  • πŸ› Bug Reports: Found a bug? Please report it!
  • ✨ Feature Requests: Have an idea? We'd love to hear it!
  • πŸ“ Documentation: Help improve our docs
  • πŸ§ͺ Testing: Add tests or improve test coverage
  • πŸ’» Code: Submit pull requests for bug fixes or features

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

🏒 About UnicoLab.ai

GraphFlow is proudly developed by UnicoLab.ai

Empowering the future of intelligent data processing

UnicoLab.ai is a cutting-edge technology company specializing in intelligent data processing solutions and AI-powered automation. We're committed to building open-source tools that make complex data workflows accessible, scalable, and production-ready.

🌟 Our Mission

To democratize advanced data processing by creating intuitive, powerful frameworks that scale from prototype to production without complexity.

πŸš€ What We Build

  • Intelligent Data Pipelines: Self-managing, context-aware processing frameworks
  • AI-Powered Automation: Smart systems that adapt and optimize automatically
  • Production-Ready Tools: Enterprise-grade solutions with built-in monitoring and reliability
  • Open Source Innovation: Community-driven development with transparent, accessible technology

🀝 Connect With Us

πŸ’‘ Why We Built GraphFlow

GraphFlow represents our vision for the future of data processing: intelligent, self-managing pipelines that understand context, optimize automatically, and scale seamlessly. We believe that powerful data processing shouldn't require complex configuration or manual dependency management.

Join us in building the next generation of intelligent data tools.

πŸ™ Acknowledgments

GraphFlow builds on the excellent work of:

About

Intelligent Graph-Based Data Processing Framework with Automatic Context Management

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published