From a504752ce04ffc9c276a662bd5b6bc1f3b637084 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Mon, 5 Jan 2026 16:22:15 +0100 Subject: [PATCH 1/6] New Priotity and UserPriority news types --- .../src/client/commands/submit/command.rs | 6 +-- .../src/client/commands/submit/defs.rs | 4 +- crates/hyperqueue/src/transfer/messages.rs | 4 +- crates/tako/src/gateway.rs | 4 +- crates/tako/src/internal/common/mod.rs | 2 + crates/tako/src/internal/common/priority.rs | 50 +++++++++++++++++++ crates/tako/src/internal/messages/worker.rs | 3 +- .../tako/src/internal/scheduler/multinode.rs | 21 +++----- crates/tako/src/internal/scheduler/state.rs | 9 +--- crates/tako/src/internal/server/client.rs | 3 +- crates/tako/src/internal/server/reactor.rs | 14 +----- crates/tako/src/internal/server/task.rs | 43 +++++----------- .../internal/tests/integration/utils/task.rs | 4 +- .../tako/src/internal/worker/data/download.rs | 4 +- crates/tako/src/internal/worker/rpc.rs | 6 +-- crates/tako/src/internal/worker/rqueue.rs | 20 ++++---- crates/tako/src/internal/worker/state.rs | 9 +--- crates/tako/src/internal/worker/task.rs | 4 +- crates/tako/src/lib.rs | 5 +- 19 files changed, 105 insertions(+), 110 deletions(-) create mode 100644 crates/tako/src/internal/common/priority.rs diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 48b7e4c50..7fd125911 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -44,7 +44,7 @@ use tako::gateway::{ }; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount}; -use tako::{JobId, JobTaskCount, Map}; +use tako::{JobId, JobTaskCount, Map, UserPriority}; const SUBMIT_ARRAY_LIMIT: JobTaskCount = 999; @@ -295,7 +295,7 @@ pub struct SubmitJobTaskConfOpts { /// Tasks priority #[arg(long, default_value_t = 0)] - priority: tako::Priority, + priority: i32, /// Task's time limit #[arg( @@ -727,7 +727,7 @@ pub async fn submit_computation( let task_desc = TaskDescription { kind: task_kind, - priority, + priority: UserPriority::new(priority), time_limit, crash_limit, }; diff --git a/crates/hyperqueue/src/client/commands/submit/defs.rs b/crates/hyperqueue/src/client/commands/submit/defs.rs index daa8805d8..91832d9e6 100644 --- a/crates/hyperqueue/src/client/commands/submit/defs.rs +++ b/crates/hyperqueue/src/client/commands/submit/defs.rs @@ -14,7 +14,7 @@ use std::time::Duration; use tako::gateway::{CrashLimit, ResourceRequest, ResourceRequestEntries, ResourceRequestEntry}; use tako::program::FileOnCloseBehavior; use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount}; -use tako::{JobTaskCount, JobTaskId, Map, Priority}; +use tako::{JobTaskCount, JobTaskId, Map, UserPriority}; #[derive(Deserialize)] #[serde(untagged)] @@ -244,7 +244,7 @@ pub struct TaskConfigDef { pub time_limit: Option, #[serde(default)] - pub priority: Priority, + pub priority: UserPriority, #[serde(default, deserialize_with = "deserialize_crash_limit")] pub crash_limit: CrashLimit, diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 0bc17b391..7a1186bfc 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -21,7 +21,7 @@ use tako::program::ProgramDefinition; use tako::resources::{ResourceDescriptor, ResourceRqId}; use tako::server::TaskExplanation; use tako::worker::WorkerConfiguration; -use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, WorkerId, define_id_type}; +use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, UserPriority, WorkerId, define_id_type}; // Messages client -> server #[allow(clippy::large_enum_variant)] @@ -152,7 +152,7 @@ define_id_type!(LocalResourceRqId, u32); pub struct TaskDescription { pub kind: TaskKind, pub time_limit: Option, - pub priority: tako::Priority, + pub priority: UserPriority, pub crash_limit: CrashLimit, } diff --git a/crates/tako/src/gateway.rs b/crates/tako/src/gateway.rs index 20f9e85c0..05c0109e2 100644 --- a/crates/tako/src/gateway.rs +++ b/crates/tako/src/gateway.rs @@ -2,7 +2,7 @@ use crate::internal::common::error::DsError; use crate::internal::common::resources::ResourceRqId; use crate::internal::datasrv::dataobj::DataObjectId; use crate::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount}; -use crate::{InstanceId, Map, Priority, TaskId}; +use crate::{InstanceId, Map, TaskId, UserPriority}; use serde::{Deserialize, Serialize}; use smallvec::{SmallVec, smallvec}; use std::fmt::{Display, Formatter}; @@ -126,7 +126,7 @@ impl Display for CrashLimit { pub struct SharedTaskConfiguration { pub time_limit: Option, - pub priority: Priority, + pub priority: UserPriority, pub crash_limit: CrashLimit, diff --git a/crates/tako/src/internal/common/mod.rs b/crates/tako/src/internal/common/mod.rs index 70d13484d..c95a50fd4 100644 --- a/crates/tako/src/internal/common/mod.rs +++ b/crates/tako/src/internal/common/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod data_structures; pub(crate) mod error; pub(crate) mod ids; pub(crate) mod index; +mod priority; pub mod resources; pub(crate) mod rpc; pub(crate) mod stablemap; @@ -13,4 +14,5 @@ pub(crate) mod utils; pub(crate) mod wrapped; pub use data_structures::{Map, Set}; +pub use priority::{Priority, UserPriority}; pub use wrapped::WrappedRcRefCell; diff --git a/crates/tako/src/internal/common/priority.rs b/crates/tako/src/internal/common/priority.rs new file mode 100644 index 000000000..f6b1d9d32 --- /dev/null +++ b/crates/tako/src/internal/common/priority.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; + +/// User-defined priority +/// +/// A larger number ==> a higher priority +/// From user perspective, negative value is allowed, but we store internally as an unsigned number, +/// So we can easily concatenate it with scheduler priorities +#[derive( + Default, Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, +)] +pub struct UserPriority(u32); + +impl UserPriority { + pub fn new(value: i32) -> Self { + Self(value as u32 ^ 0x8000_0000) + } +} + +impl Display for UserPriority { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", (self.0 ^ 0x8000_0000) as i32) + } +} + +/// Main priority type that mixes user priority, scheduler priority, resource priority +/// +/// A larger number ==> a higher priority +#[derive( + Default, Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, +)] +pub struct Priority(u64); + +impl Priority { + pub fn new(value: u64) -> Self { + Self(value) + } + + pub fn from_user_priority(user_priority: UserPriority) -> Self { + Priority((user_priority.0 as u64) << 32) + } + + pub fn remove_priority_u32(&self, value: u32) -> Priority { + Priority(self.0.saturating_add((u32::MAX - value) as u64)) + } + + pub fn combine(self, other: Priority) -> Priority { + Priority(self.0.saturating_add(other.0)) + } +} diff --git a/crates/tako/src/internal/messages/worker.rs b/crates/tako/src/internal/messages/worker.rs index c58fbacf2..06887e454 100644 --- a/crates/tako/src/internal/messages/worker.rs +++ b/crates/tako/src/internal/messages/worker.rs @@ -32,7 +32,7 @@ pub struct ComputeTaskSeparateData { pub id: TaskId, pub resource_rq_id: ResourceRqId, pub instance_id: InstanceId, - pub scheduler_priority: Priority, + pub priority: Priority, pub node_list: Vec, pub data_deps: Vec, pub entry: Option, @@ -40,7 +40,6 @@ pub struct ComputeTaskSeparateData { #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct ComputeTaskSharedData { - pub user_priority: Priority, pub time_limit: Option, pub data_flags: TaskDataFlags, pub body: Rc<[u8]>, diff --git a/crates/tako/src/internal/scheduler/multinode.rs b/crates/tako/src/internal/scheduler/multinode.rs index 484c441f3..4e77e6952 100644 --- a/crates/tako/src/internal/scheduler/multinode.rs +++ b/crates/tako/src/internal/scheduler/multinode.rs @@ -6,21 +6,21 @@ use crate::internal::server::worker::Worker; use crate::internal::server::workergroup::WorkerGroup; use crate::internal::server::workermap::WorkerMap; use crate::resources::ResourceRequest; -use crate::{Map, PriorityTuple, TaskId, WorkerId}; +use crate::{Map, Priority, TaskId, WorkerId}; use priority_queue::PriorityQueue; struct QueueForRequest { - queue: PriorityQueue, + queue: PriorityQueue, sleeping: bool, } impl QueueForRequest { - pub fn peek(&self) -> Option<(TaskId, PriorityTuple)> { + pub fn peek(&self) -> Option<(TaskId, Priority)> { self.queue .peek() - .map(|(task_id, priority)| (*task_id, (priority.0, priority.1))) + .map(|(task_id, priority)| (*task_id, *priority)) } - pub fn current_priority(&self) -> Option { + pub fn current_priority(&self) -> Option { self.peek().map(|x| x.1) } } @@ -31,13 +31,6 @@ pub(crate) struct MultiNodeQueue { requests: Vec, } -fn task_priority_tuple(task: &Task) -> PriorityTuple { - ( - task.configuration.user_priority, - task.get_scheduler_priority(), - ) -} - impl MultiNodeQueue { pub fn shrink_to_fit(&mut self) { self.queues.shrink_to_fit(); @@ -68,7 +61,7 @@ impl MultiNodeQueue { }) .queue }; - queue.push(task.id, task_priority_tuple(task)); + queue.push(task.id, task.priority()); } pub fn wakeup_sleeping_tasks(&mut self) { @@ -184,7 +177,7 @@ impl<'a> MultiNodeAllocator<'a> { pub fn try_allocate_task(self) -> Option<(TaskId, Vec)> { 'outer: loop { - let current_priority: PriorityTuple = if let Some(Some(priority)) = self + let current_priority = if let Some(Some(priority)) = self .mn_queue .queues .values() diff --git a/crates/tako/src/internal/scheduler/state.rs b/crates/tako/src/internal/scheduler/state.rs index fca141538..dec8c8743 100644 --- a/crates/tako/src/internal/scheduler/state.rs +++ b/crates/tako/src/internal/scheduler/state.rs @@ -231,7 +231,7 @@ impl SchedulerState { let mut task_msg_builder = ComputeTasksBuilder::default(); task_ids.sort_by_cached_key(|&task_id| { let task = task_map.get_task(task_id); - Reverse((task.configuration.user_priority, task.scheduler_priority)) + Reverse(task.priority()) }); for task_id in task_ids { let task = task_map.get_task_mut(task_id); @@ -515,12 +515,7 @@ impl SchedulerState { worker.id, cost ); - ( - u64::MAX - cost, - task.configuration.user_priority, - task.scheduler_priority, - difficulty, - ) + (u64::MAX - cost, task.priority(), difficulty) }); let len = ts.len(); underload_workers.push(( diff --git a/crates/tako/src/internal/server/client.rs b/crates/tako/src/internal/server/client.rs index ebf1cb0b4..ff2666987 100644 --- a/crates/tako/src/internal/server/client.rs +++ b/crates/tako/src/internal/server/client.rs @@ -42,7 +42,7 @@ pub(crate) fn handle_new_tasks( return Err(format!("Invalid configuration index {idx}").into()); } let conf = &configurations[idx]; - let mut task = Task::new( + let task = Task::new( task.id, task.resource_rq_id, task.task_deps, @@ -50,7 +50,6 @@ pub(crate) fn handle_new_tasks( task.entry, conf.clone(), ); - task.scheduler_priority = -(task.id.job_id().as_num() as i32); tasks.push(task); } if !task_submit.adjust_instance_id_and_crash_counters.is_empty() { diff --git a/crates/tako/src/internal/server/reactor.rs b/crates/tako/src/internal/server/reactor.rs index 02585c5b5..0071dff16 100644 --- a/crates/tako/src/internal/server/reactor.rs +++ b/crates/tako/src/internal/server/reactor.rs @@ -18,9 +18,6 @@ use crate::internal::server::workermap::WorkerMap; use crate::{TaskId, WorkerId}; use std::fmt::Write; -// Scheduler priority increase for each t-level -pub(crate) const T_LEVEL_WEIGHT: i32 = 256; - pub(crate) fn on_new_worker(core: &mut Core, comm: &mut impl Comm, worker: Worker) { comm.broadcast_worker_message(&ToWorkerMessage::NewWorker(NewWorkerMsg { worker_id: worker.id, @@ -174,26 +171,17 @@ pub(crate) fn on_new_tasks(core: &mut Core, comm: &mut impl Comm, new_tasks: Vec assert!(!new_tasks.is_empty()); for mut task in new_tasks.into_iter() { let mut count = 0; - // We assign scheduler priority here, the goal is to set scheduler_priority as follows = t-level * T_LEVEL_WEIGHT - job_id - // where t-level is the length of the maximal path from root tasks - // Goal is to prioritize task graph components that were partially computed + prioritize older tasks (according job_id) - // T-level is T_LEVEL_WEIGHT-times more important than job_id difference, - // but large job_id difference will overweight t-level which is usually bounded, that is done by design. - let mut priority = -(task.id.job_id().as_num() as i32); task.task_deps.retain(|t| { if let Some(task_dep) = core.find_task_mut(*t) { task_dep.add_consumer(task.id); if !task_dep.is_finished() { - priority = - std::cmp::max(priority, task_dep.scheduler_priority + T_LEVEL_WEIGHT); - count += 1 + count += 1; } true } else { false } }); - task.set_scheduler_priority(priority); assert!(matches!( task.state, TaskRuntimeState::Waiting(WaitingInfo { unfinished_deps: 0 }) diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index 8f5edba0b..71922877a 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -6,7 +6,7 @@ use thin_vec::ThinVec; use crate::internal::common::Set; use crate::internal::common::stablemap::ExtractKey; -use crate::{MAX_FRAME_SIZE, Map, ResourceVariantId, WorkerId}; +use crate::{MAX_FRAME_SIZE, Map, ResourceVariantId, UserPriority, WorkerId}; use crate::gateway::{CrashLimit, EntryType, TaskDataFlags}; use crate::internal::datasrv::dataobj::DataObjectId; @@ -97,7 +97,7 @@ bitflags::bitflags! { pub struct TaskConfiguration { // Use Rc to avoid cloning the data when we serialize them pub body: Rc<[u8]>, - pub user_priority: Priority, + pub user_priority: UserPriority, pub time_limit: Option, pub crash_limit: CrashLimit, pub data_flags: TaskDataFlags, @@ -124,14 +124,13 @@ pub struct Task { pub flags: TaskFlags, pub resource_rq_id: ResourceRqId, pub configuration: Rc, - pub scheduler_priority: Priority, pub instance_id: InstanceId, pub crash_counter: u32, pub entry: Option, } // Task is a critical data structure, so we should keep its size in check -static_assert_size!(Task, 120); +static_assert_size!(Task, 112); impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -151,7 +150,6 @@ impl Task { "consumers": self.consumers, "task_deps": self.task_deps, "flags": self.flags.bits(), - "scheduler_priority": self.scheduler_priority, "instance_id": self.instance_id, "crash_counter": self.crash_counter, "configuration": self.configuration.dump(), @@ -185,7 +183,6 @@ impl Task { resource_rq_id, configuration, entry, - scheduler_priority: Default::default(), state: TaskRuntimeState::Waiting(WaitingInfo { unfinished_deps: 0 }), consumers: Default::default(), instance_id: InstanceId::new(0), @@ -193,6 +190,12 @@ impl Task { } } + #[inline] + pub(crate) fn priority(&self) -> Priority { + Priority::from_user_priority(self.configuration.user_priority) + .remove_priority_u32(self.id.job_id().as_num()) + } + #[inline] pub(crate) fn is_ready(&self) -> bool { matches!( @@ -359,24 +362,6 @@ impl Task { | TaskRuntimeState::Finished => None, } } - - #[inline] - pub(crate) fn get_scheduler_priority(&self) -> i32 { - /*match self.state { - TaskRuntimeState::Waiting(winfo) => winfo.scheduler_metric, - _ => unreachable!() - }*/ - self.scheduler_priority - } - - #[inline] - pub(crate) fn set_scheduler_priority(&mut self, value: i32) { - /*match &mut self.state { - TaskRuntimeState::Waiting(WaitingInfo { ref mut scheduler_metric, ..}) => { *scheduler_metric = value }, - _ => unreachable!() - }*/ - self.scheduler_priority = value; - } } impl ExtractKey for Task { @@ -424,7 +409,6 @@ impl ComputeTasksBuilder { .entry(conf.clone()) .or_insert_with(|| { let shared = ComputeTaskSharedData { - user_priority: conf.user_priority, time_limit: conf.time_limit, data_flags: conf.data_flags, body: conf.body.clone(), @@ -440,7 +424,7 @@ impl ComputeTasksBuilder { id: task.id, resource_rq_id: task.resource_rq_id, instance_id: task.instance_id, - scheduler_priority: task.scheduler_priority, + priority: task.priority(), node_list, data_deps: task.data_deps.iter().copied().collect(), entry: task.entry.clone(), @@ -488,7 +472,7 @@ fn estimate_task_data_size(data: &ComputeTaskSeparateData) -> usize { id, resource_rq_id, instance_id, - scheduler_priority, + priority, node_list, data_deps, entry, @@ -500,7 +484,7 @@ fn estimate_task_data_size(data: &ComputeTaskSeparateData) -> usize { + size_of_val(id) + size_of_val(resource_rq_id) + size_of_val(instance_id) - + size_of_val(scheduler_priority) + + size_of_val(priority) + size_of_val(node_list.as_slice()) + size_of_val(data_deps.as_slice()) + entry.as_ref().map(|e| e.len()).unwrap_or_default() @@ -509,12 +493,11 @@ fn estimate_task_data_size(data: &ComputeTaskSeparateData) -> usize { /// Estimate how much data it will take to serialize this shared task data fn estimate_shared_data_size(data: &ComputeTaskSharedData) -> usize { let ComputeTaskSharedData { - user_priority, time_limit, data_flags, body, } = data; - size_of_val(user_priority) + size_of_val(time_limit) + size_of_val(data_flags) + body.len() + size_of_val(time_limit) + size_of_val(data_flags) + body.len() } #[cfg(test)] diff --git a/crates/tako/src/internal/tests/integration/utils/task.rs b/crates/tako/src/internal/tests/integration/utils/task.rs index 582a25a03..7741fb35e 100644 --- a/crates/tako/src/internal/tests/integration/utils/task.rs +++ b/crates/tako/src/internal/tests/integration/utils/task.rs @@ -6,7 +6,6 @@ use derive_builder::Builder; use smallvec::smallvec; use thin_vec::ThinVec; -use crate::TaskId; use crate::gateway::{ CrashLimit, ResourceRequest, ResourceRequestEntry, ResourceRequestVariants, SharedTaskConfiguration, TaskConfiguration, TaskDataFlags, @@ -15,6 +14,7 @@ use crate::internal::common::Map; use crate::internal::common::resources::{NumOfNodes, ResourceRqId}; use crate::program::{ProgramDefinition, StdioDef}; use crate::resources::{AllocationRequest, ResourceAmount}; +use crate::{TaskId, UserPriority}; pub struct GraphBuilder { id_counter: u32, @@ -114,7 +114,7 @@ pub fn build_task_def_from_config( let conf = SharedTaskConfiguration { time_limit, - priority: 0, + priority: UserPriority::new(0), crash_limit: CrashLimit::default(), data_flags: TaskDataFlags::empty(), body: body.into(), diff --git a/crates/tako/src/internal/worker/data/download.rs b/crates/tako/src/internal/worker/data/download.rs index d8ab05c91..6a78bd380 100644 --- a/crates/tako/src/internal/worker/data/download.rs +++ b/crates/tako/src/internal/worker/data/download.rs @@ -1,4 +1,4 @@ -use crate::PriorityTuple; +use crate::Priority; use crate::datasrv::DataObjectId; use crate::internal::datasrv::{DataObjectRef, DownloadInterface, DownloadManagerRef}; use crate::internal::worker::state::WorkerStateRef; @@ -23,4 +23,4 @@ impl DownloadInterface for WorkerStateRef { } } -pub(crate) type WorkerDownloadManagerRef = DownloadManagerRef; +pub(crate) type WorkerDownloadManagerRef = DownloadManagerRef; diff --git a/crates/tako/src/internal/worker/rpc.rs b/crates/tako/src/internal/worker/rpc.rs index e102cd53b..10899b7a4 100644 --- a/crates/tako/src/internal/worker/rpc.rs +++ b/crates/tako/src/internal/worker/rpc.rs @@ -385,11 +385,7 @@ pub(crate) fn process_worker_message(state: &mut WorkerState, message: ToWorkerM task.data_deps.iter().for_each(|data_id| { if !state.data_storage.has_object(*data_id) { waiting += 1; - state.download_object( - *data_id, - task.id, - (shared.user_priority, task.scheduler_priority), - ) + state.download_object(*data_id, task.id, task.priority) } }); waiting diff --git a/crates/tako/src/internal/worker/rqueue.rs b/crates/tako/src/internal/worker/rqueue.rs index 1bb2b39cf..02fb56968 100644 --- a/crates/tako/src/internal/worker/rqueue.rs +++ b/crates/tako/src/internal/worker/rqueue.rs @@ -6,13 +6,11 @@ use crate::internal::worker::resources::allocator::ResourceAllocator; use crate::internal::worker::state::TaskMap; use crate::internal::worker::task::Task; use crate::resources::ResourceRqId; -use crate::{Priority, PriorityTuple, ResourceVariantId, Set, TaskId, WorkerId}; +use crate::{Priority, ResourceVariantId, Set, TaskId, WorkerId}; use priority_queue::PriorityQueue; use std::rc::Rc; use std::time::Duration; -type QueuePriorityTuple = (Priority, Priority, Priority); // user priority, resource priority, scheduler priority - /// QueueForRequest is priority queue of the tasks that has the same resource request /// The idea is that if we cannot schedule one task from this queue, we cannot schedule /// any task in this queue. @@ -33,7 +31,7 @@ type QueuePriorityTuple = (Priority, Priority, Priority); // user priority, reso #[derive(Debug)] pub(crate) struct QueueForRequest { resource_priority: Priority, - queue: PriorityQueue, + queue: PriorityQueue, is_blocked: bool, } @@ -46,21 +44,21 @@ impl QueueForRequest { self.is_blocked = true; } - pub fn current_priority(&self) -> Option { + pub fn current_priority(&self) -> Option { if self.is_blocked { None } else { - self.peek().map(|x| x.1) + self.peek().map(|x| x.1.combine(self.resource_priority)) } } - pub fn peek(&self) -> Option<(TaskId, QueuePriorityTuple)> { + pub fn peek(&self) -> Option<(TaskId, Priority)> { if self.is_blocked { return None; } self.queue .peek() - .map(|(task_id, priority)| (*task_id, (priority.0, self.resource_priority, priority.1))) + .map(|(task_id, priority)| (*task_id, *priority)) } } @@ -108,10 +106,10 @@ impl ResourceWaitQueue { let mut p = 0; for (r, s) in &self.worker_resources { if !r.is_capable_to_run(rqv) { - p += s.len() as Priority; + p += s.len() as u64; } } - p + Priority::new(p << 16) } pub fn release_allocation(&mut self, allocation: Rc) { @@ -217,7 +215,7 @@ impl ResourceWaitQueue { resource_rq_map: &ResourceRqMap, out: &mut Vec<(TaskId, Rc, ResourceVariantId)>, ) -> bool { - let current_priority: QueuePriorityTuple = if let Some(Some(priority)) = + let current_priority: Priority = if let Some(Some(priority)) = self.queues.values().map(|qfr| qfr.current_priority()).max() { priority diff --git a/crates/tako/src/internal/worker/state.rs b/crates/tako/src/internal/worker/state.rs index e1e2b36af..fca090f0c 100644 --- a/crates/tako/src/internal/worker/state.rs +++ b/crates/tako/src/internal/worker/state.rs @@ -27,7 +27,7 @@ use crate::internal::worker::task::{RunningState, Task, TaskState}; use crate::internal::worker::task_comm::RunningTaskComm; use crate::launcher::TaskLauncher; use crate::resources::ResourceRequestVariants; -use crate::{PriorityTuple, TaskId}; +use crate::{Priority, TaskId}; use orion::aead::SecretKey; use rand::SeedableRng; use rand::prelude::IndexedRandom; @@ -441,12 +441,7 @@ impl WorkerState { self.resource_rq_map.insert(rqv) } - pub fn download_object( - &mut self, - data_id: DataObjectId, - task_id: TaskId, - priority: PriorityTuple, - ) { + pub fn download_object(&mut self, data_id: DataObjectId, task_id: TaskId, priority: Priority) { self.tasks_waiting_for_data .entry(data_id) .or_default() diff --git a/crates/tako/src/internal/worker/task.rs b/crates/tako/src/internal/worker/task.rs index 99446febc..6b3d4e406 100644 --- a/crates/tako/src/internal/worker/task.rs +++ b/crates/tako/src/internal/worker/task.rs @@ -25,7 +25,7 @@ pub enum TaskState { pub struct Task { pub id: TaskId, pub state: TaskState, - pub priority: (Priority, Priority), + pub priority: Priority, pub instance_id: InstanceId, pub resource_rq_id: ResourceRqId, @@ -47,7 +47,7 @@ impl Task { Self { state: task_state, id: task.id, - priority: (shared.user_priority, task.scheduler_priority), + priority: task.priority, instance_id: task.instance_id, resource_rq_id: task.resource_rq_id, time_limit: shared.time_limit, diff --git a/crates/tako/src/lib.rs b/crates/tako/src/lib.rs index 560f18096..26e96d085 100644 --- a/crates/tako/src/lib.rs +++ b/crates/tako/src/lib.rs @@ -17,6 +17,7 @@ pub use crate::internal::common::index::{AsIdVec, ItemId}; pub use crate::internal::common::taskgroup::TaskGroup; pub use crate::internal::common::utils::format_comma_delimited; pub use crate::internal::common::{Map, Set}; +pub use crate::internal::common::{Priority, UserPriority}; pub use crate::internal::common::ids::{ InstanceId, JobId, JobTaskId, ResourceVariantId, TaskId, WorkerId, @@ -24,10 +25,6 @@ pub use crate::internal::common::ids::{ pub type JobTaskCount = u32; -// Priority: Bigger number -> Higher priority -pub type Priority = i32; -pub type PriorityTuple = (Priority, Priority); // user priority, scheduler priority - pub type Error = internal::common::error::DsError; pub type Result = std::result::Result; From af6dcd2098b0a525bc10e25834c614917d6240c1 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Tue, 6 Jan 2026 19:27:52 +0100 Subject: [PATCH 2/6] Fixed tako tests --- crates/tako/src/internal/common/priority.rs | 16 ++ .../tako/src/internal/tests/test_reactor.rs | 30 ++-- .../src/internal/tests/test_scheduler_mn.rs | 11 +- crates/tako/src/internal/tests/test_worker.rs | 5 +- crates/tako/src/internal/tests/utils/task.rs | 10 +- crates/tako/src/internal/worker/rqueue.rs | 13 +- .../tako/src/internal/worker/test_rqueue.rs | 162 ++++++++++++------ crates/tako/src/internal/worker/test_util.rs | 24 +-- 8 files changed, 154 insertions(+), 117 deletions(-) diff --git a/crates/tako/src/internal/common/priority.rs b/crates/tako/src/internal/common/priority.rs index f6b1d9d32..01402dd8a 100644 --- a/crates/tako/src/internal/common/priority.rs +++ b/crates/tako/src/internal/common/priority.rs @@ -17,6 +17,12 @@ impl UserPriority { } } +impl From for UserPriority { + fn from(value: i32) -> Self { + UserPriority::new(value) + } +} + impl Display for UserPriority { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", (self.0 ^ 0x8000_0000) as i32) @@ -40,6 +46,10 @@ impl Priority { Priority((user_priority.0 as u64) << 32) } + pub fn new_resource_priority(value: u64) -> Self { + Priority(value << 16) + } + pub fn remove_priority_u32(&self, value: u32) -> Priority { Priority(self.0.saturating_add((u32::MAX - value) as u64)) } @@ -48,3 +58,9 @@ impl Priority { Priority(self.0.saturating_add(other.0)) } } + +impl From for Priority { + fn from(value: u64) -> Self { + Self(value) + } +} diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index a3cb2bd3f..cf66edd2b 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -11,8 +11,8 @@ use crate::internal::messages::worker::{StealResponse, StealResponseMsg}; use crate::internal::scheduler::state::SchedulerState; use crate::internal::server::core::Core; use crate::internal::server::reactor::{ - T_LEVEL_WEIGHT, on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, - on_steal_response, on_task_error, on_task_finished, on_task_running, + on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, on_steal_response, + on_task_error, on_task_finished, on_task_running, }; use crate::internal::server::task::{Task, TaskRuntimeState}; use crate::internal::server::worker::Worker; @@ -32,7 +32,7 @@ use crate::internal::worker::configuration::{ }; use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceIdMap}; use crate::worker::{ServerLostPolicy, WorkerConfiguration}; -use crate::{TaskId, WorkerId}; +use crate::{Priority, TaskId, WorkerId}; #[test] fn test_worker_add() { @@ -171,24 +171,14 @@ fn test_scheduler_priority() { on_new_tasks(&mut core, &mut comm, vec![t1, t2, t3, t4, t5, t6, t7, t8]); - assert_eq!(core.get_task(TaskId::new_test(501)).scheduler_priority, 0); assert_eq!( - core.get_task(TaskId::new_test(502)).scheduler_priority, - T_LEVEL_WEIGHT + core.get_task(TaskId::new_test(501)).priority(), + Priority::from_user_priority(0.into()).remove_priority_u32(0) ); - assert_eq!(core.get_task(TaskId::new_test(503)).scheduler_priority, 0); assert_eq!( - core.get_task(TaskId::new_test(504)).scheduler_priority, - 2 * T_LEVEL_WEIGHT + core.get_task(task_id8).priority(), + Priority::from_user_priority(0.into()).remove_priority_u32(123) ); - - assert_eq!(core.get_task(task_id5).scheduler_priority, -123); - assert_eq!(core.get_task(task_id6).scheduler_priority, -122); - assert_eq!( - core.get_task(task_id7).scheduler_priority, - -123 + T_LEVEL_WEIGHT - ); - assert_eq!(core.get_task(task_id8).scheduler_priority, -123); } #[test] @@ -293,9 +283,9 @@ fn test_assignments_and_finish() { &msgs[0], ToWorkerMessage::ComputeTasks(ComputeTasksMsg { tasks, - shared_data - }) if tasks.len() == 2 && tasks[0].id.job_task_id().as_num() == 11 && shared_data[0].user_priority == 12 && - tasks[1].id.job_task_id().as_num() == 15 && tasks[0].shared_index == 0 && tasks[1].shared_index == 1 && shared_data[1].user_priority == 0 + shared_data: _, + }) if tasks.len() == 2 && tasks[0].id.job_task_id().as_num() == 11 && + tasks[1].id.job_task_id().as_num() == 15 && tasks[0].shared_index == 0 )); let msgs = comm.take_worker_msgs(101, 1); assert!(matches!( diff --git a/crates/tako/src/internal/tests/test_scheduler_mn.rs b/crates/tako/src/internal/tests/test_scheduler_mn.rs index 92c8b10fd..8ca4be994 100644 --- a/crates/tako/src/internal/tests/test_scheduler_mn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_mn.rs @@ -10,16 +10,9 @@ use crate::internal::tests::utils::schedule::{ use crate::internal::tests::utils::task::TaskBuilder; use crate::resources::{ResourceDescriptor, ResourceIdMap}; -use crate::{Priority, TaskId, WorkerId}; +use crate::{TaskId, WorkerId}; use std::time::Duration; -/*fn get_mn_placement(task: &Task) -> Vec { - match &task.state { - TaskRuntimeState::RunningMultiNode(ws) => ws.clone(), - _ => unreachable!(), - } -}*/ - #[derive(Debug)] enum WorkerStatus { Root, @@ -68,7 +61,7 @@ fn test_schedule_mn_simple() { let tasks: Vec = (1..=4) .map(|i| { TaskBuilder::new(i) - .user_priority(i as Priority) + .user_priority(i as i32) .n_nodes(2) .build(rmap) }) diff --git a/crates/tako/src/internal/tests/test_worker.rs b/crates/tako/src/internal/tests/test_worker.rs index 406c5aed5..28e0c44f0 100644 --- a/crates/tako/src/internal/tests/test_worker.rs +++ b/crates/tako/src/internal/tests/test_worker.rs @@ -17,7 +17,7 @@ use crate::internal::worker::state::WorkerStateRef; use crate::launcher::{StopReason, TaskBuildContext, TaskLaunchData, TaskLauncher}; use crate::resources::{ResourceDescriptor, ResourceIdMap}; use crate::worker::{ServerLostPolicy, WorkerConfiguration}; -use crate::{Set, TaskId, WorkerId}; +use crate::{Priority, Set, TaskId, WorkerId}; use std::ops::Deref; use std::time::Duration; use tokio::sync::oneshot::Receiver; @@ -89,13 +89,12 @@ fn create_dummy_compute_msg(task_id: TaskId, resource_rq_id: ResourceRqId) -> Co id: task_id, resource_rq_id, instance_id: Default::default(), - scheduler_priority: 0, + priority: Priority::new(0), node_list: vec![], data_deps: vec![], entry: None, }], shared_data: vec![ComputeTaskSharedData { - user_priority: 0, time_limit: None, data_flags: TaskDataFlags::empty(), body: Default::default(), diff --git a/crates/tako/src/internal/tests/utils/task.rs b/crates/tako/src/internal/tests/utils/task.rs index 81a58bf27..be32dcbbf 100644 --- a/crates/tako/src/internal/tests/utils/task.rs +++ b/crates/tako/src/internal/tests/utils/task.rs @@ -8,7 +8,7 @@ use crate::internal::common::resources::{ use crate::internal::messages::worker::TaskRunningMsg; use crate::internal::server::task::{Task, TaskConfiguration}; use crate::resources::ResourceRequest; -use crate::{Priority, ResourceVariantId, Set, TaskId}; +use crate::{ResourceVariantId, Set, TaskId, UserPriority}; use smallvec::SmallVec; use std::rc::Rc; use thin_vec::ThinVec; @@ -19,7 +19,7 @@ pub struct TaskBuilder { data_deps: ThinVec, finished_resources: Vec, resources_builder: ResBuilder, - user_priority: Priority, + user_priority: UserPriority, crash_limit: CrashLimit, data_flags: TaskDataFlags, } @@ -32,14 +32,14 @@ impl TaskBuilder { data_deps: Default::default(), finished_resources: vec![], resources_builder: Default::default(), - user_priority: 0, + user_priority: 0.into(), crash_limit: CrashLimit::default(), data_flags: TaskDataFlags::empty(), } } - pub fn user_priority(mut self, value: Priority) -> TaskBuilder { - self.user_priority = value; + pub fn user_priority>(mut self, value: P) -> TaskBuilder { + self.user_priority = value.into(); self } diff --git a/crates/tako/src/internal/worker/rqueue.rs b/crates/tako/src/internal/worker/rqueue.rs index 02fb56968..e023b25d6 100644 --- a/crates/tako/src/internal/worker/rqueue.rs +++ b/crates/tako/src/internal/worker/rqueue.rs @@ -45,11 +45,7 @@ impl QueueForRequest { } pub fn current_priority(&self) -> Option { - if self.is_blocked { - None - } else { - self.peek().map(|x| x.1.combine(self.resource_priority)) - } + self.peek().map(|x| x.1) } pub fn peek(&self) -> Option<(TaskId, Priority)> { @@ -58,7 +54,7 @@ impl QueueForRequest { } self.queue .peek() - .map(|(task_id, priority)| (*task_id, *priority)) + .map(|(task_id, priority)| (*task_id, priority.combine(self.resource_priority))) } } @@ -109,7 +105,7 @@ impl ResourceWaitQueue { p += s.len() as u64; } } - Priority::new(p << 16) + Priority::new_resource_priority(p) } pub fn release_allocation(&mut self, allocation: Rc) { @@ -215,6 +211,7 @@ impl ResourceWaitQueue { resource_rq_map: &ResourceRqMap, out: &mut Vec<(TaskId, Rc, ResourceVariantId)>, ) -> bool { + dbg!(&self.queues); let current_priority: Priority = if let Some(Some(priority)) = self.queues.values().map(|qfr| qfr.current_priority()).max() { @@ -222,9 +219,11 @@ impl ResourceWaitQueue { } else { return true; }; + dbg!(current_priority); for rqv in &self.requests { let qfr = self.queues.get_mut(rqv).unwrap(); while let Some((_task_id, priority)) = qfr.peek() { + dbg!(priority); if current_priority != priority { break; } diff --git a/crates/tako/src/internal/worker/test_rqueue.rs b/crates/tako/src/internal/worker/test_rqueue.rs index 2c17d41e9..64a74df46 100644 --- a/crates/tako/src/internal/worker/test_rqueue.rs +++ b/crates/tako/src/internal/worker/test_rqueue.rs @@ -14,7 +14,7 @@ use crate::internal::tests::utils::shared::{ }; use crate::internal::worker::test_util::ResourceQueueBuilder as RB; use crate::resources::{ResourceDescriptorItem, ResourceRqId}; -use crate::{Map, Set, WorkerId}; +use crate::{Map, Priority, Set, WorkerId}; impl ResourceWaitQueue { pub fn requests(&self) -> &[ResourceRqId] { @@ -37,16 +37,16 @@ fn test_rqueue_resource_priority() { let w = worker_task( 10, ResBuilder::default().add_scatter(0, 3).finish(), - 1, + Priority::new(1), &mut rqs, ); rq.add_task(&rqs, w); - let w = worker_task(11, cpus_compact(4).finish(), 1, &mut rqs); + let w = worker_task(11, cpus_compact(4).finish(), Priority::new(1), &mut rqs); rq.add_task(&rqs, w); let w = worker_task( 12, ResBuilder::default().add_force_compact(0, 4).finish(), - 1, + Priority::new(1), &mut rqs, ); rq.add_task(&rqs, w); @@ -77,9 +77,10 @@ fn test_rqueue_resource_priority() { fn test_rqueue1() { let mut rqs = ResourceRqMap::default(); let mut rq = RB::new(wait_queue(ResourceDescriptor::sockets(3, 5))); - worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), 1); - worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(5).finish(), 1); - worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), 1); + let p = Priority::new(1); + worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), p); + worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(5).finish(), p); + worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), p); let a = rq.start_tasks(&rqs); assert_eq!(a.get(&10).unwrap().get_indices(0).len(), 2); @@ -92,9 +93,11 @@ fn test_rqueue2() { let mut rqs = ResourceRqMap::default(); let mut rq = RB::new(wait_queue(ResourceDescriptor::simple_cpus(4))); - worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), 1); - worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(1).finish(), 2); - worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), 2); + let p1 = Priority::new(1); + let p2 = Priority::new(2); + worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), p1); + worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(1).finish(), p2); + worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), p2); let a = rq.start_tasks(&rqs); assert!(!a.contains_key(&10)); @@ -108,9 +111,11 @@ fn test_rqueue3() { let mut rqs = ResourceRqMap::default(); let mut rq = RB::new(wait_queue(ResourceDescriptor::simple_cpus(4))); - worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), 1); - worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(1).finish(), 1); - worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), 2); + let p1 = Priority::new(1); + let p2 = Priority::new(2); + worker_task_add(&mut rq, &mut rqs, 10, cpus_compact(2).finish(), p1); + worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(1).finish(), p1); + worker_task_add(&mut rq, &mut rqs, 12, cpus_compact(2).finish(), p2); let a = rq.start_tasks(&rqs); assert!(a.contains_key(&10)); @@ -127,7 +132,7 @@ fn test_rqueue_time_request() { &mut rqs, 10, ResBuilder::default().add(0, 1).min_time_secs(10).finish(), - 1, + Priority::new(1), ); assert_eq!(rq.start_tasks_duration(&rqs, Duration::new(9, 0)).len(), 0); @@ -143,28 +148,28 @@ fn test_rqueue_time_request_priority1() { &mut rqs, 10, cpus_compact(2).min_time_secs(10).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 11, cpus_compact(2).min_time_secs(40).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 12, cpus_compact(2).min_time_secs(20).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 13, cpus_compact(2).min_time_secs(30).finish(), - 1, + Priority::new(1), ); let map = rq.start_tasks_duration(&rqs, Duration::new(40, 0)); @@ -182,28 +187,28 @@ fn test_rqueue_time_request_priority2() { &mut rqs, 10, cpus_compact(2).min_time_secs(10).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 11, cpus_compact(2).min_time_secs(40).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 12, cpus_compact(2).min_time_secs(20).finish(), - 1, + Priority::new(1), ); worker_task_add( &mut rq, &mut rqs, 13, cpus_compact(2).min_time_secs(30).finish(), - 1, + Priority::new(1), ); let map = rq.start_tasks_duration(&rqs, Duration::new(30, 0)); @@ -225,8 +230,9 @@ fn test_rqueue_generic_resource1_priorities() { let request: ResourceRequest = cpus_compact(2).add(1, 2).finish(); - worker_task_add(&mut rq, &mut rqs, 10, request, 1); - worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(4).finish(), 1); + let p = Priority::new(1); + worker_task_add(&mut rq, &mut rqs, 10, request, p); + worker_task_add(&mut rq, &mut rqs, 11, cpus_compact(4).finish(), p); let map = rq.start_tasks(&rqs); assert!(!map.contains_key(&10)); @@ -246,13 +252,13 @@ fn test_rqueue_generic_resource2_priorities() { let mut rq = RB::new(wait_queue(resources)); let request: ResourceRequest = cpus_compact(2).add(1, 8).finish(); - worker_task_add(&mut rq, &mut rqs, 10, request, 1); + worker_task_add(&mut rq, &mut rqs, 10, request, Priority::new(1)); let request: ResourceRequest = cpus_compact(2).add(1, 12).finish(); - worker_task_add(&mut rq, &mut rqs, 11, request, 1); + worker_task_add(&mut rq, &mut rqs, 11, request, Priority::new(1)); let request: ResourceRequest = cpus_compact(2).add(2, 50_000_000).finish(); - worker_task_add(&mut rq, &mut rqs, 12, request, 1); + worker_task_add(&mut rq, &mut rqs, 12, request, Priority::new(1)); let map = rq.start_tasks(&rqs); assert!(!map.contains_key(&10)); @@ -272,14 +278,15 @@ fn test_rqueue_generic_resource3_priorities() { let mut rq = RB::new(wait_queue(resources)); + let p1 = Priority::new(1); let request: ResourceRequest = cpus_compact(2).add(1, 18).finish(); - worker_task_add(&mut rq, &mut rqs, 10, request, 1); + worker_task_add(&mut rq, &mut rqs, 10, request, p1); let request: ResourceRequest = cpus_compact(2).add(1, 10).add(2, 60_000_000).finish(); - worker_task_add(&mut rq, &mut rqs, 11, request, 1); + worker_task_add(&mut rq, &mut rqs, 11, request, p1); let request: ResourceRequest = cpus_compact(2).add(2, 99_000_000).finish(); - worker_task_add(&mut rq, &mut rqs, 12, request, 1); + worker_task_add(&mut rq, &mut rqs, 12, request, p1); let map = rq.start_tasks(&rqs); assert!(!map.contains_key(&10)); @@ -306,9 +313,10 @@ fn test_worker_resource_priorities() { .add(1, 1) .finish_v(); - assert_eq!(rq.resource_priority(&rq1), 0); - assert_eq!(rq.resource_priority(&rq2), 0); - assert_eq!(rq.resource_priority(&rq3), 0); + let p = Priority::new(0); + assert_eq!(rq.resource_priority(&rq1), p); + assert_eq!(rq.resource_priority(&rq2), p); + assert_eq!(rq.resource_priority(&rq3), p); let resource_map = ResourceRqMap::default(); @@ -320,9 +328,18 @@ fn test_worker_resource_priorities() { &resource_map, ); - assert_eq!(rq.resource_priority(&rq1), 0); - assert_eq!(rq.resource_priority(&rq2), 1); - assert_eq!(rq.resource_priority(&rq3), 1); + assert_eq!( + rq.resource_priority(&rq1), + Priority::new_resource_priority(0) + ); + assert_eq!( + rq.resource_priority(&rq2), + Priority::new_resource_priority(1) + ); + assert_eq!( + rq.resource_priority(&rq3), + Priority::new_resource_priority(1) + ); rq.new_worker( 401.into(), @@ -331,9 +348,18 @@ fn test_worker_resource_priorities() { }), &resource_map, ); - assert_eq!(rq.resource_priority(&rq1), 0); - assert_eq!(rq.resource_priority(&rq2), 2); - assert_eq!(rq.resource_priority(&rq3), 1); + assert_eq!( + rq.resource_priority(&rq1), + Priority::new_resource_priority(0) + ); + assert_eq!( + rq.resource_priority(&rq2), + Priority::new_resource_priority(2) + ); + assert_eq!( + rq.resource_priority(&rq3), + Priority::new_resource_priority(1) + ); for i in 500..540 { rq.new_worker( @@ -344,15 +370,33 @@ fn test_worker_resource_priorities() { &resource_map, ); } - assert_eq!(rq.resource_priority(&rq1), 0); - assert_eq!(rq.resource_priority(&rq2), 2); - assert_eq!(rq.resource_priority(&rq3), 41); + assert_eq!( + rq.resource_priority(&rq1), + Priority::new_resource_priority(0) + ); + assert_eq!( + rq.resource_priority(&rq2), + Priority::new_resource_priority(2) + ); + assert_eq!( + rq.resource_priority(&rq3), + Priority::new_resource_priority(41) + ); rq.remove_worker(504.into(), &resource_map); - assert_eq!(rq.resource_priority(&rq1), 0); - assert_eq!(rq.resource_priority(&rq2), 2); - assert_eq!(rq.resource_priority(&rq3), 40); + assert_eq!( + rq.resource_priority(&rq1), + Priority::new_resource_priority(0) + ); + assert_eq!( + rq.resource_priority(&rq2), + Priority::new_resource_priority(2) + ); + assert_eq!( + rq.resource_priority(&rq3), + Priority::new_resource_priority(40) + ); } #[test] @@ -369,7 +413,7 @@ fn test_uniq_resource_priorities1() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(10) .resources(request) - .server_priority(1) + .priority(Priority::new(1)) .build(&mut rqs); rq.add_task(&rqs, wt); @@ -406,7 +450,7 @@ fn test_uniq_resource_priorities2() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(10) .resources(request) - .server_priority(1) + .priority(Priority::new(1)) .build(&mut rqs); rq.add_task(&rqs, wt); @@ -443,7 +487,7 @@ fn test_uniq_resource_priorities3() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(10) .resources(request) - .user_priority(1) + .priority(Priority::from_user_priority(1.into())) .build(&mut rqs); rq.add_task(&rqs, wt); @@ -471,7 +515,11 @@ fn test_different_resources_and_priorities() { let request: ResourceRequest = cpus_compact(1).add(1, 1).finish(); let wt = WorkerTaskBuilder::new(i) .resources(request) - .user_priority(if i % 2 == 0 { 0 } else { -1 }) + .priority(if i % 2 == 0 { + Priority::new(100) + } else { + Priority::new(99) + }) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -479,7 +527,7 @@ fn test_different_resources_and_priorities() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(i + 20) .resources(request) - .user_priority(-3) + .priority(Priority::new(97)) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -507,7 +555,7 @@ fn test_different_resources_and_priorities1() { let request: ResourceRequest = cpus_compact(1).add(1, 1).finish(); let wt = WorkerTaskBuilder::new(i) .resources(request) - .user_priority(if i % 2 == 0 { 0 } else { -1 }) + .priority(if i % 2 == 0 { 100 } else { 99 }) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -515,7 +563,7 @@ fn test_different_resources_and_priorities1() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(i + 20) .resources(request) - .user_priority(-3) + .priority(97) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -550,7 +598,7 @@ fn test_different_resources_and_priorities2() { let request: ResourceRequest = cpus_compact(1).add(1, 1).finish(); let wt = WorkerTaskBuilder::new(i + 10) .resources(request) - .user_priority(1) + .priority(101) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -560,7 +608,7 @@ fn test_different_resources_and_priorities2() { let request: ResourceRequest = cpus_compact(5).finish(); let wt = WorkerTaskBuilder::new(i + 20) .resources(request) - .user_priority(-3) + .priority(97) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -589,7 +637,7 @@ fn test_different_resources_and_priorities3() { let request: ResourceRequest = cpus_compact(1).add(1, 3).finish(); let wt = WorkerTaskBuilder::new(i + 10) .resources(request) - .user_priority(1) + .priority(101) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -599,7 +647,7 @@ fn test_different_resources_and_priorities3() { let request: ResourceRequest = cpus_compact(2).finish(); let wt = WorkerTaskBuilder::new(i + 20) .resources(request) - .user_priority(-3) + .priority(97) .build(&mut rqs); rq.add_task(&rqs, wt); } @@ -630,7 +678,7 @@ fn test_uniq_resource_priorities4() { let request: ResourceRequest = cpus_compact(16).finish(); let wt = WorkerTaskBuilder::new(10) .resources(request) - .server_priority(1) + .priority(101) .build(&mut rqs); rq.add_task(&rqs, wt); diff --git a/crates/tako/src/internal/worker/test_util.rs b/crates/tako/src/internal/worker/test_util.rs index 705148c33..1ae13db60 100644 --- a/crates/tako/src/internal/worker/test_util.rs +++ b/crates/tako/src/internal/worker/test_util.rs @@ -18,8 +18,7 @@ pub struct WorkerTaskBuilder { task_id: TaskId, instance_id: InstanceId, resources: Vec, - user_priority: Priority, - server_priority: Priority, + priority: Priority, data_deps: Vec, data_flags: TaskDataFlags, task_state: TaskState, @@ -31,8 +30,7 @@ impl WorkerTaskBuilder { task_id: task_id.into(), instance_id: 0.into(), resources: Vec::new(), - user_priority: 0, - server_priority: 0, + priority: Priority::new(0), data_deps: Vec::new(), data_flags: TaskDataFlags::empty(), task_state: TaskState::Waiting(0), @@ -43,13 +41,8 @@ impl WorkerTaskBuilder { self } - pub fn user_priority(mut self, priority: Priority) -> Self { - self.user_priority = priority; - self - } - - pub fn server_priority(mut self, priority: Priority) -> Self { - self.server_priority = priority; + pub fn priority>(mut self, priority: P) -> Self { + self.priority = priority.into(); self } @@ -67,13 +60,12 @@ impl WorkerTaskBuilder { shared_index: 0, id: self.task_id, instance_id: self.instance_id, - scheduler_priority: self.server_priority, + priority: self.priority, node_list: vec![], data_deps: self.data_deps, entry: None, }, ComputeTaskSharedData { - user_priority: self.user_priority, time_limit: None, data_flags: self.data_flags, body: Default::default(), @@ -94,15 +86,15 @@ pub fn worker_task_add>( rbuilder.add_task(resource_map, w); } -pub fn worker_task>( +pub fn worker_task, P: Into>( task_id: T, resources: ResourceRequest, - u_priority: Priority, + u_priority: P, requests: &mut ResourceRqMap, ) -> Task { WorkerTaskBuilder::new(task_id) .resources(resources) - .user_priority(u_priority) + .priority(u_priority) .build(requests) } From ff859e5c47ac13f4971466acb82ddf9d28df6b49 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Wed, 7 Jan 2026 10:50:50 +0100 Subject: [PATCH 3/6] Fixed HQ tests --- crates/hyperqueue/src/server/client/submit.rs | 20 +++++++++---------- crates/tako/src/internal/common/priority.rs | 10 ++++------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 42d348709..e5dc9fab2 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -547,7 +547,7 @@ mod tests { use tako::internal::tests::utils::sorted_vec; use tako::program::ProgramDefinition; use tako::resources::ResourceRqId; - use tako::{Priority, TaskId}; + use tako::{TaskId, UserPriority}; #[test] fn test_validate_submit() { @@ -565,7 +565,7 @@ mod tests { task_desc: JobTaskDescription::Array { ids: IntArray::from_range(100, 10), entries: None, - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), resource_rq: ResourceRequestVariants::default(), }, submit_dir: Default::default(), @@ -576,7 +576,7 @@ mod tests { let job_task_desc = JobTaskDescription::Array { ids: IntArray::from_range(109, 2), entries: None, - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), resource_rq: ResourceRequestVariants::default(), }; assert!(validate_submit(None, &job_task_desc).is_none()); @@ -590,7 +590,7 @@ mod tests { tasks: vec![TaskWithDependencies { id: 102.into(), resource_rq_id: LocalResourceRqId::new(0), - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), task_deps: vec![], data_deps: vec![], data_flags: TaskDataFlags::empty(), @@ -607,7 +607,7 @@ mod tests { TaskWithDependencies { id: 2.into(), resource_rq_id: LocalResourceRqId::new(0), - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), task_deps: vec![], data_deps: vec![], data_flags: TaskDataFlags::empty(), @@ -615,7 +615,7 @@ mod tests { TaskWithDependencies { id: 2.into(), resource_rq_id: LocalResourceRqId::new(0), - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), task_deps: vec![], data_deps: vec![], data_flags: TaskDataFlags::empty(), @@ -631,7 +631,7 @@ mod tests { tasks: vec![TaskWithDependencies { id: 2.into(), resource_rq_id: LocalResourceRqId::new(0), - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), task_deps: vec![3.into()], data_deps: vec![], data_flags: TaskDataFlags::empty(), @@ -646,7 +646,7 @@ mod tests { tasks: vec![TaskWithDependencies { id: 2.into(), resource_rq_id: LocalResourceRqId::new(0), - task_desc: task_desc(None, 0), + task_desc: task_desc(None, UserPriority::default()), task_deps: vec![2.into()], data_deps: vec![], data_flags: TaskDataFlags::empty(), @@ -660,7 +660,7 @@ mod tests { #[test] fn test_build_graph_with_dependencies() { - let desc = || task_desc(None, 0); + let desc = || task_desc(None, UserPriority::default()); let tasks = vec![ task(0, 0, desc(), vec![2, 1]), task(1, 0, desc(), vec![0]), @@ -696,7 +696,7 @@ mod tests { ); } - fn task_desc(time_limit: Option, priority: Priority) -> TaskDescription { + fn task_desc(time_limit: Option, priority: UserPriority) -> TaskDescription { TaskDescription { kind: TaskKind::ExternalProgram(TaskKindProgram { program: ProgramDefinition { diff --git a/crates/tako/src/internal/common/priority.rs b/crates/tako/src/internal/common/priority.rs index 01402dd8a..50d5591e7 100644 --- a/crates/tako/src/internal/common/priority.rs +++ b/crates/tako/src/internal/common/priority.rs @@ -4,16 +4,14 @@ use std::fmt::{Display, Formatter}; /// User-defined priority /// /// A larger number ==> a higher priority -/// From user perspective, negative value is allowed, but we store internally as an unsigned number, -/// So we can easily concatenate it with scheduler priorities #[derive( Default, Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, )] -pub struct UserPriority(u32); +pub struct UserPriority(i32); impl UserPriority { pub fn new(value: i32) -> Self { - Self(value as u32 ^ 0x8000_0000) + Self(value) } } @@ -25,7 +23,7 @@ impl From for UserPriority { impl Display for UserPriority { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", (self.0 ^ 0x8000_0000) as i32) + write!(f, "{}", self.0) } } @@ -43,7 +41,7 @@ impl Priority { } pub fn from_user_priority(user_priority: UserPriority) -> Self { - Priority((user_priority.0 as u64) << 32) + Priority((user_priority.0 as u64 ^ 0x8000_0000) << 32) } pub fn new_resource_priority(value: u64) -> Self { From af750a4c113e84046b2aec51aee98c4c60830e94 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Wed, 7 Jan 2026 12:09:54 +0100 Subject: [PATCH 4/6] Cleanup --- crates/pyhq/src/client/job.rs | 6 +++--- crates/tako/src/internal/tests/test_reactor.rs | 1 - crates/tako/src/internal/worker/rqueue.rs | 3 --- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index 9135261b4..9c7ef362e 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -30,7 +30,7 @@ use tako::gateway::{ }; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount}; -use tako::{JobTaskCount, Map}; +use tako::{JobTaskCount, Map, UserPriority}; #[derive(Debug, FromPyObject)] enum AllocationValue { @@ -63,7 +63,7 @@ pub struct TaskDescription { stdin: Option>, dependencies: Vec, task_dir: bool, - priority: tako::Priority, + priority: i32, resource_request: Vec, crash_limit: Option, } @@ -251,7 +251,7 @@ fn build_task_desc(desc: TaskDescription, submit_dir: &Path) -> anyhow::Result, ResourceVariantId)>, ) -> bool { - dbg!(&self.queues); let current_priority: Priority = if let Some(Some(priority)) = self.queues.values().map(|qfr| qfr.current_priority()).max() { @@ -219,11 +218,9 @@ impl ResourceWaitQueue { } else { return true; }; - dbg!(current_priority); for rqv in &self.requests { let qfr = self.queues.get_mut(rqv).unwrap(); while let Some((_task_id, priority)) = qfr.peek() { - dbg!(priority); if current_priority != priority { break; } From 7c89eaee102495f72cf1e573ff580075d2dabdde Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Wed, 7 Jan 2026 14:38:10 +0100 Subject: [PATCH 5/6] Benchmarks fixed --- crates/tako/benches/benchmarks/worker.rs | 3 +-- crates/tako/benches/utils/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/tako/benches/benchmarks/worker.rs b/crates/tako/benches/benchmarks/worker.rs index d1a89dbb5..33d113e39 100644 --- a/crates/tako/benches/benchmarks/worker.rs +++ b/crates/tako/benches/benchmarks/worker.rs @@ -64,13 +64,12 @@ fn create_worker_task(id: u32, resource_rq_id: ResourceRqId) -> Task { id: TaskId::new_test(id), resource_rq_id, instance_id: Default::default(), - scheduler_priority: 0, + priority: Default::default(), node_list: vec![], data_deps: vec![], entry: None, }, ComputeTaskSharedData { - user_priority: 0, time_limit: None, data_flags: TaskDataFlags::empty(), body: Default::default(), diff --git a/crates/tako/benches/utils/mod.rs b/crates/tako/benches/utils/mod.rs index 648e4c643..4f866baf4 100644 --- a/crates/tako/benches/utils/mod.rs +++ b/crates/tako/benches/utils/mod.rs @@ -11,11 +11,11 @@ use tako::resources::{ }; use tako::worker::ServerLostPolicy; use tako::worker::WorkerConfiguration; -use tako::{TaskId, WorkerId}; +use tako::{TaskId, UserPriority, WorkerId}; pub fn create_task(id: TaskId, resource_rq_id: ResourceRqId) -> Task { let conf = TaskConfiguration { - user_priority: 0, + user_priority: UserPriority::default(), time_limit: None, crash_limit: CrashLimit::default(), data_flags: TaskDataFlags::empty(), From 31b4cc32d0e5a2468846e23870108dde6d6d4281 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 9 Jan 2026 18:41:46 +0100 Subject: [PATCH 6/6] Improvements after code review --- .../hyperqueue/src/client/commands/submit/command.rs | 10 +++++++--- crates/tako/src/internal/common/priority.rs | 12 +++++++++++- crates/tako/src/internal/server/task.rs | 2 +- crates/tako/src/internal/tests/test_reactor.rs | 4 ++-- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 7fd125911..1774ce989 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -294,8 +294,8 @@ pub struct SubmitJobTaskConfOpts { array: Option, /// Tasks priority - #[arg(long, default_value_t = 0)] - priority: i32, + #[arg(long, default_value_t = UserPriority::new(0), value_parser = parse_user_priority)] + priority: UserPriority, /// Task's time limit #[arg( @@ -377,6 +377,10 @@ impl OptsWithMatches { } } +fn parse_user_priority(s: &str) -> Result { + Ok(UserPriority::new(s.parse()?)) +} + /// Returns true if the given parameter has been specified explicitly. fn has_parameter(matches: &ArgMatches, id: &str) -> bool { if let Ok(true) = matches.try_contains_id(id) { @@ -727,7 +731,7 @@ pub async fn submit_computation( let task_desc = TaskDescription { kind: task_kind, - priority: UserPriority::new(priority), + priority, time_limit, crash_limit, }; diff --git a/crates/tako/src/internal/common/priority.rs b/crates/tako/src/internal/common/priority.rs index 50d5591e7..48d1ffe1e 100644 --- a/crates/tako/src/internal/common/priority.rs +++ b/crates/tako/src/internal/common/priority.rs @@ -41,6 +41,9 @@ impl Priority { } pub fn from_user_priority(user_priority: UserPriority) -> Self { + // We want to set user priority as high 32b bits (low bits are scheduler priority) + // We also need to invert sign bit (via ^0x8000_0000) to make sure to maintain ordering + // of priorities since we are as casting from signed integer. Priority((user_priority.0 as u64 ^ 0x8000_0000) << 32) } @@ -48,7 +51,14 @@ impl Priority { Priority(value << 16) } - pub fn remove_priority_u32(&self, value: u32) -> Priority { + // Add priority to the lowest 32 bits, + pub fn add_priority_u32(&self, value: u32) -> Priority { + Priority(self.0.saturating_add(value as u64)) + } + + // Add "inverted" priority to the lowest 32 bits; i.e. a smaller number boosts priority more + // than a larger number. + pub fn add_inverted_priority_u32(&self, value: u32) -> Priority { Priority(self.0.saturating_add((u32::MAX - value) as u64)) } diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index 71922877a..1ea45d0cf 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -193,7 +193,7 @@ impl Task { #[inline] pub(crate) fn priority(&self) -> Priority { Priority::from_user_priority(self.configuration.user_priority) - .remove_priority_u32(self.id.job_id().as_num()) + .add_inverted_priority_u32(self.id.job_id().as_num()) } #[inline] diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 3047ee783..7d4a4e5a5 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -173,11 +173,11 @@ fn test_scheduler_priority() { assert_eq!( core.get_task(TaskId::new_test(501)).priority(), - Priority::from_user_priority(0.into()).remove_priority_u32(0) + Priority::from_user_priority(0.into()).add_inverted_priority_u32(0) ); assert_eq!( core.get_task(task_id8).priority(), - Priority::from_user_priority(0.into()).remove_priority_u32(123) + Priority::from_user_priority(0.into()).add_inverted_priority_u32(123) ); }