1717import java .util .HashMap ;
1818import java .util .List ;
1919import java .util .Map ;
20+ import java .util .Objects ;
2021import java .util .concurrent .TimeUnit ;
2122import java .util .logging .FileHandler ;
2223import java .util .logging .Logger ;
@@ -126,7 +127,9 @@ private void processTasksQueue(List<GlobusTaskInProgress> tasks) {
126127 }
127128 });
128129
129- tasks .forEach (t -> {
130+ Long lastProcessedUploadDatasetId = null ;
131+
132+ for (GlobusTaskInProgress t : tasks ) {
130133
131134 GlobusTaskState retrieved = checkTaskState (t );
132135 String taskStatus = retrieved == null ? "N/A" : retrieved .getStatus ();
@@ -147,6 +150,35 @@ private void processTasksQueue(List<GlobusTaskInProgress> tasks) {
147150 taskLogger .fine ("Access rule " + t .getRuleId () + " is no longer in use by other tasks; will delete." );
148151 }
149152
153+ // At HDV we have seen cases where the processing queue would get
154+ // stuck (for reasons that are still unknown). This is not a fatal
155+ // condition, since the state of every transfer is stored in the database,
156+ // and therefore all the tasks will still get properly processed
157+ // next time the application is restarted or redeployed. In particular,
158+ // successfully completed Globus transfers will get finalized and the
159+ // corresponding DataFile objects etc. will be added to the datasets.
160+ // However, one potential issue has been encountered: you may end
161+ // up with several upload tasks on the same dataset waiting to be
162+ // finalized one immediately after another, with the resulting
163+ // addFiles calls encountering OptimisticLockExceptions. With this
164+ // in mind, we'll just sleep for 10 sec. between such calls on
165+ // the same dataset, to make sure all the indexing etc. tasks have
166+ // been properly finalized.
167+
168+ if (GlobusTaskInProgress .TaskType .UPLOAD .equals (t .getTaskType ()) &&
169+ GlobusUtil .isTaskSucceeded (retrieved )) {
170+ if (t .getDataset () != null ) {
171+ if (lastProcessedUploadDatasetId != null && lastProcessedUploadDatasetId .equals (t .getDataset ().getId ())) {
172+ try {
173+ Thread .sleep (10000L );
174+ } catch (InterruptedException iex ) {
175+ logger .warning ("Failed to sleep for 10 sec. between finalizing globus uploads on the same dataset (" +lastProcessedUploadDatasetId +")" );
176+ }
177+ }
178+ lastProcessedUploadDatasetId = t .getDataset ().getId ();
179+ }
180+ }
181+
150182 globusService .processCompletedTask (t , retrieved , GlobusUtil .isTaskSucceeded (retrieved ), GlobusUtil .getCompletedTaskStatus (retrieved ), deleteRule , taskLogger );
151183
152184 // Whether it finished successfully or failed, the entry for the
@@ -159,8 +191,7 @@ private void processTasksQueue(List<GlobusTaskInProgress> tasks) {
159191 } else {
160192 logger .fine ("task " +t .getTaskId ()+" is still running; " + ", status: " + taskStatus );
161193 }
162-
163- });
194+ }
164195 }
165196
166197 private GlobusTaskState checkTaskState (GlobusTaskInProgress task ) {
0 commit comments