Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c59a017
Add Volume Manager integration
mateus-cardoso-reef Jul 14, 2025
de97e93
Add tests for Volume Manager integration
mateus-cardoso-reef Jul 14, 2025
6634216
Add documentation for Custom Volume Managers in README.md
mateus-cardoso-reef Jul 14, 2025
491bdc3
Refactor job metadata preparation in JobRunner to use full_job_reques…
mateus-cardoso-reef Jul 14, 2025
4b17de6
Remove httpx dependency from pyproject.toml and uv.lock files
mateus-cardoso-reef Jul 14, 2025
5014b8f
Update job metadata structure in README.md to include new fields for …
mateus-cardoso-reef Jul 14, 2025
88890a9
Update environment variable names for Volume Manager in README.md and…
mateus-cardoso-reef Jul 16, 2025
2beb94d
Add new test for Huggingface model and volume manager integration
mateus-cardoso-reef Jul 21, 2025
244455c
ruff check & format changes
mateus-cardoso-reef Jul 21, 2025
3273301
Enhance Volume Manager integration.
mateus-cardoso-reef Jul 31, 2025
9df7f24
add type ignore
mateus-cardoso-reef Jul 31, 2025
4eb46c0
Testing Volume Manager CI integration
mateus-cardoso-reef Aug 6, 2025
0fa92c1
ci test
mateus-cardoso-reef Aug 6, 2025
d589619
quick test
mateus-cardoso-reef Aug 6, 2025
c132b09
uncomment changes
mateus-cardoso-reef Aug 6, 2025
fcb27e7
Lint and docs
mateus-cardoso-reef Aug 6, 2025
eb6e13b
comment volume manager var
mateus-cardoso-reef Aug 6, 2025
e295996
last test
mateus-cardoso-reef Aug 6, 2025
cf50948
quick CI test
mateus-cardoso-reef Aug 6, 2025
bf9d7b9
Final Ci test for Volume Manager Integration
mateus-cardoso-reef Aug 6, 2025
c4c1711
Final changes
mateus-cardoso-reef Aug 6, 2025
cc1e665
Final CI changes
mateus-cardoso-reef Aug 6, 2025
2215139
refactor CI test
mateus-cardoso-reef Aug 6, 2025
6aa8eae
Set up CI for Volume Manager integration
mateus-cardoso-reef Aug 6, 2025
4b2f394
CI final changes for Volume Manager Integration
mateus-cardoso-reef Aug 6, 2025
ff0cad6
Merge pull request #1 from mateus-cardoso-reef/ci_test
mateus-cardoso-reef Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/volume_manager_ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Run Volume Manager Integration Tests

on:
push:
branches: [master, main]
pull_request:

env:
PYTHON_DEFAULT_VERSION: "3.11"

jobs:
volume-manager-test:
timeout-minutes: 10
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Set up Python ${{ env.PYTHON_DEFAULT_VERSION }}
uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_DEFAULT_VERSION }}
cache: "pip"

- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.6.x"
enable-cache: true

- name: Set VOLUME_MANAGER_ADDRESS env variable
run: |
sed -i 's/# update_env_var "VOLUME_MANAGER_ADDRESS"/update_env_var "VOLUME_MANAGER_ADDRESS"/' local_stack/prepare.sh

- name: Start all services
run: local_stack/run_and_await_readiness.sh /tmp/volume_manager_test_logs/

- name: Test volume manager integration
working-directory: ./examples/volume_manager
run: ./run_tests.sh -i -d

