Skip to content

Commit dfadf32

Browse files
committed
Record types [preview build 2]
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
1 parent 944d1ab commit dfadf32

11 files changed

Lines changed: 307 additions & 111 deletions

File tree

docs/process-typed.md

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,27 +94,28 @@ Inputs with type `Record` can declare the name and type of each record field:
9494
```nextflow
9595
process fastqc {
9696
input:
97-
(id: String, fastq: Path): Record
97+
sample: Record {
98+
id: String
99+
fastq: Path
100+
}
98101
99102
script:
100103
"""
101-
echo 'id: ${id}`
102-
echo 'fastq: ${fastq}'
104+
echo 'id: ${sample.id}`
105+
echo 'fastq: ${sample.fastq}'
103106
"""
104107
}
105108
```
106109
107-
This pattern is called *record destructuring*. Each record field is staged into the task the same way as an individual input.
110+
In this example, the record is staged into the task as `sample`, and `sample.fastq` is staged as an input file since the `fastq` field is declared with type `Path`.
108111
109112
When the process is invoked, the incoming record should contain the specified fields, or else the run will fail. If the record has additional fields not declared by the process input, they are ignored.
110113
111114
:::{tip}
112115
Record inputs are a useful way to select a subset of fields from a larger record. This way, the process only stages what it needs, allowing you to keep related data together in your workflow logic.
113116
:::
114117
115-
### Record type inputs
116-
117-
Record inputs can also be declared using a custom record type:
118+
You can achieve the same behavior using an external record type:
118119
119120
```nextflow
120121
process fastqc {
@@ -134,9 +135,7 @@ record Sample {
134135
}
135136
```
136137
137-
In this example, the record is staged into the task as `sample`, and `sample.fastq` is staged as an input file since the `fastq` field is declared with type `Path`.
138-
139-
When the process is invoked, the incoming record should contain the fields specified by the record type, or else the run will fail. If the record has additional fields not declared by the record type, they are ignored.
138+
This approach is useful when the record type can be re-used elsewhere in the pipeline.
140139
141140
### Tuple inputs
142141
@@ -311,10 +310,13 @@ The `record()` standard library function can be used to create a record:
311310
```nextflow
312311
process fastqc {
313312
input:
314-
(id: String, fastq: Path): Record
313+
sample: Record {
314+
id: String
315+
fastq: Path
316+
}
315317
316318
output:
317-
record(id: id, fastqc: file('fastqc_logs'))
319+
record(id: sample.id, fastqc: file('fastqc_logs'))
318320
319321
script:
320322
// ...

docs/tutorials/records.md

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,15 @@ Is converted to:
7474
```nextflow
7575
process ALIGN {
7676
input:
77-
(meta: Map, reads: Path, index: Path): Record
77+
sample: Record {
78+
meta: Map
79+
reads: Path
80+
index: Path
81+
}
7882
7983
output:
8084
record(
81-
meta: meta,
85+
meta: sample.meta,
8286
bam: file('*.bam'),
8387
bai: file('*.bai')
8488
)
@@ -188,56 +192,67 @@ The `FASTQ` process is defined with the following inputs and outputs:
188192

189193
```nextflow
190194
process FASTQC {
191-
// ...
195+
tag id
196+
conda 'bioconda::fastqc=0.12.1'
192197
193198
input:
194199
(id, fastq_1, fastq_2): Tuple<String, Path, Path>
195200
196201
output:
197202
tuple(id, file("fastqc_${id}_logs"))
198203
199-
// ...
204+
script:
205+
"""
206+
fastqc.sh "${id}" "${fastq_1} ${fastq_2}"
207+
"""
200208
}
201209
```
202210

203211
To migrate this process, rewrite the inputs and outputs as follows:
204212

205213
```nextflow
206214
process FASTQC {
207-
// ...
215+
tag sample.id
216+
conda 'bioconda::fastqc=0.12.1'
208217
209218
input:
210-
(id: String, fastq_1: Path, fastq_2: Path): Record
219+
sample: Record {
220+
id: String
221+
fastq_1: Path
222+
fastq_2: Path
223+
}
211224
212225
output:
213226
record(
214-
id: id,
215-
fastqc: file("fastqc_${id}_logs")
227+
id: sample.id,
228+
fastqc: file("fastqc_${sample.id}_logs")
216229
)
217230
218-
// ...
231+
script:
232+
"""
233+
fastqc.sh "${sample.id}" "${sample.fastq_1} ${sample.fastq_2}"
234+
"""
219235
}
220236
```
221237

222238
In the above:
223239

224240
- The tuple input is converted to a record input using the type `Record`. The field types are specified alongside the field names.
225241

242+
- Since the record input cannot be destructured like a tuple, you must define a name for the record itself (e.g., `sample`), and you must update all references to tuple inputs (e.g., replace `id` with `sample.id`).
243+
226244
- The tuple output is converted to a record by using the `record()` function and specifying a name for each record field.
227245

228246
- Whereas tuple elements must be specified in a particular order, record fields can be specified in any order. The records supplied by the calling workflow must have the same field names and types as the process definition.
229247

230-
:::{note}
231-
Other process sections, such as the directives and the `script:` block, are not shown because they do not require changes. As long as the inputs and outputs declare and reference the same variable names and file patterns, the other process sections will behave the same as before.
232-
:::
233-
234248
<h4>QUANT</h4>
235249

236250
The `QUANT` process is defined with the following inputs and outputs:
237251

238252
```nextflow
239253
process QUANT {
240-
// ...
254+
tag id
255+
conda 'bioconda::salmon=1.10.3'
241256
242257
input:
243258
(id, fastq_1, fastq_2): Tuple<String, Path, Path>
@@ -246,27 +261,50 @@ process QUANT {
246261
output:
247262
tuple(id, file("quant_${id}"))
248263
249-
// ...
264+
script:
265+
"""
266+
salmon quant \
267+
--threads ${task.cpus} \
268+
--libType=U \
269+
-i ${index} \
270+
-1 ${fastq_1} \
271+
-2 ${fastq_2} \
272+
-o quant_${id}
273+
"""
250274
}
251275
```
252276

253277
To migrate this process, rewrite the inputs and outputs as follows:
254278

255279
```nextflow
256280
process QUANT {
257-
// ...
281+
tag sample.id
282+
conda 'bioconda::salmon=1.10.3'
258283
259284
input:
260-
(id: String, fastq_1: Path, fastq_2: Path): Record
285+
sample: Record {
286+
id: String
287+
fastq_1: Path
288+
fastq_2: Path
289+
}
261290
index: Path
262291
263292
output:
264293
record(
265-
id: id,
266-
quant: file("quant_${id}")
294+
id: sample.id,
295+
quant: file("quant_${sample.id}")
267296
)
268297
269-
// ...
298+
script:
299+
"""
300+
salmon quant \
301+
--threads ${task.cpus} \
302+
--libType=U \
303+
-i ${index} \
304+
-1 ${sample.fastq_1} \
305+
-2 ${sample.fastq_2} \
306+
-o quant_${sample.id}
307+
"""
270308
}
271309
```
272310

modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,7 @@ class TaskProcessor {
17681768
final param = declaredInputs.getParams()[i]
17691769
final value = values[i]
17701770
if( param instanceof ProcessTupleInput )
1771-
assignTaskStructuredInput(task, param, value, i)
1771+
assignTaskTupleInput(task, param, value, i)
17721772
else
17731773
assignTaskInput(task, param, value, i)
17741774
}
@@ -1825,29 +1825,20 @@ class TaskProcessor {
18251825
}
18261826

18271827
@CompileStatic
1828-
private void assignTaskStructuredInput(TaskRun task, ProcessTupleInput param, Object value, int index) {
1828+
private void assignTaskTupleInput(TaskRun task, ProcessTupleInput param, Object value, int index) {
18291829
if( value == null && !param.optional ) {
18301830
throw new ProcessUnrecoverableException("[${safeTaskName(task)}] input at index ${index} cannot be null -- append `?` to the type annotation to mark it as nullable")
18311831
}
1832-
if( value instanceof List ) {
1833-
final tupleParams = param.getComponents()
1834-
final tupleValues = value as List
1835-
if( tupleParams.size() != tupleValues.size() ) {
1836-
throw new ProcessUnrecoverableException("[${safeTaskName(task)}] input at index ${index} expected a tuple with ${tupleParams.size()} elements but received ${tupleValues.size()} -- offending value: $tupleValues")
1837-
}
1838-
for( int i = 0; i < tupleParams.size(); i++ ) {
1839-
assignTaskInput(task, tupleParams[i], tupleValues[i], index)
1840-
}
1832+
if( value !instanceof List ) {
1833+
throw new ProcessUnrecoverableException("[${safeTaskName(task)}] input at index ${index} expected a tuple but received: ${value} [${value.class.simpleName}]")
18411834
}
1842-
else if( value instanceof Map ) {
1843-
final record = value as Map
1844-
for( final fieldParam : param.getComponents() ) {
1845-
final fieldName = fieldParam.getName()
1846-
assignTaskInput(task, fieldParam, record[fieldName], index)
1847-
}
1835+
final tupleParams = param.getComponents()
1836+
final tupleValues = value as List
1837+
if( tupleParams.size() != tupleValues.size() ) {
1838+
throw new ProcessUnrecoverableException("[${safeTaskName(task)}] input at index ${index} expected a tuple with ${tupleParams.size()} elements but received ${tupleValues.size()} -- offending value: $tupleValues")
18481839
}
1849-
else {
1850-
throw new ProcessUnrecoverableException("[${safeTaskName(task)}] input at index ${index} expected a record or tuple but received: ${value} [${value.class.simpleName}]")
1840+
for( int i = 0; i < tupleParams.size(); i++ ) {
1841+
assignTaskInput(task, tupleParams[i], tupleValues[i], index)
18511842
}
18521843
}
18531844

modules/nf-lang/src/main/antlr/ScriptParser.g4

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,21 @@ processInputs
240240

241241
processInput
242242
: identifier (COLON type)?
243-
| LPAREN nls nameTypePair (nls COMMA nls nameTypePair)* nls rparen (COLON type)?
243+
| processRecordInput
244+
| processTupleInput
244245
| statement
245246
;
246247

248+
processRecordInput
249+
: identifier (COLON type)? nls LBRACE
250+
nls recordBody?
251+
nls RBRACE
252+
;
253+
254+
processTupleInput
255+
: LPAREN identifier (COMMA identifier)* rparen (COLON type)?
256+
;
257+
247258
processStage
248259
: STAGE COLON nls statement (sep statement)*
249260
;

modules/nf-lang/src/main/java/nextflow/script/ast/TupleParameter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package nextflow.script.ast;
1717

18-
import java.util.List;
19-
2018
import org.codehaus.groovy.ast.ClassNode;
2119
import org.codehaus.groovy.ast.Parameter;
2220

@@ -33,9 +31,4 @@ public TupleParameter(ClassNode type, Parameter[] components) {
3331
super(type, "");
3432
this.components = components;
3533
}
36-
37-
public boolean isRecord() {
38-
return "Record".equals(getType().getUnresolvedName())
39-
|| "Record".equals(getType().getNameWithoutPackage());
40-
}
4134
}

modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import nextflow.script.ast.AssignmentExpression;
2525
import nextflow.script.ast.ProcessNodeV2;
2626
import nextflow.script.ast.RecordNode;
27+
import nextflow.script.ast.ScriptNode;
2728
import nextflow.script.ast.TupleParameter;
2829
import org.codehaus.groovy.ast.ClassHelper;
2930
import org.codehaus.groovy.ast.ClassNode;
@@ -56,10 +57,13 @@ public class ProcessToGroovyVisitorV2 {
5657

5758
private SourceUnit sourceUnit;
5859

60+
private ScriptNode moduleNode;
61+
5962
private ScriptToGroovyHelper sgh;
6063

6164
public ProcessToGroovyVisitorV2(SourceUnit sourceUnit) {
6265
this.sourceUnit = sourceUnit;
66+
this.moduleNode = (ScriptNode) sourceUnit.getAST();
6367
this.sgh = new ScriptToGroovyHelper(sourceUnit);
6468
}
6569

@@ -149,6 +153,8 @@ private void visitProcessInputType(Variable param, Expression target, BlockState
149153
stagers.addStatement(stager);
150154
}
151155
else if( isRecordType(cn) ) {
156+
if( cn.getNameWithoutPackage().startsWith("__Record") )
157+
moduleNode.addClass(cn);
152158
for( var fn : cn.getFields() )
153159
visitProcessInputType(fn, propX(target, fn.getName()), stagers);
154160
}

modules/nf-lang/src/main/java/nextflow/script/control/ScriptResolveVisitor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,10 @@ private void resolveTypedOutputs(Statement block) {
149149
@Override
150150
public void visitProcessV2(ProcessNodeV2 node) {
151151
for( var input : node.inputs ) {
152-
resolver.resolveOrFail(input.getType(), input);
153-
if( input instanceof TupleParameter tp && tp.isRecord() ) {
154-
for( var component : tp.components )
155-
resolver.resolveOrFail(component.getType(), component);
156-
}
152+
var type = input.getType();
153+
resolver.resolveOrFail(type, input);
154+
if( type.getNameWithoutPackage().startsWith("__Record") )
155+
visitRecord((RecordNode) type.redirect());
157156
}
158157
resolver.visit(node.directives);
159158
resolver.visit(node.stagers);

0 commit comments

Comments
 (0)