Skip to content

Commit edb46ce

Browse files
committed
Prevent TaskPath leakage from typed record outputs
1 parent d30e48d commit edb46ce

7 files changed

Lines changed: 211 additions & 2 deletions

File tree

modules/nextflow/src/main/groovy/nextflow/processor/TaskOutputResolver.groovy

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package nextflow.processor
1818

19+
import java.io.UncheckedIOException
20+
import java.lang.reflect.Modifier
1921
import java.nio.file.Path
2022

2123
import groovy.transform.CompileDynamic
@@ -25,7 +27,10 @@ import groovy.util.logging.Slf4j
2527
import nextflow.exception.IllegalArityException
2628
import nextflow.exception.MissingFileException
2729
import nextflow.exception.MissingValueException
30+
import nextflow.extension.Bolts
2831
import nextflow.script.params.v2.ProcessFileOutput
32+
import nextflow.script.types.Record
33+
import nextflow.util.RecordMap
2934
import org.codehaus.groovy.runtime.InvokerHelper
3035
/**
3136
* Implements the resolution of task outputs.
@@ -49,6 +54,75 @@ class TaskOutputResolver implements Map<String,Object> {
4954
this.delegate = task.context
5055
}
5156

57+
/**
58+
* Resolve and normalize an output expression before it is emitted.
59+
*
60+
* Values from the task context may contain {@link TaskPath}, which is a
61+
* task-local view of an input file. It is valid for script interpolation,
62+
* but it must not escape through output channels because downstream tasks
63+
* need durable source/work-directory paths for hashing and staging.
64+
*
65+
* @param value
66+
* A lazy output expression, such as a closure or GString
67+
* @return
68+
* The resolved output value with nested TaskPath instances converted
69+
* back to durable Path values
70+
*/
71+
Object resolveOutput(Object value) {
72+
return normalizeOutputValue(Bolts.resolveLazy(this, value))
73+
}
74+
75+
static Object normalizeOutputValue(Object value) {
76+
if( value instanceof TaskPath ) {
77+
try {
78+
return value.toRealPath()
79+
}
80+
catch( IOException e ) {
81+
throw new UncheckedIOException(e)
82+
}
83+
}
84+
85+
if( value instanceof RecordMap ) {
86+
final normalized = new LinkedHashMap<String,Object>()
87+
for( final entry : value.entrySet() )
88+
normalized.put(entry.key, normalizeOutputValue(entry.value))
89+
return new RecordMap(normalized)
90+
}
91+
92+
if( value instanceof Map ) {
93+
final normalized = new LinkedHashMap<Object,Object>()
94+
for( final entry : value.entrySet() )
95+
normalized.put(entry.key, normalizeOutputValue(entry.value))
96+
return normalized
97+
}
98+
99+
if( value instanceof Set ) {
100+
final normalized = new LinkedHashSet<Object>()
101+
for( final item : value )
102+
normalized.add(normalizeOutputValue(item))
103+
return normalized
104+
}
105+
106+
if( value instanceof Collection ) {
107+
final normalized = new ArrayList<Object>()
108+
for( final item : value )
109+
normalized.add(normalizeOutputValue(item))
110+
return normalized
111+
}
112+
113+
if( value instanceof Record ) {
114+
final fields = value.getClass().getFields()
115+
.findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic }
116+
.sort { it.name }
117+
final normalized = new LinkedHashMap<String,Object>()
118+
for( final field : fields )
119+
normalized.put(field.name, normalizeOutputValue(field.get(value)))
120+
return new RecordMap(normalized)
121+
}
122+
123+
return value
124+
}
125+
52126
/**
53127
* Get an environment variable from the task environment.
54128
*

modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,12 +1421,12 @@ class TaskProcessor {
14211421
final resolver = new TaskOutputResolver(declaredOutputs.getFiles(), task)
14221422

14231423
for( final param : declaredOutputs.getParams() ) {
1424-
final value = resolver.resolveLazy(param.getLazyValue())
1424+
final value = resolver.resolveOutput(param.getLazyValue())
14251425
task.setOutput(param, value)
14261426
}
14271427

14281428
for( final topic : declaredOutputs.getTopics() ) {
1429-
final value = resolver.resolveLazy(topic.getLazyValue())
1429+
final value = resolver.resolveOutput(topic.getLazyValue())
14301430
topic.getChannel().bind(value)
14311431
}
14321432

modules/nextflow/src/test/groovy/nextflow/processor/TaskOutputResolverTest.groovy

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import nextflow.exception.IllegalArityException
2222
import nextflow.exception.MissingFileException
2323
import nextflow.exception.MissingValueException
2424
import nextflow.script.params.v2.ProcessFileOutput
25+
import nextflow.util.RecordMap
2526
import spock.lang.Specification
2627
import spock.lang.TempDir
2728

@@ -30,6 +31,12 @@ import spock.lang.TempDir
3031
*/
3132
class TaskOutputResolverTest extends Specification {
3233

34+
static class SampleRecord implements nextflow.script.types.Record {
35+
public String id
36+
public Path path
37+
public List<Path> paths
38+
}
39+
3340
@TempDir
3441
Path tempDir
3542

@@ -290,4 +297,47 @@ class TaskOutputResolverTest extends Specification {
290297
def e = thrown(MissingValueException)
291298
e.message.contains('Missing variable in process output')
292299
}
300+
301+
def 'should normalize task paths in output values'() {
302+
given:
303+
def source = tempDir.resolve('input.txt')
304+
def taskPath = new TaskPath(source, 'input.txt')
305+
def record = new SampleRecord(id: 'alpha', path: taskPath, paths: [taskPath])
306+
def value = new RecordMap([
307+
id: 'alpha',
308+
path: taskPath,
309+
nested: [path: taskPath],
310+
paths: [taskPath] as Set,
311+
record: record,
312+
])
313+
314+
when:
315+
def result = TaskOutputResolver.normalizeOutputValue(value)
316+
317+
then:
318+
result instanceof RecordMap
319+
result.path == source
320+
!(result.path instanceof TaskPath)
321+
result.nested.path == source
322+
!(result.nested.path instanceof TaskPath)
323+
result.paths == [source] as Set
324+
result.record instanceof RecordMap
325+
result.record.path == source
326+
result.record.paths == [source]
327+
}
328+
329+
def 'should normalize resolved lazy output values'() {
330+
given:
331+
def source = tempDir.resolve('input.txt')
332+
def task = makeTask([sample: new RecordMap(path: new TaskPath(source, 'input.txt'))])
333+
def resolver = new TaskOutputResolver([:], task)
334+
335+
when:
336+
def result = resolver.resolveOutput({ -> sample })
337+
338+
then:
339+
result instanceof RecordMap
340+
result.path == source
341+
!(result.path instanceof TaskPath)
342+
}
293343
}

