Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ package nextflow.k8s

import nextflow.k8s.client.K8sRetryConfig

import java.util.concurrent.TimeUnit
import javax.annotation.Nullable

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -57,8 +54,6 @@ class K8sConfig implements ConfigScope {

static final private Map<String,?> DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1)

private Cache<String, ClientConfig> clientCache

@ConfigOption
@Description("""
Automatically mount host paths into the task pods (default: `false`). Only intended for development purposes when using a single node.
Expand Down Expand Up @@ -226,9 +221,6 @@ class K8sConfig implements ConfigScope {
cleanup = opts.cleanup as Boolean
client = opts.client as Map
clientRefreshInterval = opts.clientRefreshInterval as Duration ?: Duration.of('50m')
clientCache = CacheBuilder.newBuilder()
.expireAfterWrite(clientRefreshInterval.toMillis(), TimeUnit.MILLISECONDS)
.build()
computeResourceType = opts.computeResourceType
context = opts.context
cpuLimits = opts.cpuLimits as boolean
Expand Down Expand Up @@ -366,10 +358,6 @@ class K8sConfig implements ConfigScope {
}

ClientConfig getClient() {
return clientCache.get('client', this::getClient0)
}

private ClientConfig getClient0() {
final result = client != null
? clientFromNextflow(client, namespace, serviceAccount)
: clientDiscovery(context, namespace, serviceAccount)
Expand Down
24 changes: 18 additions & 6 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package nextflow.k8s

import java.util.concurrent.TimeUnit

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
Expand All @@ -41,12 +45,17 @@ import org.pf4j.ExtensionPoint
class K8sExecutor extends Executor implements ExtensionPoint {

/**
* The Kubernetes HTTP client
* Cache for the Kubernetes HTTP client. The client is refreshed periodically
* so that the service account token is re-read when it expires.
*/
private K8sClient client
private Cache<String, K8sClient> clientCache

/**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires.
*/
protected K8sClient getClient() {
client
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a sec to get this, so I'll leave a comment here for future readers. The client config (including token) is cached, but the k8s client itself is recreated on every submit task. This means on a new client is created every time but it's very lightweight. While building a client, it will use the config which is refreshed every 50 minutes (default value).

This is all handled by getClient() reaches into the Guava cache instead of getting a fresh client. Guava cache handles use-or-get logic.

}

/**
Expand All @@ -64,9 +73,12 @@ class K8sExecutor extends Executor implements ExtensionPoint {
protected void register() {
super.register()
final k8sConfig = getK8sConfig()
final clientConfig = k8sConfig.getClient()
this.client = new K8sClient(clientConfig)
log.debug "[K8s] config=$k8sConfig; API client config=$clientConfig"
final refreshInterval = k8sConfig.clientRefreshInterval
this.clientCache = CacheBuilder.newBuilder()
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
.build()
final client = getClient()
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
}

/**
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
K8sTaskHandler( TaskRun task, K8sExecutor executor ) {
super(task)
this.executor = executor
this.client = executor.client
this.client = executor.getClient()
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
Expand Down
29 changes: 0 additions & 29 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -484,33 +484,4 @@ class K8sConfigTest extends Specification {
cfg.clientRefreshInterval == Duration.of('1h')
}

def 'should cache client config and refresh after expiration' () {
given:
def CONFIG = [
namespace: 'test-ns',
serviceAccount: 'test-sa',
client: [server: 'http://k8s-server'],
clientRefreshInterval: '100ms'
]
K8sConfig config = Spy(K8sConfig, constructorArgs: [CONFIG])

when: 'first call to getClient'
def client1 = config.getClient()
then: 'client is created via clientFromNextflow'
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
client1.server == 'http://k8s-server'

when: 'second call within cache interval'
def client2 = config.getClient()
then: 'returns cached client without calling clientFromNextflow again'
0 * config.clientFromNextflow(_, _, _)
client2.is(client1)

when: 'call after cache expiration'
sleep(150) // wait for cache to expire
def client3 = config.getClient()
then: 'client is recreated'
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
!client3.is(client1)
}
}
66 changes: 66 additions & 0 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sExecutorTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.k8s

import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import nextflow.k8s.client.ClientConfig
import nextflow.k8s.client.K8sClient
import spock.lang.Specification

/**
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class K8sExecutorTest extends Specification {

def 'should cache k8s client and refresh after expiration' () {
given:
def CONFIG = new K8sConfig(
client: [server: 'http://k8s-server'],
namespace: 'test-ns',
serviceAccount: 'test-sa',
clientRefreshInterval: '100ms'
)
and:
def executor = Spy(K8sExecutor)
executor.getK8sConfig() >> CONFIG
// use a short-lived cache for the test
executor.@clientCache = CacheBuilder.newBuilder()
.expireAfterWrite(100, TimeUnit.MILLISECONDS)
.build()

when: 'first call to getClient'
def client1 = executor.getClient()
then: 'a new K8sClient is created'
client1 instanceof K8sClient
client1.config.server == 'http://k8s-server'

when: 'second call within cache interval'
def client2 = executor.getClient()
then: 'returns the same cached instance'
client2.is(client1)

when: 'call after cache expiration'
sleep(150)
def client3 = executor.getClient()
then: 'a new K8sClient instance is created'
client3 instanceof K8sClient
!client3.is(client1)
}

}
Loading