Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j
import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Duration
import nextflow.util.MemoryUnit
/**
Expand Down Expand Up @@ -345,17 +346,24 @@ class WorkflowStats implements Cloneable {

}

void markCompleted(TaskRun task, TraceRecord trace) {
void markCompleted(TaskRun task, TraceRecord trace, TaskStatus status) {
ProgressRecord state = getOrCreateRecord(task.processor)
state.taskName = task.name
state.hash = task.hashLog
state.running --
state.loadCpus -= task.getConfig().getCpus()
state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)

this.runningCount --
this.loadCpus -= task.getConfig().getCpus()
this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
if( status == TaskStatus.SUBMITTED ) {
state.submitted --
this.submittedCount --
}
else {
state.running --
state.loadCpus -= task.getConfig().getCpus()
state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)

this.runningCount --
this.loadCpus -= task.getConfig().getCpus()
this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
}

if( task.failed ) {
state.failed ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class WorkflowStatsObserver implements TraceObserverV2 {
@Override
void onTaskComplete(TaskEvent event) {
log.trace "== event complete pid=${event.handler.task.processor.id}; status=$event.handler.status"
agent.send { data.markCompleted(event.handler.task, event.trace) }
agent.send { data.markCompleted(event.handler.task, event.trace, event.handler.status) }
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskConfig
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.util.Duration
import nextflow.util.MemoryUnit
import spock.lang.Specification
Expand Down Expand Up @@ -339,7 +340,7 @@ class WorkflowStatsTest extends Specification {
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task, trace)
stats.markCompleted(task, trace, TaskStatus.COMPLETED)

then:
1 * trace.get('realtime') >> DURATION.millis
Expand Down Expand Up @@ -406,7 +407,7 @@ class WorkflowStatsTest extends Specification {
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task, trace)
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
then:
task.failed >> true
task.getHashLog() >> HASH
Expand Down Expand Up @@ -474,7 +475,7 @@ class WorkflowStatsTest extends Specification {
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task, trace)
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
then:
task.failed >> true
task.getHashLog() >> HASH
Expand All @@ -501,11 +502,12 @@ class WorkflowStatsTest extends Specification {
!rec.errored
}

def 'should mark aborted' () {
def 'should mark aborted from running' () {
given:
def FAILED = 10
def RETRIES = 2
def IGNORED = 3
def SUBMITTED = 0
def RUNNING = 4
def ABORTED = 5
def SUCCEEDED = 6
Expand All @@ -520,6 +522,7 @@ class WorkflowStatsTest extends Specification {
getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } }
and:
def rec = new ProgressRecord(0, 'foo')
rec.submitted = SUBMITTED
rec.running = RUNNING
rec.failed = FAILED
rec.retries = RETRIES
Expand All @@ -531,6 +534,7 @@ class WorkflowStatsTest extends Specification {
and:
def stats = new WorkflowStats(
records: [0:rec],
submittedCount: SUBMITTED,
runningCount: RUNNING,
failedCount: FAILED,
retriesCount: RETRIES,
Expand All @@ -543,17 +547,19 @@ class WorkflowStatsTest extends Specification {
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task,trace)
stats.markCompleted(task, trace, TaskStatus.RUNNING)

then:
task.aborted >> true
task.getHashLog() >> HASH
and:
stats.submittedCount == SUBMITTED
stats.runningCount == RUNNING -1
stats.loadCpus == LOAD_CPUS - CPUS
stats.loadMemory == (LOAD_MEM - MEM).bytes
stats.abortedCount == ABORTED +1
and:
rec.submitted == SUBMITTED
rec.running == RUNNING -1
rec.loadCpus == LOAD_CPUS - CPUS
rec.loadMemory == (LOAD_MEM - MEM).bytes
Expand All @@ -565,6 +571,75 @@ class WorkflowStatsTest extends Specification {
rec.succeeded == SUCCEEDED
}

def 'should mark aborted from submitted' () {
given:
def FAILED = 10
def RETRIES = 2
def IGNORED = 3
def SUBMITTED = 4
def RUNNING = 0
def ABORTED = 5
def SUCCEEDED = 6
def CPUS = 2
def MEM = 4.GB
def LOAD_CPUS = 0
def LOAD_MEM = 0.GB
def HASH = 'xyz'
and:
def task = Mock(TaskRun) {
getProcessor() >> Mock(TaskProcessor) { getId() >> 0 }
getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } }
and:
def rec = new ProgressRecord(0, 'foo')
rec.submitted = SUBMITTED
rec.running = RUNNING
rec.failed = FAILED
rec.retries = RETRIES
rec.ignored = IGNORED
rec.aborted = ABORTED
rec.succeeded = SUCCEEDED
rec.loadCpus = LOAD_CPUS
rec.loadMemory = LOAD_MEM.bytes
and:
def stats = new WorkflowStats(
records: [0:rec],
submittedCount: SUBMITTED,
runningCount: RUNNING,
failedCount: FAILED,
retriesCount: RETRIES,
ignoredCount: IGNORED,
succeededCount: SUCCEEDED,
abortedCount: ABORTED,
loadCpus: LOAD_CPUS,
loadMemory: LOAD_MEM.bytes)
and:
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task, trace, TaskStatus.SUBMITTED)

then:
task.aborted >> true
task.getHashLog() >> HASH
and:
stats.submittedCount == SUBMITTED -1
stats.runningCount == RUNNING
stats.loadCpus == LOAD_CPUS
stats.loadMemory == LOAD_MEM.bytes
stats.abortedCount == ABORTED +1
and:
rec.submitted == SUBMITTED -1
rec.running == RUNNING
rec.loadCpus == LOAD_CPUS
rec.loadMemory == LOAD_MEM.bytes
and:
rec.aborted == ABORTED +1
rec.failed == FAILED
rec.retries == RETRIES
rec.ignored == IGNORED
rec.succeeded == SUCCEEDED
}

def 'should mark succeeded' () {
given:
def FAILED = 10
Expand Down Expand Up @@ -607,7 +682,7 @@ class WorkflowStatsTest extends Specification {
def trace = Mock(TraceRecord)

when:
stats.markCompleted(task,trace)
stats.markCompleted(task, trace, TaskStatus.COMPLETED)

then:
task.aborted >> false
Expand Down