Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/env-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ The following environment variables control the configuration of the Nextflow ru
: The file storage path against which relative file paths are resolved.
: For example, with `NXF_FILE_ROOT=/some/root/path`, the use of `file('hello')` will be resolved to the absolute path `/some/root/path/hello`. A remote root path can be specified using the usual protocol prefix, e.g. `NXF_FILE_ROOT=s3://my-bucket/data`. Files defined using an absolute path are not affected by this setting.

`NXF_FUSION_TRACE`
: :::{versionadded} 26.04.0
:::
: When set to `true`, collect task resource metrics (CPU, memory, I/O) from the Fusion trace file (`.fusion/trace.json`) produced in the task work directory, replacing the metrics collected by the default bash command-trace wrapper. Requires Fusion to be enabled. GPU metrics from Fusion are always collected regardless of this setting.

`NXF_HOME`
: Nextflow home directory (default: `$HOME/.nextflow`).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.SysEnv
import nextflow.container.ContainerBuilder
import nextflow.container.ContainerHelper
Expand Down Expand Up @@ -582,7 +583,9 @@ class BashWrapperBuilder {
}

protected boolean isTraceRequired() {
statsEnabled || fixOwnership()
if( fusionEnabled && Global.isFusionTraceEnabled() )
return fixOwnership()
return statsEnabled || fixOwnership()
}

