The Enterprise-Grade ML Pipeline Framework for Humans
FlowyML is the comprehensive ML pipeline framework that combines the simplicity of a Python script with the power of an enterprise MLOps platform.
| Feature | FlowyML | Traditional Orchestrators |
|---|---|---|
| Developer Experience | π Native Python - No DSLs, no YAML hell. | π Complex YAML or rigid DSLs. |
| Context Awareness | π§ Auto-Injection - Params are just function args. | π Manual wiring of every parameter. |
| Caching | β‘ Multi-Level - Smart content-hashing & memoization. | π’ Basic file-timestamp checking. |
| Asset Management | π¦ First-Class Assets - Models & Datasets with lineage. | π Generic file paths only. |
| Architecture | ποΈ Modular Stacks - Local, Cloud, Hybrid. | π Vendor lock-in or complex setup. |
| Deployment | π’ Local or Centralized - Run locally or deploy as a company-wide hub. | π§© Fragmented or cloud-only. |
| Flexibility | π Extensive Plugin Ecosystem | Fixed integration with specific orchestrators or custom tools to be developed. |
| Separation of Concerns | πͺΎ Steps Grouping, branching and conditions | Handling only orchestrator logic and task execution oriented. |
| Features Rich | π Built-in experiment tracking, model leaderboard, human-in-the-loop, notifications, scheduling | Very limited or none extra features. |
FlowyML is a complete toolkit for building, debugging, and deploying ML applications.
Write pipelines as standard Python functions. No YAML, no DSLs.
@step(outputs=["data"])
def load(): return [1, 2, 3]
@step(inputs=["data"], outputs=["model"])
def train(data): return Model.train(data)
# It's just Python!
pipeline = Pipeline("simple").add_step(load).add_step(train)
pipeline.run()Don't waste time re-running successful steps.
- Code Hash: Re-runs only when code changes.
- Input Hash: Re-runs only when data changes.
# Only re-runs if 'data' changes, ignoring code changes
@step(cache="input_hash", outputs=["processed"])
def expensive_processing(data):
return process(data)Trace token usage, latency, and costs automatically with built-in observability.
@step
@trace_llm(model="gpt-4", tags=["production"])
def generate_summary(text: str):
# flowyml automatically tracks:
# - Token usage (prompt/completion)
# - Cost estimation
# - Latency & Success/Failure rates
return openai.ChatCompletion.create(...)Group related steps to run in the same container - reduce overhead, maintain clarity, and keep logic separate from configuration.
# Run preprocessing steps in same container (shares resources)
@step(outputs=["raw"], execution_group="preprocessing")
def load(): return fetch_data()
@step(inputs=["raw"], outputs=["clean"], execution_group="preprocessing")
def clean(raw): return preprocess(raw)
# flowyml automatically aggregates resources (max CPU, memory, best GPU)
# and executes consecutively in same environmentBuild adaptive workflows with conditional logic.
# Run 'deploy' only if model accuracy > 0.9
pipeline.add_step(
If(condition=lambda ctx: ctx["accuracy"] > 0.9)
.then(deploy_model)
.else_(notify_team)
)Extend flowyml with any tool - even wrap ZenML components!
from flowyml.stacks.plugins import load_component
# Use any ZenML orchestrator, artifact store, or integration
k8s_orch = load_component("zenml:zenml.integrations.kubernetes.orchestrators.KubernetesOrchestrator")
mlflow = load_component("zenml:zenml.integrations.mlflow.MLflowExperimentTracker")Pause pipelines for manual approval or review.
from flowyml import approval
approval_step = approval(
name="approve_deployment",
approver="ml-team",
timeout_seconds=3600
)No external tools needed - tracking is built-in.
from flowyml.tracking import Experiment
exp = Experiment("baseline_training")
exp.log_run(run_id="run_001", metrics={"accuracy": 0.95}, parameters={"lr": 0.01})
best = exp.get_best_run("accuracy", maximize=True)Track, compare, and version your models.
from flowyml import ModelLeaderboard
from flowyml.core import Model
# Track performance
leaderboard = ModelLeaderboard(metric="accuracy")
leaderboard.add_score(model_name="bert-base", score=0.92)
# Register models with stages
model = Model.create(artifact={...})
model.register(name="text_classifier", stage="production")Schedule recurring jobs without external orchestrators.
from flowyml import PipelineScheduler
scheduler = PipelineScheduler()
scheduler.schedule_daily(
name="daily_training",
pipeline_func=lambda: pipeline.run(),
hour=2, minute=0
)
scheduler.start()Slack, Email, and custom alerts built-in.
from flowyml import configure_notifications
configure_notifications(
slack_webhook="https://hooks.slack.com/...",
email_config={...}
)
# Automatic notifications on pipeline success/failureSet breakpoints and inspect state mid-pipeline.
from flowyml import StepDebugger
debugger = StepDebugger()
debugger.set_breakpoint("train_model")
pipeline.run(debug=True) # Pauses at breakpointAssets are not just files; they are first-class citizens with lineage, metadata, and versioning.
from flowyml import Dataset, Model, Metrics, FeatureSet
# Assets track their producer, lineage, and metadata automatically
dataset = Dataset.create(data=df, name="training_data", metadata={"source": "s3"})
model = Model.create(artifact=trained_model, score=0.95)
metrics = Metrics.create(values={"accuracy": 0.95})Handle failures gracefully.
@step(retry=3, timeout=300)
def flaky_api_call():
return external_api.fetch()
# Circuit breakers prevent cascading failuresMonitor distribution shifts automatically.
from flowyml import detect_drift
drift = detect_drift(
reference_data=train_feature,
current_data=prod_feature,
threshold=0.1
)
if drift['drift_detected']:
trigger_retraining()- StepDebugger: Set breakpoints in your pipeline.
- Artifact Inspection: View intermediate data in the UI.
- Local Execution: Run the exact same code locally as in production.
- π Automatic Retries: Handle transient failures.
- β° Scheduling: Built-in cron scheduler.
- π Notifications: Slack/Email alerts.
- π‘οΈ Circuit Breakers: Stop cascading failures.
Ready for the enterprise. Run locally per project or deploy as a centralized entity for the company.
- Docker Ready: Backend and Frontend are fully dockerized.
- Centralized Hub: Share pipelines, artifacts, and experiments across the team.
- Remote Execution: Configure local clients to execute on the remote hub.
- ML Frameworks: PyTorch, TensorFlow, Keras, Scikit-learn, HuggingFace.
- Cloud Providers: AWS, GCP, Azure (via plugins).
- Tools: MLflow, Weights & Biases, Great Expectations.
FlowyML automatically captures and visualizes your model training progress with zero manual intervention.
from flowyml.integrations.keras import FlowymlKerasCallback
# Just add the callback - that's it!
callback = FlowymlKerasCallback(
experiment_name="my-experiment",
project="my-project"
)
model.fit(
x_train, y_train,
validation_data=(x_val, y_val),
epochs=50,
callbacks=[callback] # Auto-tracks all metrics!
)What gets captured automatically:
- β Loss (train & validation) per epoch
- β Accuracy (train & validation) per epoch
- β All custom metrics (F1, precision, recall, etc.)
- β Model architecture and parameters
- β Interactive charts in the UI
View beautiful training graphs in the UI:
- Navigate to your project's Structure tab
- Click on any model artifact
- See interactive loss/accuracy charts over epochs!
No external tools needed - all visualization built into FlowyML.
Built-in multi-tenancy for managing multiple teams and initiatives.
from flowyml import Project
project = Project("recommendation_system")
pipeline = project.create_pipeline("training")
# All runs, artifacts, and metadata are automatically scoped to the project
runs = project.list_runs()
stats = project.get_stats()Stop reinventing the wheel. Use pre-built templates for common ML patterns.
from flowyml.core.templates import create_from_template
# Create a standard training pipeline in one line
pipeline = create_from_template(
"ml_training",
data_loader=my_loader,
trainer=my_trainer,
evaluator=my_evaluator
)# Install core
pip install flowyml
# Install with UI support
pip install "flowyml[ui]"
# Install with all features (recommended for dev)
pip install "flowyml[all]"from flowyml import Pipeline, step, context, Dataset, Model
# 1. Define your configuration (Auto-injected!)
ctx = context(
learning_rate=0.01,
batch_size=32,
epochs=10
)
# 2. Define your steps (Pure Python)
@step(outputs=["dataset"])
def load_data(batch_size: int):
# 'batch_size' is automatically injected from context!
print(f"Loading data with batch size: {batch_size}")
return Dataset.create(data=[1, 2, 3], name="mnist")
@step(inputs=["dataset"], outputs=["model"])
def train(dataset: Dataset, learning_rate: float, epochs: int):
print(f"Training on {dataset.name} with lr={learning_rate}")
# Simulate training...
return Model.create(artifact={"weights": "..."}, score=0.98)
# 3. Run it!
pipeline = Pipeline("mnist_training", context=ctx)
pipeline.add_step(load_data)
pipeline.add_step(train)
result = pipeline.run()
print(f"Run ID: {result.run_id}")
print(f"Model Score: {result.outputs['model'].score}")Git-like versioning for pipelines. Track changes, compare, rollback.
from flowyml import VersionedPipeline
pipeline = VersionedPipeline("training", version="v1.0.0")
pipeline.add_step(load_data)
pipeline.save_version()
# ... make changes ...
pipeline.version = "v1.1.0"
pipeline.save_version()
# Compare versions to see exactly what changed (steps, code, context)
diff = pipeline.compare_with("v1.0.0")
print(diff["modified_steps"]) # ['train_model']
print(diff["context_changes"]) # {'learning_rate': {'old': 0.01, 'new': 0.001}}Visualize your workflows, inspect artifacts, and monitor runs in real-time.
# Start the UI server
flowyml ui start --open-browserVisit http://localhost:8080 to access the dashboard.
- Getting Started: Your first 5 minutes with flowyml.
- Core Concepts: Deep dive into Pipelines, Steps, and Context.
- Advanced Features: Learn about Caching, Parallelism, and Conditional Execution.
- API Reference: Detailed class and function documentation.
We love contributions! Check out our Contributing Guide to get started.
Apache 2.0 - See LICENSE for details.
Built with β€οΈ by UnicoLab