From 69198c5ad0096eed2f959f299b378d4f4dad30dd Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Tue, 4 Nov 2025 12:38:29 -0500 Subject: [PATCH 1/5] With this patch Globus uploads are no longer locking datasets for edits and/or further uploads (when the async. task management mode is enabled). --- .../edu/harvard/iq/dataverse/DatasetPage.java | 24 +++++++++++++------ .../iq/dataverse/PermissionServiceBean.java | 20 +++++++++++++--- .../dataverse/globus/GlobusServiceBean.java | 20 ++++++++++++++++ src/main/java/propertyFiles/Bundle.properties | 1 + src/main/webapp/dataset.xhtml | 15 ++++++++---- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java b/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java index b41e8d4ac35..c2f41beb393 100644 --- a/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java +++ b/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java @@ -157,6 +157,7 @@ import edu.harvard.iq.dataverse.search.SearchFields; import edu.harvard.iq.dataverse.search.SearchUtil; import edu.harvard.iq.dataverse.search.SolrClientService; +import edu.harvard.iq.dataverse.settings.FeatureFlags; import edu.harvard.iq.dataverse.settings.JvmSettings; import edu.harvard.iq.dataverse.util.SignpostingResources; import edu.harvard.iq.dataverse.util.FileMetadataUtil; @@ -2367,7 +2368,8 @@ public boolean isValidOrCanReviewIncomplete() { private void displayLockInfo(Dataset dataset) { // Various info messages, when the dataset is locked (for various reasons): - if (dataset.isLocked() && canUpdateDataset()) { + boolean globusUploadInProgress = globusService.isUploadTaskInProgressForDataset(dataset.getId()); + if ((dataset.isLocked() || globusUploadInProgress) && canUpdateDataset()) { if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("dataset.locked.message"), BundleUtil.getStringFromBundle("dataset.locked.message.details")); @@ -2381,9 +2383,17 @@ private void displayLockInfo(Dataset dataset) { BundleUtil.getStringFromBundle("file.rsyncUpload.inProgressMessage.details")); lockedDueToDcmUpload = true; } - if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { - JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), - BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.details")); + if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload) + || globusUploadInProgress) { + // (prod. patch 6.8) fall back to the old-style Globus lock message unless + // the new, async task mgmt model is used. + if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), + BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessageAsync.details")); + } else { + JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), + BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.details")); + } } //This is a hack to remove dataset locks for File PID registration if //the dataset is released @@ -4357,10 +4367,10 @@ public boolean isStillLockedForAnyReason() { if (dataset.getId() != null) { Dataset testDataset = datasetService.find(dataset.getId()); if (testDataset != null && testDataset.getId() != null) { + // Refresh the info messages, in case the dataset has been + // re-locked with a different lock type (or a Globus upload task is in progress): + displayLockInfo(testDataset); if (testDataset.getLocks().size() > 0) { - // Refresh the info messages, in case the dataset has been - // re-locked with a different lock type: - displayLockInfo(testDataset); return true; } } diff --git a/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java index d492991bb62..f106a218d81 100644 --- a/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java @@ -27,6 +27,8 @@ import edu.harvard.iq.dataverse.engine.command.exception.IllegalCommandException; import edu.harvard.iq.dataverse.engine.command.impl.PublishDatasetCommand; import edu.harvard.iq.dataverse.engine.command.impl.UpdateDatasetVersionCommand; +import edu.harvard.iq.dataverse.globus.GlobusServiceBean; +import edu.harvard.iq.dataverse.settings.FeatureFlags; import edu.harvard.iq.dataverse.util.BundleUtil; import edu.harvard.iq.dataverse.workflow.PendingWorkflowInvocation; import edu.harvard.iq.dataverse.workflow.WorkflowServiceBean; @@ -85,6 +87,9 @@ public class PermissionServiceBean { @EJB GroupServiceBean groupService; + + @EJB + GlobusServiceBean globusService; @Inject DataverseSession session; @@ -779,8 +784,14 @@ else if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { else if (dataset.isLockedFor(DatasetLock.Reason.DcmUpload)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); } + /** + * prod. patch 6.8: as an experiment, not locking datasets for edits while Globus uploads in progress: + */ else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { - throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); + // ... but we'll keep it locked for edits unless the new, async task mgmt. is in use + if (!FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); + } } else if (dataset.isLockedFor(DatasetLock.Reason.EditInProgress)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); @@ -806,7 +817,9 @@ public void checkUpdateDatasetVersionLock(Dataset dataset, DataverseRequest data } public void checkPublishDatasetLock(Dataset dataset, DataverseRequest dataverseRequest, Command command) throws IllegalCommandException { - if (dataset.isLocked()) { + // prod. patch 6.8: + boolean globusUploadInProgress = globusService.isUploadTaskInProgressForDataset(dataset.getId()); + if (dataset.isLocked() || globusUploadInProgress) { if (dataset.isLockedFor(DatasetLock.Reason.Ingest)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } @@ -825,7 +838,8 @@ else if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { else if (dataset.isLockedFor(DatasetLock.Reason.DcmUpload)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } - else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { + else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload) + || globusUploadInProgress) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } else if (dataset.isLockedFor(DatasetLock.Reason.EditInProgress)) { diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index 789e0883a7c..0aea1b29425 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -51,6 +51,7 @@ import org.primefaces.PrimeFaces; import com.google.gson.Gson; +import com.rometools.utils.Lists; import edu.harvard.iq.dataverse.api.ApiConstants; import edu.harvard.iq.dataverse.authorization.AuthenticationServiceBean; import edu.harvard.iq.dataverse.authorization.users.ApiToken; @@ -1700,6 +1701,25 @@ public List findAllOngoingTasks(GlobusTaskInProgress.TaskT return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType order by o.startTime", GlobusTaskInProgress.class).setParameter("taskType", taskType).getResultList(); } + public List findAllOngoingTasksForDataset(GlobusTaskInProgress.TaskType taskType, Long datasetId) { + return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType and o.dataset.id=:datasetId order by o.startTime", GlobusTaskInProgress.class) + .setParameter("taskType", taskType) + .setParameter("datasetId", datasetId) + .getResultList(); + } + + /** + * (prod. patch 6.8) + * @param datasetId + * @return + */ + public boolean isUploadTaskInProgressForDataset(Long datasetId) { + if (!FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + return false; + } + return Lists.isNotEmpty(findAllOngoingTasksForDataset(GlobusTaskInProgress.TaskType.UPLOAD, datasetId)); + } + public boolean isRuleInUseByOtherTasks(String ruleId) { Long numTask = em.createQuery("select count(o) from GlobusTaskInProgress as o where o.ruleId=:ruleId", Long.class).setParameter("ruleId", ruleId).getSingleResult(); return numTask > 1; diff --git a/src/main/java/propertyFiles/Bundle.properties b/src/main/java/propertyFiles/Bundle.properties index f6ca8a9d4f0..551339d46fb 100644 --- a/src/main/java/propertyFiles/Bundle.properties +++ b/src/main/java/propertyFiles/Bundle.properties @@ -1956,6 +1956,7 @@ file.rsyncUpload.rsyncUploadDisabledDueFileUploadedViaHttp=Upload with rsync + S file.rsyncUpload.rsyncUploadDisabledDueFileUploadedViaHttpAndPublished=Upload with rsync + SSH is disabled for this dataset because you have already uploaded files via HTTP and published the dataset. file.globusUpload.inProgressMessage.summary=Globus Transfer in Progress file.globusUpload.inProgressMessage.details=This dataset is locked while the data files are being transferred and verified. Large transfers may take significant time. You can check transfer status at https://app.globus.org/activity. +file.globusUpload.inProgressMessageAsync.details=This dataset cannot be published while Globus transfers are in progress. Large transfers may take significant time. You can check transfer status at https://app.globus.org/activity. file.metaData.checksum.copy=Click to copy file.metaData.dataFile.dataTab.unf=UNF file.metaData.dataFile.dataTab.variables=Variables diff --git a/src/main/webapp/dataset.xhtml b/src/main/webapp/dataset.xhtml index 7729b6da442..bd2f21e7c02 100644 --- a/src/main/webapp/dataset.xhtml +++ b/src/main/webapp/dataset.xhtml @@ -360,16 +360,23 @@
- - + + - - + + + + + + + + + #{showPublishLink ? bundle['dataset.publishBtn'] : (latestVersionInReview ? bundle['dataset.disabledSubmittedBtn'] : bundle['dataset.submitBtn'])} From 93478fa57dc6ec3171cb1ba403b15a9f0508e09d Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Tue, 18 Nov 2025 11:00:27 -0500 Subject: [PATCH 2/5] Unused commented-out code removed. --- src/main/webapp/dataset.xhtml | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/webapp/dataset.xhtml b/src/main/webapp/dataset.xhtml index 031d987dea5..9c94dbe130a 100644 --- a/src/main/webapp/dataset.xhtml +++ b/src/main/webapp/dataset.xhtml @@ -374,9 +374,6 @@ - - - #{showPublishLink ? bundle['dataset.publishBtn'] : (latestVersionInReview ? bundle['dataset.disabledSubmittedBtn'] : bundle['dataset.submitBtn'])} From 39ec05aac428a709cf0458d05f7b2f288df9ec5f Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Fri, 16 Jan 2026 14:54:18 -0500 Subject: [PATCH 3/5] An extra Globus patch, to make sure access rules that may still be in use by other tasks are not removed when upload transfers are processed. --- .../dataverse/globus/GlobusServiceBean.java | 43 ++++++++++----- .../globus/TaskMonitoringServiceBean.java | 53 ++++++++----------- 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index 0aea1b29425..1e6108eea4a 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -941,6 +941,8 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques fileHandler.close(); } } + + /** * As the name suggests, the method completes and finalizes an upload task, * whether it completed successfully or failed. (In the latter case, it @@ -955,6 +957,7 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques * user will need to be obtained from the saved api token, when this * method is called via the TaskMonitoringService * @param ruleId Globus rule/permission id associated with the task + * @param deleteRule delete the rule above when done * @param myLogger the Logger; if null, the main logger of the service bean will be used * @param fileHandler FileHandler associated with the Logger, when not null * @param taskSuccess boolean task status of the completed task @@ -965,16 +968,20 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques private void processCompletedUploadTask(Dataset dataset, JsonArray filesJsonArray, AuthenticatedUser authUser, - String ruleId, + String ruleId, + boolean deleteRule, Logger globusLogger, boolean taskSuccess, String taskStatus) { Logger myLogger = globusLogger == null ? logger : globusLogger; - if (ruleId != null) { - // Transfer is complete, so delete rule - deletePermission(ruleId, dataset, myLogger); + if (deleteRule) { + // Transfer is complete, and there must be no other tasks using the rule so, delete it + myLogger.fine("Deleting access (upload) rule "+ruleId); + if (ruleId != null) { + deletePermission(ruleId, dataset, myLogger); + } } // If success, switch to an EditInProgress lock - do this before removing the @@ -1048,16 +1055,22 @@ private void processCompletedUploadTask(Dataset dataset, myLogger.info("Exception from processUploadedFiles call " + e.getMessage()); datasetSvc.removeDatasetLocks(dataset, DatasetLock.Reason.EditInProgress); } - } - - // @todo: this appears to be redundant - it was already deleted above - ? - if (ruleId != null) { - deletePermission(ruleId, dataset, myLogger); - myLogger.info("Removed upload permission: " + ruleId); - } - + } } + /** + * Conveniece version of the method above that defaults to deleting the + * access rule. + */ + private void processCompletedUploadTask(Dataset dataset, + JsonArray filesJsonArray, + AuthenticatedUser authUser, + String ruleId, + Logger globusLogger, + boolean taskSuccess, + String taskStatus) { + processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, true, globusLogger, taskSuccess, taskStatus); + } /** * The code in this method is copy-and-pasted from the previous Borealis @@ -1766,7 +1779,7 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, JsonArray filesJsonArray = filesJsonArrayBuilder.build(); - processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus); + processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, deleteRule, taskLogger, taskSuccess, taskStatus); break; case DOWNLOAD: @@ -1806,8 +1819,10 @@ private void processCompletedDownloadTask(GlobusTaskState taskState, // It is possible that, for whatever reason, we failed to look up // the rule id when the monitoring of the task was initiated - but // now that it has completed, let's try and look it up again: - getRuleId(endpoint, taskState.getOwner_id(), "r"); + ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r"); } + + taskLogger.fine("Deleting access (download) rule "+ruleId); if (ruleId != null) { deletePermission(ruleId, endpoint, taskLogger); diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index 63c58f9d422..a73a42d374f 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -66,6 +66,11 @@ public void init() { this.scheduler.scheduleWithFixedDelay(this::checkOngoingDownloadTasks, 0, pollingInterval, TimeUnit.SECONDS); + + // The purpose of having 2 separate scheduling queues is to avoid + // potentially expensive/long-running processing of upload tasks + // slowing down the handling of completed download transfers that + // are always cheap. } else { logger.info("Skipping Globus task monitor initialization"); @@ -82,28 +87,8 @@ public void init() { public void checkOngoingUploadTasks() { logger.fine("Performing a scheduled external Globus UPLOAD task check"); List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.UPLOAD); - - tasks.forEach(t -> { - GlobusTaskState retrieved = checkTaskState(t); - - if (GlobusUtil.isTaskCompleted(retrieved)) { - FileHandler taskLogHandler = getTaskLogHandler(t); - Logger taskLogger = getTaskLogger(t, taskLogHandler); - - // Do our thing, finalize adding the files to the dataset - globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), true, taskLogger); - // Whether it finished successfully, or failed in the process, - // there's no need to keep monitoring this task, so we can - // delete it. - //globusService.deleteExternalUploadRecords(t.getTaskId()); - globusService.deleteTask(t); - - if (taskLogHandler != null) { - taskLogHandler.close(); - } - } - - }); + + processTasksQueue(tasks); } /** @@ -115,10 +100,17 @@ public void checkOngoingDownloadTasks() { logger.fine("Performing a scheduled external Globus DOWNLOAD task check"); List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.DOWNLOAD); - // Unlike with uploads, it is now possible for a user to run several - // download transfers on the same dataset - with several download - // tasks using the same access rule on the corresponding Globus - // pseudofolder. This means that we'll need to be careful not to + processTasksQueue(tasks); + } + + /** + * The workhorse method that checks on Globus transfer tasks in the active + * queue. + */ + private void processTasksQueue(List tasks) { + // It is now possible for a user to run several transfer tasks + // using the same access rule on the corresponding Globus + // pseudofolder. That means that need to be careful not to // delete any rule, without checking if there are still other // active tasks using it: Map rulesInUse = new HashMap<>(); @@ -136,23 +128,23 @@ public void checkOngoingDownloadTasks() { tasks.forEach(t -> { - GlobusTaskState retrieved = checkTaskState(t); + GlobusTaskState retrieved = checkTaskState(t); + String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); if (GlobusUtil.isTaskCompleted(retrieved)) { FileHandler taskLogHandler = getTaskLogHandler(t); Logger taskLogger = getTaskLogger(t, taskLogHandler); - String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); taskLogger.info("Processing completed task " + t.getTaskId() + ", status: " + taskStatus); boolean deleteRule = true; if (t.getRuleId() == null || rulesInUse.get(t.getRuleId()) > 1) { - taskLogger.info("Access rule " + t.getRuleId() + " is still in use by other tasks."); + taskLogger.fine("Access rule " + t.getRuleId() + " is still in use by other tasks."); deleteRule = false; rulesInUse.put(t.getRuleId(), rulesInUse.get(t.getRuleId()) - 1); } else { - taskLogger.info("Access rule " + t.getRuleId() + " is no longer in use by other tasks; will delete."); + taskLogger.fine("Access rule " + t.getRuleId() + " is no longer in use by other tasks; will delete."); } globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), deleteRule, taskLogger); @@ -165,7 +157,6 @@ public void checkOngoingDownloadTasks() { taskLogHandler.close(); } } else { - String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); logger.fine("task "+t.getTaskId()+" is still running; " + ", status: " + taskStatus); } From 0d88fecf8c3c44580417d16ab2acc6ab19c6bec0 Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Fri, 16 Jan 2026 16:25:14 -0500 Subject: [PATCH 4/5] typo. #11971 --- .../harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index a73a42d374f..659fa14a92e 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -110,7 +110,7 @@ public void checkOngoingDownloadTasks() { private void processTasksQueue(List tasks) { // It is now possible for a user to run several transfer tasks // using the same access rule on the corresponding Globus - // pseudofolder. That means that need to be careful not to + // pseudofolder. That means that we need to be careful not to // delete any rule, without checking if there are still other // active tasks using it: Map rulesInUse = new HashMap<>(); From 62e4be2bf70baeb48a8f63f55a654d4f4f30919d Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Mon, 2 Feb 2026 14:08:33 -0500 Subject: [PATCH 5/5] One more minor improvement - this eliminates an OptimisticLock when processing 2 completed Globus transfers results in adding files to the same dataset back to back. This is a real, practical condition, when the queue processing gets stuck and then a number of completed tasks accumulated over some time get processed all at once sequentially. Note that I had to abandon a lambda .forEach() notation since I needed a non-final lastDataset variable in the loop. --- .../globus/TaskMonitoringServiceBean.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index 659fa14a92e..912c16422c2 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.logging.FileHandler; import java.util.logging.Logger; @@ -126,7 +127,9 @@ private void processTasksQueue(List tasks) { } }); - tasks.forEach(t -> { + Long lastProcessedUploadDatasetId = null; + + for (GlobusTaskInProgress t : tasks) { GlobusTaskState retrieved = checkTaskState(t); String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); @@ -147,6 +150,35 @@ private void processTasksQueue(List tasks) { taskLogger.fine("Access rule " + t.getRuleId() + " is no longer in use by other tasks; will delete."); } + // At HDV we have seen cases where the processing queue would get + // stuck (for reasons that are still unknown). This is not a fatal + // condition, since the state of every transfer is stored in the database, + // and therefore all the tasks will still get properly processed + // next time the application is restarted or redeployed. In particular, + // successfully completed Globus transfers will get finalized and the + // corresponding DataFile objects etc. will be added to the datasets. + // However, one potential issue has been encountered: you may end + // up with several upload tasks on the same dataset waiting to be + // finalized one immediately after another, with the resulting + // addFiles calls encountering OptimisticLockExceptions. With this + // in mind, we'll just sleep for 10 sec. between such calls on + // the same dataset, to make sure all the indexing etc. tasks have + // been properly finalized. + + if (GlobusTaskInProgress.TaskType.UPLOAD.equals(t.getTaskType()) && + GlobusUtil.isTaskSucceeded(retrieved)) { + if (t.getDataset() != null) { + if (lastProcessedUploadDatasetId != null && lastProcessedUploadDatasetId.equals(t.getDataset().getId())) { + try { + Thread.sleep(10000L); + } catch (InterruptedException iex) { + logger.warning("Failed to sleep for 10 sec. between finalizing globus uploads on the same dataset ("+lastProcessedUploadDatasetId+")"); + } + } + lastProcessedUploadDatasetId = t.getDataset().getId(); + } + } + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), deleteRule, taskLogger); // Whether it finished successfully or failed, the entry for the @@ -159,8 +191,7 @@ private void processTasksQueue(List tasks) { } else { logger.fine("task "+t.getTaskId()+" is still running; " + ", status: " + taskStatus); } - - }); + } } private GlobusTaskState checkTaskState(GlobusTaskInProgress task) {