diff --git a/scheduler.py b/scheduler.py index ffb23da..6659ae8 100644 --- a/scheduler.py +++ b/scheduler.py @@ -16,10 +16,11 @@ def transition(what, fromList, toList, key="unknow"): try: print("DEBUG:%s: transition %s" % (key, what)) fromList.remove(what) + toList.append(what) except ValueError as e: print (what + " not in source list") - raise e - toList.append(what) + return False + return True class Scheduler(object): # A simple job scheduler. @@ -147,12 +148,18 @@ def __doRescheduleParallel(self): parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"] # First of all clean up the pending parallel jobs from all those # which have broken dependencies. - for taskId in parallelJobs: - brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] - if not brokenDeps: - continue - transition(taskId, self.pendingJobs, self.brokenJobs, "parallel:pending->broken") - self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) + while True: + has_broken = False + for taskId in parallelJobs[:]: + brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] + if not brokenDeps: + continue + has_broken = True + parallelJobs.remove(taskId) + transition(taskId, self.pendingJobs, self.brokenJobs, "parallel:pending->broken") + self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) + if not has_broken: + break # If no tasks left, quit. Notice we need to check also for serial jobs # since they might queue more parallel payloads. @@ -175,7 +182,8 @@ def __doRescheduleParallel(self): if dumpMsg: self.log("Pending tasks: %s: %s" % (taskId, pendingDeps),30) continue - allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) + if taskId in self.pendingJobs: + allJobs.append({"id": taskId, "priorty": self.jobs[taskId]["priorty"]}) buildJobs =[] downloadJobs = [] forceJobs = [] @@ -199,10 +207,12 @@ def __doRescheduleParallel(self): else: buildJobs = buildJobs[:bldCount] for taskId in forceJobs + downloadJobs + buildJobs: - taskType = taskId.split("-")[0] - self.runningJobsCount[taskType] += 1 - transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running") - self.__scheduleParallel(taskId, self.jobs[taskId]["spec"], priorty=self.jobs[taskId]["priorty"]) + if transition(taskId, self.pendingJobs, self.runningJobs, "parallel:pending->running"): + taskType = taskId.split("-")[0] + self.runningJobsCount[taskType] += 1 + self.__scheduleParallel(taskId, self.jobs[taskId]["spec"], priorty=self.jobs[taskId]["priorty"]) + elif self.resourceManager: + self.resourceManager.releaseResourcesForExternal(taskId) # Update the job with the result of running. def __updateJobStatus(self, taskId, error):