Skip to content

Commit eebf5cb

Browse files
committed
Support mixing of legacy vs typed workflows
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
1 parent 11150a3 commit eebf5cb

8 files changed

Lines changed: 278 additions & 47 deletions

File tree

modules/nextflow/src/main/groovy/nextflow/dataflow/ValueImpl.groovy

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import groovyx.gpars.dataflow.DataflowVariable
2424
import nextflow.Global
2525
import nextflow.Session
2626
import nextflow.dag.NodeMarker
27+
import nextflow.dataflow.ops.CrossOpV2
28+
import nextflow.exception.ScriptRuntimeException
2729
import nextflow.extension.CH
2830
import nextflow.extension.DataflowHelper
2931

@@ -50,6 +52,21 @@ class ValueImpl {
5052
return source
5153
}
5254

55+
ValueImpl cross(Object other) {
56+
DataflowVariable left = this.getSource()
57+
DataflowVariable right
58+
if( other instanceof ValueImpl ) {
59+
right = other.getSource()
60+
}
61+
else {
62+
throw new ScriptRuntimeException("Operator `cross` expected a dataflow value, but received: ${other} [${other.class.simpleName}]")
63+
}
64+
65+
final target = (DataflowVariable) new CrossOpV2(left, right).apply()
66+
NodeMarker.addOperatorNode("cross", [left, right], [target])
67+
return new ValueImpl(target)
68+
}
69+
5370
ChannelImpl flatMap(Function<?,Iterable> transform = null) {
5471
final target = CH.create()
5572
final onNext = { value ->

modules/nextflow/src/main/groovy/nextflow/script/BaseScript.groovy

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ abstract class BaseScript extends Script implements ExecutionContext {
4242

4343
private ScriptMeta meta
4444

45+
private boolean typingEnabled
46+
4547
private WorkflowDef entryFlow
4648

4749
private OutputDef publisher
@@ -64,6 +66,10 @@ abstract class BaseScript extends Script implements ExecutionContext {
6466
session
6567
}
6668

69+
boolean isTypingEnabled() {
70+
return typingEnabled
71+
}
72+
6773
/**
6874
* Holds the configuration object which will used to execution the user tasks
6975
*/
@@ -97,6 +103,13 @@ abstract class BaseScript extends Script implements ExecutionContext {
97103
binding.setVariable( 'secrets', SecretsLoader.secretContext() )
98104
}
99105

106+
/**
107+
* Enable static typing for the script.
108+
*/
109+
protected void enableTyping() {
110+
this.typingEnabled = true
111+
}
112+
100113
/**
101114
* Define a params block.
102115
*
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2013-2026, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.script
18+
19+
import groovy.transform.CompileStatic
20+
import groovy.util.logging.Slf4j
21+
import groovyx.gpars.dataflow.DataflowVariable
22+
import groovyx.gpars.dataflow.DataflowWriteChannel
23+
import nextflow.dataflow.ChannelImpl
24+
import nextflow.dataflow.ValueImpl
25+
import nextflow.util.RecordMap
26+
/**
27+
* Utility functions for converting between v1 and v2 dataflow types.
28+
*
29+
* When a script enables the `nextflow.preview.types` flag, its workflows
30+
* are "typed workflows", otherwise they are "legacy workflows":
31+
*
32+
* - Typed workflows use v2 dataflow types: ChannelImpl, ValueImpl
33+
* - Legacy workflows use v1 dataflow types: DataflowBroadcast, DataflowVariable
34+
*
35+
* While a given script must be either typed or legacy, typed workflows
36+
* can be composed with legacy workflows and vise versa. In order to support
37+
* this, the calling workflow must be able to convert between v1 and v2 dataflow
38+
* types based on whether the callee is typed or legacy.
39+
*
40+
* @author Ben Sherman <bentshermann@gmail.com>
41+
*/
42+
@Slf4j
43+
@CompileStatic
44+
class DataflowTypeHelper {
45+
46+
/**
47+
* Normalize a source value to the appropriate dataflow type
48+
* based on the target context.
49+
*
50+
* If a v2 dataflow type (ChannelImpl or ValueImpl) is being
51+
* passed to a legacy process or workflow, normalize the source
52+
* value by unwrapping it into a DataflowWriteChannel.
53+
*
54+
* If a v1 dataflow type (DataflowWriteChannel) is being passed
55+
* to a typed process or workflow, normalize the source value by
56+
* wrapping it as a ChannelImpl or ValueImpl.
57+
*
58+
* @param source
59+
* @param typingEnabled
60+
*/
61+
static Object normalize(Object source, boolean typingEnabled) {
62+
return typingEnabled
63+
? normalizeV2(source)
64+
: normalizeV1(source)
65+
}
66+
67+
/**
68+
* Normalize a source value by converting v1 dataflow types
69+
* (DataflowWriteChannel, ChannelOut) to v2 dataflow types
70+
* (ChannelImpl, ValueImpl).
71+
*
72+
* @param source
73+
*/
74+
static Object normalizeV2(Object source) {
75+
if( source instanceof ChannelOut )
76+
return normalizeMultiChannelV2(source)
77+
if( source instanceof DataflowVariable )
78+
return new ValueImpl(source)
79+
if( source instanceof DataflowWriteChannel )
80+
return new ChannelImpl(source)
81+
return source
82+
}
83+
84+
private static Object normalizeMultiChannelV2(ChannelOut source) {
85+
final names = source.getNames()
86+
if( source.size() == 1 && (names.isEmpty() || names.first() == '$out') )
87+
return normalizeV2(source[0])
88+
final result = new HashMap<String,Object>()
89+
for( int i = 0; i < source.size(); i++ )
90+
result.put(names[i], normalizeV2(source[i]))
91+
return new RecordMap(result)
92+
}
93+
94+
/**
95+
* Normalize a source value by converting v2 dataflow types
96+
* (ChannelImpl, ValueImpl) to v1 dataflow types (DataflowWriteChannel,
97+
* ChannelOut).
98+
*
99+
* @param source
100+
*/
101+
static Object normalizeV1(Object source) {
102+
if( source instanceof ChannelImpl )
103+
return source.getSource()
104+
if( source instanceof ValueImpl )
105+
return source.getSource()
106+
return source
107+
}
108+
109+
}

modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@ import groovy.transform.CompileStatic
2020
import groovy.util.logging.Slf4j
2121
import groovyx.gpars.dataflow.DataflowBroadcast
2222
import groovyx.gpars.dataflow.DataflowReadChannel
23-
import groovyx.gpars.dataflow.DataflowVariable
23+
import groovyx.gpars.dataflow.DataflowWriteChannel
2424
import nextflow.Const
2525
import nextflow.Global
2626
import nextflow.Session
27-
import nextflow.dataflow.ChannelImpl
28-
import nextflow.dataflow.ValueImpl
2927
import nextflow.exception.ScriptRuntimeException
3028
import nextflow.extension.CH
3129
import nextflow.processor.TaskProcessor
@@ -144,9 +142,8 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {
144142
output = runV1(args, processConfig)
145143

146144
// invoke process with typed inputs/outputs
147-
// NOTE: .out property is not accessible with typed dataflow
148145
else if( processConfig instanceof ProcessConfigV2 )
149-
return runV2(args, processConfig)
146+
output = runV2(args, processConfig)
150147

151148
// return process output
152149
return output
@@ -205,7 +202,8 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {
205202
return output
206203
}
207204

208-
private Object runV2(Object[] args, ProcessConfigV2 config) {
205+
private ChannelOut runV2(Object[] args0, ProcessConfigV2 config) {
206+
final args = ChannelOut.spread(args0)
209207
final declaredInputs = config.getInputs()
210208
final declaredOutputs = config.getOutputs()
211209

@@ -217,16 +215,19 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {
217215
for( int i = 0; i < declaredInputs.size(); i++ )
218216
declaredInputs[i].setChannel(createSourceChannel(args[i]))
219217

220-
// set output
218+
// set outputs
219+
final singleton = declaredInputs.isSingleton()
220+
221221
final feedbackChannels = getFeedbackChannels()
222222
if( feedbackChannels && feedbackChannels.size() != declaredOutputs.size() )
223223
throw new ScriptRuntimeException("Process `$processName` inputs and outputs do not have the same cardinality - Feedback loop is not supported" )
224224

225-
Object output = null
226-
if( declaredOutputs.size() > 0 ) {
227-
final singleton = declaredInputs.isSingleton()
228-
output = feedbackChannels ? feedbackChannels[0] : CH.create(singleton)
229-
declaredOutputs[0].setChannel(output)
225+
final channels = new LinkedHashMap<String,DataflowWriteChannel>()
226+
for( int i = 0; i < declaredOutputs.size(); i++ ) {
227+
final param = declaredOutputs[i]
228+
final ch = feedbackChannels ? feedbackChannels[i] : CH.create(singleton)
229+
param.setChannel(ch)
230+
channels.put(param.getName(), ch)
230231
}
231232

232233
for( final topic : declaredOutputs.getTopics() ) {
@@ -237,18 +238,12 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {
237238
// start processor
238239
createTaskProcessor().run()
239240

240-
return \
241-
output instanceof DataflowVariable ? new ValueImpl(output) :
242-
output instanceof DataflowBroadcast ? new ChannelImpl(output) :
243-
null
241+
return new ChannelOut(channels)
244242
}
245243

246244
private DataflowReadChannel createSourceChannel(Object value) {
247-
if( value instanceof ChannelImpl )
248-
return CH.getReadChannel(value.getSource())
249-
250-
if( value instanceof ValueImpl )
251-
return value.getSource()
245+
if( value instanceof DataflowReadChannel || value instanceof DataflowBroadcast )
246+
return CH.getReadChannel(value)
252247

253248
final result = CH.value()
254249
result.bind(value)

modules/nextflow/src/main/groovy/nextflow/script/WorkflowBinding.groovy

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import groovy.transform.CompileStatic
2020
import groovy.transform.PackageScope
2121
import groovy.util.logging.Slf4j
2222
import groovyx.gpars.dataflow.DataflowWriteChannel
23+
import org.codehaus.groovy.runtime.InvokerHelper
2324
import nextflow.NF
2425
import nextflow.dataflow.ChannelImpl
2526
import nextflow.dataflow.ValueImpl
@@ -104,17 +105,26 @@ class WorkflowBinding extends Binding {
104105
final component = getComponent0(name)
105106
if( component ) {
106107
checkScope0(component)
107-
return component.invoke_o(args)
108+
return invoke0(component, args)
108109
}
109110

110111
// check it's an operator name
111-
if( NF.hasOperator(name) )
112+
if( !owner?.isTypingEnabled() && NF.hasOperator(name) )
112113
return OpCall.create(name, args)
113114
}
114115

115116
throw new MissingMethodException(name,this.getClass())
116117
}
117118

119+
private Object invoke0(ComponentDef component, Object args0) {
120+
final componentTyped = component instanceof WorkflowDef && component.getOwner().isTypingEnabled()
121+
final args = InvokerHelper.asArray(args0).stream()
122+
.map(arg -> DataflowTypeHelper.normalize(arg, componentTyped))
123+
.toArray()
124+
final result = component.invoke_a(args)
125+
return DataflowTypeHelper.normalize(result, owner?.isTypingEnabled())
126+
}
127+
118128
@Override
119129
void setProperty(String name, Object value) {
120130
// when a script variable name matches a BaseScript attribute name
@@ -149,8 +159,11 @@ class WorkflowBinding extends Binding {
149159
super.getVariable(name)
150160
}
151161
catch( MissingPropertyException e ) {
162+
if( owner?.isTypingEnabled() )
163+
throw e
164+
152165
if( !meta )
153-
throw e
166+
throw e
154167

155168
def component = getComponent0(name)
156169
if( component )

modules/nextflow/src/main/groovy/nextflow/script/WorkflowDef.groovy

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import groovy.transform.CompileStatic
2020
import groovy.transform.PackageScope
2121
import groovy.util.logging.Slf4j
2222
import groovyx.gpars.dataflow.DataflowWriteChannel
23-
import nextflow.dataflow.ChannelImpl
24-
import nextflow.dataflow.ValueImpl
2523
import nextflow.exception.MissingProcessException
2624
import nextflow.exception.MissingValueException
2725
import nextflow.exception.ScriptRuntimeException
@@ -138,26 +136,19 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec
138136
}
139137
}
140138

141-
protected Object collectOutputs(List<String> emissions) {
139+
protected ChannelOut collectOutputs(List<String> emissions) {
142140
// make sure feedback channel cardinality matches
143141
if( feedbackChannels && feedbackChannels.size() != emissions.size() )
144142
throw new ScriptRuntimeException("Workflow `$name` inputs and outputs do not have the same cardinality - Feedback loop is not supported" )
145143

146-
final channels = new LinkedHashMap<String,?>(emissions.size())
147-
boolean typedDataflow = false
144+
final channels = new LinkedHashMap<String, DataflowWriteChannel>(emissions.size())
148145
for( int i=0; i<emissions.size(); i++ ) {
149146
final targetName = emissions[i]
150147
if( !binding.hasVariable(targetName) )
151148
throw new MissingValueException("Missing workflow output parameter: $targetName")
152-
final obj = binding.getVariable(targetName)
149+
final obj = DataflowTypeHelper.normalizeV1(binding.getVariable(targetName))
153150

154-
if( obj instanceof ChannelImpl || obj instanceof ValueImpl ) {
155-
// TODO: feedbackChannels
156-
typedDataflow = true
157-
channels.put(targetName, obj)
158-
}
159-
160-
else if( CH.isChannel(obj) ) {
151+
if( CH.isChannel(obj) ) {
161152
channels.put(targetName, target(i, obj))
162153
}
163154

@@ -177,9 +168,7 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec
177168
channels.put(targetName, value)
178169
}
179170
}
180-
return typedDataflow
181-
? (channels.size() == 1 && channels.containsKey('$out') ? channels['$out'] : channels)
182-
: new ChannelOut(channels as Map<String,DataflowWriteChannel>)
171+
return new ChannelOut(channels)
183172
}
184173

185174
protected DataflowWriteChannel target(int index, Object output) {
@@ -219,10 +208,8 @@ class WorkflowDef extends BindableDef implements ChainableDef, IterableDef, Exec
219208
}
220209
else {
221210
// otherwise collect the outputs from the workflow binding
222-
final out = collectOutputs(declaredOutputs)
223-
if( out instanceof ChannelOut )
224-
this.output = out
225-
return out
211+
output = collectOutputs(declaredOutputs)
212+
return output
226213
}
227214
}
228215

0 commit comments

Comments
 (0)