A minimal framework for building distributed AI pipelines. Write async functions, compose them with .go(), and scale them across clusters.
- Dynamic execution – Submit tasks with
.go()and let the scheduler handle parallelism automatically - Transparent distribution –
@servicemarks classes for distributed execution - Automatic parallelism – Independent tasks run in parallel without extra code
- Built-in profiling – Automatic dependency tracking and performance metrics
- Minimal – Clean API with no complex schedulers or DSLs
pip install thinkagain
# or with uv
uv add thinkagainimport thinkagain as ta
@ta.op
async def add_one(x: int) -> int:
return x + 1
@ta.op
async def multiply(x: int, factor: int) -> int:
return x * factor
async def pipeline(x: int) -> int:
# Submit tasks immediately (non-blocking)
x_ref = add_one.go(x)
result_ref = multiply.go(x_ref, 3)
# Wait for result
return await result_ref
result = await pipeline(5) # Returns 18import thinkagain as ta
# CPU-only service
@ta.service()
class Retriever:
async def retrieve(self, query: str) -> list[str]:
return ["doc1", "doc2", "doc3"]
# GPU service
@ta.service(gpus=4)
class Generator:
def __init__(self):
self.model = load_llm()
async def generate(self, query: str, docs: list[str]) -> str:
return f"Answer based on {len(docs)} documents"
# Define cluster resources
mesh = ta.Mesh([
ta.MeshNode("server1", gpus=8, cpus=32),
ta.MeshNode("server2", gpus=8, cpus=32),
])
# Create service handles
retriever = Retriever.init()
generator = Generator.init()
async def rag_pipeline(query: str) -> str:
docs_ref = retriever.retrieve.go(query)
return await generator.generate.go(query, docs_ref)
# Execute with mesh
with mesh:
result = await rag_pipeline("What is ML?")
print(result)@op– Decorator for async functions@service– Decorator for distributed classes.go()– Submit work and return ObjectRef futureMesh– Define cluster resources (GPUs, CPUs)ObjectRef– Future that can be awaited or passed to other calls
@ta.op– Mark async functions for dynamic composition@ta.service(gpus=None)– Mark classes for distributed execution
fn.go(*args)– Submit call and return ObjectRefawait object_ref– Wait for result
ta.Mesh(devices)– Define cluster topologyta.GpuDevice(id)– GPU deviceta.CpuDevice(count)– CPU resourcesta.devices()– Auto-detect GPUs
ta.profile()– Context manager for profilingta.get_profiler()– Access profiler instance
See examples/ for working demos:
demo.py– Core API with dynamic executiondistributed_example.py– Distributed execution with services
# Install dependencies
uv sync
# Run tests
uv run pytest
# Run examples
uv run python examples/demo.pyApache 2.0 – see LICENSE