tests/checks/.IGNORE-PARSER-V2

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ params-dsl.nf
99
record-types.nf
1010
records.nf
1111
task-ext-block.nf
12+
typed-record-output-taskpath.nf
1213
topic-channel-typed.nf
1314
type-annotations.nf
1415
workflow-oncomplete-v2.nf
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
set -e
2+
3+
export NXF_SYNTAX_PARSER=v2
4+
5+
$NXF_RUN | tee stdout
6+
[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P1') == 1 ]]
7+
[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > P2') == 1 ]]
8+
grep 'test1' stdout
9+
grep 'test2' stdout
10+
11+
TASK_DIR=$($NXF_CMD log last -F "process == 'P2'")
12+
grep "cat 'input.txt' 'test2.txt' > combined.txt" "$TASK_DIR/.command.sh"
13+
test -e "$TASK_DIR/input.txt"
14+
test -e "$TASK_DIR/test2.txt"
15+
16+
$NXF_RUN -resume | tee stdout
17+
[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P1') == 1 ]]
18+
[[ $(grep INFO .nextflow.log | grep -c 'Cached process > P2') == 1 ]]
19+
grep 'test1' stdout
20+
grep 'test2' stdout
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env nextflow
2+
/*
3+
* Copyright 2013-2026, Seqera Labs
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
nextflow.enable.types = true
19+
20+
workflow {
21+
sample = record(
22+
id: 'alpha',
23+
file: file("${projectDir}/typed-record-output-taskpath/input.txt")
24+
)
25+
26+
result = P2(P1(channel.of(sample)))
27+
result.view { it -> it.text.trim() }
28+
}
29+
30+
process P1 {
31+
input:
32+
sample: SampleRecord
33+
34+
output:
35+
sample + record(file2: file('test2.txt'))
36+
37+
script:
38+
"""
39+
echo 'test2' > test2.txt
40+
"""
41+
}
42+
43+
process P2 {
44+
input:
45+
sample: SampleRecord
46+
47+
output:
48+
file('combined.txt')
49+
50+
script:
51+
"""
52+
test '${sample.file}' = 'input.txt'
53+
test '${sample.file2}' = 'test2.txt'
54+
55+
cat '${sample.file}' '${sample.file2}' > combined.txt
56+
"""
57+
}
58+
59+
record SampleRecord {
60+
id: String
61+
file: Path
62+
file2: Path?
63+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
test1

0 commit comments

Comments
 (0)