Skip to content

Commit 6b3d007

Browse files
committed
Merge branch 'master' into feat/seqera-auto-labels-filter [ci fast]
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
2 parents 171e140 + 979f684 commit 6b3d007

28 files changed

Lines changed: 2634 additions & 1991 deletions

docs/reference/cli.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ The `launch` command launches a pipeline run in Seqera Platform. To log in and c
815815
: A mnemonic name to assign to the run.
816816

817817
`-main-script`
818-
: The script file to be executed when launching a project directory or repository.
818+
: The script file to execute when launching a project directory or repository. Should be a path relative to the project root, e.g. `-main-script subproject/main.nf`.
819819

820820
`-params-file`
821821
: A JSON or YAML file to load parameters from.
@@ -918,6 +918,11 @@ The `lint` command parses and analyzes the given Nextflow scripts and config fil
918918
: File pattern to exclude from linting (default: `.git, .lineage, .nextflow, .nf-test, nf-test.config, work`).
919919
: Can be specified multiple times.
920920

921+
`-files-from`
922+
: :::{versionadded} 26.04.0
923+
:::
924+
: Read list of paths to lint from a text file. Use `-` to read from standard input.
925+
921926
`-format`
922927
: Format scripts and config files that have no errors.
923928

@@ -1598,7 +1603,8 @@ The `run` command is used to execute a local pipeline script or remote pipeline
15981603
: Library extension path.
15991604

16001605
`-main-script` (`main.nf`)
1601-
: The script file to be executed when launching a project directory or repository.
1606+
: The script file to execute when launching a project directory or repository. Should be a path relative to the project root, e.g. `-main-script subproject/main.nf`.
1607+
: The project `nextflow.config` is always read from the project root regardless of the main script location.
16021608

16031609
`-name`
16041610
: Assign a mnemonic name to the a pipeline run.

docs/reference/config.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,9 +1418,6 @@ The following settings are available:
14181418
`seqera.executor.autoLabels`
14191419
: When `true`, automatically adds workflow metadata labels to the session with the `nextflow.io/` prefix (default: `false`). The following labels are added: `projectName`, `userName`, `runName`, `sessionId`, `resume`, `revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`. A `seqera.io/runId` label is also added, computed as a SipHash of the session ID and run name.
14201420

1421-
`seqera.executor.labels`
1422-
: Custom labels to apply to AWS resources for cost tracking and resource organization. Labels are propagated to ECS tasks, capacity providers, and EC2 instances. When used together with `autoLabels`, user-defined labels take precedence over auto-generated labels.
1423-
14241421
`seqera.executor.machineRequirement.arch`
14251422
: The CPU architecture for task execution, e.g. `'x86_64'` or `'arm64'`.
14261423

docs/reference/env-vars.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ The following environment variables control the configuration of the Nextflow ru
123123
: The file storage path against which relative file paths are resolved.
124124
: For example, with `NXF_FILE_ROOT=/some/root/path`, the use of `file('hello')` will be resolved to the absolute path `/some/root/path/hello`. A remote root path can be specified using the usual protocol prefix, e.g. `NXF_FILE_ROOT=s3://my-bucket/data`. Files defined using an absolute path are not affected by this setting.
125125

126+
`NXF_FUSION_TRACE`
127+
: :::{versionadded} 26.04.0
128+
:::
129+
: When set to `true`, collect task resource metrics (CPU, memory, I/O) from the Fusion trace file (`.fusion/trace.json`) produced in the task work directory, replacing the metrics collected by the default bash command-trace wrapper. Requires Fusion to be enabled. GPU metrics from Fusion are always collected regardless of this setting.
130+
126131
`NXF_HOME`
127132
: Nextflow home directory (default: `$HOME/.nextflow`).
128133

modules/nextflow/src/main/groovy/nextflow/cli/CmdLint.groovy

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ class CmdLint extends CmdBase {
6565
)
6666
List<String> excludePatterns = ['.git', '.lineage', '.nextflow', '.nf-test', 'nf-test.config', 'work']
6767

