Skip to content
10 changes: 10 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package nextflow

import dev.failsafe.RetryPolicy
import io.seqera.util.retry.Retryable
import nextflow.util.IRetryConfig
import nextflow.util.RetryConfig

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
Expand Down Expand Up @@ -581,6 +586,11 @@ class Session implements ISession {

ScriptBinding getBinding() { binding }

@Override
IRetryConfig getCommonRetryConfig() {
RetryConfig.config(this)
}

@Memoized
ClassLoader getClassLoader() { getClassLoader0() }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import nextflow.Session
@ToString(includePackage = false, includeNames = true)
@EqualsAndHashCode
@CompileStatic
class RetryConfig {
class RetryConfig implements IRetryConfig{

private final static Duration DEFAULT_DELAY = Duration.of('350ms')
private final static Duration DEFAULT_MAX_DELAY = Duration.of('90s')
Expand Down
1 change: 1 addition & 0 deletions modules/nf-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
api 'org.pf4j:pf4j:3.12.0'
api 'org.pf4j:pf4j-update:2.3.0'
api 'dev.failsafe:failsafe:3.1.0'
api 'io.seqera:lib-retry:1.1.0'
// patch gson dependency required by pf4j
api 'com.google.code.gson:gson:2.13.1'

Expand Down
5 changes: 5 additions & 0 deletions modules/nf-commons/src/main/nextflow/ISession.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow

import nextflow.util.IRetryConfig

import java.nio.file.Path

/**
Expand Down Expand Up @@ -66,4 +68,7 @@ interface ISession {

String getCommitId()

IRetryConfig getCommonRetryConfig()
Comment thread
bentsherman marked this conversation as resolved.
Outdated


}
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
package nextflow.plugin

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.FailsafeExecutor
import dev.failsafe.Fallback
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.BuildInfo
import nextflow.Global
import nextflow.util.HttpRetryableClient
import org.pf4j.PluginRuntimeException
import org.pf4j.update.FileDownloader
import org.pf4j.update.FileVerifier
import org.pf4j.update.PluginInfo
import org.pf4j.update.SimpleFileDownloader
import org.pf4j.update.verifier.CompoundVerifier

import java.net.http.HttpClient
Expand All @@ -35,9 +29,9 @@ import java.net.http.HttpResponse
@Slf4j
@CompileStatic
class HttpPluginRepository implements PrefetchUpdateRepository {
private final HttpClient client = HttpClient.newHttpClient()
private final String id
private final URI url
private final HttpRetryableClient retriableHttpClient

private Map<String, PluginInfo> plugins = new HashMap<>()

Expand All @@ -47,6 +41,7 @@ class HttpPluginRepository implements PrefetchUpdateRepository {
this.url = !url.toString().endsWith("/")
? URI.create(url.toString() + "/")
: url
this.retriableHttpClient = HttpRetryableClient.create(HttpClient.newHttpClient(), Global.session?.getCommonRetryConfig())
}

// NOTE ON PREFETCHING
Expand Down Expand Up @@ -120,53 +115,38 @@ class HttpPluginRepository implements PrefetchUpdateRepository {

private Map<String, PluginInfo> fetchMetadata(Collection<PluginSpec> specs) {
final ordered = specs.sort(false)
final CheckedSupplier<Map<String, PluginInfo>> supplier = () -> fetchMetadata0(ordered)
return retry().get(supplier)
return fetchMetadata0(ordered)
}

private Map<String, PluginInfo> fetchMetadata0(List<PluginSpec> specs) {
final gson = new Gson()

def reqBody = gson.toJson([
'nextflowVersion': BuildInfo.version,
'plugins' : specs
])

def pluginsParam = specs.collect { "${it.id}${it.version ? '@' + it.version : ''}" }.join(',')
def uri = url.resolve("v1/plugins/dependencies?plugins=${URLEncoder.encode(pluginsParam, 'UTF-8')}&nextflowVersion=${URLEncoder.encode(BuildInfo.version, 'UTF-8')}")
def req = HttpRequest.newBuilder()
.uri(url.resolve("plugins/collect"))
.POST(HttpRequest.BodyPublishers.ofString(reqBody))
.uri(uri)
.GET()
.build()
try {
return sendAndParse(req)
} catch( ConnectException e ) {
throw new ConnectException("Failed to download plugins metadata")
} catch( Exception e ) {
throw new PluginRuntimeException("Failed to download plugin metadata: ${e.message}")
}
}

def rep = client.send(req, HttpResponse.BodyHandlers.ofString())
if (rep.statusCode() != 200) throw new PluginRuntimeException(errorMessage(rep, gson))

private Map<String, PluginInfo> sendAndParse(HttpRequest req) {
final gson = new Gson()
def rep = retriableHttpClient.send(req, HttpResponse.BodyHandlers.ofString())
if( rep.statusCode() != 200 ) throw new PluginRuntimeException(errorMessage(rep, gson))
try {
def repBody = gson.fromJson(rep.body(), FetchResponse)
return repBody.plugins.collectEntries { p -> Map.entry(p.id, p) }
} catch (Exception e) {
} catch( Exception e ) {
log.info("Plugin metadata response body: '${rep.body()}'")
throw new PluginRuntimeException("Failed to parse response body", e)
}
}

// create a retry executor using failsafe
private static FailsafeExecutor retry() {
EventListener<ExecutionAttemptedEvent> logAttempt = (ExecutionAttemptedEvent attempt) -> {
log.debug("Retrying download of plugins metadata - attempt ${attempt.attemptCount}, ${attempt.lastFailure.message}", attempt.lastFailure)
}
Fallback fallback = Fallback.ofException { e ->
e.lastFailure instanceof ConnectException
? new ConnectException("Failed to download plugins metadata")
: new PluginRuntimeException("Failed to download plugin metadata: ${e.lastFailure.message}")
}
final policy = RetryPolicy.builder()
.withMaxAttempts(3)
.handle(ConnectException)
.onRetry(logAttempt)
.build()
return Failsafe.with(fallback, policy)
}

private static String errorMessage(HttpResponse<String> rep, Gson gson) {
try {
def err = gson.fromJson(rep.body(), ErrorResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ package nextflow.plugin

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.util.HttpRetryableClient
import org.pf4j.update.SimpleFileDownloader

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.regex.Pattern

/**
Expand All @@ -43,18 +49,47 @@ class OciAwareFileDownloader extends SimpleFileDownloader {
*/
@Override
protected Path downloadFileHttp(URL fileUrl) {
def retriableHttpClient = HttpRetryableClient.create(
HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.NEVER)
.connectTimeout(Duration.ofSeconds(30))
.build(),
Global.session?.getCommonRetryConfig()
)


Path destination = Files.createTempDirectory("pf4j-update-downloader")
destination.toFile().deleteOnExit()

String path = fileUrl.getPath()
String fileName = path.substring(path.lastIndexOf('/') + 1)
HttpRequest request = HttpRequest.newBuilder()
.uri(fileUrl.toURI())
.timeout(Duration.ofMinutes(5))
.GET()
.build()

HttpResponse<String> response = retriableHttpClient.send(request, HttpResponse.BodyHandlers.ofString())

// Handle redirects manually because of filename is got from path
if (response.statusCode() in [301, 302, 303]) {
String newUrl = response.headers().firstValue("Location").orElse(null)
if (newUrl) {
log.debug("Managing redirection to $newUrl")
fileUrl = URI.create(newUrl).toURL()
path = fileUrl.getPath()
fileName = path.substring(path.lastIndexOf('/') + 1)
request = HttpRequest.newBuilder()
.uri(fileUrl.toURI())
.timeout(Duration.ofMinutes(5))
.GET()
.build()
response = retriableHttpClient.send(request, HttpResponse.BodyHandlers.ofString())
}
}

Path destination = Files.createTempDirectory("pf4j-update-downloader");
destination.toFile().deleteOnExit();

String path = fileUrl.getPath();
String fileName = path.substring(path.lastIndexOf('/') + 1);
Path file = destination.resolve(fileName);
HttpURLConnection conn = (HttpURLConnection) fileUrl.openConnection()
conn.instanceFollowRedirects = true

if (conn.responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
def wwwAuth = conn.getHeaderField("WWW-Authenticate")
if (response.statusCode() == 401) {
def wwwAuth = response.headers().firstValue("WWW-Authenticate").orElse(null)
if (wwwAuth?.contains("Bearer")) {
log.debug("Received 401 — attempting OCI token auth")

Expand All @@ -68,30 +103,41 @@ class OciAwareFileDownloader extends SimpleFileDownloader {
def token = fetchToken(tokenUrl)

// Retry download with Bearer token
def authConn = (HttpURLConnection) fileUrl.openConnection()
authConn.setRequestProperty("Authorization", "Bearer $token")
authConn.instanceFollowRedirects = true

authConn.inputStream.withStream { input ->
file.withOutputStream { out -> out << input }
}

HttpRequest authRequest = HttpRequest.newBuilder()
.uri(fileUrl.toURI())
.header("Authorization", "Bearer $token")
.timeout(Duration.ofMinutes(5))
.GET()
.build()
HttpResponse<byte[]> authResponse = retriableHttpClient.send(authRequest, HttpResponse.BodyHandlers.ofByteArray())
Path file = destination.resolve(fileName)
Files.write(file, authResponse.body())
return file
}
}

// Fallback to default behavior
conn.inputStream.withStream { input ->
file.withOutputStream { out -> out << input }
}
// Fallback to default behavior - download with initial response
HttpResponse<byte[]> downloadResponse = retriableHttpClient.send(request, HttpResponse.BodyHandlers.ofByteArray())
Path file = destination.resolve(fileName)
Files.write(file, downloadResponse.body())
return file
}

private String fetchToken(String tokenUrl) {
def conn = (HttpURLConnection) URI.create(tokenUrl).toURL().openConnection()
conn.setRequestProperty("Accept", "application/json")

def json = conn.inputStream.getText("UTF-8")
def retriableHttpClient = HttpRetryableClient.create(
HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build(),
Global.session?.getCommonRetryConfig()
)

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(tokenUrl))
.header("Accept", "application/json")
.timeout(Duration.ofMinutes(1))
.GET()
.build()
HttpResponse<String> response = retriableHttpClient.send(request, HttpResponse.BodyHandlers.ofString())
def json = response.body()
def matcher = json =~ /"token"\s*:\s*"([^"]+)"/
if (matcher.find()) {
return matcher.group(1)
Expand Down
Loading