Skip to content

Commit ce25993

Browse files
committed
Fix K8s token refresh by caching K8sClient at executor level
Move the Guava cache from K8sConfig to K8sExecutor so that the K8sClient object itself is cached (not just the ClientConfig). This avoids re-creating K8sClient (including SSL setup) on every invocation while still refreshing the service account token when the configured interval expires. Fixes #6918 Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent 021c77c commit ce25993

5 files changed

Lines changed: 85 additions & 48 deletions

File tree

plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ package nextflow.k8s
1818

1919
import nextflow.k8s.client.K8sRetryConfig
2020

21-
import java.util.concurrent.TimeUnit
2221
import javax.annotation.Nullable
2322

24-
import com.google.common.cache.Cache
25-
import com.google.common.cache.CacheBuilder
2623
import groovy.transform.CompileStatic
2724
import groovy.transform.PackageScope
2825
import groovy.util.logging.Slf4j
@@ -57,8 +54,6 @@ class K8sConfig implements ConfigScope {
5754

5855
static final private Map<String,?> DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1)
5956

60-
private Cache<String, ClientConfig> clientCache
61-
6257
@ConfigOption
6358
@Description("""
6459
Automatically mount host paths into the task pods (default: `false`). Only intended for development purposes when using a single node.
@@ -226,9 +221,6 @@ class K8sConfig implements ConfigScope {
226221
cleanup = opts.cleanup as Boolean
227222
client = opts.client as Map
228223
clientRefreshInterval = opts.clientRefreshInterval as Duration ?: Duration.of('50m')
229-
clientCache = CacheBuilder.newBuilder()
230-
.expireAfterWrite(clientRefreshInterval.toMillis(), TimeUnit.MILLISECONDS)
231-
.build()
232224
computeResourceType = opts.computeResourceType
233225
context = opts.context
234226
cpuLimits = opts.cpuLimits as boolean
@@ -366,10 +358,6 @@ class K8sConfig implements ConfigScope {
366358
}
367359

368360
ClientConfig getClient() {
369-
return clientCache.get('client', this::getClient0)
370-
}
371-
372-
private ClientConfig getClient0() {
373361
final result = client != null
374362
? clientFromNextflow(client, namespace, serviceAccount)
375363
: clientDiscovery(context, namespace, serviceAccount)

plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package nextflow.k8s
1818

19+
import java.util.concurrent.TimeUnit
20+
21+
import com.google.common.cache.Cache
22+
import com.google.common.cache.CacheBuilder
1923
import groovy.transform.CompileStatic
2024
import groovy.transform.Memoized
2125
import groovy.util.logging.Slf4j
@@ -41,12 +45,17 @@ import org.pf4j.ExtensionPoint
4145
class K8sExecutor extends Executor implements ExtensionPoint {
4246

4347
/**
44-
* The Kubernetes HTTP client
48+
* Cache for the Kubernetes HTTP client. The client is refreshed periodically
49+
* so that the service account token is re-read when it expires.
4550
*/
46-
private K8sClient client
51+
private Cache<String, K8sClient> clientCache
4752

53+
/**
54+
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
55+
* the client (including the service account token) when the configured interval expires.
56+
*/
4857
protected K8sClient getClient() {
49-
client
58+
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
5059
}
5160

5261
/**
@@ -64,9 +73,12 @@ class K8sExecutor extends Executor implements ExtensionPoint {
6473
protected void register() {
6574
super.register()
6675
final k8sConfig = getK8sConfig()
67-
final clientConfig = k8sConfig.getClient()
68-
this.client = new K8sClient(clientConfig)
69-
log.debug "[K8s] config=$k8sConfig; API client config=$clientConfig"
76+
final refreshInterval = k8sConfig.clientRefreshInterval
77+
this.clientCache = CacheBuilder.newBuilder()
78+
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
79+
.build()
80+
final client = getClient()
81+
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
7082
}
7183

7284
/**

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
9393
K8sTaskHandler( TaskRun task, K8sExecutor executor ) {
9494
super(task)
9595
this.executor = executor
96-
this.client = executor.client
96+
this.client = executor.getClient()
9797
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
9898
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
9999
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)

plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -484,33 +484,4 @@ class K8sConfigTest extends Specification {
484484
cfg.clientRefreshInterval == Duration.of('1h')
485485
}
486486

487-
def 'should cache client config and refresh after expiration' () {
488-
given:
489-
def CONFIG = [
490-
namespace: 'test-ns',
491-
serviceAccount: 'test-sa',
492-
client: [server: 'http://k8s-server'],
493-
clientRefreshInterval: '100ms'
494-
]
495-
K8sConfig config = Spy(K8sConfig, constructorArgs: [CONFIG])
496-
497-
when: 'first call to getClient'
498-
def client1 = config.getClient()
499-
then: 'client is created via clientFromNextflow'
500-
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
501-
client1.server == 'http://k8s-server'
502-
503-
when: 'second call within cache interval'
504-
def client2 = config.getClient()
505-
then: 'returns cached client without calling clientFromNextflow again'
506-
0 * config.clientFromNextflow(_, _, _)
507-
client2.is(client1)
508-
509-
when: 'call after cache expiration'
510-
sleep(150) // wait for cache to expire
511-
def client3 = config.getClient()
512-
then: 'client is recreated'
513-
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
514-
!client3.is(client1)
515-
}
516487
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2013-2026, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.k8s
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import com.google.common.cache.CacheBuilder
22+
import nextflow.k8s.client.ClientConfig
23+
import nextflow.k8s.client.K8sClient
24+
import spock.lang.Specification
25+
26+
/**
27+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
28+
*/
29+
class K8sExecutorTest extends Specification {
30+
31+
def 'should cache k8s client and refresh after expiration' () {
32+
given:
33+
def CONFIG = new K8sConfig(
34+
client: [server: 'http://k8s-server'],
35+
namespace: 'test-ns',
36+
serviceAccount: 'test-sa',
37+
clientRefreshInterval: '100ms'
38+
)
39+
and:
40+
def executor = Spy(K8sExecutor)
41+
executor.getK8sConfig() >> CONFIG
42+
// use a short-lived cache for the test
43+
executor.@clientCache = CacheBuilder.newBuilder()
44+
.expireAfterWrite(100, TimeUnit.MILLISECONDS)
45+
.build()
46+
47+
when: 'first call to getClient'
48+
def client1 = executor.getClient()
49+
then: 'a new K8sClient is created'
50+
client1 instanceof K8sClient
51+
client1.config.server == 'http://k8s-server'
52+
53+
when: 'second call within cache interval'
54+
def client2 = executor.getClient()
55+
then: 'returns the same cached instance'
56+
client2.is(client1)
57+
58+
when: 'call after cache expiration'
59+
sleep(150)
60+
def client3 = executor.getClient()
61+
then: 'a new K8sClient instance is created'
62+
client3 instanceof K8sClient
63+
!client3.is(client1)
64+
}
65+
66+
}

0 commit comments

Comments
 (0)