- name: Upload volume manager test logs
uses: actions/upload-artifact@v4
if: always()
with:
name: volume-manager-test-logs
path: /tmp/volume_manager_test_logs/*.log
105 changes: 105 additions & 0 deletions compute_horde_sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,111 @@ The SDK includes an optional `FallbackClient` that mirrors the standard `Compute

See the [Fallback docs](https://sdk.computehorde.io/master/fallback.html) for usage examples and limitations.

## 🔧 Custom Volume Managers

By default, executors download all input volumes before running a job. Miners can implement their own volume manager to handle volume preparation more efficiently (e.g., caching, custom storage strategies).

### How It Works

When an executor has a volume manager configured, it delegates volume preparation to the volume manager instead of downloading volumes directly:

1. Executor sends volume specifications to volume manager
2. Volume manager returns Docker mount options
3. Executor uses those mounts when running the job container
4. Executor notifies volume manager when job completes

### Configuration

Set the volume manager address as an environment variable:

```bash
export VOLUME_MANAGER_ADDRESS="http://localhost:8080"
```

You can also add custom headers for authentication or any other purpose. Any environment variable starting with `VOLUME_MANAGER_HEADER_` will be added as a header to each request sent to the volume manager.

**Examples:**

```bash
# Authentication header
export VOLUME_MANAGER_HEADER_AUTHORIZATION='Bearer tokentokentoken'

# Custom API key
export VOLUME_MANAGER_HEADER_X_API_KEY='your-api-key-here'

# Custom metadata
export VOLUME_MANAGER_HEADER_X_CUSTOM_METADATA='project:compute-horde'
```
**Local development**: The local stack setup script [`prepare.sh`](https://github.com/backend-developers-ltd/ComputeHorde/blob/master/local_stack/prepare.sh) includes a commented volume manager configuration. To test volume manager functionality, uncomment the section and run [`send_hello_world_job.py`](https://github.com/backend-developers-ltd/ComputeHorde/blob/master/local_stack/send_hello_world_job.py) with the `--test-volume` flag.

### Volume Manager API

Your volume manager must implement these endpoints:

#### `POST /prepare_volume`

**Request:**
```json
{
"job_uuid": "7b522daa-e807-4094-8d96-99b9a863f960",
"volume": {
"volume_type": "huggingface_volume",
"repo_id": "example/model",
"revision": "main",
"relative_path": "models",
"usage_type": "reusable"
},
"job_metadata": {
"message_type": "V0JobRequest",
"job_uuid": "7b522daa-e807-4094-8d96-99b9a863f960",
"executor_class": "always_on__llm__a6000",
"docker_image": "example/image:latest",
"raw_script": null,
"docker_run_options_preset": "nvidia_all",
"docker_run_cmd": ["python", "main.py"],
"volume": null,
"output_upload": {
"output_upload_type": "single_file_put",
"relative_path": "results.json",
"url": "https://s3.example.com/results.json"
},
"artifacts_dir": "/artifacts"
}
}
```

**Note:** Large volume downloads may take considerable time, so volume managers should not implement self-timeout mechanisms during volume preparation as the system already handles timeouts. For the same reason, the manager service should handle executor disconnections gracefully.

**Response:**
```json
{
"mounts": [
["-v", "/host/path/to/cached/model:/volume/models"],
["--tmpfs", "/tmp/cache:size=100m"],
["-v", "my_named_volume:/data", "--volume-driver", "local"],
["--device", "/dev/gpu0:/dev/gpu0:rwm"]
]
}
```

#### `POST /job_finished`

**Request:**
```json
{
"job_uuid": "7b522daa-e807-4094-8d96-99b9a863f960"
}
```

**Response:** `200 OK` (no body required)

### Volume Usage Hints

The `usage_type` field helps volume managers optimize storage:

- `single_use`: Volume is used once and can be cleaned up immediately
- `reusable`: Volume may be used multiple times (cache aggressively)

## Versioning

This package uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
Expand Down
6 changes: 6 additions & 0 deletions compute_horde_sdk/src/compute_horde_core/volume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@
VolumeDownloadFailed,
ZipUrlVolumeDownloader,
)
from ._manager import (
VolumeManagerClient,
VolumeManagerError,
create_volume_manager_client,
get_volume_manager_headers,
)
from ._models import *
170 changes: 170 additions & 0 deletions compute_horde_sdk/src/compute_horde_core/volume/_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from __future__ import annotations

import json
import logging
import os
from typing import Any

import httpx
import tenacity

from ._models import Volume

logger = logging.getLogger(__name__)


def get_volume_manager_headers() -> dict[str, str]:
"""Extract volume manager headers from environment variables."""
headers = {}
prefix = "COMPUTE_HORDE_VOLUME_MANAGER_HEADER_"
for key, value in os.environ.items():
if key.startswith(prefix):
header_name = key[len(prefix) :]
headers[header_name] = value
return headers


class VolumeManagerError(Exception):
"""Exception raised when volume manager operations fail."""

def __init__(self, description: str, error_detail: str | None = None) -> None:
self.description = description
self.error_detail = error_detail

def __str__(self) -> str:
if self.error_detail is not None:
return f"{self.description}: {self.error_detail}"
return self.description


class VolumeManagerClient:
"""Client for communicating with the Volume Manager service."""

def __init__(self, base_url: str, headers: dict[str, str] | None = None):
self.base_url = base_url.rstrip("/")
self.headers = headers or {}
self._client: httpx.AsyncClient | None = None

def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient()
return self._client

async def close(self) -> None:
"""Close the HTTP client."""
if self._client is not None:
await self._client.aclose()
self._client = None

async def prepare_volume(self, job_uuid: str, volume: Volume, job_metadata: dict[str, Any]) -> list[list[str]]:
"""
Request the volume manager to prepare a volume for the job.

Args:
job_uuid: Unique identifier for the job
volume: Volume specification to prepare
job_metadata: Additional metadata about the job

Returns:
List of mount flag lists for Docker

Raises:
VolumeManagerError: If the request fails

"""
url = f"{self.base_url}/prepare_volume"
volume_data = volume.model_dump()
payload = {"job_uuid": job_uuid, "volume": volume_data, "job_metadata": job_metadata}

response_data = await self._make_request(url, payload, "prepare_volume")

return response_data["mounts"]

async def job_finished(self, job_uuid: str) -> None:
"""
Notify the volume manager that a job has finished.

Args:
job_uuid: Unique identifier for the finished job

Raises:
VolumeManagerError: If the notification fails

"""
url = f"{self.base_url}/job_finished"
payload = {"job_uuid": job_uuid}

await self._make_request(url, payload, "job_finished")

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_fixed(1),
retry=tenacity.retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
)
async def _make_request(self, url: str, payload: dict[str, Any], operation: str) -> dict[str, list[list[str]]]:
"""
Make a POST request to the Volume Manager with standardized error handling.

Args:
url: The endpoint URL
payload: The JSON payload to send
operation: Operation name for error messages

Returns:
Mount options for the job

Raises:
VolumeManagerError: If the request fails or response is invalid

"""
logger.debug(f"Making {operation} request to Volume Manager at {url}")

try:
client = self._get_client()
response = await client.post(
url,
json=payload,
headers=self.headers,
timeout=httpx.Timeout(connect=30.0, read=None, write=None, pool=None),
)
response.raise_for_status()

logger.debug(f"Volume Manager {operation} response status: {response.status_code}")
logger.debug(f"Volume Manager {operation} response: {response.text}")

return response.json() # type: ignore

except httpx.HTTPStatusError as e:
# Handle non-retryable status codes
error_msg = f"Volume Manager {operation} returned status {e.response.status_code}"
error_detail = None

try:
error_data = e.response.json()
if "error" in error_data:
error_detail = error_data["error"]
except (json.JSONDecodeError, KeyError):
error_detail = e.response.text

raise VolumeManagerError(error_msg, error_detail=error_detail)
except httpx.RequestError as e:
raise VolumeManagerError(f"Network error during Volume Manager {operation}: {e}", error_detail=str(e))
except json.JSONDecodeError as e:
raise VolumeManagerError(f"Invalid JSON response from Volume Manager {operation}: {e}", error_detail=str(e))


def create_volume_manager_client(base_url: str, headers: dict[str, str] | None = None) -> VolumeManagerClient:
"""
Create a Volume Manager client with the specified configuration.

Args:
base_url: The base URL of the Volume Manager service
headers: Optional headers to include in requests

Returns:
A configured VolumeManagerClient instance

"""
logger.debug(f"Volume manager created at {base_url}")
return VolumeManagerClient(base_url=base_url, headers=headers)
14 changes: 14 additions & 0 deletions compute_horde_sdk/src/compute_horde_core/volume/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ def __str__(self) -> str:
return str.__str__(self)


class VolumeUsageType(str, enum.Enum):
"""Hint for volume managers about expected usage patterns."""

single_use = "single_use" # Volume is expected to be used just once
reusable = "reusable" # Volume is expected to be used many times

def __str__(self) -> str:
return str.__str__(self)


class HuggingfaceVolume(pydantic.BaseModel):
volume_type: Literal[VolumeType.huggingface_volume] = VolumeType.huggingface_volume
repo_id: str
Expand All @@ -32,6 +42,7 @@ class HuggingfaceVolume(pydantic.BaseModel):
# If provided, only files matching at least one pattern are downloaded.
allow_patterns: str | list[str] | None = None
token: str | None = None
usage_type: VolumeUsageType | None = None

def is_safe(self) -> bool:
return True
Expand All @@ -41,6 +52,7 @@ class InlineVolume(pydantic.BaseModel):
volume_type: Literal[VolumeType.inline] = VolumeType.inline
contents: str = pydantic.Field(repr=False)
relative_path: str | None = None
usage_type: VolumeUsageType | None = None

def is_safe(self) -> bool:
return True
Expand All @@ -50,6 +62,7 @@ class ZipUrlVolume(pydantic.BaseModel):
volume_type: Literal[VolumeType.zip_url] = VolumeType.zip_url
contents: str # backwards compatibility - this is the URL
relative_path: str | None = Field(default=None)
usage_type: VolumeUsageType | None = None

def is_safe(self) -> bool:
domain = urlparse(self.contents).netloc
Expand All @@ -62,6 +75,7 @@ class SingleFileVolume(pydantic.BaseModel):
volume_type: Literal[VolumeType.single_file] = VolumeType.single_file
url: str
relative_path: str
usage_type: VolumeUsageType | None = None

def is_safe(self) -> bool:
domain = urlparse(self.url).netloc
Expand Down
Loading