-
Notifications
You must be signed in to change notification settings - Fork 57
fix(p2p): add dedicated delete channel and skip redundant downloads #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add BitTorrent library dependency (anacrolix/torrent) and define constants for P2P model distribution: - Lease coordination constants (prefix, labels, durations) - Default configuration values (ports, rates, timeouts) - Environment variable keys for P2P configuration
Introduce the P2P distributor package with: - Config struct with validation and defaults - ConfigFromEnv for environment-based configuration - Comprehensive Prometheus metrics for P2P operations - Download metrics (total, duration, failures) - Peer discovery and connection metrics - Lease and seeding metrics - Metainfo server metrics
Implement ModelDistributor for P2P model distribution: - BitTorrent client management with rate limiting - Peer discovery via Kubernetes headless service DNS - Metainfo fetching from peers for torrent coordination - Model seeding and download operations - Active torrent tracking with proper cleanup Fix API compatibility with anacrolix/torrent v1.57.1: - Use bencode.Marshal instead of info.MarshalBencode - Use t.Complete().Bool() instead of t.Complete.Bool - Handle PeerRemoteAddr type assertion for peer addresses
Implement MetainfoServer to enable peers to discover available models:
- GET /metainfo/{modelHash} - serve torrent metainfo
- GET /health - health check endpoint
- GET /stats - P2P distribution statistics
- GET /models - list available models with seeding status
- Graceful shutdown support
Fix API compatibility with anacrolix/torrent v1.57.1:
- Add exists() helper function
- Use bencode.Marshal instead of info.MarshalBencode
Add comprehensive tests for the distributor package: - Config validation tests (valid, missing fields, invalid ports) - ConfigWithDefaults tests - Metrics recording tests - Stats struct tests - Test helper functions for integration tests Fix test config to include required LeaseDurationSeconds field.
Implement P2PLeaseManager for coordinating model downloads: - Lease acquisition with expired lease takeover - Lease renewal for long-running downloads - Complete/release lifecycle management - Ensures only one node downloads from HuggingFace Tests cover: - Lease acquisition (new, existing, expired) - Lease expiration detection - Lease name generation with hash truncation - Renewal and holder verification
Integrate P2P model distribution into the Gopher download workflow: - Add P2P fields to Gopher struct (distributor, lease manager, timeout) - EnableP2P() and SetP2PTimeout() configuration methods - computeModelHash() for consistent model identification - downloadWithP2P() orchestrates P2P-first download strategy - downloadWithLeaseHeld() handles HF download with lease coordination - waitForP2PAvailability() with exponential backoff for waiting nodes - startSeeding() begins seeding after successful download Flow: Check P2P peers → Try P2P download → Acquire lease → HF download → Seed
Add deployment configuration for P2P model distribution: - Headless Service for peer discovery via DNS - DaemonSet with P2P-enabled model-agent - Documentation with architecture overview and usage instructions
Integrate P2P model distribution into the model-agent entry point: - Add P2P configuration fields (ports, rates, encryption, timeout) - Read P2P settings from environment variables - Create ModelDistributor when P2P_ENABLED=true - Create P2PLeaseManager for download coordination - Call gopher.EnableP2P() to activate P2P download flow - Start MetainfoServer for peer discovery - Add graceful shutdown for P2P resources Environment variables: - P2P_ENABLED: Enable/disable P2P (default: false) - PEERS_SERVICE: Headless service DNS for peer discovery - P2P_TORRENT_PORT: BitTorrent port (default: 6881) - P2P_METAINFO_PORT: Metainfo HTTP port (default: 8081) - P2P_MAX_DOWNLOAD_RATE: Max download rate in bytes/s - P2P_MAX_UPLOAD_RATE: Max upload rate in bytes/s - P2P_ENCRYPTION_ENABLED: Enable BitTorrent encryption - P2P_DOWNLOAD_TIMEOUT: P2P download timeout in seconds
Summary of ChangesHello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the model-agent's efficiency and robustness by introducing a peer-to-peer model distribution system and optimizing task processing. The P2P mechanism leverages BitTorrent to enable nodes to share models directly, drastically reducing external download bandwidth and preventing rate limits. Concurrently, a dedicated deletion task pipeline ensures that cleanup operations are prioritized and executed without delay, improving overall system responsiveness and resource management. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a peer-to-peer (P2P) model distribution system using BitTorrent, which is a significant feature enhancement. The goal is to reduce load on HuggingFace and improve download times within a cluster. Key changes include a new distributor package for BitTorrent logic, Kubernetes lease management for download coordination, and a dedicated channel for delete tasks to improve responsiveness. My review focuses on the new P2P logic, configuration handling, and potential correctness issues. I've provided suggestions to improve configuration parsing, strengthen hash usage to prevent collisions, and refactor code for better maintainability.
| if len(modelHash) > 16 { | ||
| modelHash = modelHash[:16] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Truncating the SHA256 hash to the first 16 characters significantly increases the risk of hash collisions, especially in a system managing a large number of models. A collision would cause two different models to contend for the same lease, leading to incorrect behavior. Kubernetes resource names can be up to 253 characters, so a much longer portion of the hash (e.g., 32 or even the full 64 characters) can be used to minimize this risk.
| if len(modelHash) > 16 { | |
| modelHash = modelHash[:16] | |
| } | |
| if len(modelHash) > 50 { | |
| modelHash = modelHash[:50] | |
| } |
| func getEnvInt(key string, defaultVal int) int { | ||
| if val := os.Getenv(key); val != "" { | ||
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | ||
| return defaultVal | ||
| } | ||
| } | ||
| return defaultVal | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of fmt.Sscanf here can be brittle. For example, if the environment variable is "123foo", Sscanf will successfully parse 123 and return no error, which might not be the desired behavior. Using strconv.Atoi is more idiomatic and provides stricter parsing. It would also be clearer to parse into a new variable rather than overwriting defaultVal. This feedback also applies to getEnvInt64.
| func getEnvInt(key string, defaultVal int) int { | |
| if val := os.Getenv(key); val != "" { | |
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | |
| return defaultVal | |
| } | |
| } | |
| return defaultVal | |
| } | |
| func getEnvInt(key string, defaultVal int) int { | |
| if valStr := os.Getenv(key); valStr != "" { | |
| if i, err := strconv.Atoi(valStr); err == nil { | |
| return i | |
| } | |
| } | |
| return defaultVal | |
| } |
| func getEnvInt64(key string, defaultVal int64) int64 { | ||
| if val := os.Getenv(key); val != "" { | ||
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | ||
| return defaultVal | ||
| } | ||
| } | ||
| return defaultVal | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to getEnvInt, using strconv.ParseInt would be more robust and idiomatic for parsing the int64 value from the environment variable compared to fmt.Sscanf.
| func getEnvInt64(key string, defaultVal int64) int64 { | |
| if val := os.Getenv(key); val != "" { | |
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | |
| return defaultVal | |
| } | |
| } | |
| return defaultVal | |
| } | |
| func getEnvInt64(key string, defaultVal int64) int64 { | |
| if valStr := os.Getenv(key); valStr != "" { | |
| if i, err := strconv.ParseInt(valStr, 10, 64); err == nil { | |
| return i | |
| } | |
| } | |
| return defaultVal | |
| } |
| var p2pDistributor *distributor.ModelDistributor | ||
| var metainfoServer *distributor.MetainfoServer | ||
| if cfg.p2pEnabled { | ||
| logger.Info("P2P model distribution is enabled, initializing...") | ||
|
|
||
| // Get POD_NAME and POD_IP for P2P peer identification | ||
| podName := os.Getenv("POD_NAME") | ||
| if podName == "" { | ||
| logger.Warn("POD_NAME not set, P2P coordination may not work correctly") | ||
| } | ||
| podIP := os.Getenv("POD_IP") | ||
| if podIP == "" { | ||
| logger.Warn("POD_IP not set, P2P peer discovery may not work correctly") | ||
| } | ||
|
|
||
| // Create distributor configuration | ||
| distCfg := distributor.Config{ | ||
| DataDir: cfg.modelsRootDir, | ||
| PodName: podName, | ||
| PodIP: podIP, | ||
| PeersService: cfg.p2pPeersService, | ||
| TorrentPort: cfg.p2pTorrentPort, | ||
| MetainfoPort: cfg.p2pMetainfoPort, | ||
| MaxDownloadRate: cfg.p2pMaxDownloadRate, | ||
| MaxUploadRate: cfg.p2pMaxUploadRate, | ||
| EnableEncryption: cfg.p2pEnableEncryption, | ||
| RequireEncryption: cfg.p2pRequireEncryption, | ||
| Namespace: cfg.namespace, | ||
| LeaseDurationSeconds: 120, // 2 minutes | ||
| LeaseRenewIntervalSeconds: 30, // renew every 30 seconds | ||
| P2PTimeoutSeconds: int(cfg.p2pDownloadTimeout.Seconds()), | ||
| EnableP2P: true, | ||
| } | ||
|
|
||
| // Create the P2P distributor | ||
| p2pDistributor, err = distributor.New(distCfg, logger) | ||
| if err != nil { | ||
| logger.Errorf("Failed to create P2P distributor: %v", err) | ||
| logger.Warn("Continuing without P2P support") | ||
| } else { | ||
| // Create lease manager for P2P coordination | ||
| leaseManager := modelagent.NewP2PLeaseManager(kubeClient, cfg.namespace, cfg.nodeName, logger) | ||
|
|
||
| // Enable P2P on the gopher | ||
| gopher.EnableP2P(p2pDistributor, leaseManager) | ||
| gopher.SetP2PTimeout(cfg.p2pDownloadTimeout) | ||
|
|
||
| // Create and start metainfo server | ||
| metainfoServer = distributor.NewMetainfoServer( | ||
| cfg.modelsRootDir, | ||
| cfg.p2pMetainfoPort, | ||
| p2pDistributor, | ||
| logger, | ||
| ) | ||
|
|
||
| go func() { | ||
| logger.Infof("Starting P2P metainfo server on port %d", cfg.p2pMetainfoPort) | ||
| if err := metainfoServer.ServeWithContext(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||
| logger.Errorf("Metainfo server error: %v", err) | ||
| } | ||
| }() | ||
|
|
||
| logger.Info("P2P model distribution initialized successfully") | ||
| } | ||
| } else { | ||
| logger.Info("P2P model distribution is disabled") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| func ConfigFromEnv() Config { | ||
| cfg := DefaultConfig() | ||
|
|
||
| if v := os.Getenv("MODEL_DIR"); v != "" { | ||
| cfg.DataDir = v | ||
| } | ||
| if v := os.Getenv("POD_NAMESPACE"); v != "" { | ||
| cfg.Namespace = v | ||
| } | ||
| if v := os.Getenv("POD_NAME"); v != "" { | ||
| cfg.PodName = v | ||
| } | ||
| if v := os.Getenv("POD_IP"); v != "" { | ||
| cfg.PodIP = v | ||
| } | ||
| if v := os.Getenv("PEERS_SERVICE"); v != "" { | ||
| cfg.PeersService = v | ||
| } | ||
| if v := os.Getenv("P2P_ENABLED"); v == "false" { | ||
| cfg.EnableP2P = false | ||
| } | ||
| if v := os.Getenv("P2P_ENCRYPTION_ENABLED"); v == "true" { | ||
| cfg.EnableEncryption = true | ||
| } | ||
| if v := os.Getenv("P2P_ENCRYPTION_REQUIRED"); v == "true" { | ||
| cfg.RequireEncryption = true | ||
| } | ||
|
|
||
| return cfg | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function ConfigFromEnv appears to be unused. The configuration for the P2P distributor is constructed manually in cmd/model-agent/main.go. This function seems to be dead code and should either be used to centralize configuration logic or be removed to avoid confusion. The same applies to the WithDefaults function.
| // createMetainfo builds a torrent metainfo for the given path. | ||
| func (d *ModelDistributor) createMetainfo(path, name string) (*metainfo.MetaInfo, error) { | ||
| info := metainfo.Info{ | ||
| PieceLength: 4 * 1024 * 1024, // 4MB pieces |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BitTorrent piece length is hardcoded to 4MB. While this is a reasonable default, it may not be optimal for all scenarios. For very large models (e.g., terabytes), a larger piece size (like 16MB or 32MB) can improve efficiency by reducing the size of the .torrent metainfo file. Consider making this value configurable in the distributor.Config to allow for tuning.
| relPath, err := filepath.Rel(absBase, modelPath) | ||
| if err != nil || strings.HasPrefix(relPath, "..") || relPath == "." { | ||
| s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash) | ||
| http.Error(w, "Invalid model path", http.StatusBadRequest) | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The path traversal check is good, but the condition relPath == "." is likely redundant. filepath.Clean(modelHash) is called on line 76, and the check modelHash == "." on line 78 already prevents this case. You can simplify this check.
| relPath, err := filepath.Rel(absBase, modelPath) | |
| if err != nil || strings.HasPrefix(relPath, "..") || relPath == "." { | |
| s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash) | |
| http.Error(w, "Invalid model path", http.StatusBadRequest) | |
| return | |
| } | |
| relPath, err := filepath.Rel(absBase, modelPath) | |
| if err != nil || strings.HasPrefix(relPath, "..") { | |
| s.logger.Warnf("Rejected metainfo request with path traversal attempt: %s", modelHash) | |
| http.Error(w, "Invalid model path", http.StatusBadRequest) | |
| return | |
| } |
6e44c5b to
953287c
Compare
- Add dedicated deleteTaskChan for delete tasks to ensure deletions are never blocked by downloads (even with 100 concurrent downloads) - Scout sends delete tasks to deleteChan instead of gopherChan - Gopher runs dedicated runDeleteWorker for immediate delete processing - Skip download if already seeding the model (prevents infinite loop when update events trigger DownloadOverride for already-downloaded models)
953287c to
1bcb721
Compare
are never blocked by downloads (even with 100 concurrent downloads)
when update events trigger DownloadOverride for already-downloaded models)