diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskContext.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskContext.groovy index dbeff8c884..389f84a438 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskContext.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskContext.groovy @@ -19,6 +19,7 @@ package nextflow.processor import nextflow.NF import nextflow.script.ScriptMeta +import java.lang.reflect.Modifier import java.nio.file.Path import java.nio.file.Paths import java.nio.file.Files @@ -33,7 +34,9 @@ import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Global import nextflow.exception.ProcessException import nextflow.script.ScriptBinding +import nextflow.script.types.Record import nextflow.util.KryoHelper +import nextflow.util.RecordMap import nextflow.util.TestOnly /** * Map used to delegate variable resolution to script scope @@ -196,7 +199,7 @@ class TaskContext implements Map, Cloneable { map.remove(TaskProcessor.TASK_CONTEXT_PROPERTY_NAME) } - return KryoHelper.serialize(map) + return KryoHelper.serialize(sanitizeValue(map)) } catch( Exception e ) { log.warn "Cannot serialize context map. Cause: ${e.cause} -- Resume will not work on this process" @@ -210,6 +213,25 @@ class TaskContext implements Map, Cloneable { new TaskContext(processor, map) } + private static Object sanitizeValue(Object value) { + if( value instanceof Map ) { + final normalized = value.collectEntries { k, v -> [k, sanitizeValue(v)] } + return value instanceof RecordMap ? new RecordMap(normalized) : normalized + } + if( value instanceof Collection ) { + return value.collect { item -> sanitizeValue(item) } + } + if( value instanceof Record ) { + final fields = value.getClass().getFields() + .findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic } + .sort { it.name } + return new RecordMap(fields.collectEntries { field -> + [field.name, sanitizeValue(field.get(value))] + }) + } + return value + } + @PackageScope static String dumpMap( Map map ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy index a965880259..af0115a29b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy @@ -16,6 +16,7 @@ package nextflow.processor +import java.lang.reflect.Modifier import java.nio.file.Path import java.util.regex.Matcher import java.util.regex.Pattern @@ -32,6 +33,7 @@ import nextflow.file.LogicalDataPath import nextflow.script.ScriptType import nextflow.script.params.FileInParam import nextflow.script.params.v2.ProcessFileInput +import nextflow.script.types.Record import nextflow.util.ArrayBag import nextflow.util.BlankSeparatedList import nextflow.util.RecordMap @@ -111,14 +113,47 @@ class TaskInputResolver { return value.collect { el -> normalizeValue(el, holders) } } + if( value instanceof RecordMap ) { + final normalized = new LinkedHashMap() + for( final entry : value.entrySet() ) + normalized.put(entry.key, normalizeValue(entry.value, holders)) + return new RecordMap(normalized) + } + if( value instanceof Map ) { final normalized = value.collectEntries { k, v -> [k, normalizeValue(v, holders)] } - return value instanceof RecordMap ? new RecordMap(normalized as Map) : normalized + return normalized + } + + if( value instanceof Record ) { + return normalizeRecord(value, holders) } return value } + private Object normalizeRecord(Record value, Map holders) { + final normalized = new LinkedHashMap() + final fields = value.getClass().getFields() + .findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic } + .sort { it.name } + + for( final field : fields ) + normalized.put(field.name, normalizeValue(field.get(value), holders)) + + try { + final result = value.getClass().getDeclaredConstructor().newInstance() + for( final field : fields ) { + field.set(result, normalized[field.name]) + } + return result + } + catch( Exception e ) { + log.debug "Unable to normalize record as ${value.getClass().name}; using RecordMap instead", e + return new RecordMap(normalized) + } + } + private Path normalizePath(Path value, Map holders) { return holders.containsKey(value) ? new TaskPath(holders[value]) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/SerializationHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/SerializationHelper.groovy index 05f9af69f0..ffc058f652 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/SerializationHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/SerializationHelper.groovy @@ -182,7 +182,7 @@ class KryoHelper { } finally { if( prev ) { - kryo.setClassLoader(loader) + kryo.setClassLoader(prev) } } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskContextTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskContextTest.groovy index 8c56a42e91..7c4e0d2c2e 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskContextTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskContextTest.groovy @@ -17,6 +17,7 @@ package nextflow.processor import java.nio.file.Files +import java.nio.file.Path import java.nio.file.Paths import groovy.runtime.metaclass.ExtensionProvider @@ -32,6 +33,7 @@ import nextflow.script.ScriptMeta import nextflow.util.BlankSeparatedList import nextflow.util.Duration import nextflow.util.MemoryUnit +import nextflow.util.RecordMap import spock.lang.Specification /** * @@ -39,6 +41,11 @@ import spock.lang.Specification */ class TaskContextTest extends Specification { + static class SampleRecord implements nextflow.script.types.Record { + public String id + public Path path + } + def setupSpec() { NF.init() } @@ -80,6 +87,27 @@ class TaskContextTest extends Specification { } + def 'should serialize typed records as record maps' () { + setup: + def processor = Mock(TaskProcessor) { + getTaskBody() >> new BodyDef(null,'source') + } + def path = Paths.get('/some/input.txt') + def map = new TaskContext(processor, [:]) + map.sample = new SampleRecord(id: 'alpha', path: path) + map.samples = [new SampleRecord(id: 'beta', path: path)] + + when: + def buffer = map.serialize() + def result = TaskContext.deserialize(processor, buffer) + + then: + result.sample instanceof RecordMap + result.sample == new RecordMap(id: 'alpha', path: path) + result.samples[0] instanceof RecordMap + result.samples[0] == new RecordMap(id: 'beta', path: path) + } + def 'should dehydrate rehydrate'() { setup: diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy index 103375a006..6e5eab0300 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy @@ -40,6 +40,12 @@ import spock.lang.Unroll */ class TaskInputResolverTest extends Specification { + static class SampleRecord implements nextflow.script.types.Record { + public String id + public Path path + public List paths + } + def holdersMap(List holders) { final result = [:] for( final holder : holders ) @@ -172,6 +178,26 @@ class TaskInputResolverTest extends Specification { task.context.input.bam.toString() == 'input.bam' } + def 'should normalize typed record fields' () { + given: + def resolver = new TaskInputResolver(Mock(TaskRun), Mock(FilePorter.Batch), Mock(Executor)) + def source = Path.of('/some/input.txt') + def holder = FileHolder.get(source, 'input.txt') + def record = new SampleRecord(id: 'alpha', path: source, paths: [source]) + + when: + def result = resolver.normalizeValue(record, holdersMap([holder])) + + then: + result instanceof SampleRecord + !result.is(record) + result.id == 'alpha' + result.path instanceof TaskPath + result.path.toString() == 'input.txt' + result.paths[0] instanceof TaskPath + result.paths[0].toString() == 'input.txt' + } + def 'should return single item or collection'() { setup: diff --git a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java index 8c14e282cb..5e14d7c241 100644 --- a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java +++ b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java @@ -19,6 +19,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -29,6 +31,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -47,6 +50,7 @@ import nextflow.extension.FilesEx; import nextflow.io.SerializableMarker; import nextflow.script.types.Bag; +import nextflow.script.types.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static nextflow.Const.DEFAULT_ROOT; @@ -149,20 +153,19 @@ else if( value instanceof Object[]) { else if( value instanceof CacheFunnel ) ((CacheFunnel)value).funnel(hasher, mode); - else if( value instanceof Map ) - hashUnorderedCollection(hasher, ((Map) value).entrySet(), mode); + else if( value instanceof Map map ) + hashUnorderedCollection(hasher, map.entrySet(), mode, basePath); - else if( value instanceof Map.Entry ) { - Map.Entry entry = (Map.Entry)value; + else if( value instanceof Map.Entry entry ) { with(entry.getKey()); with(entry.getValue()); } - else if( value instanceof Bag || value instanceof Set ) - hashUnorderedCollection(hasher, (Collection) value, mode); + else if( value instanceof Bag || value instanceof Set ) + hashUnorderedCollection(hasher, (Collection) value, mode, basePath); - else if( value instanceof Collection) - for( Object item : ((Collection)value) ) + else if( value instanceof Collection collection) + for( Object item : collection ) with(item); else if( value instanceof Path ) @@ -179,6 +182,9 @@ else if( value instanceof UUID ) { else if( value instanceof VersionNumber ) hasher.putInt( value.hashCode() ); + else if( value instanceof Record ) + hashUnorderedCollection(hasher, recordFields((Record)value).entrySet(), mode, basePath); + else if( value instanceof SerializableMarker) hasher.putInt( value.hashCode() ); @@ -461,11 +467,11 @@ static HashCode hashContent( Path file, HashFunction function ) { return hashFileContent(hasher, file).hash(); } - static private Hasher hashUnorderedCollection(Hasher hasher, Collection collection, HashMode mode) { + static private Hasher hashUnorderedCollection(Hasher hasher, Collection collection, HashMode mode, Path basePath) { byte[] resultBytes = new byte[HASH_BYTES]; for (Object item : collection) { // hash ghe collection item - byte[] nextBytes = hashBytes(item, mode); + byte[] nextBytes = hashBytes(item, mode, basePath); // sum the hash bytes to the "resultBytes" accumulator // since the sum is a commutative operation the order does not matter sumBytes(resultBytes, nextBytes); @@ -475,7 +481,33 @@ static private Hasher hashUnorderedCollection(Hasher hasher, Collection collecti } static private byte[] hashBytes(Object item, HashMode mode) { - return hasher(defaultHasher(), item, mode).hash().asBytes(); + return hashBytes(item, mode, null); + } + + static private byte[] hashBytes(Object item, HashMode mode, Path basePath) { + return new HashBuilder() + .withHasher(defaultHasher()) + .withMode(mode) + .withBasePath(basePath) + .with(item) + .getHasher() + .hash() + .asBytes(); + } + + static private Map recordFields(Record record) { + final Map result = new TreeMap<>(); + for( Field field : record.getClass().getFields() ) { + if( Modifier.isStatic(field.getModifiers()) || field.isSynthetic() ) + continue; + try { + result.put(field.getName(), field.get(record)); + } + catch( IllegalAccessException e ) { + throw new IllegalStateException("Unable to access record field: " + field.getName(), e); + } + } + return result; } /** diff --git a/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy b/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy index 7b156e9a9c..48dd84627b 100644 --- a/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy +++ b/modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy @@ -17,11 +17,13 @@ package nextflow.util import java.nio.file.Files +import java.nio.file.Path import java.nio.file.Paths import com.google.common.hash.Hashing import nextflow.Global import nextflow.Session +import nextflow.script.types.Record import org.apache.commons.codec.digest.DigestUtils import spock.lang.Specification import test.TestHelper @@ -31,6 +33,15 @@ import test.TestHelper */ class HashBuilderTest extends Specification { + static class SampleRecord implements Record { + public String id + public Path path + } + + static class OtherSampleRecord implements Record { + public String id + public Path path + } def testHashContent() { setup: @@ -122,6 +133,19 @@ class HashBuilderTest extends Specification { HashBuilder.hashFileSha256Impl0(file) == DigestUtils.sha256Hex(file.bytes) } + def 'should hash records structurally'() { + given: + def file = TestHelper.createInMemTempFile('foo', 'Hello world') + def record = new SampleRecord(id: 'alpha', path: file) + def sameRecord = new OtherSampleRecord(id: 'alpha', path: file) + def differentRecord = new SampleRecord(id: 'beta', path: file) + + expect: + new HashBuilder().with(record).build() == new HashBuilder().with(sameRecord).build() + new HashBuilder().with(record).build() == new HashBuilder().with(new RecordMap(id: 'alpha', path: file)).build() + new HashBuilder().with(record).build() != new HashBuilder().with(differentRecord).build() + } + def 'should hash dir content with sha256'() { given: def folder = TestHelper.createInMemTempDir() diff --git a/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java b/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java index 9c032dc65b..d7b95dc2ba 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java +++ b/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java @@ -26,6 +26,7 @@ import nextflow.script.ast.RecordNode; import nextflow.script.ast.ScriptNode; import nextflow.script.ast.TupleParameter; +import nextflow.script.types.Bag; import org.codehaus.groovy.ast.ClassHelper; import org.codehaus.groovy.ast.ClassNode; import org.codehaus.groovy.ast.CodeVisitorSupport; @@ -152,28 +153,49 @@ private void visitProcessInputType(Variable param, Expression target, BlockState var stager = stmt(callThisX("stageAs", args(closureX(stmt(target))))); stagers.addStatement(stager); } - else if( isRecordType(cn) ) { - for( var fn : cn.getFields() ) + else if( cn != null && cn.redirect() instanceof RecordNode rn ) { + for( var fn : rn.getFields() ) visitProcessInputType(fn, propX(target, fn.getName()), stagers); } } private static boolean isPathType(ClassNode cn) { - if( !cn.isResolved() ) + if( cn == null ) return false; - var clazz = cn.getTypeClass(); - if( Path.class.isAssignableFrom(clazz) ) { + + if( cn.isResolved() && Path.class.isAssignableFrom(cn.getTypeClass()) ) return true; + + if( isPathCollectionType(cn) && cn.isUsingGenerics() ) { + var generics = cn.getGenericsTypes(); + if( generics != null && generics.length > 0 ) + return isPathType(generics[0].getType()); } - if( Collection.class.isAssignableFrom(clazz) && cn.isUsingGenerics() ) { - var elementType = cn.getGenericsTypes()[0].getType(); - return Path.class.isAssignableFrom(elementType.getTypeClass()); + + // For unresolved Path references, fall back to the simple or fully qualified name. + if( !cn.isResolved() ) { + var name = cn.getNameWithoutPackage(); + return "Path".equals(name) || Path.class.getName().equals(cn.getName()); } + return false; } - private static boolean isRecordType(ClassNode cn) { - return cn.redirect() instanceof RecordNode; + private static boolean isPathCollectionType(ClassNode cn) { + if( cn == null ) + return false; + + if( cn.isResolved() ) { + var cls = cn.getTypeClass(); + return Collection.class.isAssignableFrom(cls) || Bag.class.isAssignableFrom(cls); + } + + // For unresolved collection-like references, fall back to the simple name. + var name = cn.getNameWithoutPackage(); + return "List".equals(name) + || "Set".equals(name) + || "Collection".equals(name) + || "Bag".equals(name); } private void visitProcessUnstagers(Statement outputs, ProcessUnstageVisitor visitor) { diff --git a/tests/checks/typed-record-staging.nf/.checks b/tests/checks/typed-record-staging.nf/.checks new file mode 100644 index 0000000000..326e654f1a --- /dev/null +++ b/tests/checks/typed-record-staging.nf/.checks @@ -0,0 +1,15 @@ +set -e + +export NXF_SYNTAX_PARSER=v2 + +$NXF_RUN | tee stdout +[[ $(grep INFO .nextflow.log | grep -c 'Submitted process > CHECK_SAMPLE') == 1 ]] +grep 'record input localized' stdout + +TASK_DIR=$($NXF_CMD log last -F "process == 'CHECK_SAMPLE'") +grep "cat 'input.txt' > out.txt" "$TASK_DIR/.command.sh" +test -e "$TASK_DIR/input.txt" + +$NXF_RUN -resume | tee stdout +[[ $(grep INFO .nextflow.log | grep -c 'Cached process > CHECK_SAMPLE') == 1 ]] +grep 'record input localized' stdout \ No newline at end of file diff --git a/tests/typed-record-staging.nf b/tests/typed-record-staging.nf new file mode 100644 index 0000000000..cafef334d0 --- /dev/null +++ b/tests/typed-record-staging.nf @@ -0,0 +1,31 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +nextflow.enable.types = true + +include { SampleRecord } from './typed-record-staging/records/main.nf' +include { CHECK_SAMPLE } from './typed-record-staging/modules/check/main.nf' + +workflow { + sample = record( + id: 'alpha', + path: file("${projectDir}/typed-record-staging/input.txt") + ) + + result = CHECK_SAMPLE(channel.of(sample)) + result.view { it -> it.text.trim() } +} diff --git a/tests/typed-record-staging/input.txt b/tests/typed-record-staging/input.txt new file mode 100644 index 0000000000..8c0ef1a075 --- /dev/null +++ b/tests/typed-record-staging/input.txt @@ -0,0 +1 @@ +record input localized \ No newline at end of file diff --git a/tests/typed-record-staging/modules/check/main.nf b/tests/typed-record-staging/modules/check/main.nf new file mode 100644 index 0000000000..466e3ef274 --- /dev/null +++ b/tests/typed-record-staging/modules/check/main.nf @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +nextflow.enable.types = true + +include { SampleRecord } from '../../records/main.nf' + +process CHECK_SAMPLE { + input: + sample: SampleRecord + + output: + file('out.txt') + + script: + """ + test '${sample.path}' = 'input.txt' + test -f '${sample.path}' + cat '${sample.path}' > out.txt + """ +} diff --git a/tests/typed-record-staging/records/main.nf b/tests/typed-record-staging/records/main.nf new file mode 100644 index 0000000000..c3a6e64eac --- /dev/null +++ b/tests/typed-record-staging/records/main.nf @@ -0,0 +1,20 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +record SampleRecord { + id: String + path: Path +}