Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,28 @@ class LogsCheckpoint implements TraceObserverV2 {
log.debug "Starting logs checkpoint thread - interval: ${interval}"
try {
while( true ) {
await(interval)
if( Thread.currentThread().isInterrupted() )
break
final interrupted = await(interval)
Thread.interrupted() // clear flag so NIO writes in saveFiles() succeed
Copy link
Copy Markdown
Contributor

@jorgee jorgee May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the description of the PR I have not clear what was causing the issue and why you set Thread.interrupted() in all the cases. Is it not causing issues in the normal execution?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main mistake is to use interrupt to signal the termination that was introduced in previous iteration.

synchronized(lock) {
thread.interrupt()
}

interrupt should be use to force the thread interruption (that's not the case).

I'd suggest going in the change history, revert that implementation and proper exiting the thread

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was added in #6787. I see it is in stable 25.10.x. But this branch doesn't include #6939. So, maybe the reported problem is caused by the race condition and interrupt is happening during upload. Because, if the thread is interrupted, it should break without calling saveFiles.

@rnaidu-seqera, Is the issue happening in 25.10.4?

synchronized(lock) {
if( Thread.currentThread().isInterrupted() )
break
handler.saveFiles()
}
if( interrupted )
break
}
}
finally {
log.debug "Terminating logs checkpoint thread"
}
}

protected void await(Duration interval) {
protected boolean await(Duration interval) {
try {
Thread.sleep(interval.toMillis())
return Thread.currentThread().isInterrupted()
}
catch (InterruptedException e) {
log.debug "Interrupted logs checkpoint thread"
Thread.currentThread().interrupt()
return true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,26 @@ class LogsCheckpointTest extends Specification {
cleanup:
SysEnv.pop()
}

def 'should perform final saveFiles when interrupted mid-sleep' () {
given:
SysEnv.push(TOWER_LOGS_CHECKPOINT_INTERVAL: '60s')
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [:]
}
def handler = Mock(LogsHandler)
def checkpoint = new LogsCheckpoint()

when:
checkpoint.onFlowCreate(session)
checkpoint.@handler = handler // inject mock before thread wakes up
checkpoint.onFlowComplete()

then:
1 * handler.saveFiles() >> { assert !Thread.currentThread().isInterrupted() }

cleanup:
SysEnv.pop()
}
}
Loading