Skip to content

Conversation

@slin1237
Copy link
Collaborator

  • Add dedicated deleteTaskChan for delete tasks to ensure deletions
      are never blocked by downloads (even with 100 concurrent downloads)
  • Scout sends delete tasks to deleteChan instead of gopherChan
  • Gopher runs dedicated runDeleteWorker for immediate delete processing
  • Skip download if already seeding the model (prevents infinite loop
      when update events trigger DownloadOverride for already-downloaded models)

Add BitTorrent library dependency (anacrolix/torrent) and define
constants for P2P model distribution:
- Lease coordination constants (prefix, labels, durations)
- Default configuration values (ports, rates, timeouts)
- Environment variable keys for P2P configuration
Introduce the P2P distributor package with:
- Config struct with validation and defaults
- ConfigFromEnv for environment-based configuration
- Comprehensive Prometheus metrics for P2P operations
  - Download metrics (total, duration, failures)
  - Peer discovery and connection metrics
  - Lease and seeding metrics
  - Metainfo server metrics
Implement ModelDistributor for P2P model distribution:
- BitTorrent client management with rate limiting
- Peer discovery via Kubernetes headless service DNS
- Metainfo fetching from peers for torrent coordination
- Model seeding and download operations
- Active torrent tracking with proper cleanup

Fix API compatibility with anacrolix/torrent v1.57.1:
- Use bencode.Marshal instead of info.MarshalBencode
- Use t.Complete().Bool() instead of t.Complete.Bool
- Handle PeerRemoteAddr type assertion for peer addresses
Implement MetainfoServer to enable peers to discover available models:
- GET /metainfo/{modelHash} - serve torrent metainfo
- GET /health - health check endpoint
- GET /stats - P2P distribution statistics
- GET /models - list available models with seeding status
- Graceful shutdown support

Fix API compatibility with anacrolix/torrent v1.57.1:
- Add exists() helper function
- Use bencode.Marshal instead of info.MarshalBencode
Add comprehensive tests for the distributor package:
- Config validation tests (valid, missing fields, invalid ports)
- ConfigWithDefaults tests
- Metrics recording tests
- Stats struct tests
- Test helper functions for integration tests

Fix test config to include required LeaseDurationSeconds field.
Implement P2PLeaseManager for coordinating model downloads:
- Lease acquisition with expired lease takeover
- Lease renewal for long-running downloads
- Complete/release lifecycle management
- Ensures only one node downloads from HuggingFace

Tests cover:
- Lease acquisition (new, existing, expired)
- Lease expiration detection
- Lease name generation with hash truncation
- Renewal and holder verification
Integrate P2P model distribution into the Gopher download workflow:
- Add P2P fields to Gopher struct (distributor, lease manager, timeout)
- EnableP2P() and SetP2PTimeout() configuration methods
- computeModelHash() for consistent model identification
- downloadWithP2P() orchestrates P2P-first download strategy
- downloadWithLeaseHeld() handles HF download with lease coordination
- waitForP2PAvailability() with exponential backoff for waiting nodes
- startSeeding() begins seeding after successful download

Flow: Check P2P peers → Try P2P download → Acquire lease → HF download → Seed
Add deployment configuration for P2P model distribution:
- Headless Service for peer discovery via DNS
- DaemonSet with P2P-enabled model-agent
- Documentation with architecture overview and usage instructions
Integrate P2P model distribution into the model-agent entry point:
- Add P2P configuration fields (ports, rates, encryption, timeout)
- Read P2P settings from environment variables
- Create ModelDistributor when P2P_ENABLED=true
- Create P2PLeaseManager for download coordination
- Call gopher.EnableP2P() to activate P2P download flow
- Start MetainfoServer for peer discovery
- Add graceful shutdown for P2P resources

Environment variables:
- P2P_ENABLED: Enable/disable P2P (default: false)
- PEERS_SERVICE: Headless service DNS for peer discovery
- P2P_TORRENT_PORT: BitTorrent port (default: 6881)
- P2P_METAINFO_PORT: Metainfo HTTP port (default: 8081)
- P2P_MAX_DOWNLOAD_RATE: Max download rate in bytes/s
- P2P_MAX_UPLOAD_RATE: Max upload rate in bytes/s
- P2P_ENCRYPTION_ENABLED: Enable BitTorrent encryption
- P2P_DOWNLOAD_TIMEOUT: P2P download timeout in seconds
@github-actions github-actions bot added documentation Documentation changes model-agent Model agent changes tests Test changes config Configuration changes dependencies Dependency updates labels Dec 31, 2025
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the model-agent's efficiency and robustness by introducing a peer-to-peer model distribution system and optimizing task processing. The P2P mechanism leverages BitTorrent to enable nodes to share models directly, drastically reducing external download bandwidth and preventing rate limits. Concurrently, a dedicated deletion task pipeline ensures that cleanup operations are prioritized and executed without delay, improving overall system responsiveness and resource management.

