Skip to content

Commit 3d2e4c4

Browse files
authored
Fix K8s token refresh by caching K8sClient at executor level (#6925)
1 parent 021c77c commit 3d2e4c4

5 files changed

Lines changed: 85 additions & 48 deletions

File tree

plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ package nextflow.k8s
1818

1919
import nextflow.k8s.client.K8sRetryConfig
2020

21-
import java.util.concurrent.TimeUnit
2221
import javax.annotation.Nullable
2322

24-
import com.google.common.cache.Cache
25-
import com.google.common.cache.CacheBuilder
2623
import groovy.transform.CompileStatic
2724
import groovy.transform.PackageScope
2825
import groovy.util.logging.Slf4j
@@ -57,8 +54,6 @@ class K8sConfig implements ConfigScope {
5754

5855
static final private Map<String,?> DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1)
5956

60-
private Cache<String, ClientConfig> clientCache
61-
6257
@ConfigOption
6358
@Description("""
6459
Automatically mount host paths into the task pods (default: `false`). Only intended for development purposes when using a single node.
@@ -226,9 +221,6 @@ class K8sConfig implements ConfigScope {
226221
cleanup = opts.cleanup as Boolean
227222
client = opts.client as Map
228223
clientRefreshInterval = opts.clientRefreshInterval as Duration ?: Duration.of('50m')
229-
clientCache = CacheBuilder.newBuilder()
230-
.expireAfterWrite(clientRefreshInterval.toMillis(), TimeUnit.MILLISECONDS)
231-
.build()
232224
computeResourceType = opts.computeResourceType
233225
context = opts.context
234226
cpuLimits = opts.cpuLimits as boolean
@@ -366,10 +358,6 @@ class K8sConfig implements ConfigScope {
366358
}
367359

368360
ClientConfig getClient() {
369-
return clientCache.get('client', this::getClient0)
370-
}
371-
372-
private ClientConfig getClient0() {
373361
final result = client != null
374362
? clientFromNextflow(client, namespace, serviceAccount)
375363
: clientDiscovery(context, namespace, serviceAccount)

plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package nextflow.k8s
1818

19+
import java.util.concurrent.TimeUnit
20+
21+
import com.google.common.cache.Cache
22+
import com.google.common.cache.CacheBuilder
1923
import groovy.transform.CompileStatic
2024
import groovy.transform.Memoized
2125
import groovy.util.logging.Slf4j
@@ -41,12 +45,17 @@ import org.pf4j.ExtensionPoint
4145
class K8sExecutor extends Executor implements ExtensionPoint {
4246

4347
/**
44-
* The Kubernetes HTTP client
48+
* Cache for the Kubernetes HTTP client. The client is refreshed periodically
49+
* so that the service account token is re-read when it expires.
4550
*/
46-
private K8sClient client
51+
private Cache<String, K8sClient> clientCache
4752

53+
/**
54+
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
55+
* the client (including the service account token) when the configured interval expires.
56+
*/
4857
protected K8sClient getClient() {
49-
client
58+
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
5059
}
5160

5261
/**
@@ -64,9 +73,12 @@ class K8sExecutor extends Executor implements ExtensionPoint {
6473
protected void register() {
6574
super.register()
6675
final k8sConfig = getK8sConfig()
67-
final clientConfig = k8sConfig.getClient()
68-
this.client = new K8sClient(clientConfig)
69-
log.debug "[K8s] config=$k8sConfig; API client config=$clientConfig"
76+
final refreshInterval = k8sConfig.clientRefreshInterval
77+
this.clientCache = CacheBuilder.newBuilder()
78+
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
79+
.build()
80+
final client = getClient()
81+
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
7082
}
7183

7284
/**

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
9393
K8sTaskHandler( TaskRun task, K8sExecutor executor ) {
9494
super(task)
9595
this.executor = executor
96-
this.client = executor.client
96+
this.client = executor.getClient()
9797
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
9898
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
9999
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)

plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -484,33 +484,4 @@ class K8sConfigTest extends Specification {
484484
cfg.clientRefreshInterval == Duration.of('1h')
485485
}
486486

487-
def 'should cache client config and refresh after expiration' () {
488-
given:
489-
def CONFIG = [
490-
namespace: 'test-ns',
491-
serviceAccount: 'test-sa',
492-
client: [server: 'http://k8s-server'],
493-
clientRefreshInterval: '100ms'
494-
]
495-
K8sConfig config = Spy(K8sConfig, constructorArgs: [CONFIG])
496-
497-
when: 'first call to getClient'
498-
def client1 = config.getClient()
499-
then: 'client is created via clientFromNextflow'
500-
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
501-
client1.server == 'http://k8s-server'
502-
503-
when: 'second call within cache interval'
504-
def client2 = config.getClient()
505-
then: 'returns cached client without calling clientFromNextflow again'
506-
0 * config.clientFromNextflow(_, _, _)
507-
client2.is(client1)
508-
509-
when: 'call after cache expiration'
510-
sleep(150) // wait for cache to expire
511-
def client3 = config.getClient()
512-
then: 'client is recreated'
513-
1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns')
514-
!client3.is(client1)
515-
}
516487
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2013-2026, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.k8s
18+
19+
import java.util.concurrent.TimeUnit
20+
21+
import com.google.common.cache.CacheBuilder
22+
import nextflow.k8s.client.ClientConfig
23+
import nextflow.k8s.client.K8sClient
24+
import spock.lang.Specification
25+
26+
/**
27+
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
28+
*/
29+
class K8sExecutorTest extends Specification {
30+
31+
def 'should cache k8s client and refresh after expiration' () {
32+
given:
33+
def CONFIG = new K8sConfig(
34+
client: [server: 'http://k8s-server'],
35+
namespace: 'test-ns',
36+
serviceAccount: 'test-sa',
37+
clientRefreshInterval: '100ms'
38+
)
39+
and:
40+
def executor = Spy(K8sExecutor)
41+
executor.getK8sConfig() >> CONFIG
42+
// use a short-lived cache for the test
43+
executor.@clientCache = CacheBuilder.newBuilder()
44+
.expireAfterWrite(100, TimeUnit.MILLISECONDS)
45+
.build()
46+
47+
when: 'first call to getClient'
48+
def client1 = executor.getClient()
49+
then: 'a new K8sClient is created'
50+
client1 instanceof K8sClient
51+
client1.config.server == 'http://k8s-server'
52+
53+
when: 'second call within cache interval'
54+
def client2 = executor.getClient()
55+
then: 'returns the same cached instance'
56+
client2.is(client1)
57+
58+
when: 'call after cache expiration'
59+
sleep(150)
60+
def client3 = executor.getClient()
61+
then: 'a new K8sClient instance is created'
62+
client3 instanceof K8sClient
63+
!client3.is(client1)
64+
}
65+
66+
}

0 commit comments

Comments
 (0)