A Python library for coordinating distributed task processing across multiple worker nodes with automatic load balancing, failover, and zero-downtime deployments.
JobSync coordinates multiple worker processes so each task is processed exactly once, even when workers come and go. It uses PostgreSQL for state management and provides:
- Automatic load balancing - Tasks distributed evenly across all active workers
- Zero-downtime deployments - Add/remove workers without stopping processing
- Automatic failover - When workers die, their tasks are automatically reassigned
- Task pinning - Lock specific tasks to specific workers (e.g., GPU tasks to GPU nodes)
- Health monitoring - Built-in health checks for load balancers
pip install jobsyncfrom jobsync import Job, Task, CoordinationConfig
# Configure coordination
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_' # Prefix for database tables
)
# Each worker runs this code
with Job('worker-01', coordination_config=coord_config) as job:
for item_id in get_pending_items():
task = Task(item_id)
# Only process if this task belongs to this worker
if job.can_claim_task(task):
job.add_task(task)
process_item(item_id)from jobsync import Job, CoordinationConfig
# For subscriptions, WebSockets, or continuous processing
def on_rebalance():
"""Called when cluster membership changes.
Re-evaluate which tasks this node should process and update
long-running connections or subscriptions accordingly.
"""
current_tokens = job.my_tokens
# Update your subscriptions/connections based on token ownership
logger.info(f'Rebalance: now own {len(current_tokens)} tokens')
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-01', coordination_config=coord_config,
on_rebalance=on_rebalance) as job:
# Callback notifies you of cluster changes
while not shutdown:
time.sleep(1)Run multiple workers - they automatically coordinate:
# Terminal 1
python worker.py --node-name worker-01
# Terminal 2
python worker.py --node-name worker-02
# Terminal 3
python worker.py --node-name worker-03Each worker processes different tasks - no duplicate work.
Token-Based Distribution: JobSync divides a pool of 10,000 tokens evenly across all active workers. Each task hashes consistently to exactly one token, so each worker knows which tasks it owns.
Leader-Based Coordination: The oldest worker becomes the leader and monitors cluster health. When workers join or leave, the leader redistributes tokens to maintain balance.
Consistent Hashing: Tasks always hash to the same token, so the same task always goes to the same worker (until the cluster changes).
Lifecycle:
- Workers register and send heartbeat every 5 seconds
- Leader is elected (oldest worker by registration time)
- Leader distributes tokens evenly across all workers
- Workers claim only tasks that hash to their tokens
- Leader monitors for dead workers and triggers rebalancing
- When leader dies, next oldest worker becomes leader
Configure database connection and coordination settings using CoordinationConfig:
from jobsync import CoordinationConfig
coord_config = CoordinationConfig(
host='localhost',
port=5432,
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_', # Prefix for database tables (e.g., myapp_node, myapp_token)
total_tokens=10000, # Optional: customize token pool size
hash_function='double_sha256' # Optional: 'md5', 'sha256', or 'double_sha256' (default, recommended)
)See Usage Guide for detailed configuration options and Operator Guide for tuning parameters.
from jobsync import Job, Task, CoordinationConfig
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-01', coordination_config=coord_config) as job:
for item_id in get_pending_items():
task = Task(item_id)
if job.can_claim_task(task):
job.add_task(task)
process_item(item_id)
job.write_audit()from jobsync import Job, Task, CoordinationConfig
def register_gpu_locks(job):
gpu_tasks = get_gpu_task_ids()
locks = [(task_id, '%gpu%', 'requires_gpu') for task_id in gpu_tasks]
job.register_locks_bulk(locks)
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-gpu-01', coordination_config=coord_config,
lock_provider=register_gpu_locks) as job:
for task_id in get_all_tasks():
if job.can_claim_task(Task(task_id)):
process_gpu_task(task_id)See Usage Guide for complete examples including WebSocket subscriptions, Kafka consumers, ETL pipelines, and more.
- Token-based distribution - 10,000 tokens evenly distributed across all active workers
- Consistent hashing - Each task always hashes to the same token
- Automatic rebalancing - When workers join/leave, tokens are redistributed with minimal movement
- Leader-based coordination - Oldest worker manages cluster state and triggers rebalancing
- Zero-downtime deployments - Add new workers, wait for tokens, then stop old workers
Lock specific tasks to specific workers using SQL LIKE patterns with ordered fallback support. Locks are registered by task_id and stored in the database. During distribution, each task_id is hashed to a token_id, and lock patterns determine which node receives that token.
from jobsync import Job, CoordinationConfig
def register_locks(job):
gpu_tasks = get_gpu_task_ids()
# Ordered fallback: try primary GPU first, then any GPU node
locks = [
(task_id, ['gpu-primary-01', '%-gpu'], 'requires_gpu')
for task_id in gpu_tasks
]
job.register_locks_bulk(locks)
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-gpu-01', coordination_config=coord_config,
lock_provider=register_locks,
clear_existing_locks=True) as job:
process_tasks(job)Use cases:
- Pin GPU tasks to GPU-enabled workers
- Route high-memory tasks to large-memory nodes
- Ensure data locality (tasks process data in same region)
- Resource-aware scheduling
See Usage Guide for complete lock API including pattern matching, fallback chains, expiration, and lifecycle management.
Health checks for load balancers:
from flask import Flask, jsonify
from jobsync import Job, CoordinationConfig
@app.route('/health/ready')
def health():
if job.am_i_healthy():
return jsonify({'status': 'ready'}), 200
return jsonify({'status': 'not_ready'}), 503Fine-tune coordination:
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_',
total_tokens=50000, # More tokens for large clusters
minimum_nodes=2, # Wait for 2 nodes before distribution
heartbeat_interval_sec=3, # Faster heartbeat
heartbeat_timeout_sec=9, # Quicker failure detection
rebalance_check_interval_sec=15 # More responsive rebalancing
)See Usage Guide and Operator Guide for tuning recommendations.
Usage Guide - Developer reference with examples, configuration, and patterns
Operator Guide - Deployment, monitoring, and troubleshooting
SQL Cheatsheet - Common monitoring queries
Check cluster health:
-- Quick health check
SELECT COUNT(*) as active_nodes FROM sync_node
WHERE last_heartbeat > NOW() - INTERVAL '15 seconds';See Operator Guide for metrics, alerts, and troubleshooting.
- Memory: <1 MB per worker
- Database load: <10 queries/minute per worker
- Scalability: Tested with 2-50 workers
- Rebalancing: Only ~14% of tokens move when worker fails
- Distribution speed: 100-500ms for 10,000 tokens
# Install test dependencies
pip install pytest pytest-cov
# Run all tests
pytest tests/
# Run with coverage
pytest --cov=jobsync tests/
# Run specific test suite
pytest tests/test_coordination.py -v- Python 3.10+
- PostgreSQL 12+
- Required packages: psycopg (psycopg3), sqlalchemy (installed automatically)
MIT
- Issues: https://github.com/bissli/jobsync/issues
- Documentation: See
docs/folder - Examples: See
tests/folder