Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import nextflow.NF
import nextflow.script.ScriptMeta

import java.lang.reflect.Modifier
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.Files
Expand All @@ -33,7 +34,9 @@ import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Global
import nextflow.exception.ProcessException
import nextflow.script.ScriptBinding
import nextflow.script.types.Record
import nextflow.util.KryoHelper
import nextflow.util.RecordMap
import nextflow.util.TestOnly
/**
* Map used to delegate variable resolution to script scope
Expand Down Expand Up @@ -196,7 +199,7 @@ class TaskContext implements Map<String,Object>, Cloneable {
map.remove(TaskProcessor.TASK_CONTEXT_PROPERTY_NAME)
}

return KryoHelper.serialize(map)
return KryoHelper.serialize(sanitizeValue(map))
}
catch( Exception e ) {
log.warn "Cannot serialize context map. Cause: ${e.cause} -- Resume will not work on this process"
Expand All @@ -210,6 +213,25 @@ class TaskContext implements Map<String,Object>, Cloneable {
new TaskContext(processor, map)
}

private static Object sanitizeValue(Object value) {
if( value instanceof Map ) {
final normalized = value.collectEntries { k, v -> [k, sanitizeValue(v)] }
return value instanceof RecordMap ? new RecordMap(normalized) : normalized
}
if( value instanceof Collection ) {
return value.collect { item -> sanitizeValue(item) }
}
if( value instanceof Record ) {
final fields = value.getClass().getFields()
.findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic }
.sort { it.name }
return new RecordMap(fields.collectEntries { field ->
[field.name, sanitizeValue(field.get(value))]
})
}
return value
}


@PackageScope
static String dumpMap( Map map ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow.processor

import java.lang.reflect.Modifier
import java.nio.file.Path
import java.util.regex.Matcher
import java.util.regex.Pattern
Expand All @@ -32,6 +33,7 @@ import nextflow.file.LogicalDataPath
import nextflow.script.ScriptType
import nextflow.script.params.FileInParam
import nextflow.script.params.v2.ProcessFileInput
import nextflow.script.types.Record
import nextflow.util.ArrayBag
import nextflow.util.BlankSeparatedList
import nextflow.util.RecordMap
Expand Down Expand Up @@ -111,14 +113,47 @@ class TaskInputResolver {
return value.collect { el -> normalizeValue(el, holders) }
}

if( value instanceof RecordMap ) {
final normalized = new LinkedHashMap<String,Object>()
for( final entry : value.entrySet() )
normalized.put(entry.key, normalizeValue(entry.value, holders))
return new RecordMap(normalized)
}

if( value instanceof Map ) {
final normalized = value.collectEntries { k, v -> [k, normalizeValue(v, holders)] }
return value instanceof RecordMap ? new RecordMap(normalized as Map<String,?>) : normalized
return normalized
}

if( value instanceof Record ) {
return normalizeRecord(value, holders)
}

return value
}

private Object normalizeRecord(Record value, Map<Path,FileHolder> holders) {
final normalized = new LinkedHashMap<String,Object>()
final fields = value.getClass().getFields()
.findAll { field -> !Modifier.isStatic(field.modifiers) && !field.synthetic }
.sort { it.name }

for( final field : fields )
normalized.put(field.name, normalizeValue(field.get(value), holders))

try {
final result = value.getClass().getDeclaredConstructor().newInstance()
for( final field : fields ) {
field.set(result, normalized[field.name])
}
return result
}
catch( Exception e ) {
log.debug "Unable to normalize record as ${value.getClass().name}; using RecordMap instead", e
return new RecordMap(normalized)
}
}

private Path normalizePath(Path value, Map<Path,FileHolder> holders) {
return holders.containsKey(value)
? new TaskPath(holders[value])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class KryoHelper {
}
finally {
if( prev ) {
kryo.setClassLoader(loader)
kryo.setClassLoader(prev)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nextflow.processor

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

import groovy.runtime.metaclass.ExtensionProvider
Expand All @@ -32,13 +33,19 @@ import nextflow.script.ScriptMeta
import nextflow.util.BlankSeparatedList
import nextflow.util.Duration
import nextflow.util.MemoryUnit
import nextflow.util.RecordMap
import spock.lang.Specification
/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class TaskContextTest extends Specification {

static class SampleRecord implements nextflow.script.types.Record {
public String id
public Path path
}

def setupSpec() {
NF.init()
}
Expand Down Expand Up @@ -80,6 +87,27 @@ class TaskContextTest extends Specification {

}

def 'should serialize typed records as record maps' () {
setup:
def processor = Mock(TaskProcessor) {
getTaskBody() >> new BodyDef(null,'source')
}
def path = Paths.get('/some/input.txt')
def map = new TaskContext(processor, [:])
map.sample = new SampleRecord(id: 'alpha', path: path)
map.samples = [new SampleRecord(id: 'beta', path: path)]

when:
def buffer = map.serialize()
def result = TaskContext.deserialize(processor, buffer)

then:
result.sample instanceof RecordMap
result.sample == new RecordMap(id: 'alpha', path: path)
result.samples[0] instanceof RecordMap
result.samples[0] == new RecordMap(id: 'beta', path: path)
}

def 'should dehydrate rehydrate'() {

setup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ import spock.lang.Unroll
*/
class TaskInputResolverTest extends Specification {

static class SampleRecord implements nextflow.script.types.Record {
public String id
public Path path
public List<Path> paths
}

def holdersMap(List<FileHolder> holders) {
final result = [:]
for( final holder : holders )
Expand Down Expand Up @@ -172,6 +178,26 @@ class TaskInputResolverTest extends Specification {
task.context.input.bam.toString() == 'input.bam'
}

def 'should normalize typed record fields' () {
given:
def resolver = new TaskInputResolver(Mock(TaskRun), Mock(FilePorter.Batch), Mock(Executor))
def source = Path.of('/some/input.txt')
def holder = FileHolder.get(source, 'input.txt')
def record = new SampleRecord(id: 'alpha', path: source, paths: [source])

when:
def result = resolver.normalizeValue(record, holdersMap([holder]))

then:
result instanceof SampleRecord
!result.is(record)
result.id == 'alpha'
result.path instanceof TaskPath
result.path.toString() == 'input.txt'
result.paths[0] instanceof TaskPath
result.paths[0].toString() == 'input.txt'
}

def 'should return single item or collection'() {

setup:
Expand Down
54 changes: 43 additions & 11 deletions modules/nf-commons/src/main/nextflow/util/HashBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -29,6 +31,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

Expand All @@ -47,6 +50,7 @@
import nextflow.extension.FilesEx;
import nextflow.io.SerializableMarker;
import nextflow.script.types.Bag;
import nextflow.script.types.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static nextflow.Const.DEFAULT_ROOT;
Expand Down Expand Up @@ -149,20 +153,19 @@ else if( value instanceof Object[]) {
else if( value instanceof CacheFunnel )
((CacheFunnel)value).funnel(hasher, mode);

else if( value instanceof Map )
hashUnorderedCollection(hasher, ((Map) value).entrySet(), mode);
else if( value instanceof Map<?,?> map )
hashUnorderedCollection(hasher, map.entrySet(), mode, basePath);

else if( value instanceof Map.Entry ) {
Map.Entry entry = (Map.Entry)value;
else if( value instanceof Map.Entry<?,?> entry ) {
with(entry.getKey());
with(entry.getValue());
}

else if( value instanceof Bag || value instanceof Set )
hashUnorderedCollection(hasher, (Collection) value, mode);
else if( value instanceof Bag<?> || value instanceof Set<?> )
hashUnorderedCollection(hasher, (Collection<?>) value, mode, basePath);

else if( value instanceof Collection)
for( Object item : ((Collection)value) )
else if( value instanceof Collection<?> collection)
for( Object item : collection )
with(item);

else if( value instanceof Path )
Expand All @@ -179,6 +182,9 @@ else if( value instanceof UUID ) {
else if( value instanceof VersionNumber )
hasher.putInt( value.hashCode() );

else if( value instanceof Record )
hashUnorderedCollection(hasher, recordFields((Record)value).entrySet(), mode, basePath);

else if( value instanceof SerializableMarker)
hasher.putInt( value.hashCode() );

Expand Down Expand Up @@ -461,11 +467,11 @@ static HashCode hashContent( Path file, HashFunction function ) {
return hashFileContent(hasher, file).hash();
}

static private Hasher hashUnorderedCollection(Hasher hasher, Collection collection, HashMode mode) {
static private Hasher hashUnorderedCollection(Hasher hasher, Collection<?> collection, HashMode mode, Path basePath) {
byte[] resultBytes = new byte[HASH_BYTES];
for (Object item : collection) {
// hash ghe collection item
byte[] nextBytes = hashBytes(item, mode);
byte[] nextBytes = hashBytes(item, mode, basePath);
// sum the hash bytes to the "resultBytes" accumulator
// since the sum is a commutative operation the order does not matter
sumBytes(resultBytes, nextBytes);
Expand All @@ -475,7 +481,33 @@ static private Hasher hashUnorderedCollection(Hasher hasher, Collection collecti
}

static private byte[] hashBytes(Object item, HashMode mode) {
return hasher(defaultHasher(), item, mode).hash().asBytes();
return hashBytes(item, mode, null);
}

static private byte[] hashBytes(Object item, HashMode mode, Path basePath) {
return new HashBuilder()
.withHasher(defaultHasher())
.withMode(mode)
.withBasePath(basePath)
.with(item)
.getHasher()
.hash()
.asBytes();
}

static private Map<String,Object> recordFields(Record record) {
final Map<String,Object> result = new TreeMap<>();
for( Field field : record.getClass().getFields() ) {
if( Modifier.isStatic(field.getModifiers()) || field.isSynthetic() )
continue;
try {
result.put(field.getName(), field.get(record));
}
catch( IllegalAccessException e ) {
throw new IllegalStateException("Unable to access record field: " + field.getName(), e);
}
}
return result;
}

/**
Expand Down
24 changes: 24 additions & 0 deletions modules/nf-commons/src/test/nextflow/util/HashBuilderTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package nextflow.util

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

import com.google.common.hash.Hashing
import nextflow.Global
import nextflow.Session
import nextflow.script.types.Record
import org.apache.commons.codec.digest.DigestUtils
import spock.lang.Specification
import test.TestHelper
Expand All @@ -31,6 +33,15 @@ import test.TestHelper
*/
class HashBuilderTest extends Specification {

static class SampleRecord implements Record {
public String id
public Path path
}

static class OtherSampleRecord implements Record {
public String id
public Path path
}

def testHashContent() {
setup:
Expand Down Expand Up @@ -122,6 +133,19 @@ class HashBuilderTest extends Specification {
HashBuilder.hashFileSha256Impl0(file) == DigestUtils.sha256Hex(file.bytes)
}

def 'should hash records structurally'() {
given:
def file = TestHelper.createInMemTempFile('foo', 'Hello world')
def record = new SampleRecord(id: 'alpha', path: file)
def sameRecord = new OtherSampleRecord(id: 'alpha', path: file)
def differentRecord = new SampleRecord(id: 'beta', path: file)

expect:
new HashBuilder().with(record).build() == new HashBuilder().with(sameRecord).build()
new HashBuilder().with(record).build() == new HashBuilder().with(new RecordMap(id: 'alpha', path: file)).build()
new HashBuilder().with(record).build() != new HashBuilder().with(differentRecord).build()
}

def 'should hash dir content with sha256'() {
given:
def folder = TestHelper.createInMemTempDir()
Expand Down
Loading
Loading