Skip to content

Commit 5f7628a

Browse files
rhassaineclaude
andcommitted
Add K8s failure classification, terminationReason plumbing, and retryOn directive
When running Nextflow pipelines on Kubernetes, all task failures are treated identically — there is no distinction between infrastructure failures (OOMKilled, Evicted, Preempted) and application errors (exit code 1, script bugs). This makes it impossible to implement smart retry strategies like "retry OOM with more memory" or "fail fast on application errors" without resorting to fragile exit-code matching in error strategy closures. Additionally, task.terminationReason was not accessible in error strategy closures because the value was set on TaskRun but never plumbed through to TaskConfig (the object exposed as `task` in closures). The K8s task handler now extracts the container termination reason from the K8s API and classifies failures into categories: - **Application failures** (OOMKilled, Error, etc.): `terminationReason` is set on the task but no special handling — respects the user's error strategy. - **Infrastructure failures** (Evicted, Preempting, DeadlineExceeded, Shutdown): Sets `task.aborted = true` and throws `NodeTerminationException` for automatic retry, since these are transient platform issues not caused by the task itself. - **Inferred reasons**: When K8s doesn't provide a reason but the exit code is informative (137 = SIGKILL/OOM, 143 = SIGTERM), a synthetic reason is set (e.g. `OOMKilled(exit137)`). Also fixes the `exitcode` → `exitCode` typo in `K8sClient.jobStateFallback0()` and the `0 ?: readExitFile()` Groovy truthiness bug (0 is falsy in Groovy, so exit code 0 was incorrectly falling through to readExitFile). Building on the work in #6436 and #6442 which introduced K8s exit code reading from the container terminated state. - Added `volatile String terminationReason` field to `TaskRun` - Plumbed `terminationReason` and `aborted` from `TaskRun` to `TaskConfig` in `TaskProcessor.resumeOrDie()`, following the existing `exitStatus` pattern - `terminationReason` is now accessible as `task.terminationReason` in error strategy closures - Native logging: retry messages now include `[reason: OOMKilled]` and hard failure error blocks include a "Termination reason" section — no custom error strategy closure needed for visibility New process directive `retryOn` that provides a declarative way to retry based on termination reasons without writing Groovy closures: process FOO { retryOn 'OOMKilled' memory { 2.GB * task.attempt } ... } Or in config files (assignment syntax): process { retryOn = ['OOMKilled', 'OOMKilled(exit137)'] } When the task's `terminationReason` matches any value in the `retryOn` list, the error strategy is overridden to RETRY. The user's error strategy closure is still evaluated first (preserving side effects like logging), but the return value is overridden. Tested on GKE with nf-core/oncoanalyser using a config that forces OOM: process { withName: "BWAMEM2_ALIGN" { memory = { 1500.MB * task.attempt } retryOn = ['OOMKilled'] } } BWAMEM2_ALIGN OOMs at 1500MB, Nextflow logs: [2b/54e1be] NOTE: Process `BWAMEM2_ALIGN (...)` terminated with an error exit status (137) [reason: OOMKilled] -- Execution is retried (1) The task is retried with 3000MB (attempt 2), then 4500MB (attempt 3) if needed. - TaskRun.groovy: added terminationReason field, cleared on makeCopy() - TaskConfig.groovy: added getRetryOn() getter - TaskProcessor.groovy: plumbed terminationReason/aborted to config, implemented retryOn override in checkErrorStrategy(), added native terminationReason logging - ProcessBuilder.groovy: registered retryOn as valid directive - K8sTaskHandler.groovy: failure classification, terminationReason extraction, infrastructure failure detection, K8sOutOfCpu/MemoryException in catch clause - K8sClient.groovy: fixed exitcode→exitCode typo, explicit exitCode: 0 - K8sTaskHandlerTest.groovy: 6 new tests + 1 updated test - K8sClientTest.groovy: 1 new test for job fallback exit code Signed-off-by: Rayan Hassaine <r.hassaine@hartwigmedicalfoundation.nl> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Rayan Hassaïne <r.hassaine@hartwigmedicalfoundation.nl>
1 parent c70376d commit 5f7628a

10 files changed

Lines changed: 231 additions & 16 deletions