protected String shellPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class TaskBean implements Serializable, Cloneable {

boolean statsEnabled

boolean fusionEnabled

List<String> outputEnvNames

Map<String,String> outputEvals
Expand Down Expand Up @@ -164,6 +166,7 @@ class TaskBean implements Serializable, Cloneable {
this.outputEnvNames = task.getOutputEnvNames()
this.outputEvals = task.getOutputEvals()
this.statsEnabled = task.getProcessor().getSession().statsEnabled
this.fusionEnabled = task.getProcessor().isFusionEnabled()

this.inputFiles = task.getInputFilesMap()
this.outputFiles = task.getOutputFilesNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package nextflow.processor

import static nextflow.processor.TaskStatus.*
import static nextflow.trace.TraceRecord.toLong
import static nextflow.trace.TraceRecord.toFloat

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.trace.TraceRecord
import nextflow.util.TestOnly
/**
Expand Down Expand Up @@ -241,15 +244,21 @@ abstract class TaskHandler {
}
}

final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
try {
if(file) record.parseTraceFile(file)
}
catch( NoSuchFileException e ) {
// ignore it
}
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
// When Fusion trace is enabled, task metrics are collected by the Fusion client
// and reported in .fusion/trace.json — skip parsing the .command.trace file
// generated by the bash trace wrapper, as it won't exist
final fusionTraceEnabled = task.processor.executor.isFusionEnabled() && Global.isFusionTraceEnabled()
if( !fusionTraceEnabled ) {
final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
try {
if(file) record.parseTraceFile(file)
}
catch( NoSuchFileException e ) {
log.trace "Unable to find trace file: $file"
}
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
}
}

parseFusionTrace(record)
Expand All @@ -261,22 +270,82 @@ abstract class TaskHandler {
protected void parseFusionTrace(TraceRecord record) {
if( !task.processor.executor.isFusionEnabled() )
return
// Resolve the Fusion trace file path (.fusion/trace.json) in the task work directory.
// This file is produced by the Fusion client and contains process, cgroup, and GPU metrics.
final fusionTrace = task.workDir?.resolve(TaskRun.FUSION_TRACE)
try {
if( fusionTrace ) {
final gpu = TraceRecord.parseFusionTraceFile(fusionTrace)
if( gpu )
record.gpuMetrics = gpu
}
if( !fusionTrace )
return
// Parse the full Fusion trace JSON — sections: 'proc', 'cgroup', 'gpu'
final json = TraceRecord.parseFusionTraceFile(fusionTrace)
// GPU metrics are always extracted when available, regardless of the
// NXF_FUSION_TRACE setting — this preserves backward compatibility
final gpu = (Map<String,Object>) json.get('gpu')
if( gpu )
record.gpuMetrics = gpu
// When NXF_FUSION_TRACE is disabled, stop here — only GPU metrics are collected.
// When enabled, also map the 'proc' and 'cgroup' sections into TraceRecord fields,
// replacing the metrics that would normally come from the bash trace wrapper
if( !Global.isFusionTraceEnabled() )
return
applyFusionMetrics(record, json)
}
catch( NoSuchFileException e ) {
// ignore it
log.trace "Unable to find Fusion trace file: $fusionTrace"
}
catch( Exception e ) {
log.debug "[WARN] Cannot read Fusion trace file: $fusionTrace -- Cause: ${e.message}"
}
}

protected void applyFusionMetrics(TraceRecord record, Map<String,Object> json) {
final proc = (Map<String,Object>) json.get('proc')
final cgroup = (Map<String,Object>) json.get('cgroup')

if( proc ) {
// CPU and timing metrics
record.store.put('realtime', toLong(proc.get('realtime')))
record.store.put('%cpu', toFloat(proc.get('pct_cpu')) / 10.0f as float)
record.store.put('cpu_model', proc.get('cpu_name')?.toString())
// I/O metrics
record.store.put('rchar', toLong(proc.get('rchar')))
record.store.put('wchar', toLong(proc.get('wchar')))
record.store.put('syscr', toLong(proc.get('syscr')))
record.store.put('syscw', toLong(proc.get('syscw')))
record.store.put('read_bytes', toLong(proc.get('read_bytes')))
record.store.put('write_bytes', toLong(proc.get('write_bytes')))
// context switches
record.store.put('vol_ctxt', toLong(proc.get('vol_ctxt')))
record.store.put('inv_ctxt', toLong(proc.get('inv_ctxt')))
}

// Prefer cgroup memory metrics (more accurate for containerized tasks)
if( cgroup ) {
record.store.put('vmem', toLong(cgroup.get('memory_current')))
record.store.put('rss', toLong(cgroup.get('memory_rss')))
record.store.put('peak_vmem', toLong(cgroup.get('memory_peak')))
record.store.put('peak_rss', toLong(cgroup.get('memory_peak_rss')))
// Compute %mem as peak RSS against the cgroup memory limit — i.e. how close the
// task got to its allocated memory. This overrides proc.pct_mem, which is RSS against
// total host memory (ps %MEM semantics) and underestimates utilization for containerized
// tasks whose limit is smaller than the host. Using peak (not current) RSS because Fusion
// overwrites memory_rss post-exit, making the last sample unrepresentative.
final memLimit = toLong(cgroup.get('memory_limit'))
final memPeakRss = toLong(cgroup.get('memory_peak_rss'))
if( memLimit > 0 ) {
record.store.put('%mem', (memPeakRss / (float)memLimit * 100.0f) as float)
}
}
else if( proc ) {
// Fallback to proc memory metrics when cgroup is not available
record.store.put('vmem', toLong(proc.get('vmem')))
record.store.put('rss', toLong(proc.get('rss')))
record.store.put('peak_vmem', toLong(proc.get('peak_vmem')))
record.store.put('peak_rss', toLong(proc.get('peak_rss')))
record.store.put('%mem', toFloat(proc.get('pct_mem')) / 10.0f as float)
}
}

/**
* Determine if a process can be forked i.e. can launch
* a parallel task execution. This is only enforced when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ class TaskProcessor {
*/
Executor getExecutor() { executor }

boolean isFusionEnabled() { executor?.isFusionEnabled() ?: false }

/**
* @return The {@code DataflowOperator} underlying this process
*/
Expand Down
42 changes: 40 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,47 @@ class TraceRecord implements Serializable {
return this
}

/**
* Parse the Fusion trace file (.fusion/trace.json) produced by the Fusion client.
* The file contains task metrics in the following format:
* <pre>
* {
* "proc": {
* "realtime": 660541, "pct_cpu": 1045, "cpu_name": "Intel Xeon ...",
* "rchar": 14112539262, "wchar": 12668821375, "syscr": 1823378, "syscw": 169293,
* "read_bytes": 8011776, "write_bytes": 102400,
* "vmem": 39015152, "rss": 14826068, "peak_vmem": 39047920, "peak_rss": 15775480,
* "pct_mem": 56, "vol_ctxt": 413015, "inv_ctxt": 1540
* },
* "gpu": {
* "name": "Tesla T4", "mem": 15360, "driver": "580.126.09",
* "pct": 75, "peak": 100, "pct_mem": 40.1, "peak_mem": 74.1
* },
* "cgroup": {
* "version": "v2", "memory_current": 25469927424, "memory_peak": 41178980352,
* "memory_rss": 67919872, "memory_peak_rss": 14783070208,
* "memory_limit": 77309411328, "cpu_usage_usec": 785302059
* }
* }
* </pre>
*
* @param file Path to the .fusion/trace.json file
* @return The parsed JSON as a map with keys: {@code proc}, {@code gpu}, {@code cgroup}
*/
static Map<String,Object> parseFusionTraceFile(Path file) {
final json = (Map) new JsonSlurper().parse(file)
return (Map<String,Object>) json.get('gpu')
return (Map<String,Object>) new JsonSlurper().parse(file)
}

static long toLong(Object value) {
if( value instanceof Number )
return value.longValue()
return value != null ? value.toString().toLong() : 0L
}

static float toFloat(Object value) {
if( value instanceof Number )
return value.floatValue()
return value != null ? value.toString().toFloat() : 0.0f
}

private long parseInt( String str, Path file, String row ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,42 @@ class BashWrapperBuilderTest extends Specification {
binding.kill_cmd == 'podman stop $NXF_BOXID'
}

def 'should skip trace wrapper when NXF_FUSION_TRACE is enabled and Fusion is active'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'true'])
def bean = new TaskBean()
bean.statsEnabled = true
bean.fusionEnabled = true
bean.workDir = Path.of('/work/xx/yy')
bean.script = 'echo hello'

def builder = new BashWrapperBuilder(bean)

expect:
!builder.isTraceRequired()

cleanup:
SysEnv.pop()
}

def 'should keep trace wrapper when NXF_FUSION_TRACE is disabled even with Fusion'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'false'])
def bean = new TaskBean()
bean.statsEnabled = true
bean.fusionEnabled = true
bean.workDir = Path.of('/work/xx/yy')
bean.script = 'echo hello'

def builder = new BashWrapperBuilder(bean)

expect:
builder.isTraceRequired()

cleanup:
SysEnv.pop()
}

@Unroll
def 'should check retryable errors' () {
expect:
Expand Down
Loading
Loading