diff --git a/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java b/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java index 20617160a1c..847a15e6702 100644 --- a/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java +++ b/src/main/java/edu/harvard/iq/dataverse/DatasetPage.java @@ -160,6 +160,7 @@ import edu.harvard.iq.dataverse.search.SearchFields; import edu.harvard.iq.dataverse.search.SearchUtil; import edu.harvard.iq.dataverse.search.SolrClientService; +import edu.harvard.iq.dataverse.settings.FeatureFlags; import edu.harvard.iq.dataverse.settings.JvmSettings; import edu.harvard.iq.dataverse.util.SignpostingResources; import edu.harvard.iq.dataverse.util.FileMetadataUtil; @@ -2391,7 +2392,8 @@ public boolean isValidOrCanReviewIncomplete() { private void displayLockInfo(Dataset dataset) { // Various info messages, when the dataset is locked (for various reasons): - if (dataset.isLocked() && canUpdateDataset()) { + boolean globusUploadInProgress = globusService.isUploadTaskInProgressForDataset(dataset.getId()); + if ((dataset.isLocked() || globusUploadInProgress) && canUpdateDataset()) { if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("dataset.locked.message"), BundleUtil.getStringFromBundle("dataset.locked.message.details")); @@ -2405,9 +2407,17 @@ private void displayLockInfo(Dataset dataset) { BundleUtil.getStringFromBundle("file.rsyncUpload.inProgressMessage.details")); lockedDueToDcmUpload = true; } - if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { - JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), - BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.details")); + if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload) + || globusUploadInProgress) { + // (prod. patch 6.8) fall back to the old-style Globus lock message unless + // the new, async task mgmt model is used. + if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), + BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessageAsync.details")); + } else { + JH.addMessage(FacesMessage.SEVERITY_WARN, BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.summary"), + BundleUtil.getStringFromBundle("file.globusUpload.inProgressMessage.details")); + } } //This is a hack to remove dataset locks for File PID registration if //the dataset is released @@ -4381,10 +4391,10 @@ public boolean isStillLockedForAnyReason() { if (dataset.getId() != null) { Dataset testDataset = datasetService.find(dataset.getId()); if (testDataset != null && testDataset.getId() != null) { + // Refresh the info messages, in case the dataset has been + // re-locked with a different lock type (or a Globus upload task is in progress): + displayLockInfo(testDataset); if (testDataset.getLocks().size() > 0) { - // Refresh the info messages, in case the dataset has been - // re-locked with a different lock type: - displayLockInfo(testDataset); return true; } } diff --git a/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java index 402a1b06e3c..887df7252c1 100644 --- a/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/PermissionServiceBean.java @@ -27,6 +27,8 @@ import edu.harvard.iq.dataverse.engine.command.exception.IllegalCommandException; import edu.harvard.iq.dataverse.engine.command.impl.PublishDatasetCommand; import edu.harvard.iq.dataverse.engine.command.impl.UpdateDatasetVersionCommand; +import edu.harvard.iq.dataverse.globus.GlobusServiceBean; +import edu.harvard.iq.dataverse.settings.FeatureFlags; import edu.harvard.iq.dataverse.util.BundleUtil; import edu.harvard.iq.dataverse.workflow.PendingWorkflowInvocation; import edu.harvard.iq.dataverse.workflow.WorkflowServiceBean; @@ -85,6 +87,9 @@ public class PermissionServiceBean { @EJB GroupServiceBean groupService; + + @EJB + GlobusServiceBean globusService; @Inject DataverseSession session; @@ -779,8 +784,14 @@ else if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { else if (dataset.isLockedFor(DatasetLock.Reason.DcmUpload)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); } + /** + * prod. patch 6.8: as an experiment, not locking datasets for edits while Globus uploads in progress: + */ else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { - throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); + // ... but we'll keep it locked for edits unless the new, async task mgmt. is in use + if (!FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); + } } else if (dataset.isLockedFor(DatasetLock.Reason.EditInProgress)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), command); @@ -806,7 +817,9 @@ public void checkUpdateDatasetVersionLock(Dataset dataset, DataverseRequest data } public void checkPublishDatasetLock(Dataset dataset, DataverseRequest dataverseRequest, Command command) throws IllegalCommandException { - if (dataset.isLocked()) { + // prod. patch 6.8: + boolean globusUploadInProgress = globusService.isUploadTaskInProgressForDataset(dataset.getId()); + if (dataset.isLocked() || globusUploadInProgress) { if (dataset.isLockedFor(DatasetLock.Reason.Ingest)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } @@ -825,7 +838,8 @@ else if (dataset.isLockedFor(DatasetLock.Reason.Workflow)) { else if (dataset.isLockedFor(DatasetLock.Reason.DcmUpload)) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } - else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload)) { + else if (dataset.isLockedFor(DatasetLock.Reason.GlobusUpload) + || globusUploadInProgress) { throw new IllegalCommandException(BundleUtil.getStringFromBundle("dataset.message.locked.publishNotAllowed"), command); } else if (dataset.isLockedFor(DatasetLock.Reason.EditInProgress)) { diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index 789e0883a7c..1e6108eea4a 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -51,6 +51,7 @@ import org.primefaces.PrimeFaces; import com.google.gson.Gson; +import com.rometools.utils.Lists; import edu.harvard.iq.dataverse.api.ApiConstants; import edu.harvard.iq.dataverse.authorization.AuthenticationServiceBean; import edu.harvard.iq.dataverse.authorization.users.ApiToken; @@ -940,6 +941,8 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques fileHandler.close(); } } + + /** * As the name suggests, the method completes and finalizes an upload task, * whether it completed successfully or failed. (In the latter case, it @@ -954,6 +957,7 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques * user will need to be obtained from the saved api token, when this * method is called via the TaskMonitoringService * @param ruleId Globus rule/permission id associated with the task + * @param deleteRule delete the rule above when done * @param myLogger the Logger; if null, the main logger of the service bean will be used * @param fileHandler FileHandler associated with the Logger, when not null * @param taskSuccess boolean task status of the completed task @@ -964,16 +968,20 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques private void processCompletedUploadTask(Dataset dataset, JsonArray filesJsonArray, AuthenticatedUser authUser, - String ruleId, + String ruleId, + boolean deleteRule, Logger globusLogger, boolean taskSuccess, String taskStatus) { Logger myLogger = globusLogger == null ? logger : globusLogger; - if (ruleId != null) { - // Transfer is complete, so delete rule - deletePermission(ruleId, dataset, myLogger); + if (deleteRule) { + // Transfer is complete, and there must be no other tasks using the rule so, delete it + myLogger.fine("Deleting access (upload) rule "+ruleId); + if (ruleId != null) { + deletePermission(ruleId, dataset, myLogger); + } } // If success, switch to an EditInProgress lock - do this before removing the @@ -1047,16 +1055,22 @@ private void processCompletedUploadTask(Dataset dataset, myLogger.info("Exception from processUploadedFiles call " + e.getMessage()); datasetSvc.removeDatasetLocks(dataset, DatasetLock.Reason.EditInProgress); } - } - - // @todo: this appears to be redundant - it was already deleted above - ? - if (ruleId != null) { - deletePermission(ruleId, dataset, myLogger); - myLogger.info("Removed upload permission: " + ruleId); - } - + } } + /** + * Conveniece version of the method above that defaults to deleting the + * access rule. + */ + private void processCompletedUploadTask(Dataset dataset, + JsonArray filesJsonArray, + AuthenticatedUser authUser, + String ruleId, + Logger globusLogger, + boolean taskSuccess, + String taskStatus) { + processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, true, globusLogger, taskSuccess, taskStatus); + } /** * The code in this method is copy-and-pasted from the previous Borealis @@ -1700,6 +1714,25 @@ public List findAllOngoingTasks(GlobusTaskInProgress.TaskT return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType order by o.startTime", GlobusTaskInProgress.class).setParameter("taskType", taskType).getResultList(); } + public List findAllOngoingTasksForDataset(GlobusTaskInProgress.TaskType taskType, Long datasetId) { + return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType and o.dataset.id=:datasetId order by o.startTime", GlobusTaskInProgress.class) + .setParameter("taskType", taskType) + .setParameter("datasetId", datasetId) + .getResultList(); + } + + /** + * (prod. patch 6.8) + * @param datasetId + * @return + */ + public boolean isUploadTaskInProgressForDataset(Long datasetId) { + if (!FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + return false; + } + return Lists.isNotEmpty(findAllOngoingTasksForDataset(GlobusTaskInProgress.TaskType.UPLOAD, datasetId)); + } + public boolean isRuleInUseByOtherTasks(String ruleId) { Long numTask = em.createQuery("select count(o) from GlobusTaskInProgress as o where o.ruleId=:ruleId", Long.class).setParameter("ruleId", ruleId).getSingleResult(); return numTask > 1; @@ -1746,7 +1779,7 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, JsonArray filesJsonArray = filesJsonArrayBuilder.build(); - processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus); + processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, deleteRule, taskLogger, taskSuccess, taskStatus); break; case DOWNLOAD: @@ -1786,8 +1819,10 @@ private void processCompletedDownloadTask(GlobusTaskState taskState, // It is possible that, for whatever reason, we failed to look up // the rule id when the monitoring of the task was initiated - but // now that it has completed, let's try and look it up again: - getRuleId(endpoint, taskState.getOwner_id(), "r"); + ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r"); } + + taskLogger.fine("Deleting access (download) rule "+ruleId); if (ruleId != null) { deletePermission(ruleId, endpoint, taskLogger); diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java index 63c58f9d422..912c16422c2 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.logging.FileHandler; import java.util.logging.Logger; @@ -66,6 +67,11 @@ public void init() { this.scheduler.scheduleWithFixedDelay(this::checkOngoingDownloadTasks, 0, pollingInterval, TimeUnit.SECONDS); + + // The purpose of having 2 separate scheduling queues is to avoid + // potentially expensive/long-running processing of upload tasks + // slowing down the handling of completed download transfers that + // are always cheap. } else { logger.info("Skipping Globus task monitor initialization"); @@ -82,28 +88,8 @@ public void init() { public void checkOngoingUploadTasks() { logger.fine("Performing a scheduled external Globus UPLOAD task check"); List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.UPLOAD); - - tasks.forEach(t -> { - GlobusTaskState retrieved = checkTaskState(t); - - if (GlobusUtil.isTaskCompleted(retrieved)) { - FileHandler taskLogHandler = getTaskLogHandler(t); - Logger taskLogger = getTaskLogger(t, taskLogHandler); - - // Do our thing, finalize adding the files to the dataset - globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), true, taskLogger); - // Whether it finished successfully, or failed in the process, - // there's no need to keep monitoring this task, so we can - // delete it. - //globusService.deleteExternalUploadRecords(t.getTaskId()); - globusService.deleteTask(t); - - if (taskLogHandler != null) { - taskLogHandler.close(); - } - } - - }); + + processTasksQueue(tasks); } /** @@ -115,10 +101,17 @@ public void checkOngoingDownloadTasks() { logger.fine("Performing a scheduled external Globus DOWNLOAD task check"); List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.DOWNLOAD); - // Unlike with uploads, it is now possible for a user to run several - // download transfers on the same dataset - with several download - // tasks using the same access rule on the corresponding Globus - // pseudofolder. This means that we'll need to be careful not to + processTasksQueue(tasks); + } + + /** + * The workhorse method that checks on Globus transfer tasks in the active + * queue. + */ + private void processTasksQueue(List tasks) { + // It is now possible for a user to run several transfer tasks + // using the same access rule on the corresponding Globus + // pseudofolder. That means that we need to be careful not to // delete any rule, without checking if there are still other // active tasks using it: Map rulesInUse = new HashMap<>(); @@ -134,27 +127,58 @@ public void checkOngoingDownloadTasks() { } }); - tasks.forEach(t -> { + Long lastProcessedUploadDatasetId = null; + + for (GlobusTaskInProgress t : tasks) { - GlobusTaskState retrieved = checkTaskState(t); + GlobusTaskState retrieved = checkTaskState(t); + String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); if (GlobusUtil.isTaskCompleted(retrieved)) { FileHandler taskLogHandler = getTaskLogHandler(t); Logger taskLogger = getTaskLogger(t, taskLogHandler); - String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); taskLogger.info("Processing completed task " + t.getTaskId() + ", status: " + taskStatus); boolean deleteRule = true; if (t.getRuleId() == null || rulesInUse.get(t.getRuleId()) > 1) { - taskLogger.info("Access rule " + t.getRuleId() + " is still in use by other tasks."); + taskLogger.fine("Access rule " + t.getRuleId() + " is still in use by other tasks."); deleteRule = false; rulesInUse.put(t.getRuleId(), rulesInUse.get(t.getRuleId()) - 1); } else { - taskLogger.info("Access rule " + t.getRuleId() + " is no longer in use by other tasks; will delete."); + taskLogger.fine("Access rule " + t.getRuleId() + " is no longer in use by other tasks; will delete."); } + // At HDV we have seen cases where the processing queue would get + // stuck (for reasons that are still unknown). This is not a fatal + // condition, since the state of every transfer is stored in the database, + // and therefore all the tasks will still get properly processed + // next time the application is restarted or redeployed. In particular, + // successfully completed Globus transfers will get finalized and the + // corresponding DataFile objects etc. will be added to the datasets. + // However, one potential issue has been encountered: you may end + // up with several upload tasks on the same dataset waiting to be + // finalized one immediately after another, with the resulting + // addFiles calls encountering OptimisticLockExceptions. With this + // in mind, we'll just sleep for 10 sec. between such calls on + // the same dataset, to make sure all the indexing etc. tasks have + // been properly finalized. + + if (GlobusTaskInProgress.TaskType.UPLOAD.equals(t.getTaskType()) && + GlobusUtil.isTaskSucceeded(retrieved)) { + if (t.getDataset() != null) { + if (lastProcessedUploadDatasetId != null && lastProcessedUploadDatasetId.equals(t.getDataset().getId())) { + try { + Thread.sleep(10000L); + } catch (InterruptedException iex) { + logger.warning("Failed to sleep for 10 sec. between finalizing globus uploads on the same dataset ("+lastProcessedUploadDatasetId+")"); + } + } + lastProcessedUploadDatasetId = t.getDataset().getId(); + } + } + globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getCompletedTaskStatus(retrieved), deleteRule, taskLogger); // Whether it finished successfully or failed, the entry for the @@ -165,11 +189,9 @@ public void checkOngoingDownloadTasks() { taskLogHandler.close(); } } else { - String taskStatus = retrieved == null ? "N/A" : retrieved.getStatus(); logger.fine("task "+t.getTaskId()+" is still running; " + ", status: " + taskStatus); } - - }); + } } private GlobusTaskState checkTaskState(GlobusTaskInProgress task) { diff --git a/src/main/java/propertyFiles/Bundle.properties b/src/main/java/propertyFiles/Bundle.properties index f6c0054a43a..56c921bc674 100644 --- a/src/main/java/propertyFiles/Bundle.properties +++ b/src/main/java/propertyFiles/Bundle.properties @@ -1970,6 +1970,7 @@ file.rsyncUpload.rsyncUploadDisabledDueFileUploadedViaHttp=Upload with rsync + S file.rsyncUpload.rsyncUploadDisabledDueFileUploadedViaHttpAndPublished=Upload with rsync + SSH is disabled for this dataset because you have already uploaded files via HTTP and published the dataset. file.globusUpload.inProgressMessage.summary=Globus Transfer in Progress file.globusUpload.inProgressMessage.details=This dataset is locked while the data files are being transferred and verified. Large transfers may take significant time. You can check transfer status at https://app.globus.org/activity. +file.globusUpload.inProgressMessageAsync.details=This dataset cannot be published while Globus transfers are in progress. Large transfers may take significant time. You can check transfer status at https://app.globus.org/activity. file.metaData.checksum.copy=Click to copy file.metaData.dataFile.dataTab.unf=UNF file.metaData.dataFile.dataTab.variables=Variables diff --git a/src/main/webapp/dataset.xhtml b/src/main/webapp/dataset.xhtml index 0b16e63ac94..9c94dbe130a 100644 --- a/src/main/webapp/dataset.xhtml +++ b/src/main/webapp/dataset.xhtml @@ -360,12 +360,16 @@
- - + + - - + + + + + +