From 698975765b33090adf9d0beea53367cd4e60e310 Mon Sep 17 00:00:00 2001 From: Shahzad Malik Muzaffar Date: Tue, 20 Jan 2026 12:32:29 +0100 Subject: [PATCH 1/5] make sure the pending task is still in pendingJobs queue --- scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scheduler.py b/scheduler.py index ffb23da..cb1fd2f 100644 --- a/scheduler.py +++ b/scheduler.py @@ -175,7 +175,8 @@ def __doRescheduleParallel(self): if dumpMsg: self.log("Pending tasks: %s: %s" % (taskId, pendingDeps),30) continue - allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) + if taskId in self.pendingJobs: + allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) buildJobs =[] downloadJobs = [] forceJobs = [] From 8efd5eab963b095da9d29195a1cef7731c2e09b9 Mon Sep 17 00:00:00 2001 From: Shahzad Malik Muzaffar Date: Mon, 26 Jan 2026 13:19:39 +0100 Subject: [PATCH 2/5] resursively mark job broken if a deps is broken --- scheduler.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/scheduler.py b/scheduler.py index cb1fd2f..bb09e98 100644 --- a/scheduler.py +++ b/scheduler.py @@ -147,12 +147,18 @@ def __doRescheduleParallel(self): parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"] # First of all clean up the pending parallel jobs from all those # which have broken dependencies. - for taskId in parallelJobs: - brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] - if not brokenDeps: - continue - transition(taskId, self.pendingJobs, self.brokenJobs, "parallel:pending->broken") - self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) + while True: + has_broken = False + for taskId in parallelJobs[:]: + brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] + if not brokenDeps: + continue + has_broken = True + parallelJobs.remove(taskId) + transition(taskId, self.pendingJobs, self.brokenJobs, "parallel:pending->broken") + self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) + if not has_broken: + break # If no tasks left, quit. Notice we need to check also for serial jobs # since they might queue more parallel payloads. @@ -175,8 +181,7 @@ def __doRescheduleParallel(self): if dumpMsg: self.log("Pending tasks: %s: %s" % (taskId, pendingDeps),30) continue - if taskId in self.pendingJobs: - allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) + allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) buildJobs =[] downloadJobs = [] forceJobs = [] From b7bcd059d8928ed0f02c5fa76b2d8ba41eda5b14 Mon Sep 17 00:00:00 2001 From: Shahzad Malik Muzaffar Date: Mon, 26 Jan 2026 23:26:21 +0100 Subject: [PATCH 3/5] do not fail on forceDone tasks --- scheduler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/scheduler.py b/scheduler.py index bb09e98..c92a713 100644 --- a/scheduler.py +++ b/scheduler.py @@ -16,10 +16,10 @@ def transition(what, fromList, toList, key="unknow"): try: print("DEBUG:%s: transition %s" % (key, what)) fromList.remove(what) + toList.append(what) except ValueError as e: print (what + " not in source list") - raise e - toList.append(what) + return class Scheduler(object): # A simple job scheduler. @@ -181,7 +181,8 @@ def __doRescheduleParallel(self): if dumpMsg: self.log("Pending tasks: %s: %s" % (taskId, pendingDeps),30) continue - allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) + if taskId in self.pendingJobs: + allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) buildJobs =[] downloadJobs = [] forceJobs = [] From 5b2ea80d20542e982e894a3af3e02a31de87cac9 Mon Sep 17 00:00:00 2001 From: Shahzad Malik Muzaffar Date: Tue, 27 Jan 2026 11:30:45 +0100 Subject: [PATCH 4/5] release resources if fails to schedule a pending job --- scheduler.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/scheduler.py b/scheduler.py index c92a713..d458b65 100644 --- a/scheduler.py +++ b/scheduler.py @@ -19,7 +19,8 @@ def transition(what, fromList, toList, key="unknow"): toList.append(what) except ValueError as e: print (what + " not in source list") - return + return False + return True class Scheduler(object): # A simple job scheduler. @@ -206,10 +207,12 @@ def __doRescheduleParallel(self): else: buildJobs = buildJobs[:bldCount] for taskId in forceJobs + downloadJobs + buildJobs: - taskType = taskId.split("-")[0] - self.runningJobsCount[taskType] += 1 - transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running") - self.__scheduleParallel(taskId, self.jobs[taskId]["spec"], priorty=self.jobs[taskId]["priorty"]) + if transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running") + taskType = taskId.split("-")[0] + self.runningJobsCount[taskType] += 1 + self.__scheduleParallel(taskId, self.jobs[taskId]["spec"], priorty=self.jobs[taskId]["priorty"]) + elif self.resourceManager: + self.resourceManager.releaseResourcesForExternal(taskId) # Update the job with the result of running. def __updateJobStatus(self, taskId, error): From 7ab46ca8c1bd024aa1e3b14fe03c40869d00ecdf Mon Sep 17 00:00:00 2001 From: Malik Shahzad Muzaffar Date: Tue, 27 Jan 2026 11:58:37 +0100 Subject: [PATCH 5/5] Update scheduler.py --- scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler.py b/scheduler.py index d458b65..6659ae8 100644 --- a/scheduler.py +++ b/scheduler.py @@ -207,7 +207,7 @@ def __doRescheduleParallel(self): else: buildJobs = buildJobs[:bldCount] for taskId in forceJobs + downloadJobs + buildJobs: - if transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running") + if transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running"): taskType = taskId.split("-")[0] self.runningJobsCount[taskType] += 1 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"], priorty=self.jobs[taskId]["priorty"])