68+
@Parameter(
69+
names = ['-files-from'],
70+
description = 'Read list of paths to lint from a text file (one per line, use - for stdin)'
71+
)
72+
String filesFrom
73+
6874
@Parameter(
6975
names = ['-o', '-output'],
7076
description = 'Output mode for reporting errors: full, extended, concise, json, markdown',
@@ -119,7 +125,10 @@ class CmdLint extends CmdBase {
119125

120126
@Override
121127
void run() {
122-
if( !args )
128+
// read input files from positional args and -files-from option
129+
final inputs = getInputs(args, filesFrom)
130+
131+
if( !inputs )
123132
throw new AbortOperationException("Error: No input files were specified")
124133

125134
if( spaces && tabs )
@@ -134,21 +143,21 @@ class CmdLint extends CmdBase {
134143

135144
scriptParser = new ScriptParser(baseDir, classLoader)
136145
configParser = new ConfigParser()
137-
errorListener = outputMode == 'json'
138-
? new JsonErrorListener()
139-
: outputMode == 'markdown'
140-
? new MarkdownErrorListener()
141-
: new StandardErrorListener(outputMode, launcher.options.ansiLog)
146+
errorListener = switch( outputMode ) {
147+
case 'json' -> new JsonErrorListener()
148+
case 'markdown' -> new MarkdownErrorListener()
149+
default -> new StandardErrorListener(outputMode, launcher.options.ansiLog, launcher.options.quiet)
150+
}
142151
formattingOptions = new FormattingOptions(spaces, !tabs, harhsilAlignment, false, sortDeclarations)
143152

144153
errorListener.beforeAll()
145154

146155
// collect files to lint
147156
final List<File> files = []
148157

149-
for( final arg : args ) {
158+
for( final input : inputs ) {
150159
PathUtils.visitFiles(
151-
Path.of(arg),
160+
Path.of(input),
152161
(path) -> !PathUtils.isExcluded(path, excludePatterns),
153162
(path) -> files.add(path.toFile()))
154163
}
@@ -178,6 +187,24 @@ class CmdLint extends CmdBase {
178187
throw new AbortOperationException()
179188
}
180189

190+
private static List<String> getInputs(List<String> args, String filesFrom) {
191+
final List<String> result = []
192+
result.addAll(args)
193+
194+
if( filesFrom ) {
195+
final lines = filesFrom == '-'
196+
? System.in.readLines()
197+
: Path.of(filesFrom).readLines()
198+
for( final line : lines ) {
199+
final trimmed = line.trim()
200+
if( trimmed )
201+
result.add(trimmed)
202+
}
203+
}
204+
205+
return result
206+
}
207+
181208
private void parse(File file) {
182209
final name = file.getName()
183210
if( name.endsWith('.nf') )

modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -616,9 +616,7 @@ class CmdRun extends CmdBase implements HubOptions {
616616
/*
617617
* try to look for a pipeline in the repository
618618
*/
619-
try( final manager = new AssetManager(pipelineName, this) ) {
620-
if( revision )
621-
manager.setRevision(revision)
619+
try( final manager = new AssetManager(pipelineName, revision, mainScript, this) ) {
622620
final repo = manager.getProjectWithRevision()
623621
final remoteSource = !manager.isLocalScmSource()
624622

modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.nio.file.Path
2626
import groovy.transform.CompileStatic
2727
import groovy.transform.PackageScope
2828
import groovy.util.logging.Slf4j
29+
import nextflow.Global
2930
import nextflow.SysEnv
3031
import nextflow.container.ContainerBuilder
3132
import nextflow.container.ContainerHelper
@@ -582,7 +583,9 @@ class BashWrapperBuilder {
582583
}
583584

584585
protected boolean isTraceRequired() {
585-
statsEnabled || fixOwnership()
586+
if( fusionEnabled && Global.isFusionTraceEnabled() )
587+
return fixOwnership()
588+
return statsEnabled || fixOwnership()
586589
}
587590

588591
protected String shellPath() {

modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class TaskBean implements Serializable, Cloneable {
7373

7474
boolean statsEnabled
7575

76+
boolean fusionEnabled
77+
7678
List<String> outputEnvNames
7779

7880
Map<String,String> outputEvals
@@ -164,6 +166,7 @@ class TaskBean implements Serializable, Cloneable {
164166
this.outputEnvNames = task.getOutputEnvNames()
165167
this.outputEvals = task.getOutputEvals()
166168
this.statsEnabled = task.getProcessor().getSession().statsEnabled
169+
this.fusionEnabled = task.getProcessor().isFusionEnabled()
167170

168171
this.inputFiles = task.getInputFilesMap()
169172
this.outputFiles = task.getOutputFilesNames()

modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package nextflow.processor
1818

1919
import static nextflow.processor.TaskStatus.*
20+
import static nextflow.trace.TraceRecord.toLong
21+
import static nextflow.trace.TraceRecord.toFloat
2022

2123
import java.nio.file.NoSuchFileException
2224
import java.util.concurrent.atomic.AtomicBoolean
2325

2426
import groovy.transform.CompileDynamic
2527
import groovy.transform.CompileStatic
2628
import groovy.util.logging.Slf4j
29+
import nextflow.Global
2730
import nextflow.trace.TraceRecord
2831
import nextflow.util.TestOnly
2932
/**
@@ -241,15 +244,21 @@ abstract class TaskHandler {
241244
}
242245
}
243246

244-
final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
245-
try {
246-
if(file) record.parseTraceFile(file)
247-
}
248-
catch( NoSuchFileException e ) {
249-
// ignore it
250-
}
251-
catch( IOException e ) {
252-
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
247+
// When Fusion trace is enabled, task metrics are collected by the Fusion client
248+
// and reported in .fusion/trace.json — skip parsing the .command.trace file
249+
// generated by the bash trace wrapper, as it won't exist
250+
final fusionTraceEnabled = task.processor.executor.isFusionEnabled() && Global.isFusionTraceEnabled()
251+
if( !fusionTraceEnabled ) {
252+
final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
253+
try {
254+
if(file) record.parseTraceFile(file)
255+
}
256+
catch( NoSuchFileException e ) {
257+
log.trace "Unable to find trace file: $file"
258+
}
259+
catch( IOException e ) {
260+
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
261+
}
253262
}
254263

255264
parseFusionTrace(record)
@@ -261,22 +270,82 @@ abstract class TaskHandler {
261270
protected void parseFusionTrace(TraceRecord record) {
262271
if( !task.processor.executor.isFusionEnabled() )
263272
return
273+
// Resolve the Fusion trace file path (.fusion/trace.json) in the task work directory.
274+
// This file is produced by the Fusion client and contains process, cgroup, and GPU metrics.
264275
final fusionTrace = task.workDir?.resolve(TaskRun.FUSION_TRACE)
265276
try {
266-
if( fusionTrace ) {
267-
final gpu = TraceRecord.parseFusionTraceFile(fusionTrace)
268-
if( gpu )
269-
record.gpuMetrics = gpu
270-
}
277+
if( !fusionTrace )
278+
return
279+
// Parse the full Fusion trace JSON — sections: 'proc', 'cgroup', 'gpu'
280+
final json = TraceRecord.parseFusionTraceFile(fusionTrace)
281+
// GPU metrics are always extracted when available, regardless of the
282+
// NXF_FUSION_TRACE setting — this preserves backward compatibility
283+
final gpu = (Map<String,Object>) json.get('gpu')
284+
if( gpu )
285+
record.gpuMetrics = gpu
286+
// When NXF_FUSION_TRACE is disabled, stop here — only GPU metrics are collected.
287+
// When enabled, also map the 'proc' and 'cgroup' sections into TraceRecord fields,
288+
// replacing the metrics that would normally come from the bash trace wrapper
289+
if( !Global.isFusionTraceEnabled() )
290+
return
291+
applyFusionMetrics(record, json)
271292
}
272293
catch( NoSuchFileException e ) {
273-
// ignore it
294+
log.trace "Unable to find Fusion trace file: $fusionTrace"
274295
}
275296
catch( Exception e ) {
276297
log.debug "[WARN] Cannot read Fusion trace file: $fusionTrace -- Cause: ${e.message}"
277298
}
278299
}
279300

301+
protected void applyFusionMetrics(TraceRecord record, Map<String,Object> json) {
302+
final proc = (Map<String,Object>) json.get('proc')
303+
final cgroup = (Map<String,Object>) json.get('cgroup')
304+
305+
if( proc ) {
306+
// CPU and timing metrics
307+
record.store.put('realtime', toLong(proc.get('realtime')))
308+
record.store.put('%cpu', toFloat(proc.get('pct_cpu')) / 10.0f as float)
309+
record.store.put('cpu_model', proc.get('cpu_name')?.toString())
310+
// I/O metrics
311+
record.store.put('rchar', toLong(proc.get('rchar')))
312+
record.store.put('wchar', toLong(proc.get('wchar')))
313+
record.store.put('syscr', toLong(proc.get('syscr')))
314+
record.store.put('syscw', toLong(proc.get('syscw')))
315+
record.store.put('read_bytes', toLong(proc.get('read_bytes')))
316+
record.store.put('write_bytes', toLong(proc.get('write_bytes')))
317+
// context switches
318+
record.store.put('vol_ctxt', toLong(proc.get('vol_ctxt')))
319+
record.store.put('inv_ctxt', toLong(proc.get('inv_ctxt')))
320+
}
321+
322+
// Prefer cgroup memory metrics (more accurate for containerized tasks)
323+
if( cgroup ) {
324+
record.store.put('vmem', toLong(cgroup.get('memory_current')))
325+
record.store.put('rss', toLong(cgroup.get('memory_rss')))
326+
record.store.put('peak_vmem', toLong(cgroup.get('memory_peak')))
327+
record.store.put('peak_rss', toLong(cgroup.get('memory_peak_rss')))
328+
// Compute %mem as peak RSS against the cgroup memory limit — i.e. how close the
329+
// task got to its allocated memory. This overrides proc.pct_mem, which is RSS against
330+
// total host memory (ps %MEM semantics) and underestimates utilization for containerized
331+
// tasks whose limit is smaller than the host. Using peak (not current) RSS because Fusion
332+
// overwrites memory_rss post-exit, making the last sample unrepresentative.
333+
final memLimit = toLong(cgroup.get('memory_limit'))
334+
final memPeakRss = toLong(cgroup.get('memory_peak_rss'))
335+
if( memLimit > 0 ) {
336+
record.store.put('%mem', (memPeakRss / (float)memLimit * 100.0f) as float)
337+
}
338+
}
339+
else if( proc ) {
340+
// Fallback to proc memory metrics when cgroup is not available
341+
record.store.put('vmem', toLong(proc.get('vmem')))
342+
record.store.put('rss', toLong(proc.get('rss')))
343+
record.store.put('peak_vmem', toLong(proc.get('peak_vmem')))
344+
record.store.put('peak_rss', toLong(proc.get('peak_rss')))
345+
record.store.put('%mem', toFloat(proc.get('pct_mem')) / 10.0f as float)
346+
}
347+
}
348+
280349
/**
281350
* Determine if a process can be forked i.e. can launch
282351
* a parallel task execution. This is only enforced when

modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ class TaskProcessor {
331331
*/
332332
Executor getExecutor() { executor }
333333

334+
boolean isFusionEnabled() { executor?.isFusionEnabled() ?: false }
335+
334336
/**
335337
* @return The {@code DataflowOperator} underlying this process
336338
*/

modules/nextflow/src/main/groovy/nextflow/scm/AssetManager.groovy

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ class AssetManager implements Closeable {
114114
build(pipelineName, config)
115115
}
116116

117-
AssetManager( String pipelineName, String revision, HubOptions cliOpts = null ) {
117+
AssetManager( String pipelineName, String revision, String mainScript = null, HubOptions cliOpts = null ) {
118118
assert pipelineName
119-
// build the object
119+
// read the default config file (if available)
120120
def config = ProviderConfig.getDefault()
121121
// build the object
122-
build(pipelineName, config, cliOpts, revision)
122+
build(pipelineName, config, cliOpts, revision, mainScript)
123123
}
124124

125125
/**
@@ -131,23 +131,24 @@ class AssetManager implements Closeable {
131131
* @return The {@link AssetManager} object itself
132132
*/
133133
@PackageScope
134-
AssetManager build( String pipelineName, Map config = null, HubOptions cliOpts = null, String revision = null ) {
134+
AssetManager build( String pipelineName, Map config = null, HubOptions cliOpts = null, String revision = null, String mainScript = null ) {
135135

136136
this.providerConfigs = ProviderConfig.createFromMap(config)
137137

138138
this.project = resolveName(pipelineName)
139+
if( mainScript )
140+
this.mainScript = mainScript
141+
142+
if( !isValidProjectName(project) )
143+
throw new IllegalArgumentException("Not a valid project name: ${project}")
139144

140-
if( !isValidProjectName(this.project) ) {
141-
throw new IllegalArgumentException("Not a valid project name: ${this.project}")
142-
}
143145
// Initialize strategy based on environment and repository state
144146
initStrategy(revision)
145147
this.hub = checkHubProvider(cliOpts)
146148
this.provider = createHubProvider(hub)
147149

148-
if( revision ){
150+
if( revision )
149151
setRevision(revision)
150-
}
151152

152153
strategy.setProvider(this.provider)
153154

@@ -414,10 +415,13 @@ class AssetManager implements Closeable {
414415

415416
def parts = name.split('/') as List<String>
416417
def last = parts[-1]
417-
if( last.endsWith('.nf') || last.endsWith('.nxf') ) {
418+
if( last.endsWith('.nf') ) {
418419
if( parts.size()==1 )
419420
throw new AbortOperationException("Not a valid project name: $name")
420421

422+
if( mainScript )
423+
throw new AbortOperationException("Not a valid project name: $name -- Project name must be a directory when main script is provided")
424+
421425
if( parts.size()==2 ) {
422426
mainScript = last
423427
parts = [ parts.first() ]
@@ -462,6 +466,9 @@ class AssetManager implements Closeable {
462466
if( !isUrl )
463467
return null
464468

469+
if( repository.endsWith('.nf') )
470+
throw new AbortOperationException("Repository URL must not end with a script file extension (.nf) -- use `-main-script` to specify the relative script path")
471+
465472
try {
466473
def url = new GitUrl(repository)
467474

0 commit comments

Comments
 (0)