From 2ea2e38d66d4ed8108a9e23006cbf911ec89ee5f Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 16 Jul 2025 10:30:22 -0500 Subject: [PATCH] Fix submitted count when aborting submitted tasks Signed-off-by: Ben Sherman --- .../nextflow/trace/WorkflowStats.groovy | 22 +++-- .../trace/WorkflowStatsObserver.groovy | 2 +- .../nextflow/trace/WorkflowStatsTest.groovy | 87 +++++++++++++++++-- 3 files changed, 97 insertions(+), 14 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStats.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStats.groovy index 9cd5d37bec..1ebffe4659 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStats.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStats.groovy @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j import nextflow.processor.ErrorStrategy import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus import nextflow.util.Duration import nextflow.util.MemoryUnit /** @@ -345,17 +346,24 @@ class WorkflowStats implements Cloneable { } - void markCompleted(TaskRun task, TraceRecord trace) { + void markCompleted(TaskRun task, TraceRecord trace, TaskStatus status) { ProgressRecord state = getOrCreateRecord(task.processor) state.taskName = task.name state.hash = task.hashLog - state.running -- - state.loadCpus -= task.getConfig().getCpus() - state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0) - this.runningCount -- - this.loadCpus -= task.getConfig().getCpus() - this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0) + if( status == TaskStatus.SUBMITTED ) { + state.submitted -- + this.submittedCount -- + } + else { + state.running -- + state.loadCpus -= task.getConfig().getCpus() + state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0) + + this.runningCount -- + this.loadCpus -= task.getConfig().getCpus() + this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0) + } if( task.failed ) { state.failed ++ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStatsObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStatsObserver.groovy index f0487095ff..e947201dee 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStatsObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStatsObserver.groovy @@ -81,7 +81,7 @@ class WorkflowStatsObserver implements TraceObserverV2 { @Override void onTaskComplete(TaskEvent event) { log.trace "== event complete pid=${event.handler.task.processor.id}; status=$event.handler.status" - agent.send { data.markCompleted(event.handler.task, event.trace) } + agent.send { data.markCompleted(event.handler.task, event.trace, event.handler.status) } } @Override diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/WorkflowStatsTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/WorkflowStatsTest.groovy index b2b74a5b81..a1bc2e4b9d 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/WorkflowStatsTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/WorkflowStatsTest.groovy @@ -20,6 +20,7 @@ import nextflow.processor.ErrorStrategy import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus import nextflow.util.Duration import nextflow.util.MemoryUnit import spock.lang.Specification @@ -339,7 +340,7 @@ class WorkflowStatsTest extends Specification { def trace = Mock(TraceRecord) when: - stats.markCompleted(task, trace) + stats.markCompleted(task, trace, TaskStatus.COMPLETED) then: 1 * trace.get('realtime') >> DURATION.millis @@ -406,7 +407,7 @@ class WorkflowStatsTest extends Specification { def trace = Mock(TraceRecord) when: - stats.markCompleted(task, trace) + stats.markCompleted(task, trace, TaskStatus.COMPLETED) then: task.failed >> true task.getHashLog() >> HASH @@ -474,7 +475,7 @@ class WorkflowStatsTest extends Specification { def trace = Mock(TraceRecord) when: - stats.markCompleted(task, trace) + stats.markCompleted(task, trace, TaskStatus.COMPLETED) then: task.failed >> true task.getHashLog() >> HASH @@ -501,11 +502,12 @@ class WorkflowStatsTest extends Specification { !rec.errored } - def 'should mark aborted' () { + def 'should mark aborted from running' () { given: def FAILED = 10 def RETRIES = 2 def IGNORED = 3 + def SUBMITTED = 0 def RUNNING = 4 def ABORTED = 5 def SUCCEEDED = 6 @@ -520,6 +522,7 @@ class WorkflowStatsTest extends Specification { getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } } and: def rec = new ProgressRecord(0, 'foo') + rec.submitted = SUBMITTED rec.running = RUNNING rec.failed = FAILED rec.retries = RETRIES @@ -531,6 +534,7 @@ class WorkflowStatsTest extends Specification { and: def stats = new WorkflowStats( records: [0:rec], + submittedCount: SUBMITTED, runningCount: RUNNING, failedCount: FAILED, retriesCount: RETRIES, @@ -543,17 +547,19 @@ class WorkflowStatsTest extends Specification { def trace = Mock(TraceRecord) when: - stats.markCompleted(task,trace) + stats.markCompleted(task, trace, TaskStatus.RUNNING) then: task.aborted >> true task.getHashLog() >> HASH and: + stats.submittedCount == SUBMITTED stats.runningCount == RUNNING -1 stats.loadCpus == LOAD_CPUS - CPUS stats.loadMemory == (LOAD_MEM - MEM).bytes stats.abortedCount == ABORTED +1 and: + rec.submitted == SUBMITTED rec.running == RUNNING -1 rec.loadCpus == LOAD_CPUS - CPUS rec.loadMemory == (LOAD_MEM - MEM).bytes @@ -565,6 +571,75 @@ class WorkflowStatsTest extends Specification { rec.succeeded == SUCCEEDED } + def 'should mark aborted from submitted' () { + given: + def FAILED = 10 + def RETRIES = 2 + def IGNORED = 3 + def SUBMITTED = 4 + def RUNNING = 0 + def ABORTED = 5 + def SUCCEEDED = 6 + def CPUS = 2 + def MEM = 4.GB + def LOAD_CPUS = 0 + def LOAD_MEM = 0.GB + def HASH = 'xyz' + and: + def task = Mock(TaskRun) { + getProcessor() >> Mock(TaskProcessor) { getId() >> 0 } + getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } } + and: + def rec = new ProgressRecord(0, 'foo') + rec.submitted = SUBMITTED + rec.running = RUNNING + rec.failed = FAILED + rec.retries = RETRIES + rec.ignored = IGNORED + rec.aborted = ABORTED + rec.succeeded = SUCCEEDED + rec.loadCpus = LOAD_CPUS + rec.loadMemory = LOAD_MEM.bytes + and: + def stats = new WorkflowStats( + records: [0:rec], + submittedCount: SUBMITTED, + runningCount: RUNNING, + failedCount: FAILED, + retriesCount: RETRIES, + ignoredCount: IGNORED, + succeededCount: SUCCEEDED, + abortedCount: ABORTED, + loadCpus: LOAD_CPUS, + loadMemory: LOAD_MEM.bytes) + and: + def trace = Mock(TraceRecord) + + when: + stats.markCompleted(task, trace, TaskStatus.SUBMITTED) + + then: + task.aborted >> true + task.getHashLog() >> HASH + and: + stats.submittedCount == SUBMITTED -1 + stats.runningCount == RUNNING + stats.loadCpus == LOAD_CPUS + stats.loadMemory == LOAD_MEM.bytes + stats.abortedCount == ABORTED +1 + and: + rec.submitted == SUBMITTED -1 + rec.running == RUNNING + rec.loadCpus == LOAD_CPUS + rec.loadMemory == LOAD_MEM.bytes + and: + rec.aborted == ABORTED +1 + rec.failed == FAILED + rec.retries == RETRIES + rec.ignored == IGNORED + rec.succeeded == SUCCEEDED + } + def 'should mark succeeded' () { given: def FAILED = 10 @@ -607,7 +682,7 @@ class WorkflowStatsTest extends Specification { def trace = Mock(TraceRecord) when: - stats.markCompleted(task,trace) + stats.markCompleted(task, trace, TaskStatus.COMPLETED) then: task.aborted >> false