diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy index 39d3c2e173..fd5a4969b0 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy @@ -74,14 +74,13 @@ class LogsCheckpoint implements TraceObserverV2 { log.debug "Starting logs checkpoint thread - interval: ${interval}" try { while( true ) { - await(interval) - if( Thread.currentThread().isInterrupted() ) - break + final interrupted = await(interval) + Thread.interrupted() // clear flag so NIO writes in saveFiles() succeed synchronized(lock) { - if( Thread.currentThread().isInterrupted() ) - break handler.saveFiles() } + if( interrupted ) + break } } finally { @@ -89,13 +88,14 @@ class LogsCheckpoint implements TraceObserverV2 { } } - protected void await(Duration interval) { + protected boolean await(Duration interval) { try { Thread.sleep(interval.toMillis()) + return Thread.currentThread().isInterrupted() } catch (InterruptedException e) { log.debug "Interrupted logs checkpoint thread" - Thread.currentThread().interrupt() + return true } } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy index 4115a32637..ff37c36483 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy @@ -80,4 +80,26 @@ class LogsCheckpointTest extends Specification { cleanup: SysEnv.pop() } + + def 'should perform final saveFiles when interrupted mid-sleep' () { + given: + SysEnv.push(TOWER_LOGS_CHECKPOINT_INTERVAL: '60s') + def session = Mock(Session) { + getWorkDir() >> TestHelper.createInMemTempDir() + getConfig() >> [:] + } + def handler = Mock(LogsHandler) + def checkpoint = new LogsCheckpoint() + + when: + checkpoint.onFlowCreate(session) + checkpoint.@handler = handler // inject mock before thread wakes up + checkpoint.onFlowComplete() + + then: + 1 * handler.saveFiles() >> { assert !Thread.currentThread().isInterrupted() } + + cleanup: + SysEnv.pop() + } }