Skip to content

Commit da4827b

Browse files
committed
Merge branch 'master' into nf-356-tower-abort-on-error
2 parents f68a439 + 5e8276c commit da4827b

43 files changed

Lines changed: 5060 additions & 1734 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# NIO Filesystem for Seqera Platform Datasets
2+
3+
- Authors: Jorge Ejarque
4+
- Status: draft
5+
- Date: 2026-03-10
6+
- Tags: nio, filesystem, seqera, datasets, nf-tower
7+
8+
Technical Story: Enable Nextflow pipelines to read Seqera Platform datasets as ordinary file paths using `seqera://` URIs.
9+
10+
## Summary
11+
12+
Add a Java NIO `FileSystemProvider` to the `nf-tower` plugin that registers the `seqera://` scheme, allowing pipelines to reference Seqera Platform datasets (CSV/TSV) as standard file paths without manual download steps. The implementation reuses the existing `TowerClient` for all HTTP communication, inheriting authentication and retry behaviour.
13+
14+
## Problem Statement
15+
16+
Nextflow users managing datasets on the Seqera Platform must currently download dataset files manually or through custom scripts before referencing them in pipelines. There is no native integration between Nextflow's file abstraction and the Seqera Platform dataset API. This creates friction in workflows where datasets are the primary input and forces users to handle authentication, versioning, and file staging outside the pipeline definition.
17+
18+
## Goals or Decision Drivers
19+
20+
- Transparent access to Seqera Platform datasets using standard Nextflow file path syntax
21+
- Reuse of existing nf-tower plugin infrastructure (authentication, HTTP client, retry/backoff)
22+
- Hierarchical path browsing matching the platform's org/workspace/dataset structure
23+
- Extensible architecture that can support future Seqera-managed resource types (e.g. data-links)
24+
- No new plugin or module — feature lives within nf-tower
25+
26+
## Non-goals
27+
28+
- Streaming large datasets — the Platform API does not support streaming; content is fully buffered on download
29+
- Implementing resource types beyond `datasets` — only the extensible architecture is required
30+
- Local caching across pipeline runs — Nextflow's standard task staging handles caching
31+
- Dataset management operations (delete, rename) — the filesystem is read-only in the initial implementation
32+
33+
## Considered Options
34+
35+
### Option 1: Standalone plugin with dedicated HTTP client
36+
37+
A new `nf-seqera-fs` plugin with its own HTTP client configuration and authentication setup.
38+
39+
- Good, because it isolates the filesystem code from the nf-tower plugin
40+
- Bad, because it duplicates authentication configuration and HTTP client setup
41+
- Bad, because two separate HTTP clients sharing a refresh token would corrupt each other's auth state
42+
43+
### Option 2: NIO filesystem within nf-tower using TowerClient delegation
44+
45+
Add the filesystem to nf-tower, delegating all HTTP through the existing `TowerClient` singleton via a typed `SeqeraDatasetClient` wrapper.
46+
47+
- Good, because it shares authentication and token refresh with TowerClient
48+
- Good, because it reuses existing retry/backoff configuration
49+
- Good, because no new dependencies are needed
50+
51+
### Option 3: Direct HxClient usage within nf-tower
52+
53+
Add the filesystem to nf-tower but use `HxClient` directly rather than going through TowerClient.
54+
55+
- Good, because it gives full control over request construction
56+
- Bad, because exposing HxClient internals couples the filesystem to implementation details
57+
- Bad, because token refresh coordination with TowerClient becomes manual
58+
59+
## Solution or decision outcome
60+
61+
Option 2 — NIO filesystem within nf-tower using TowerClient delegation. All HTTP calls go through `TowerClient.sendApiRequest()`, ensuring a single point of authentication and retry logic.
62+
63+
## Rationale & discussion
64+
65+
### Path Hierarchy
66+
67+
The `seqera://` path encodes the Platform's organizational structure directly:
68+
69+
```
70+
seqera:// → ROOT (directory, depth 0)
71+
└── <org>/ → ORGANIZATION (directory, depth 1)
72+
└── <workspace>/ → WORKSPACE (directory, depth 2)
73+
└── datasets/ → RESOURCE TYPE (directory, depth 3)
74+
└── <name>[@<version>] → DATASET (file, depth 4)
75+
```
76+
77+
Each level is a directory except the leaf dataset, which is a file. Version pinning uses an `@version` suffix on the dataset name segment (e.g. `seqera://acme/research/datasets/samples@2`). Without it, the latest non-disabled version is resolved.
78+
79+
### Name-to-ID Resolution
80+
81+
The path uses human-readable names but the Platform API requires numeric IDs. Resolution is built from two API calls at filesystem initialization:
82+
83+
1. `GET /user-info` → obtain `userId`
84+
2. `GET /user/{userId}/workspaces` → returns all accessible org/workspace pairs
85+
86+
This single source provides both directory listing content and name→ID mapping. Results are cached in `SeqeraFileSystem` with invalidation on write operations. `GET /orgs` is intentionally not used as it returns all platform orgs, not scoped to user membership.
87+
88+
### Component Structure
89+
90+
```
91+
plugins/nf-tower/src/main/io/seqera/tower/plugin/
92+
├── fs/ ← NIO layer
93+
│ ├── SeqeraFileSystemProvider ← FileSystemProvider (scheme: "seqera")
94+
│ ├── SeqeraFileSystem ← FileSystem with org/workspace/dataset caches
95+
│ ├── SeqeraPath ← Path implementation (depth 0–4)
96+
│ ├── SeqeraFileAttributes ← BasicFileAttributes
97+
│ ├── SeqeraPathFactory ← PF4J FileSystemPathFactory extension
98+
│ └── DatasetInputStream ← SeekableByteChannel over InputStream
99+
├── dataset/ ← API client layer
100+
│ ├── SeqeraDatasetClient ← Typed HTTP client wrapping TowerClient
101+
│ ├── DatasetDto ← Dataset API response model
102+
│ ├── DatasetVersionDto ← Version API response model
103+
│ ├── OrgAndWorkspaceDto ← Org/workspace list model
104+
│ └── WorkspaceOrgDto ← Workspace/org mapping model
105+
└── resources/META-INF/services/
106+
└── java.nio.file.spi.FileSystemProvider
107+
```
108+
109+
### Key Design Decisions
110+
111+
1. **TowerClient delegation**: `SeqeraDatasetClient` delegates all HTTP through `TowerFactory.client()``TowerClient.sendApiRequest()`. This ensures shared authentication state and avoids the token refresh corruption that would occur with separate HTTP client instances.
112+
113+
2. **One filesystem per JVM**: `SeqeraFileSystemProvider` maintains a single `SeqeraFileSystem` keyed by scheme. This matches the `TowerClient` singleton-per-session pattern.
114+
115+
3. **Read-only initial scope**: The filesystem reports `isReadOnly()=true`. Write support (dataset upload via multipart POST) is deferred to a future iteration.
116+
117+
4. **Download filename constraint**: The Platform API's download endpoint (`GET /datasets/{id}/v/{version}/n/{fileName}`) requires the exact filename from upload time. The implementation always resolves `DatasetVersionDto.fileName` from `GET /datasets/{id}/versions` before constructing the download URL.
118+
119+
5. **Extensible resource types**: The path hierarchy reserves depth 3 for a resource type segment (currently only `datasets`). Adding support for data-links or other resource types requires only a new handler at the directory listing and I/O layers, with no changes to path resolution or authentication.
120+
121+
6. **Thread safety**: `SeqeraFileSystem` cache methods and `SeqeraFileSystemProvider` lifecycle methods are `synchronized`. The filesystem map uses `LinkedHashMap` with external synchronization rather than `ConcurrentHashMap`, matching the low-contention access pattern.
122+
123+
### Limitations
124+
125+
- **No size metadata**: `SeqeraFileAttributes.size()` returns 0 for all paths because the Platform API does not expose content length in dataset metadata.
126+
- **Single endpoint per JVM**: The filesystem key is scheme-only; concurrent access to different Platform endpoints in the same JVM is not supported.
127+
128+
### Streaming Downloads
129+
130+
Dataset downloads use `TowerClient.sendStreamingRequest()` which calls `HxClient.sendAsStream()` — the response body is returned as an `InputStream` streamed directly from the HTTP connection. This avoids the triple-buffering problem (`String``getBytes()``ByteArrayInputStream`) that would otherwise consume ~40 MB heap per 10 MB dataset. The `HxClient.sendAsStream()` method goes through the same `sendWithRetry()` path as `sendAsString()`, so retry logic and token refresh are preserved.
131+
132+
## Links
133+
134+
- [Spec](../specs/260310-seqera-dataset-fs/spec.md)
135+
- [Implementation plan](../specs/260310-seqera-dataset-fs/plan.md)
136+
- [Data model](../specs/260310-seqera-dataset-fs/data-model.md)

docs/reference/cli.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,11 +1277,10 @@ The `module` command provides a comprehensive system for managing registry-based
12771277

12781278
(cli-module-run)=
12791279

1280-
`run [options] [namespace/name] [--<input_name> <input-value>]`
1280+
`run [options] [namespace/name | path] [--<input_name> <input-value>]`
12811281

1282-
: Execute a module directly from the registry without creating a wrapper workflow.
1283-
: Automatically downloads the module if not already installed. Accepts all standard Nextflow run options.
1284-
: The `module run` command extends the `run` command and accepts all its options, including `-profile`, `-resume`, `-c`, etc. Command-line params (i.e., `--<input_name>`) are inferred from the module's declared inputs.
1282+
: Execute a module directly. Can be a remote module (`namespace/name`) or a local module path (beginning with `./`, `../`, or `/`). Automatically downloads the module if not already installed.
1283+
: Accepts all standard Nextflow run options, including `-profile`, `-resume`, `-c`, etc. Command-line params (i.e., `--<input_name>`) are inferred from the module's declared inputs.
12851284
: The following additional options are available:
12861285

12871286
`-version`
@@ -1290,15 +1289,20 @@ The `module` command provides a comprehensive system for managing registry-based
12901289
: **Examples:**
12911290

12921291
```console
1293-
# Run module with inputs
1294-
$ nextflow module run nf-core/fastqc --input 'data/*.fastq.gz'
1292+
# Run remote module
1293+
$ nextflow module run nf-core/fastqc \
1294+
--input 'data/*.fastq.gz'
12951295

1296-
# Run specific version with Nextflow options
1296+
# Run remote module with specific version and run options
12971297
$ nextflow module run nf-core/fastqc \
1298-
--input 'data/*.fastq.gz' \
12991298
-version 1.0.0 \
1300-
-profile docker \
1299+
--input 'data/*.fastq.gz' \
1300+
-with-conda \
13011301
-resume
1302+
1303+
# Run local module
1304+
$ nextflow module run ./modules/nf-core/fastqc/main.nf \
1305+
--input 'data/*.fastq.gz'
13021306
```
13031307