Highlights

  • Dedicated Delete Channel: Introduced a dedicated channel and worker for deletion tasks, ensuring that model deletions are processed immediately and are never blocked by ongoing downloads, even under high concurrency.
  • Peer-to-Peer (P2P) Model Distribution: Implemented P2P model distribution using the BitTorrent protocol to efficiently transfer large model files between nodes within the Kubernetes cluster, reducing reliance on external sources like HuggingFace and mitigating rate limiting issues.
  • Intelligent Download Skipping: Added logic to skip redundant downloads if a model is already being seeded locally, preventing infinite download loops triggered by update events for already-present models.
  • Kubernetes Lease-based P2P Coordination: Integrated Kubernetes Leases to coordinate P2P downloads, ensuring only one node downloads a model from HuggingFace while others wait for P2P availability, and then seed the model to other nodes.
  • P2P Configuration and Observability: Added extensive P2P configuration options via environment variables and introduced Prometheus metrics for monitoring P2P download activity, peer discovery, and seeding status.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a peer-to-peer (P2P) model distribution system using BitTorrent, which is a significant feature enhancement. The goal is to reduce load on HuggingFace and improve download times within a cluster. Key changes include a new distributor package for BitTorrent logic, Kubernetes lease management for download coordination, and a dedicated channel for delete tasks to improve responsiveness. My review focuses on the new P2P logic, configuration handling, and potential correctness issues. I've provided suggestions to improve configuration parsing, strengthen hash usage to prevent collisions, and refactor code for better maintainability.

