Intelligent Graph-Based Data Processing Framework with Automatic Context Management
Built with β€οΈ by UnicoLab.ai
π Website β’ π Documentation β’ π¬ Community β’ π Issues
GraphFlow is a production-ready Python framework designed for intelligent data processing pipelines that automatically manage context and dependencies while scaling from prototype to production without code changes.
π Ready for Production: Built-in caching, retries, monitoring, validation, and flexible execution backends make GraphFlow production-ready from day one.
- π― Zero-friction Development: Write pandas/polars code, get distributed execution
- π§ Automatic Context Resolution: Smart parameter detection and dependency injection based on graph analysis
- π Production-Grade: Built-in caching, retries, monitoring, and data quality validation
- β‘ Flexible Execution: Seamlessly choose between local, distributed, or cloud execution
- π Dynamic Graph Exports: Export and visualize pipelines in multiple formats (HTML, GraphViz, Mermaid, JSON, YAML)
- π§ Broad Applicability: Optimized for any data processing workflow - ETL, ML features, analytics, streaming
- π Content-Addressed Caching: Automatic incremental recomputation based on content hashes
- π Real-time Monitoring: Pipeline inspector with live analysis and performance profiling
- πΎ Memory Optimization: Built-in streaming, chunking, and memory management for big data processing
- π Auto-Discovery: Automatically discover and add nodes from modules or scopes
- β Pipeline Validation: Comprehensive validation with dependency checking and cycle detection
# Basic installation
pip install graphflow
# With all optional dependencies
pip install "graphflow[all]"
# For distributed execution
pip install "graphflow[distributed]"
# For cloud execution
pip install "graphflow[cloud]"from graphflow import Pipeline, node, context, dataset
import pandas as pd
# Create context - parameters automatically flow where needed
ctx = context(
lookback_days=30,
min_samples=100,
target_col="churn"
)
# Create pipeline
pipeline = Pipeline(
name="my_data_pipeline",
base_uri="s3://my-bucket/data/",
context=ctx
)
# Define processing node - context params auto-detected and injected!
@node(
inputs=[dataset("raw/customers")],
outputs=[dataset("processed/customer_features")]
)
def process_customers(df: pd.DataFrame,
lookback_days: int, # Auto-injected from context
min_samples: int) -> pd.DataFrame: # Auto-injected from context
# Context automatically provides lookback_days and min_samples
return df.groupby('customer_id').tail(lookback_days)
# Add to pipeline and run
pipeline.add_node(process_customers)
# Choose execution backend dynamically
result = pipeline.run(
executor="auto" # or "local", "ray", "cloud"
)
print(result.summary())
# Export pipeline graph
pipeline.export_graph(format="html", output="my_pipeline.html")from graphflow import Pipeline, node, Context, Dataset
import pandas as pd
# Create pipeline with memory optimization enabled
pipeline = Pipeline(
name="big_data_pipeline",
base_uri="./data",
context=Context({"chunk_size": 10000}),
memory_optimization_enabled=True,
chunk_size=10000,
max_memory_mb=1000 # Limit memory usage to 1GB
)
@node(
inputs=[Dataset("large_dataset")],
outputs=[Dataset("processed_data")]
)
def process_large_data(df: pd.DataFrame, chunk_size: int = 10000) -> pd.DataFrame:
"""Process large dataset in chunks for memory efficiency."""
result_chunks = []
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i + chunk_size].copy()
# Process chunk
chunk['processed'] = chunk['value'] * 2
result_chunks.append(chunk)
return pd.concat(result_chunks, ignore_index=True)
# Auto-discover nodes from current module
pipeline.auto_discover_nodes(globals())
# Run with memory optimization
result = pipeline.run()
# Check memory usage
memory_usage = pipeline.get_memory_usage()
print(f"Memory usage: {memory_usage['rss_mb']:.1f}MB")
# Clean up memory
pipeline.cleanup_memory()- No manual dependency wiring: Context parameters are automatically detected and injected
- Smart categorization: Parameters are automatically organized by type (data, processing, ML, infrastructure)
- Graph-aware: Context flows intelligently through the pipeline graph
- Auto-selection: Framework chooses the best executor for your workload
- Hybrid execution: Different nodes can run on different backends
- Scale transparently: Same code runs locally or on 1000+ node clusters
- Multiple formats: Export to HTML, GraphViz, Mermaid, JSON, or YAML
- Real-time inspection: Live pipeline analysis and filtering
- Rich metadata: Show context dependencies, cache status, and execution history
- Streaming Processing: Process large datasets in chunks without loading everything into memory
- Memory Management: Automatic garbage collection and memory usage monitoring
- Chunked Processing: Configurable chunk sizes for optimal memory usage
- Memory-Mapped Files: Efficient file-based data processing for very large datasets
- Auto-Discovery: Automatically discover and add nodes from modules or scopes
- Pipeline Validation: Comprehensive validation with dependency checking and cycle detection
- Getting Started - Installation and quick start guide
- Examples - Comprehensive examples and tutorials
- API Reference - Complete API documentation
- Best Practices - Guidelines for effective usage
- FAQ - Frequently asked questions
- Community - Contributing and community guidelines
- Live Documentation: https://unicolab.github.io/GraphFlow
- GitHub Repository: https://github.com/UnicoLab/GraphFlow
- PyPI Package: https://pypi.org/project/graphflow
Version: 0.1.0 (Development)
- Core Framework: Pipeline engine, context management, dataset abstraction
- Node System: Decorator-based node definition with automatic dependency detection
- Execution Backends: Local, thread pool, and process pool executors
- Context Management: Automatic parameter categorization and injection
- Graph Analysis: Pipeline inspection and dependency analysis
- Documentation: Comprehensive documentation with examples and API reference
- Testing: Unit tests with parallel execution support
- Development Tools: Makefile, pre-commit hooks, linting, formatting
- Distributed Executors: Ray and Dask integration β
- Advanced Caching: Content-addressed caching system β
- Data Validation: Schema validation and data quality checks β
- Graph Visualization: Dynamic graph export and visualization β
- Jupyter Notebooks: Interactive examples and tutorials β
- Enhanced CLI: Rich output and better error handling β
- Memory Optimization: Streaming processing and memory management β
- Auto-Discovery: Automatic node discovery from modules β
- Pipeline Validation: Comprehensive validation with dependency checking β
- Big Data Support: Chunked processing and memory-mapped files β
- Cloud Executors: Vertex AI, AWS Batch, Azure ML support
- Performance Profiling: Built-in performance monitoring
- Streaming Support: Real-time data processing capabilities
- v0.2.0: β Distributed execution, caching, validation, and visualization (Current)
- v0.3.0: Cloud backends and performance profiling
- v0.4.0: Streaming support and advanced ML features
- v1.0.0: Production-ready release with full feature set
# Clone the repository
git clone https://github.com/UnicoLab/GraphFlow.git
cd GraphFlow
# Install development dependencies
make install-dev
# Run tests
make test
# Build documentation
make docs
# Serve documentation locally
make docs-servemake help # Show all available commands
make install # Install package
make test # Run tests
make lint # Run linting
make format # Format code
make docs # Build documentation
make docs-serve # Serve documentation locally
make build # Build package
make publish # Publish to PyPIWe welcome contributions! Please see our Contributing Guide for details.
- π Bug Reports: Found a bug? Please report it!
- β¨ Feature Requests: Have an idea? We'd love to hear it!
- π Documentation: Help improve our docs
- π§ͺ Testing: Add tests or improve test coverage
- π» Code: Submit pull requests for bug fixes or features
This project is licensed under the MIT License - see the LICENSE file for details.
GraphFlow is proudly developed by UnicoLab.ai
Empowering the future of intelligent data processing
UnicoLab.ai is a cutting-edge technology company specializing in intelligent data processing solutions and AI-powered automation. We're committed to building open-source tools that make complex data workflows accessible, scalable, and production-ready.
To democratize advanced data processing by creating intuitive, powerful frameworks that scale from prototype to production without complexity.
- Intelligent Data Pipelines: Self-managing, context-aware processing frameworks
- AI-Powered Automation: Smart systems that adapt and optimize automatically
- Production-Ready Tools: Enterprise-grade solutions with built-in monitoring and reliability
- Open Source Innovation: Community-driven development with transparent, accessible technology
- π Website: unicolab.ai
- πΌ LinkedIn: UnicoLab.ai
- π¦ Twitter: @UnicoLabAI
- π§ Contact: contact@unicolab.ai
GraphFlow represents our vision for the future of data processing: intelligent, self-managing pipelines that understand context, optimize automatically, and scale seamlessly. We believe that powerful data processing shouldn't require complex configuration or manual dependency management.
Join us in building the next generation of intelligent data tools.
GraphFlow builds on the excellent work of: