Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tako::gateway::{
};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount};
use tako::{JobId, JobTaskCount, Map};
use tako::{JobId, JobTaskCount, Map, UserPriority};

const SUBMIT_ARRAY_LIMIT: JobTaskCount = 999;

Expand Down Expand Up @@ -294,8 +294,8 @@ pub struct SubmitJobTaskConfOpts {
array: Option<IntArray>,

/// Tasks priority
#[arg(long, default_value_t = 0)]
priority: tako::Priority,
#[arg(long, default_value_t = UserPriority::new(0), value_parser = parse_user_priority)]
priority: UserPriority,

/// Task's time limit
#[arg(
Expand Down Expand Up @@ -377,6 +377,10 @@ impl OptsWithMatches<SubmitJobTaskConfOpts> {
}
}

fn parse_user_priority(s: &str) -> Result<UserPriority, anyhow::Error> {
Ok(UserPriority::new(s.parse()?))
}

/// Returns true if the given parameter has been specified explicitly.
fn has_parameter(matches: &ArgMatches, id: &str) -> bool {
if let Ok(true) = matches.try_contains_id(id) {
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/commands/submit/defs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;
use tako::gateway::{CrashLimit, ResourceRequest, ResourceRequestEntries, ResourceRequestEntry};
use tako::program::FileOnCloseBehavior;
use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount};
use tako::{JobTaskCount, JobTaskId, Map, Priority};
use tako::{JobTaskCount, JobTaskId, Map, UserPriority};

#[derive(Deserialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -244,7 +244,7 @@ pub struct TaskConfigDef {
pub time_limit: Option<Duration>,

#[serde(default)]
pub priority: Priority,
pub priority: UserPriority,

#[serde(default, deserialize_with = "deserialize_crash_limit")]
pub crash_limit: CrashLimit,
Expand Down
20 changes: 10 additions & 10 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ mod tests {
use tako::internal::tests::utils::sorted_vec;
use tako::program::ProgramDefinition;
use tako::resources::ResourceRqId;
use tako::{Priority, TaskId};
use tako::{TaskId, UserPriority};

#[test]
fn test_validate_submit() {
Expand All @@ -565,7 +565,7 @@ mod tests {
task_desc: JobTaskDescription::Array {
ids: IntArray::from_range(100, 10),
entries: None,
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
resource_rq: ResourceRequestVariants::default(),
},
submit_dir: Default::default(),
Expand All @@ -576,7 +576,7 @@ mod tests {
let job_task_desc = JobTaskDescription::Array {
ids: IntArray::from_range(109, 2),
entries: None,
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
resource_rq: ResourceRequestVariants::default(),
};
assert!(validate_submit(None, &job_task_desc).is_none());
Expand All @@ -590,7 +590,7 @@ mod tests {
tasks: vec![TaskWithDependencies {
id: 102.into(),
resource_rq_id: LocalResourceRqId::new(0),
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
task_deps: vec![],
data_deps: vec![],
data_flags: TaskDataFlags::empty(),
Expand All @@ -607,15 +607,15 @@ mod tests {
TaskWithDependencies {
id: 2.into(),
resource_rq_id: LocalResourceRqId::new(0),
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
task_deps: vec![],
data_deps: vec![],
data_flags: TaskDataFlags::empty(),
},
TaskWithDependencies {
id: 2.into(),
resource_rq_id: LocalResourceRqId::new(0),
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
task_deps: vec![],
data_deps: vec![],
data_flags: TaskDataFlags::empty(),
Expand All @@ -631,7 +631,7 @@ mod tests {
tasks: vec![TaskWithDependencies {
id: 2.into(),
resource_rq_id: LocalResourceRqId::new(0),
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
task_deps: vec![3.into()],
data_deps: vec![],
data_flags: TaskDataFlags::empty(),
Expand All @@ -646,7 +646,7 @@ mod tests {
tasks: vec![TaskWithDependencies {
id: 2.into(),
resource_rq_id: LocalResourceRqId::new(0),
task_desc: task_desc(None, 0),
task_desc: task_desc(None, UserPriority::default()),
task_deps: vec![2.into()],
data_deps: vec![],
data_flags: TaskDataFlags::empty(),
Expand All @@ -660,7 +660,7 @@ mod tests {

#[test]
fn test_build_graph_with_dependencies() {
let desc = || task_desc(None, 0);
let desc = || task_desc(None, UserPriority::default());
let tasks = vec![
task(0, 0, desc(), vec![2, 1]),
task(1, 0, desc(), vec![0]),
Expand Down Expand Up @@ -696,7 +696,7 @@ mod tests {
);
}

fn task_desc(time_limit: Option<Duration>, priority: Priority) -> TaskDescription {
fn task_desc(time_limit: Option<Duration>, priority: UserPriority) -> TaskDescription {
TaskDescription {
kind: TaskKind::ExternalProgram(TaskKindProgram {
program: ProgramDefinition {
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tako::program::ProgramDefinition;
use tako::resources::{ResourceDescriptor, ResourceRqId};
use tako::server::TaskExplanation;
use tako::worker::WorkerConfiguration;
use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, WorkerId, define_id_type};
use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, UserPriority, WorkerId, define_id_type};

// Messages client -> server
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -152,7 +152,7 @@ define_id_type!(LocalResourceRqId, u32);
pub struct TaskDescription {
pub kind: TaskKind,
pub time_limit: Option<Duration>,
pub priority: tako::Priority,
pub priority: UserPriority,
pub crash_limit: CrashLimit,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/pyhq/src/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tako::gateway::{
};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount};
use tako::{JobTaskCount, Map};
use tako::{JobTaskCount, Map, UserPriority};

#[derive(Debug, FromPyObject)]
enum AllocationValue {
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct TaskDescription {
stdin: Option<Vec<u8>>,
dependencies: Vec<u32>,
task_dir: bool,
priority: tako::Priority,
priority: i32,
resource_request: Vec<ResourceRequestDescription>,
crash_limit: Option<u16>,
}
Expand Down Expand Up @@ -251,7 +251,7 @@ fn build_task_desc(desc: TaskDescription, submit_dir: &Path) -> anyhow::Result<H
pin_mode: PinMode::None,
task_dir: desc.task_dir,
}),
priority: desc.priority,
priority: UserPriority::new(desc.priority),
time_limit: None,
crash_limit: desc
.crash_limit
Expand Down
3 changes: 1 addition & 2 deletions crates/tako/benches/benchmarks/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ fn create_worker_task(id: u32, resource_rq_id: ResourceRqId) -> Task {
id: TaskId::new_test(id),
resource_rq_id,
instance_id: Default::default(),
scheduler_priority: 0,
priority: Default::default(),
node_list: vec![],
data_deps: vec![],
entry: None,
},
ComputeTaskSharedData {
user_priority: 0,
time_limit: None,
data_flags: TaskDataFlags::empty(),
body: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions crates/tako/benches/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use tako::resources::{
};
use tako::worker::ServerLostPolicy;
use tako::worker::WorkerConfiguration;
use tako::{TaskId, WorkerId};
use tako::{TaskId, UserPriority, WorkerId};

pub fn create_task(id: TaskId, resource_rq_id: ResourceRqId) -> Task {
let conf = TaskConfiguration {
user_priority: 0,
user_priority: UserPriority::default(),
time_limit: None,
crash_limit: CrashLimit::default(),
data_flags: TaskDataFlags::empty(),
Expand Down
4 changes: 2 additions & 2 deletions crates/tako/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::internal::common::error::DsError;
use crate::internal::common::resources::ResourceRqId;
use crate::internal::datasrv::dataobj::DataObjectId;
use crate::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount};
use crate::{InstanceId, Map, Priority, TaskId};
use crate::{InstanceId, Map, TaskId, UserPriority};
use serde::{Deserialize, Serialize};
use smallvec::{SmallVec, smallvec};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Display for CrashLimit {
pub struct SharedTaskConfiguration {
pub time_limit: Option<Duration>,

pub priority: Priority,
pub priority: UserPriority,

pub crash_limit: CrashLimit,

Expand Down
2 changes: 2 additions & 0 deletions crates/tako/src/internal/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod data_structures;
pub(crate) mod error;
pub(crate) mod ids;
pub(crate) mod index;
mod priority;
pub mod resources;
pub(crate) mod rpc;
pub(crate) mod stablemap;
Expand All @@ -13,4 +14,5 @@ pub(crate) mod utils;
pub(crate) mod wrapped;

pub use data_structures::{Map, Set};
pub use priority::{Priority, UserPriority};
pub use wrapped::WrappedRcRefCell;
74 changes: 74 additions & 0 deletions crates/tako/src/internal/common/priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

/// User-defined priority
///
/// A larger number ==> a higher priority
#[derive(
Default, Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize,
)]
pub struct UserPriority(i32);

impl UserPriority {
pub fn new(value: i32) -> Self {
Self(value)
}
}

impl From<i32> for UserPriority {
fn from(value: i32) -> Self {
UserPriority::new(value)
}
}

impl Display for UserPriority {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// Main priority type that mixes user priority, scheduler priority, resource priority
///
/// A larger number ==> a higher priority
#[derive(
Default, Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize,
)]
pub struct Priority(u64);

impl Priority {
pub fn new(value: u64) -> Self {
Self(value)
}

pub fn from_user_priority(user_priority: UserPriority) -> Self {
// We want to set user priority as high 32b bits (low bits are scheduler priority)
// We also need to invert sign bit (via ^0x8000_0000) to make sure to maintain ordering
// of priorities since we are as casting from signed integer.
Priority((user_priority.0 as u64 ^ 0x8000_0000) << 32)
}

pub fn new_resource_priority(value: u64) -> Self {
Priority(value << 16)
}

// Add priority to the lowest 32 bits,
pub fn add_priority_u32(&self, value: u32) -> Priority {
Priority(self.0.saturating_add(value as u64))
}

// Add "inverted" priority to the lowest 32 bits; i.e. a smaller number boosts priority more
// than a larger number.
pub fn add_inverted_priority_u32(&self, value: u32) -> Priority {
Priority(self.0.saturating_add((u32::MAX - value) as u64))
}

pub fn combine(self, other: Priority) -> Priority {
Priority(self.0.saturating_add(other.0))
}
}

impl From<u64> for Priority {
fn from(value: u64) -> Self {
Self(value)
}
}
3 changes: 1 addition & 2 deletions crates/tako/src/internal/messages/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ pub struct ComputeTaskSeparateData {
pub id: TaskId,
pub resource_rq_id: ResourceRqId,
pub instance_id: InstanceId,
pub scheduler_priority: Priority,
pub priority: Priority,
pub node_list: Vec<WorkerId>,
pub data_deps: Vec<DataObjectId>,
pub entry: Option<EntryType>,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct ComputeTaskSharedData {
pub user_priority: Priority,
pub time_limit: Option<Duration>,
pub data_flags: TaskDataFlags,
pub body: Rc<[u8]>,
Expand Down
21 changes: 7 additions & 14 deletions crates/tako/src/internal/scheduler/multinode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ use crate::internal::server::worker::Worker;
use crate::internal::server::workergroup::WorkerGroup;
use crate::internal::server::workermap::WorkerMap;
use crate::resources::ResourceRequest;
use crate::{Map, PriorityTuple, TaskId, WorkerId};
use crate::{Map, Priority, TaskId, WorkerId};
use priority_queue::PriorityQueue;

struct QueueForRequest {
queue: PriorityQueue<TaskId, PriorityTuple>,
queue: PriorityQueue<TaskId, Priority>,
sleeping: bool,
}

impl QueueForRequest {
pub fn peek(&self) -> Option<(TaskId, PriorityTuple)> {
pub fn peek(&self) -> Option<(TaskId, Priority)> {
self.queue
.peek()
.map(|(task_id, priority)| (*task_id, (priority.0, priority.1)))
.map(|(task_id, priority)| (*task_id, *priority))
}
pub fn current_priority(&self) -> Option<PriorityTuple> {
pub fn current_priority(&self) -> Option<Priority> {
self.peek().map(|x| x.1)
}
}
Expand All @@ -31,13 +31,6 @@ pub(crate) struct MultiNodeQueue {
requests: Vec<ResourceRqId>,
}

fn task_priority_tuple(task: &Task) -> PriorityTuple {
(
task.configuration.user_priority,
task.get_scheduler_priority(),
)
}

impl MultiNodeQueue {
pub fn shrink_to_fit(&mut self) {
self.queues.shrink_to_fit();
Expand Down Expand Up @@ -68,7 +61,7 @@ impl MultiNodeQueue {
})
.queue
};
queue.push(task.id, task_priority_tuple(task));
queue.push(task.id, task.priority());
}

pub fn wakeup_sleeping_tasks(&mut self) {
Expand Down Expand Up @@ -184,7 +177,7 @@ impl<'a> MultiNodeAllocator<'a> {

pub fn try_allocate_task(self) -> Option<(TaskId, Vec<WorkerId>)> {
'outer: loop {
let current_priority: PriorityTuple = if let Some(Some(priority)) = self
let current_priority = if let Some(Some(priority)) = self
.mn_queue
.queues
.values()
Expand Down
Loading
Loading