Skip to content

bissli/jobsync

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

66 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

JobSync - Distributed Task Coordination

A Python library for coordinating distributed task processing across multiple worker nodes with automatic load balancing, failover, and zero-downtime deployments.

What It Does

JobSync coordinates multiple worker processes so each task is processed exactly once, even when workers come and go. It uses PostgreSQL for state management and provides:

  • Automatic load balancing - Tasks distributed evenly across all active workers
  • Zero-downtime deployments - Add/remove workers without stopping processing
  • Automatic failover - When workers die, their tasks are automatically reassigned
  • Task pinning - Lock specific tasks to specific workers (e.g., GPU tasks to GPU nodes)
  • Health monitoring - Built-in health checks for load balancers

Quick Start

Installation

pip install jobsync

Basic Example

from jobsync import Job, Task, CoordinationConfig

# Configure coordination
coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_'  # Prefix for database tables
)

# Each worker runs this code
with Job('worker-01', coordination_config=coord_config) as job:
    for item_id in get_pending_items():
        task = Task(item_id)
        
        # Only process if this task belongs to this worker
        if job.can_claim_task(task):
            job.add_task(task)
            process_item(item_id)

Long-Running Task Example

from jobsync import Job, CoordinationConfig

# For subscriptions, WebSockets, or continuous processing
def on_rebalance():
    """Called when cluster membership changes.
    
    Re-evaluate which tasks this node should process and update
    long-running connections or subscriptions accordingly.
    """
    current_tokens = job.my_tokens
    # Update your subscriptions/connections based on token ownership
    logger.info(f'Rebalance: now own {len(current_tokens)} tokens')

coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_'
)

with Job('worker-01', coordination_config=coord_config,
         on_rebalance=on_rebalance) as job:
    # Callback notifies you of cluster changes
    while not shutdown:
        time.sleep(1)

Run multiple workers - they automatically coordinate:

# Terminal 1
python worker.py --node-name worker-01

# Terminal 2  
python worker.py --node-name worker-02

# Terminal 3
python worker.py --node-name worker-03

Each worker processes different tasks - no duplicate work.

How It Works

Token-Based Distribution: JobSync divides a pool of 10,000 tokens evenly across all active workers. Each task hashes consistently to exactly one token, so each worker knows which tasks it owns.

Leader-Based Coordination: The oldest worker becomes the leader and monitors cluster health. When workers join or leave, the leader redistributes tokens to maintain balance.

Consistent Hashing: Tasks always hash to the same token, so the same task always goes to the same worker (until the cluster changes).

Lifecycle:

  1. Workers register and send heartbeat every 5 seconds
  2. Leader is elected (oldest worker by registration time)
  3. Leader distributes tokens evenly across all workers
  4. Workers claim only tasks that hash to their tokens
  5. Leader monitors for dead workers and triggers rebalancing
  6. When leader dies, next oldest worker becomes leader

Configuration

Configure database connection and coordination settings using CoordinationConfig:

from jobsync import CoordinationConfig

coord_config = CoordinationConfig(
    host='localhost',
    port=5432,
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_',      # Prefix for database tables (e.g., myapp_node, myapp_token)
    total_tokens=10000,    # Optional: customize token pool size
    hash_function='double_sha256'  # Optional: 'md5', 'sha256', or 'double_sha256' (default, recommended)
)

See Usage Guide for detailed configuration options and Operator Guide for tuning parameters.

Usage Examples

Basic Task Processing

from jobsync import Job, Task, CoordinationConfig

coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_'
)

with Job('worker-01', coordination_config=coord_config) as job:
    for item_id in get_pending_items():
        task = Task(item_id)
        if job.can_claim_task(task):
            job.add_task(task)
            process_item(item_id)
    job.write_audit()

Task Pinning (Lock GPU tasks to GPU workers)

from jobsync import Job, Task, CoordinationConfig

def register_gpu_locks(job):
    gpu_tasks = get_gpu_task_ids()
    locks = [(task_id, '%gpu%', 'requires_gpu') for task_id in gpu_tasks]
    job.register_locks_bulk(locks)

coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_'
)

with Job('worker-gpu-01', coordination_config=coord_config,
         lock_provider=register_gpu_locks) as job:
    for task_id in get_all_tasks():
        if job.can_claim_task(Task(task_id)):
            process_gpu_task(task_id)

See Usage Guide for complete examples including WebSocket subscriptions, Kafka consumers, ETL pipelines, and more.

Key Features

Automatic Load Balancing and Failover

  • Token-based distribution - 10,000 tokens evenly distributed across all active workers
  • Consistent hashing - Each task always hashes to the same token
  • Automatic rebalancing - When workers join/leave, tokens are redistributed with minimal movement
  • Leader-based coordination - Oldest worker manages cluster state and triggers rebalancing
  • Zero-downtime deployments - Add new workers, wait for tokens, then stop old workers

Task Locking and Pinning

Lock specific tasks to specific workers using SQL LIKE patterns with ordered fallback support. Locks are registered by task_id and stored in the database. During distribution, each task_id is hashed to a token_id, and lock patterns determine which node receives that token.

from jobsync import Job, CoordinationConfig

def register_locks(job):
    gpu_tasks = get_gpu_task_ids()
    
    # Ordered fallback: try primary GPU first, then any GPU node
    locks = [
        (task_id, ['gpu-primary-01', '%-gpu'], 'requires_gpu')
        for task_id in gpu_tasks
    ]
    job.register_locks_bulk(locks)

coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_'
)

with Job('worker-gpu-01', coordination_config=coord_config,
         lock_provider=register_locks,
         clear_existing_locks=True) as job:
    process_tasks(job)

Use cases:

  • Pin GPU tasks to GPU-enabled workers
  • Route high-memory tasks to large-memory nodes
  • Ensure data locality (tasks process data in same region)
  • Resource-aware scheduling

See Usage Guide for complete lock API including pattern matching, fallback chains, expiration, and lifecycle management.

Health Monitoring and Custom Configuration

Health checks for load balancers:

from flask import Flask, jsonify
from jobsync import Job, CoordinationConfig

@app.route('/health/ready')
def health():
    if job.am_i_healthy():
        return jsonify({'status': 'ready'}), 200
    return jsonify({'status': 'not_ready'}), 503

Fine-tune coordination:

coord_config = CoordinationConfig(
    host='localhost',
    dbname='jobsync',
    user='postgres',
    password='postgres',
    appname='myapp_',
    total_tokens=50000,                   # More tokens for large clusters
    minimum_nodes=2,                      # Wait for 2 nodes before distribution
    heartbeat_interval_sec=3,             # Faster heartbeat
    heartbeat_timeout_sec=9,              # Quicker failure detection
    rebalance_check_interval_sec=15       # More responsive rebalancing
)

See Usage Guide and Operator Guide for tuning recommendations.

Documentation

πŸ“š Documentation

Usage Guide - Developer reference with examples, configuration, and patterns

Operator Guide - Deployment, monitoring, and troubleshooting

SQL Cheatsheet - Common monitoring queries

Monitoring

Check cluster health:

-- Quick health check
SELECT COUNT(*) as active_nodes FROM sync_node
WHERE last_heartbeat > NOW() - INTERVAL '15 seconds';

See Operator Guide for metrics, alerts, and troubleshooting.

Performance

  • Memory: <1 MB per worker
  • Database load: <10 queries/minute per worker
  • Scalability: Tested with 2-50 workers
  • Rebalancing: Only ~14% of tokens move when worker fails
  • Distribution speed: 100-500ms for 10,000 tokens

Testing

# Install test dependencies
pip install pytest pytest-cov

# Run all tests
pytest tests/

# Run with coverage
pytest --cov=jobsync tests/

# Run specific test suite
pytest tests/test_coordination.py -v

Requirements

  • Python 3.10+
  • PostgreSQL 12+
  • Required packages: psycopg (psycopg3), sqlalchemy (installed automatically)

License

MIT

Support

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages