Skip to content

Commit 475c1c1

Browse files
committed
fix: forward tower auth for dataset http reads
Signed-off-by: Edmund Miller <edmund.miller@seqera.io>
1 parent caf25e6 commit 475c1c1

4 files changed

Lines changed: 204 additions & 8 deletions

File tree

plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java

Lines changed: 132 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.io.InputStream;
2121
import java.net.URI;
22+
import java.net.URLConnection;
2223
import java.nio.channels.SeekableByteChannel;
2324
import java.nio.file.AccessMode;
2425
import java.nio.file.CopyOption;
@@ -36,6 +37,10 @@
3637
import java.nio.file.spi.FileSystemProvider;
3738
import java.util.Map;
3839
import java.util.Set;
40+
import java.util.regex.Pattern;
41+
42+
import nextflow.file.http.XAuthProvider;
43+
import nextflow.file.http.XAuthRegistry;
3944

4045
import org.slf4j.Logger;
4146
import org.slf4j.LoggerFactory;
@@ -55,6 +60,7 @@
5560
public class DatasetFileSystemProvider extends FileSystemProvider {
5661

5762
private static final Logger log = LoggerFactory.getLogger(DatasetFileSystemProvider.class);
63+
private static final Pattern PLATFORM_DATASET_PATH = Pattern.compile(".*?/workspaces/[^/]+/datasets/[^/]+/v/[^/]+/n/.+");
5864

5965
private volatile DatasetFileSystem fileSystem;
6066

@@ -99,13 +105,13 @@ public Path getPath(URI uri) {
99105
public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
100106
final Path resolved = resolvedPath(path);
101107
log.debug("newInputStream for dataset path {} -> {}", path, resolved);
102-
return resolved.getFileSystem().provider().newInputStream(resolved, options);
108+
return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().newInputStream(resolved, options));
103109
}
104110

105111
@Override
106112
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
107113
final Path resolved = resolvedPath(path);
108-
return resolved.getFileSystem().provider().newByteChannel(resolved, options, attrs);
114+
return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().newByteChannel(resolved, options, attrs));
109115
}
110116

111117
@Override
@@ -117,20 +123,20 @@ public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter
117123
@SuppressWarnings("unchecked")
118124
public <A extends BasicFileAttributes> A readAttributes(Path path, Class<A> type, LinkOption... options) throws IOException {
119125
final Path resolved = resolvedPath(path);
120-
return resolved.getFileSystem().provider().readAttributes(resolved, type, options);
126+
return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().readAttributes(resolved, type, options));
121127
}
122128

123129
@Override
124130
public Map<String, Object> readAttributes(Path path, String attributes, LinkOption... options) throws IOException {
125131
final Path resolved = resolvedPath(path);
126-
return resolved.getFileSystem().provider().readAttributes(resolved, attributes, options);
132+
return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().readAttributes(resolved, attributes, options));
127133
}
128134

129135
@Override
130136
public <V extends FileAttributeView> V getFileAttributeView(Path path, Class<V> type, LinkOption... options) {
131137
try {
132138
final Path resolved = resolvedPath(path);
133-
return resolved.getFileSystem().provider().getFileAttributeView(resolved, type, options);
139+
return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().getFileAttributeView(resolved, type, options));
134140
}
135141
catch (IOException e) {
136142
throw new RuntimeException("Failed to resolve dataset path: " + path, e);
@@ -145,7 +151,10 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException {
145151
}
146152
}
147153
final Path resolved = resolvedPath(path);
148-
resolved.getFileSystem().provider().checkAccess(resolved, modes);
154+
withPlatformDatasetAuth(resolved, () -> {
155+
resolved.getFileSystem().provider().checkAccess(resolved, modes);
156+
return null;
157+
});
149158
}
150159

151160
// -- write operations: read-only in phase 1 --
@@ -206,6 +215,86 @@ private DatasetFileSystem getOrCreateFileSystem() {
206215
return fileSystem;
207216
}
208217

218+
private <T> T withPlatformDatasetAuth(Path resolved, IoCallable<T> action) throws IOException {
219+
final XAuthProvider authProvider = authProviderFor(resolved);
220+
if (authProvider == null) {
221+
return action.call();
222+
}
223+
224+
final XAuthRegistry registry = XAuthRegistry.getInstance();
225+
registry.register(authProvider);
226+
try {
227+
return action.call();
228+
}
229+
finally {
230+
registry.unregister(authProvider);
231+
}
232+
}
233+
234+
private XAuthProvider authProviderFor(Path resolved) {
235+
final URI targetUri = resolved.toUri();
236+
if (targetUri == null || !isHttpScheme(targetUri.getScheme())) {
237+
return null;
238+
}
239+
if (!isPlatformDatasetDownloadPath(targetUri.getPath())) {
240+
return null;
241+
}
242+
243+
final String endpoint = DatasetResolver.towerEndpoint();
244+
final String accessToken = DatasetResolver.towerAccessToken();
245+
if (isBlank(endpoint) || isBlank(accessToken)) {
246+
return null;
247+
}
248+
249+
final URI endpointUri = URI.create(endpoint);
250+
if (!isHttpScheme(endpointUri.getScheme()) || !isSameOrigin(endpointUri, targetUri)) {
251+
return null;
252+
}
253+
254+
return new DatasetBearerAuthProvider(endpointUri, accessToken);
255+
}
256+
257+
private boolean isHttpScheme(String scheme) {
258+
return "http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme);
259+
}
260+
261+
private boolean isPlatformDatasetDownloadPath(String path) {
262+
return path != null && PLATFORM_DATASET_PATH.matcher(path).matches();
263+
}
264+
265+
private static boolean isSameOrigin(URI left, URI right) {
266+
if (left.getScheme() == null || right.getScheme() == null) {
267+
return false;
268+
}
269+
if (!left.getScheme().equalsIgnoreCase(right.getScheme())) {
270+
return false;
271+
}
272+
if (left.getHost() == null || right.getHost() == null) {
273+
return false;
274+
}
275+
if (!left.getHost().equalsIgnoreCase(right.getHost())) {
276+
return false;
277+
}
278+
return defaultPort(left) == defaultPort(right);
279+
}
280+
281+
private static int defaultPort(URI uri) {
282+
if (uri.getPort() != -1) {
283+
return uri.getPort();
284+
}
285+
if ("https".equalsIgnoreCase(uri.getScheme())) {
286+
return 443;
287+
}
288+
if ("http".equalsIgnoreCase(uri.getScheme())) {
289+
return 80;
290+
}
291+
return -1;
292+
}
293+
294+
private boolean isBlank(String value) {
295+
return value == null || value.trim().isEmpty();
296+
}
297+
209298
/**
210299
* Resolve a dataset path to its backing cloud storage path.
211300
* Caches the resolved path on the DatasetPath instance.
@@ -216,4 +305,41 @@ private Path resolvedPath(Path path) throws IOException {
216305
}
217306
return ((DatasetPath) path).getResolvedPath();
218307
}
308+
309+
@FunctionalInterface
310+
private interface IoCallable<T> {
311+
T call() throws IOException;
312+
}
313+
314+
private static final class DatasetBearerAuthProvider implements XAuthProvider {
315+
private final URI endpoint;
316+
private final String accessToken;
317+
318+
private DatasetBearerAuthProvider(URI endpoint, String accessToken) {
319+
this.endpoint = endpoint;
320+
this.accessToken = accessToken;
321+
}
322+
323+
@Override
324+
public boolean authorize(URLConnection connection) {
325+
final URI target = URI.create(connection.getURL().toString());
326+
if (!isSameOrigin(endpoint, target)) {
327+
return false;
328+
}
329+
if (!PLATFORM_DATASET_PATH.matcher(target.getPath()).matches()) {
330+
return false;
331+
}
332+
if (connection.getRequestProperty("Authorization") != null) {
333+
return false;
334+
}
335+
336+
connection.setRequestProperty("Authorization", "Bearer " + accessToken);
337+
return true;
338+
}
339+
340+
@Override
341+
public boolean refreshToken(URLConnection connection) {
342+
return false;
343+
}
344+
}
219345
}

plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ import nextflow.platform.PlatformHelper
4444
@CompileStatic
4545
class DatasetResolver {
4646

47+
static String towerEndpoint() {
48+
return getEndpoint()
49+
}
50+
51+
static String towerAccessToken() {
52+
return getAccessToken()
53+
}
54+
4755
/**
4856
* Resolve a dataset name (and optional version) to the backing cloud storage Path.
4957
*

plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.nio.file.Files
2020
import java.nio.file.LinkOption
2121
import java.nio.file.attribute.BasicFileAttributes
2222

23+
import com.github.tomakehurst.wiremock.WireMockServer
2324
import spock.lang.TempDir
2425
import spock.lang.Unroll
2526

@@ -114,6 +115,69 @@ class DatasetIntegrationTest extends DatasetWireMockSpec {
114115
'readAttributes' | 'hello' | { DatasetFileSystemProvider providerRef, dsPath -> providerRef.readAttributes(dsPath, BasicFileAttributes, new LinkOption[0]) } | { attrs -> attrs.size() == 5 && !attrs.isDirectory() && attrs.isRegularFile() }
115116
}
116117

118+
def 'should forward bearer auth when reading platform dataset download URLs'() {
119+
given:
120+
mockSession(workspaceId: '100')
121+
stubDatasets([[id: '7', name: 'secure-ds']], '100')
122+
123+
and:
124+
def downloadPath = '/workspaces/100/datasets/7/v/1/n/data.csv'
125+
def downloadUrl = "http://localhost:${wireMock.port()}${downloadPath}"
126+
stubDatasetVersions('7', [[version: 1, url: downloadUrl, fileName: 'data.csv']], '100')
127+
wireMock.stubFor(get(urlPathEqualTo(downloadPath))
128+
.withHeader('Authorization', equalTo('Bearer test-token'))
129+
.willReturn(ok('secure,data\na,b\n')))
130+
131+
and:
132+
def provider = new DatasetFileSystemProvider()
133+
def path = provider.getPath(new URI('dataset://secure-ds'))
134+
135+
when:
136+
def content = provider.newInputStream(path).text
137+
138+
then:
139+
content == 'secure,data\na,b\n'
140+
141+
and:
142+
wireMock.verify(1, getRequestedFor(urlPathEqualTo(downloadPath))
143+
.withHeader('Authorization', equalTo('Bearer test-token')))
144+
}
145+
146+
def 'should not forward bearer auth to non-platform hosts'() {
147+
given:
148+
def externalHost = new WireMockServer(0)
149+
externalHost.start()
150+
151+
and:
152+
mockSession(workspaceId: '100')
153+
stubDatasets([[id: '7', name: 'external-ds']], '100')
154+
155+
and:
156+
def downloadPath = '/workspaces/100/datasets/7/v/1/n/data.csv'
157+
def downloadUrl = "http://localhost:${externalHost.port()}${downloadPath}"
158+
stubDatasetVersions('7', [[version: 1, url: downloadUrl, fileName: 'data.csv']], '100')
159+
externalHost.stubFor(get(urlPathEqualTo(downloadPath))
160+
.withHeader('Authorization', matching('.+'))
161+
.atPriority(1)
162+
.willReturn(unauthorized()))
163+
externalHost.stubFor(get(urlPathEqualTo(downloadPath))
164+
.atPriority(10)
165+
.willReturn(ok('public,data\nx,y\n')))
166+
167+
and:
168+
def provider = new DatasetFileSystemProvider()
169+
def path = provider.getPath(new URI('dataset://external-ds'))
170+
171+
when:
172+
def content = provider.newInputStream(path).text
173+
174+
then:
175+
content == 'public,data\nx,y\n'
176+
177+
cleanup:
178+
externalHost.stop()
179+
}
180+
117181
def 'should cache resolved path across multiple reads'() {
118182
given:
119183
def dataFile = makeFile('data.csv', 'cached')

plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package io.seqera.tower.plugin.dataset
1818

1919
import nextflow.Global
2020
import nextflow.Session
21-
import spock.lang.PendingFeature
2221
import spock.lang.Requires
2322
import spock.lang.Specification
2423
import spock.lang.Unroll
@@ -38,7 +37,6 @@ class DatasetLiveAuthRegressionTest extends Specification {
3837
Global.session = null
3938
}
4039

41-
@PendingFeature(reason = 'Dataset provider does not yet forward Tower bearer auth to resolved HTTP dataset URLs')
4240
@Unroll
4341
def 'should read live dataset via provider using bearer auth - #datasetName'() {
4442
given:

0 commit comments

Comments
 (0)