File tree

modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,18 @@ class TaskConfig extends LazyMap implements Cloneable {
240240
throw new IllegalArgumentException("Not a valid `ErrorStrategy` value: ${strategy}")
241241
}
242242

243+
List<String> getRetryOn() {
244+
final value = get('retryOn')
245+
if( value instanceof List )
246+
return (List<String>) value
247+
if( value == null )
248+
return Collections.<String>emptyList()
249+
// single string value
250+
if( value instanceof CharSequence )
251+
return [ value.toString() ]
252+
throw new IllegalArgumentException("Not a valid `retryOn` value: ${value}")
253+
}
254+
243255
def getResourceLimit(String directive) {
244256
final limits = get('resourceLimits') as Map
245257
return limits?.get(directive)

modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ class TaskErrorFormatter {
117117
// -- the exit status
118118
message << "\nCommand exit status:\n ${task.exitStatus != Integer.MAX_VALUE ? task.exitStatus : '-'}".toString()
119119

120+
// -- the termination reason (e.g. OOMKilled, Evicted) when provided by the executor
121+
if( task.terminationReason )
122+
message << "\nTermination reason:\n ${task.terminationReason}".toString()
123+
120124
// -- the tail of the process stdout
121125
message << "\nCommand output:"
122126
final max = 50

modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,8 @@ class TaskProcessor {
10751075
if( task && error instanceof ProcessException ) {
10761076
// expose current task exit status
10771077
task.config.exitStatus = task.exitStatus
1078+
task.config.terminationReason = task.terminationReason
1079+
task.config.aborted = task.aborted
10781080
task.config.errorCount = procErrCount
10791081
task.config.retryCount = taskErrCount
10801082
//Add trace of the previous execution in the task context for next execution
@@ -1085,6 +1087,8 @@ class TaskProcessor {
10851087
errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries)
10861088
if( errorStrategy.soft ) {
10871089
def msg = "[$task.hashLog] NOTE: ${submitTimeout ? submitErrMsg : error.message}"
1090+
if( task.terminationReason )
1091+
msg += " [reason: ${task.terminationReason}]"
10881092
if( errorStrategy == IGNORE )
10891093
msg += " -- Error is ignored"
10901094
else if( errorStrategy == RETRY )
@@ -1150,7 +1154,13 @@ class TaskProcessor {
11501154

11511155
protected ErrorStrategy checkErrorStrategy( TaskRun task, ProcessException error, final int taskErrCount, final int procErrCount, final submitRetries ) {
11521156

1153-
final action = task.config.getErrorStrategy()
1157+
// always evaluate the error strategy (may have side effects like logging)
1158+
final configAction = task.config.getErrorStrategy()
1159+
// retryOn directive: if the termination reason matches, override to RETRY
1160+
final retryReasons = task.config.getRetryOn()
1161+
final action = (retryReasons && task.terminationReason in retryReasons)
1162+
? RETRY
1163+
: configAction
11541164

11551165
// retry is not allowed when the script cannot be compiled or similar errors
11561166
if( error instanceof ProcessUnrecoverableException || error.cause instanceof ProcessUnrecoverableException ) {

modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,12 @@ class TaskRun implements Cloneable {
342342
*/
343343
volatile boolean aborted
344344

345+
/**
346+
* The executor-specific reason for task termination (e.g. OOMKilled, Evicted).
347+
* Set by the executor when the platform provides a termination reason.
348+
*/
349+
volatile String terminationReason
350+
345351
/**
346352
* The action {@link ErrorStrategy} action applied if task has failed
347353
*/
@@ -378,6 +384,7 @@ class TaskRun implements Cloneable {
378384
copy.name = null // <-- force to re-evaluate the name that can include a dynamic tag
379385
copy.error = null
380386
copy.exitStatus = Integer.MAX_VALUE
387+
copy.terminationReason = null
381388
return copy
382389
}
383390

modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class ProcessBuilder {
7373
'queue',
7474
'resourceLabels',
7575
'resourceLimits',
76+
'retryOn',
7677
'scratch',
7778
'secret',
7879
'shell',

modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,12 @@ void resourceLimits(
303303
Map<String,?> opts
304304
);
305305

306+
@Description("""
307+
The `retryOn` directive allows you to specify termination reasons (e.g. OOMKilled) that should trigger a retry, overriding the error strategy.
308+
""")
309+
void retryOn(String value);
310+
void retryOn(List<String> value);
311+
306312
@Description("""
307313
The `scratch` directive allows you to execute each task in a temporary directory that is local to the compute node.
308314

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import groovy.util.logging.Slf4j
2828
import nextflow.SysEnv
2929
import nextflow.container.ContainerHelper
3030
import nextflow.container.DockerBuilder
31+
import nextflow.exception.K8sOutOfCpuException
32+
import nextflow.exception.K8sOutOfMemoryException
3133
import nextflow.exception.NodeTerminationException
3234
import nextflow.k8s.client.PodUnschedulableException
3335
import nextflow.exception.ProcessSubmitException
@@ -358,7 +360,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
358360
}
359361
return state
360362
}
361-
catch (NodeTerminationException | PodUnschedulableException e) {
363+
catch (NodeTerminationException | PodUnschedulableException | K8sOutOfCpuException | K8sOutOfMemoryException e) {
362364
// create a synthetic `state` object adding an extra `nodeTermination`
363365
// attribute to return the error to the caller method
364366
final instant = Instant.now()
@@ -438,11 +440,28 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
438440
// the K8s API is more reliable because the container may terminate before the exit file is written
439441
// See https://github.com/nextflow-io/nextflow/issues/6436
440442
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core
441-
log.trace("[k8s] Container Terminated state ${state.terminated}")
442-
final k8sExitCode = (state.terminated as Map)?.exitCode as Integer
443+
log.debug("[k8s] Container Terminated state ${state.terminated}")
444+
final terminated = state.terminated as Map
445+
final k8sExitCode = terminated?.exitCode as Integer
446+
final reason = terminated?.reason as String
443447
task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile()
444448
task.stdout = outputFile
445449
task.stderr = errorFile
450+
// expose termination reason so users can make decisions in errorStrategy
451+
// K8s provides 'reason' when the container itself is killed (e.g. OOMKilled, Evicted)
452+
// When the process inside is killed but the container exits normally, reason is null
453+
// In that case, infer from exit code: 137 = SIGKILL (likely OOM), 143 = SIGTERM
454+
if( reason && reason != 'Error' && reason != 'Completed' )
455+
task.terminationReason = reason
456+
else if( k8sExitCode == 137 )
457+
task.terminationReason = 'OOMKilled(exit137)'
458+
else if( k8sExitCode == 143 )
459+
task.terminationReason = 'SignalTerm(exit143)'
460+
// classify infrastructure failures so Nextflow can auto-retry
461+
if( isInfrastructureFailure(reason) ) {
462+
task.error = new NodeTerminationException("K8s infrastructure failure: ${reason}")
463+
task.aborted = true
464+
}
446465
}
447466
status = TaskStatus.COMPLETED
448467
saveJobLogOnError(task)
@@ -455,6 +474,24 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
455474
return false
456475
}
457476

477+
/**
478+
* Determine if the K8s container termination reason indicates an infrastructure
479+
* failure (as opposed to an application failure).
480+
*/
481+
protected static boolean isInfrastructureFailure(String reason) {
482+
if( !reason )
483+
return false
484+
switch( reason ) {
485+
case 'Evicted':
486+
case 'Preempting':
487+
case 'DeadlineExceeded':
488+
case 'Shutdown':
489+
return true
490+
default:
491+
return false
492+
}
493+
}
494+
458495
protected void saveJobLogOnError(TaskRun task) {
459496
if( task.isSuccess() )
460497
return

plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ class K8sClient {
420420
final dummyPodStatus = [
421421
terminated: [
422422
reason: "Completed",
423+
exitCode: 0,
423424
startedAt: jobStatus.startTime,
424425
finishedAt: jobStatus.completionTime,
425426
]

plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.nio.file.Paths
2222

2323
import nextflow.Session
2424
import nextflow.SysEnv
25+
import nextflow.exception.K8sOutOfCpuException
26+
import nextflow.exception.K8sOutOfMemoryException
2527
import nextflow.exception.NodeTerminationException
2628
import nextflow.file.http.XPath
2729
import nextflow.fusion.FusionConfig
@@ -500,8 +502,10 @@ class K8sTaskHandlerTest extends Specification {
500502
then:
501503
1 * handler.getState() >> fullState
502504
1 * handler.updateTimestamps(termState)
505+
0 * handler.readExitFile()
503506
1 * handler.deleteJobIfSuccessful(task) >> null
504507
1 * handler.saveJobLogOnError(task) >> null
508+
// exitCode 0 from K8s is now used directly instead of falling through to readExitFile()
505509
handler.task.exitStatus == 0
506510
handler.task.@stdout == OUT_FILE
507511
handler.task.@stderr == ERR_FILE
@@ -678,6 +682,142 @@ class K8sTaskHandlerTest extends Specification {
678682
state.nodeTermination.message == "Pod failed for unknown reason"
679683
}
680684

685+
def 'should return nodeTermination state for K8sOutOfMemoryException' () {
686+
given:
687+
def POD_NAME = 'pod-xyz'
688+
def client = Mock(K8sClient)
689+
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
690+
691+
when:
692+
def state = handler.getState()
693+
then:
694+
1 * client.podState(POD_NAME) >> { throw new K8sOutOfMemoryException("Pod out of memory") }
695+
then:
696+
state.terminated.startedAt
697+
state.terminated.finishedAt
698+
and:
699+
state.nodeTermination instanceof K8sOutOfMemoryException
700+
state.nodeTermination.message == "Pod out of memory"
701+
}
702+
703+
def 'should return nodeTermination state for K8sOutOfCpuException' () {
704+
given:
705+
def POD_NAME = 'pod-xyz'
706+
def client = Mock(K8sClient)
707+
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
708+
709+
when:
710+
def state = handler.getState()
711+
then:
712+
1 * client.podState(POD_NAME) >> { throw new K8sOutOfCpuException("Pod out of CPU") }
713+
then:
714+
state.terminated.startedAt
715+
state.terminated.finishedAt
716+
and:
717+
state.nodeTermination instanceof K8sOutOfCpuException
718+
state.nodeTermination.message == "Pod out of CPU"
719+
}
720+
721+
def 'should classify infrastructure failures' () {
722+
expect:
723+
K8sTaskHandler.isInfrastructureFailure(reason) == expected
724+
where:
725+
reason | expected
726+
'Evicted' | true
727+
'Preempting' | true
728+
'Shutdown' | true
729+
'DeadlineExceeded' | true
730+
'OOMKilled' | false
731+
'Completed' | false
732+
'Error' | false
733+
null | false
734+
'' | false
735+
}
736+
737+
def 'should set task aborted for Evicted' () {
738+
given:
739+
def ERR_FILE = Paths.get('err.file')
740+
def OUT_FILE = Paths.get('out.file')
741+
def POD_NAME = 'pod-xyz'
742+
def client = Mock(K8sClient)
743+
def termState = [ reason: "Evicted",
744+
startedAt: "2018-01-13T10:09:36Z",
745+
finishedAt: "2018-01-13T10:19:36Z",
746+
exitCode: 143 ]
747+
def task = new TaskRun()
748+
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
749+
750+
when:
751+
def result = handler.checkIfCompleted()
752+
then:
753+
1 * handler.getState() >> [terminated: termState]
754+
1 * handler.updateTimestamps(termState)
755+
1 * handler.deleteJobIfSuccessful(task) >> null
756+
1 * handler.saveJobLogOnError(task) >> null
757+
handler.task.exitStatus == 143
758+
handler.task.aborted == true
759+
handler.task.terminationReason == 'Evicted'
760+
handler.task.error instanceof NodeTerminationException
761+
handler.status == TaskStatus.COMPLETED
762+
result == true
763+
}
764+
765+
def 'should set terminationReason but not aborted for OOMKilled' () {
766+
given:
767+
def ERR_FILE = Paths.get('err.file')
768+
def OUT_FILE = Paths.get('out.file')
769+
def POD_NAME = 'pod-xyz'
770+
def client = Mock(K8sClient)
771+
def termState = [ reason: "OOMKilled",
772+
startedAt: "2018-01-13T10:09:36Z",
773+
finishedAt: "2018-01-13T10:19:36Z",
774+
exitCode: 137 ]
775+
def task = new TaskRun()
776+
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
777+
778+
when:
779+
def result = handler.checkIfCompleted()
780+
then:
781+
1 * handler.getState() >> [terminated: termState]
782+
1 * handler.updateTimestamps(termState)
783+
1 * handler.deleteJobIfSuccessful(task) >> null
784+
1 * handler.saveJobLogOnError(task) >> null
785+
handler.task.exitStatus == 137
786+
handler.task.aborted == false
787+
handler.task.terminationReason == 'OOMKilled'
788+
handler.task.error == null
789+
handler.status == TaskStatus.COMPLETED
790+
result == true
791+
}
792+
793+
def 'should not set task aborted for application failure' () {
794+
given:
795+
def ERR_FILE = Paths.get('err.file')
796+
def OUT_FILE = Paths.get('out.file')
797+
def POD_NAME = 'pod-xyz'
798+
def client = Mock(K8sClient)
799+
def termState = [ reason: "Error",
800+
startedAt: "2018-01-13T10:09:36Z",
801+
finishedAt: "2018-01-13T10:19:36Z",
802+
exitCode: 1 ]
803+
def task = new TaskRun()
804+
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
805+
806+
when:
807+
def result = handler.checkIfCompleted()
808+
then:
809+
1 * handler.getState() >> [terminated: termState]
810+
1 * handler.updateTimestamps(termState)
811+
1 * handler.deleteJobIfSuccessful(task) >> null
812+
1 * handler.saveJobLogOnError(task) >> null
813+
handler.task.exitStatus == 1
814+
handler.task.aborted == false
815+
handler.task.terminationReason == null
816+
handler.task.error == null
817+
handler.status == TaskStatus.COMPLETED
818+
result == true
819+
}
820+
681821
def 'should return container mounts' () {
682822

683823
given:

plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,15 +1060,13 @@ class K8sClientTest extends Specification {
10601060
e.message == "K8s pod in Failed state"
10611061
}
10621062

1063-
def 'should fallback to job status when pod is gone and not return hardcoded exit code' () {
1063+
def 'should fallback to job status when pod is gone and return exit code zero' () {
10641064
given:
10651065
def JOB_STATUS_JSON = '''
10661066
{
10671067
"apiVersion": "batch/v1",
10681068
"kind": "Job",
1069-
"metadata": {
1070-
"name": "test-job"
1071-
},
1069+
"metadata": { "name": "nf-abc123" },
10721070
"status": {
10731071
"succeeded": 1,
10741072
"startTime": "2025-01-15T10:00:00Z",
@@ -1084,22 +1082,21 @@ class K8sClientTest extends Specification {
10841082
}
10851083
}
10861084
'''
1085+
def JOB_NAME = 'nf-abc123'
10871086
def client = Spy(K8sClient)
1088-
final JOB_NAME = 'test-job'
10891087

10901088
when:
1091-
def result = client.jobStateFallback0(JOB_NAME)
1092-
1089+
def result = client.jobState(JOB_NAME)
10931090
then:
1091+
// findPodNameForJob returns null (pod is gone)
1092+
1 * client.findPodNameForJob(JOB_NAME) >> null
1093+
// falls back to jobStateFallback0 which calls jobStatus
10941094
1 * client.jobStatus(JOB_NAME) >> new K8sResponseJson(JOB_STATUS_JSON)
1095-
10961095
and:
1097-
result.terminated != null
10981096
result.terminated.reason == 'Completed'
10991097
result.terminated.startedAt == '2025-01-15T10:00:00Z'
11001098
result.terminated.finishedAt == '2025-01-15T10:05:00Z'
1101-
// The key assertion: exitCode should not be present (null) so fallback to .exitcode file works
1102-
result.terminated.exitCode == null
1103-
result.terminated.exitcode == null
1099+
// K8s only reports succeeded==1 when exit code is 0, so synthetic status should include it
1100+
result.terminated.exitCode == 0
11041101
}
11051102
}

0 commit comments

Comments
 (0)