13041308
(cli-module-search)=

modules/nextflow/src/main/groovy/nextflow/cli/module/CmdModuleRun.groovy

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,22 @@
1616

1717
package nextflow.cli.module
1818

19+
import java.nio.file.Path
20+
1921
import com.beust.jcommander.Parameter
2022
import com.beust.jcommander.Parameters
2123
import groovy.transform.CompileStatic
2224
import io.seqera.npr.client.RegistryClient
25+
import nextflow.Const
2326
import nextflow.cli.CmdRun
2427
import nextflow.config.ConfigBuilder
2528
import nextflow.config.RegistryConfig
2629
import nextflow.exception.AbortOperationException
2730
import nextflow.module.ModuleReference
28-
import nextflow.module.RegistryClientFactory
2931
import nextflow.module.ModuleResolver
30-
32+
import nextflow.module.RegistryClientFactory
3133
import nextflow.util.TestOnly
3234

33-
import java.nio.file.Path
34-
import java.nio.file.Paths
35-
3635
/**
3736
* Module run subcommand
3837
*
@@ -58,31 +57,53 @@ class CmdModuleRun extends CmdRun {
5857
@Override
5958
void run() {
6059
if( !args ) {
61-
throw new AbortOperationException("Arguments not provided")
60+
throw new AbortOperationException("Module name/path not provided")
61+
}
62+
63+
final moduleFile = isLocalModule(args[0])
64+
? resolveLocalModule(args[0])
65+
: resolveRemoteModule(args[0], version)
66+
67+
if( moduleFile ) {
68+
args[0] = moduleFile.toAbsolutePath().toString()
69+
super.run()
6270
}
71+
}
72+
73+
private boolean isLocalModule(String str) {
74+
return str.startsWith('/') || str.startsWith('./') || str.startsWith('../')
75+
}
76+
77+
private Path resolveLocalModule(String str) {
78+
final module = Path.of(str).toAbsolutePath().normalize()
79+
final path = module.isDirectory() ? module.resolve(Const.DEFAULT_MAIN_FILE_NAME) : module
80+
if( !path.exists() )
81+
throw new AbortOperationException("Invalid module path: ${str}")
82+
return path
83+
}
6384

85+
private Path resolveRemoteModule(String name, String version) {
6486
// Parse and validate module reference
6587
ModuleReference reference
6688
try {
67-
reference = ModuleReference.parse(args[0])
89+
reference = ModuleReference.parse(name)
6890
} catch( Exception e ) {
69-
throw new AbortOperationException("Invalid module reference: ${args[0]}", e)
91+
throw new AbortOperationException("Invalid module reference: ${name}", e)
7092
}
7193

7294
// Get config
73-
def baseDir = root ?: Paths.get('.').toAbsolutePath().normalize()
74-
def config = new ConfigBuilder()
95+
final baseDir = root ?: Path.of('.').toAbsolutePath().normalize()
96+
final config = new ConfigBuilder()
7597
.setOptions(launcher.options)
7698
.setBaseDir(baseDir)
7799
.build()
78100

79-
def registryConfig = config.navigate('registry') as RegistryConfig ?: new RegistryConfig()
80-
81-
def resolver = new ModuleResolver(baseDir, client ?: RegistryClientFactory.forConfig(registryConfig))
82-
Path moduleFile = resolver.installModule(reference, version)
83-
if( moduleFile ) {
84-
args[0] = moduleFile.toAbsolutePath().toString()
85-
super.run()
101+
final registryConfig = new RegistryConfig(config.registry as Map ?: Collections.emptyMap())
102+
try {
103+
final resolver = new ModuleResolver(baseDir, client ?: RegistryClientFactory.forConfig(registryConfig))
104+
return resolver.installModule(reference, version)
105+
} catch( Exception e ) {
106+
throw new AbortOperationException("Unable to install module: ${name}", e)
86107
}
87108
}
88109
}

modules/nextflow/src/test/groovy/nextflow/cli/module/CmdModuleRunTest.groovy

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,73 @@ class CmdModuleRunTest extends Specification {
195195

196196
}
197197

198+
def 'should run module from local path'() {
199+
given:
200+
def moduleScript = '''
201+
process CREATE_LOCAL_FILE {
202+
output:
203+
path "local_output.txt"
204+
205+
script:
206+
"""
207+
echo "Local module executed" > local_output.txt
208+
"""
209+
}
210+
'''.stripIndent()
211+
212+
and:
213+
// Create a local module directory with main.nf
214+
def moduleDir = tempDir.resolve('local-module')
215+
Files.createDirectories(moduleDir)
216+
moduleDir.resolve('main.nf').text = moduleScript
217+
218+
def escapedPath = Pattern.quote(tempDir.toString())
219+
def pattern = ~/"${escapedPath}\/.+\/local_output\.txt"/
220+
221+
and:
222+
def cmd = new CmdModuleRun()
223+
def opts = new CliOptions()
224+
opts.setQuiet(true)
225+
cmd.launcher = Mock(Launcher) {
226+
getOptions() >> opts
227+
getCliString() >> "nextflow module run ${moduleDir}"
228+
}
229+
cmd.args = [moduleDir.toString()]
230+
cmd.root = tempDir
231+
cmd.workDir = tempDir.toString()
232+
cmd.outputDir = tempDir.resolve('results').toString()
233+
cmd.outputFormat = 'json'
234+
235+
when:
236+
cmd.run()
237+
def stdout = capture
238+
.toString()
239+
.readLines()// remove the log part
240+
.findResults { line -> !line.contains('DEBUG') ? line : null }
241+
.findResults { line -> !line.contains('INFO') ? line : null }.join(" ")
242+
243+
then:
244+
assert (stdout =~ pattern).find()
245+
}
246+
247+
def 'should fail when path and module do not exist'() {
248+
given:
249+
def nonExistentPath = tempDir.resolve('does-not-exist').toString()
250+
def cmd = new CmdModuleRun()
251+
cmd.launcher = Mock(Launcher) {
252+
getOptions() >> null
253+
}
254+
cmd.args = [nonExistentPath]
255+
cmd.root = tempDir
256+
257+
when:
258+
cmd.run()
259+
260+
then:
261+
def e = thrown(AbortOperationException)
262+
e.message.contains('Invalid module path')
263+
}
264+
198265
def 'should fail with no arguments'() {
199266
given:
200267
def cmd = new CmdModuleRun()

modules/nf-commons/src/main/nextflow/file/FileHelper.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ class FileHelper {
362362
return asPath(toPathURI(str))
363363
}
364364

365-
static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure']
365+
static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure', seqera:'nf-tower']
366366

367367
static final private Map<String,Boolean> SCHEME_CHECKED = new HashMap<>()
368368

@@ -373,6 +373,7 @@ class FileHelper {
373373
// find out the default plugin for the given scheme and try to load it
374374
final pluginId = PLUGINS_MAP.get(scheme)
375375
if( pluginId ) try {
376+
log.debug "Detected required plugin '$pluginId'"
376377
if( Plugins.startIfMissing(pluginId) ) {
377378
log.debug "Started plugin '$pluginId' required to handle file: $str"
378379
// return true to signal a new plugin was loaded

plugins/nf-seqera/src/main/io/seqera/config/ExecutorOpts.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ExecutorOpts implements ConfigScope {
3636
static final Set<String> VALID_AUTO_LABELS = Collections.unmodifiableSet(new LinkedHashSet<>([
3737
'projectName', 'userName', 'runName', 'sessionId', 'resume',
3838
'revision', 'commitId', 'repository', 'manifestName',
39-
'runtimeVersion', 'workflowId'
39+
'runtimeVersion', 'workflowId', 'workspaceId', 'computeEnvId'
4040
]))
4141

4242
final RetryOpts retryPolicy
@@ -87,7 +87,7 @@ class ExecutorOpts implements ConfigScope {
8787
`['runName', 'projectName']` or `'runName,projectName'`
8888
Valid names: `projectName`, `userName`, `runName`, `sessionId`, `resume`,
8989
`revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`,
90-
`workflowId`.
90+
`workflowId`, `workspaceId`, `computeEnvId`.
9191
""")
9292
final Set<String> autoLabels
9393

0 commit comments

Comments
 (0)