Skip to content

Commit b4d8777

Browse files
pditommasoclaude
andcommitted
Add versioned task hasher strategy
Refactor TaskHasher into a configurable strategy pattern with versioned implementations (V1, V2) selectable via the NXF_TASK_HASH_VER environment variable. V2 (current behavior) is the default. - TaskHasher: now an interface defining compute(), getTaskGlobalVars(), getTaskBinEntries() - TaskHasherV1: original hashing (Maps by values, CacheFunnel after SerializableMarker) - TaskHasherV2: current hashing (Maps by unordered entrySet, CacheFunnel before Map) - TaskHasherFactory: Version enum + create() factory method - HashBuilder: version-aware with branching for Map/CacheFunnel handling - Session: caches resolved hash strategy version at startup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent 021c77c commit b4d8777

10 files changed

Lines changed: 438 additions & 239 deletions

File tree

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import nextflow.plugin.Plugins
5959
import nextflow.processor.ErrorStrategy
6060
import nextflow.processor.TaskFault
6161
import nextflow.processor.TaskHandler
62+
import nextflow.processor.TaskHasherFactory
6263
import nextflow.processor.TaskProcessor
6364
import nextflow.script.BaseScript
6465
import nextflow.script.ProcessFactory
@@ -179,6 +180,11 @@ class Session implements ISession {
179180
*/
180181
String runName
181182

183+
/**
184+
* The task hash strategy version
185+
*/
186+
TaskHasherFactory.Version hashStrategy
187+
182188
/**
183189
* Enable stub run mode
184190
*/
@@ -385,6 +391,9 @@ class Session implements ISession {
385391
this.runName = config.runName ?: NameGenerator.next()
386392
log.debug "Run name: $runName"
387393

394+
// -- hash strategy
395+
this.hashStrategy = TaskHasherFactory.Version.DEFAULT()
396+
388397
// -- dry run
389398
this.stubRun = config.stubRun
390399

modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy

Lines changed: 5 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -18,240 +18,18 @@ package nextflow.processor
1818
import java.nio.file.Path
1919

2020
import com.google.common.hash.HashCode
21-
import groovy.json.JsonOutput
2221
import groovy.transform.CompileStatic
23-
import groovy.transform.Memoized
24-
import groovy.util.logging.Slf4j
25-
import nextflow.Session
26-
import nextflow.exception.UnexpectedException
27-
import nextflow.util.CacheHelper
2822
/**
29-
* Implement task hash computation
23+
* Define the interface for task hash computation
3024
*
3125
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
3226
*/
33-
@Slf4j
3427
@CompileStatic
35-
class TaskHasher {
28+
interface TaskHasher {
3629

37-
private TaskRun task
30+
HashCode compute()
3831

39-
private TaskProcessor processor
32+
Map<String,Object> getTaskGlobalVars()
4033

41-
private Session session
42-
43-
public TaskHasher(TaskRun task) {
44-
this.task = task
45-
this.processor = task.processor
46-
this.session = task.processor.session
47-
}
48-
49-
public HashCode compute() {
50-
51-
final keys = new ArrayList<Object>()
52-
53-
// add session UUID
54-
keys << session.uniqueId
55-
56-
// add fully-qualified process name
57-
keys << task.processor.name
58-
59-
// add source code of `script:` or `exec:` block
60-
//
61-
// - this allows task script to reference directives like `task.cpus`
62-
// without invalidating the cache
63-
//
64-
// - references to local variables, global variables, and `task.ext`
65-
// are included separately
66-
keys << task.source
67-
68-
// add container fingerprint if present
69-
if( task.isContainerEnabled() )
70-
keys << task.getContainerFingerprint()
71-
72-
// add the name and value of each task input
73-
for( final entry : task.inputs ) {
74-
keys.add( entry.key.name )
75-
keys.add( entry.value )
76-
}
77-
78-
// add eval output commands
79-
final outEvals = task.getOutputEvals()
80-
if( outEvals ) {
81-
keys.add("eval_outputs")
82-
keys.add(computeEvalOutputCommands(outEvals))
83-
}
84-
85-
// add variables referenced in the task script but not declared as input/output
86-
def vars = getTaskGlobalVars()
87-
if( vars ) {
88-
log.trace "Task: ${task.processor.name} > Adding script vars hash code: ${vars}"
89-
keys.add(vars.entrySet())
90-
}
91-
92-
// add bin scripts referenced in the task script
93-
final binEntries = getTaskBinEntries(task.source)
94-
if( binEntries ) {
95-
log.trace "Task: ${task.processor.name} > Adding scripts on project bin path: ${-> binEntries.join('; ')}"
96-
keys.addAll(binEntries)
97-
}
98-
99-
// add environment modules (`module` directive)
100-
final modules = task.getConfig().getModule()
101-
if( modules ) {
102-
keys.addAll(modules)
103-
}
104-
105-
// add conda packages (`conda` directive)
106-
final conda = task.getCondaEnv()
107-
if( conda ) {
108-
keys.add(conda)
109-
}
110-
111-
// add spack packages (`spack` and `arch` directives)
112-
final spack = task.getSpackEnv()
113-
final arch = task.getConfig().getArchitecture()
114-
115-
if( spack ) {
116-
keys.add(spack)
117-
118-
if( arch ) {
119-
keys.add(arch)
120-
}
121-
}
122-
123-
// add stub run marker if enabled
124-
if( session.stubRun && task.config.getStubBlock() ) {
125-
keys.add('stub-run')
126-
}
127-
128-
// compute task hash
129-
final mode = task.processor.getConfig().getHashMode()
130-
final hash = computeHash(keys, mode)
131-
132-
// log task hash entries if enabled
133-
if( session.dumpHashes ) {
134-
session.dumpHashes == 'json'
135-
? dumpHashEntriesJson(task, keys, mode, hash)
136-
: dumpHashEntriesLegacy(task, keys, mode, hash)
137-
}
138-
139-
return hash
140-
}
141-
142-
/**
143-
* Compute a deterministic string representation of eval output commands for cache hashing.
144-
* This method creates a consistent hash key based on the semantic names and command values
145-
* of eval outputs, ensuring cache invalidation when eval outputs change.
146-
*
147-
* @param outEvals Map of eval parameter names to their command strings
148-
* @return A concatenated string of "name=command" pairs, sorted for deterministic hashing
149-
*/
150-
protected static String computeEvalOutputCommands(Map<String, String> outEvals) {
151-
// Assert precondition that outEvals should not be null or empty when this method is called
152-
assert outEvals != null && !outEvals.isEmpty(), "Eval outputs should not be null or empty"
153-
154-
final result = new StringBuilder()
155-
156-
// Sort entries by key for deterministic ordering. This ensures that the same set of
157-
// eval outputs always produces the same hash regardless of map iteration order,
158-
// which is critical for cache consistency across different JVM runs.
159-
// Without sorting, HashMap iteration order can vary between executions, leading to
160-
// different cache keys for identical eval output configurations and causing
161-
// unnecessary cache misses and task re-execution
162-
final sortedEntries = outEvals.entrySet().sort { a, b -> a.key.compareTo(b.key) }
163-
164-
// Build content using for loop to concatenate "name=command" pairs.
165-
// This creates a symmetric pattern with input parameter hashing where both
166-
// the parameter name and its value contribute to the cache key
167-
for( final entry : sortedEntries ) {
168-
// Add newline separator between entries for readability in debug scenarios
169-
if( result.length() > 0 ) {
170-
result.append('\n')
171-
}
172-
// Format: "semantic_name=bash_command" - both name and command value are
173-
// included because changing either should invalidate the task cache
174-
result.append(entry.key).append('=').append(entry.value)
175-
}
176-
177-
return result.toString()
178-
}
179-
180-
/**
181-
* Get the mapping of global variables that were referenced by
182-
* the task script, excluding references to `task.ext`.
183-
*/
184-
Map<String,Object> getTaskGlobalVars() {
185-
final result = task.getGlobalVars(task.processor.getOwnerScript().getBinding())
186-
final directives = getTaskExtensionDirectiveVars()
187-
result.putAll(directives)
188-
return result
189-
}
190-
191-
protected Map<String,Object> getTaskExtensionDirectiveVars() {
192-
final variableNames = task.getVariableNames()
193-
final result = new HashMap(variableNames.size())
194-
final taskConfig = task.config
195-
for( final key : variableNames ) {
196-
if( !key.startsWith('task.ext.') )
197-
continue
198-
final value = taskConfig.eval(key.substring(5))
199-
result.put(key, value)
200-
}
201-
202-
return result
203-
}
204-
205-
/**
206-
* This method scans the task command string looking for invocations of scripts
207-
* defined in the project bin folder.
208-
*
209-
* @param script The task command string
210-
* @return The list of paths of scripts in the project bin folder referenced in the task command
211-
*/
212-
@Memoized
213-
List<Path> getTaskBinEntries(String script) {
214-
List<Path> result = []
215-
final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`")
216-
while( tokenizer.hasMoreTokens() ) {
217-
final token = tokenizer.nextToken()
218-
final path = session.binEntries.get(token)
219-
if( path )
220-
result.add(path)
221-
}
222-
return result
223-
}
224-
225-
private String safeTaskName(TaskRun task) {
226-
return task != null ? task.lazyName() : task.processor.name
227-
}
228-
229-
private HashCode computeHash(List keys, CacheHelper.HashMode mode) {
230-
try {
231-
return CacheHelper.hasher(keys, mode).hash()
232-
}
233-
catch (Throwable e) {
234-
final msg = "Something went wrong while creating task hash for process '${task.processor.name}' -- Offending keys: ${ keys.collect { k -> "\n - type=${k.getClass().getName()} value=$k" } }"
235-
throw new UnexpectedException(msg,e)
236-
}
237-
}
238-
239-
private void dumpHashEntriesJson(TaskRun task, List entries, CacheHelper.HashMode mode, hash) {
240-
final collector = (item) -> [
241-
hash: CacheHelper.hasher(item, mode).hash().toString(),
242-
type: item?.getClass()?.getName(),
243-
value: item?.toString()
244-
]
245-
final json = JsonOutput.toJson(entries.collect(collector))
246-
log.info "[${safeTaskName(task)}] cache hash: ${hash}; mode: ${mode}; entries: ${JsonOutput.prettyPrint(json)}"
247-
}
248-
249-
private void dumpHashEntriesLegacy(TaskRun task, List entries, CacheHelper.HashMode mode, hash) {
250-
final buffer = new StringBuilder()
251-
buffer.append("[${safeTaskName(task)}] cache hash: ${hash}; mode: $mode; entries: \n")
252-
for( final entry : entries ) {
253-
buffer.append( " ${CacheHelper.hasher(entry, mode).hash()} [${entry?.getClass()?.getName()}] $entry \n")
254-
}
255-
log.info(buffer.toString())
256-
}
34+
List<Path> getTaskBinEntries(String script)
25735
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2013-2026, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package nextflow.processor
17+
18+
import groovy.transform.CompileStatic
19+
import nextflow.SysEnv
20+
/**
21+
* Factory for creating versioned {@link TaskHasher} instances.
22+
*
23+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
24+
*/
25+
@CompileStatic
26+
class TaskHasherFactory {
27+
28+
enum Version {
29+
V1,
30+
V2
31+
32+
static Version DEFAULT() {
33+
final val = SysEnv.get('NXF_TASK_HASH_VER')
34+
return val ? valueOf(val.toUpperCase()) : V2
35+
}
36+
}
37+
38+
static TaskHasher create(TaskRun task) {
39+
final version = task.processor.session.hashStrategy
40+
switch( version ) {
41+
case Version.V1:
42+
return new TaskHasherV1(task)
43+
case Version.V2:
44+
return new TaskHasherV2(task)
45+
default:
46+
throw new IllegalArgumentException("Unknown task hasher version: ${version}")
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)