Comment on lines +49 to +51
if len(modelHash) > 16 {
modelHash = modelHash[:16]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Truncating the SHA256 hash to the first 16 characters significantly increases the risk of hash collisions, especially in a system managing a large number of models. A collision would cause two different models to contend for the same lease, leading to incorrect behavior. Kubernetes resource names can be up to 253 characters, so a much longer portion of the hash (e.g., 32 or even the full 64 characters) can be used to minimize this risk.

Suggested change
if len(modelHash) > 16 {
modelHash = modelHash[:16]
}
if len(modelHash) > 50 {
modelHash = modelHash[:50]
}

Comment on lines +120 to +127
func getEnvInt(key string, defaultVal int) int {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of fmt.Sscanf here can be brittle. For example, if the environment variable is "123foo", Sscanf will successfully parse 123 and return no error, which might not be the desired behavior. Using strconv.Atoi is more idiomatic and provides stricter parsing. It would also be clearer to parse into a new variable rather than overwriting defaultVal. This feedback also applies to getEnvInt64.

Suggested change
func getEnvInt(key string, defaultVal int) int {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
func getEnvInt(key string, defaultVal int) int {
if valStr := os.Getenv(key); valStr != "" {
if i, err := strconv.Atoi(valStr); err == nil {
return i
}
}
return defaultVal
}

Comment on lines +130 to 137
func getEnvInt64(key string, defaultVal int64) int64 {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to getEnvInt, using strconv.ParseInt would be more robust and idiomatic for parsing the int64 value from the environment variable compared to fmt.Sscanf.

Suggested change
func getEnvInt64(key string, defaultVal int64) int64 {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
func getEnvInt64(key string, defaultVal int64) int64 {
if valStr := os.Getenv(key); valStr != "" {
if i, err := strconv.ParseInt(valStr, 10, 64); err == nil {
return i
}
}
return defaultVal
}

Comment on lines +389 to +455
var p2pDistributor *distributor.ModelDistributor
var metainfoServer *distributor.MetainfoServer
if cfg.p2pEnabled {
logger.Info("P2P model distribution is enabled, initializing...")

// Get POD_NAME and POD_IP for P2P peer identification
podName := os.Getenv("POD_NAME")
if podName == "" {
logger.Warn("POD_NAME not set, P2P coordination may not work correctly")
}
podIP := os.Getenv("POD_IP")
if podIP == "" {
logger.Warn("POD_IP not set, P2P peer discovery may not work correctly")
}

// Create distributor configuration
distCfg := distributor.Config{
DataDir: cfg.modelsRootDir,
PodName: podName,
PodIP: podIP,
PeersService: cfg.p2pPeersService,
TorrentPort: cfg.p2pTorrentPort,
MetainfoPort: cfg.p2pMetainfoPort,
MaxDownloadRate: cfg.p2pMaxDownloadRate,
MaxUploadRate: cfg.p2pMaxUploadRate,
EnableEncryption: cfg.p2pEnableEncryption,
RequireEncryption: cfg.p2pRequireEncryption,
Namespace: cfg.namespace,
LeaseDurationSeconds: 120, // 2 minutes
LeaseRenewIntervalSeconds: 30, // renew every 30 seconds
P2PTimeoutSeconds: int(cfg.p2pDownloadTimeout.Seconds()),
EnableP2P: true,
}

// Create the P2P distributor
p2pDistributor, err = distributor.New(distCfg, logger)
if err != nil {
logger.Errorf("Failed to create P2P distributor: %v", err)
logger.Warn("Continuing without P2P support")
} else {
// Create lease manager for P2P coordination
leaseManager := modelagent.NewP2PLeaseManager(kubeClient, cfg.namespace, cfg.nodeName, logger)

// Enable P2P on the gopher
gopher.EnableP2P(p2pDistributor, leaseManager)
gopher.SetP2PTimeout(cfg.p2pDownloadTimeout)

// Create and start metainfo server
metainfoServer = distributor.NewMetainfoServer(
cfg.modelsRootDir,
cfg.p2pMetainfoPort,
p2pDistributor,
logger,
)

go func() {
logger.Infof("Starting P2P metainfo server on port %d", cfg.p2pMetainfoPort)
if err := metainfoServer.ServeWithContext(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Metainfo server error: %v", err)
}
}()

logger.Info("P2P model distribution initialized successfully")
}
} else {
logger.Info("P2P model distribution is disabled")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for P2P initialization is quite long and makes the runCommand function harder to read. Consider extracting this logic into a separate helper function, for example initializeP2PComponents, to improve modularity and readability.

Comment on lines +80 to +109
func ConfigFromEnv() Config {
cfg := DefaultConfig()

if v := os.Getenv("MODEL_DIR"); v != "" {
cfg.DataDir = v
}
if v := os.Getenv("POD_NAMESPACE"); v != "" {
cfg.Namespace = v
}
if v := os.Getenv("POD_NAME"); v != "" {
cfg.PodName = v
}
if v := os.Getenv("POD_IP"); v != "" {
cfg.PodIP = v
}
if v := os.Getenv("PEERS_SERVICE"); v != "" {
cfg.PeersService = v
}
if v := os.Getenv("P2P_ENABLED"); v == "false" {
cfg.EnableP2P = false
}
if v := os.Getenv("P2P_ENCRYPTION_ENABLED"); v == "true" {
cfg.EnableEncryption = true
}
if v := os.Getenv("P2P_ENCRYPTION_REQUIRED"); v == "true" {
cfg.RequireEncryption = true
}

return cfg
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function ConfigFromEnv appears to be unused. The configuration for the P2P distributor is constructed manually in cmd/model-agent/main.go. This function seems to be dead code and should either be used to centralize configuration logic or be removed to avoid confusion. The same applies to the WithDefaults function.

// createMetainfo builds a torrent metainfo for the given path.
func (d *ModelDistributor) createMetainfo(path, name string) (*metainfo.MetaInfo, error) {
info := metainfo.Info{
PieceLength: 4 * 1024 * 1024, // 4MB pieces
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The BitTorrent piece length is hardcoded to 4MB. While this is a reasonable default, it may not be optimal for all scenarios. For very large models (e.g., terabytes), a larger piece size (like 16MB or 32MB) can improve efficiency by reducing the size of the .torrent metainfo file. Consider making this value configurable in the distributor.Config to allow for tuning.

Comment on lines +112 to +117
relPath, err := filepath.Rel(absBase, modelPath)
if err != nil || strings.HasPrefix(relPath, "..") || relPath == "." {
s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash)
http.Error(w, "Invalid model path", http.StatusBadRequest)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The path traversal check is good, but the condition relPath == "." is likely redundant. filepath.Clean(modelHash) is called on line 76, and the check modelHash == "." on line 78 already prevents this case. You can simplify this check.

Suggested change
relPath, err := filepath.Rel(absBase, modelPath)
if err != nil || strings.HasPrefix(relPath, "..") || relPath == "." {
s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash)
http.Error(w, "Invalid model path", http.StatusBadRequest)
return
}
relPath, err := filepath.Rel(absBase, modelPath)
if err != nil || strings.HasPrefix(relPath, "..") {
s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash)
http.Error(w, "Invalid model path", http.StatusBadRequest)
return
}

@slin1237 slin1237 force-pushed the feature/p2p-model-distribution-n/10 branch 6 times, most recently from 6e44c5b to 953287c Compare December 31, 2025 21:02
- Add dedicated deleteTaskChan for delete tasks to ensure deletions
  are never blocked by downloads (even with 100 concurrent downloads)
- Scout sends delete tasks to deleteChan instead of gopherChan
- Gopher runs dedicated runDeleteWorker for immediate delete processing
- Skip download if already seeding the model (prevents infinite loop
  when update events trigger DownloadOverride for already-downloaded models)
@slin1237 slin1237 force-pushed the feature/p2p-model-distribution-n/10 branch from 953287c to 1bcb721 Compare December 31, 2025 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

config Configuration changes dependencies Dependency updates documentation Documentation changes model-agent Model agent changes tests Test changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants