Skip to content

Commit bb396fd

Browse files
pditommasoclaude
andcommitted
Fix K8s 401 after token expiry (#6918)
Three caching layers caused the K8s client to hold an expired service-account token after the kubelet rotated it on disk, leading to 401 Unauthorized after ~60 minutes on clusters with short-lived projected tokens (e.g. AKS default 3607s). This change closes the remaining failure modes left open after the fixes in #6742 and #6925: - K8sTaskHandler no longer caches the K8sClient at construction; every access now delegates to the executor's Guava-cached client, so handlers pick up refreshes automatically. - ClientConfig retains the token file path (in-cluster discovery and kubeconfig/Nextflow-config tokenFile entries) so the token can be re-read from disk later. - K8sClient.apply() retries on HTTP 401 when tokenPath is set: the onRetry hook re-reads the token file (kubelet writes the rotated token to the same mount path) and updates the in-memory config before retrying. 401s with no tokenPath still propagate immediately as before. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent d30e48d commit bb396fd

8 files changed

Lines changed: 214 additions & 27 deletions

File tree

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
7070

7171
private ResourceType resourceType = ResourceType.Pod
7272

73-
private K8sClient client
74-
7573
private String podName
7674

7775
private BashWrapperBuilder builder
@@ -93,7 +91,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
9391
K8sTaskHandler( TaskRun task, K8sExecutor executor ) {
9492
super(task)
9593
this.executor = executor
96-
this.client = executor.getClient()
9794
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
9895
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
9996
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
@@ -116,6 +113,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
116113

117114
protected K8sConfig getK8sConfig() { executor.getK8sConfig() }
118115

116+
protected K8sClient getClient() { executor.getClient() }
117+
119118
protected boolean useJobResource() { resourceType==ResourceType.Job }
120119

121120
protected List<String> getContainerMounts() {

plugins/nf-k8s/src/main/nextflow/k8s/client/ClientConfig.groovy

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ class ClientConfig {
4949

5050
String token
5151

52+
/**
53+
* Filesystem path of the token, when the token was loaded from a file.
54+
* Used to re-read the token after expiry — kubelet rotates projected
55+
* service-account tokens in place by overwriting the mounted file.
56+
*/
57+
Path tokenPath
58+
5259
byte[] sslCert
5360

5461
byte[] clientCert
@@ -108,8 +115,10 @@ class ClientConfig {
108115

109116
if( opts.token )
110117
result.token = opts.token
111-
else if( opts.tokenFile )
112-
result.token = Paths.get(opts.tokenFile.toString()).getText('UTF-8')
118+
else if( opts.tokenFile ) {
119+
result.tokenPath = Paths.get(opts.tokenFile.toString())
120+
result.token = result.tokenPath.getText('UTF-8')
121+
}
113122

114123
result.namespace = namespace ?: opts.namespace ?: 'default'
115124

@@ -143,7 +152,8 @@ class ClientConfig {
143152
result.token = user.token
144153

145154
else if( user.tokenFile ) {
146-
result.token = Paths.get(user.tokenFile.toString()).getText('UTF-8')
155+
result.tokenPath = Paths.get(user.tokenFile.toString())
156+
result.token = result.tokenPath.getText('UTF-8')
147157
}
148158

149159
if( user."client-certificate" )

plugins/nf-k8s/src/main/nextflow/k8s/client/ConfigDiscovery.groovy

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,13 @@ class ConfigDiscovery {
8484
final server = formatHostName(host, port)
8585

8686
final cert = path('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt').bytes
87-
final token = path('/var/run/secrets/kubernetes.io/serviceaccount/token').text
87+
final tokenFile = path('/var/run/secrets/kubernetes.io/serviceaccount/token')
8888
final namespace = path('/var/run/secrets/kubernetes.io/serviceaccount/namespace').text
8989

9090
return new ClientConfig(
9191
server: server,
92-
token: token,
92+
token: tokenFile.text,
93+
tokenPath: tokenFile,
9394
namespace: cfgNamespace ?: namespace,
9495
serviceAccount: serviceAccount,
9596
sslCert: cert,

plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,9 @@ class K8sClient {
741741
@Override
742742
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
743743
log.debug("K8s response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
744+
final t = event.lastFailure
745+
if( t instanceof K8sResponseException && t.response.code == 401 )
746+
refreshToken()
744747
}
745748
}
746749
return RetryPolicy.<T>builder()
@@ -752,6 +755,25 @@ class K8sClient {
752755
.build()
753756
}
754757

758+
/**
759+
* Reload the service-account token from {@link ClientConfig#tokenPath} so that
760+
* a request retried after a 401 picks up a token rotated in place by kubelet.
761+
*/
762+
protected void refreshToken() {
763+
if( !config.tokenPath )
764+
return
765+
try {
766+
final newToken = config.tokenPath.getText('UTF-8')
767+
if( newToken && newToken != config.token ) {
768+
log.debug "[K8s] Refreshing service-account token from ${config.tokenPath}"
769+
config.token = newToken
770+
}
771+
}
772+
catch( Exception e ) {
773+
log.warn "[K8s] Unable to refresh service-account token from ${config.tokenPath} - cause: ${e.message}"
774+
}
775+
}
776+
755777
final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)
756778

757779
/**
@@ -767,6 +789,9 @@ class K8sClient {
767789
boolean test(Throwable t) {
768790
if ( t instanceof K8sResponseException && t.response.code in RETRY_CODES )
769791
return true
792+
// 401 is retried only when the token was loaded from a file and can be re-read from disk
793+
if ( t instanceof K8sResponseException && t.response.code == 401 && config.tokenPath )
794+
return true
770795
if( t instanceof SocketException || t.cause instanceof SocketException )
771796
return true
772797
if( t instanceof SocketTimeoutException || t.cause instanceof SocketTimeoutException )

plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ class K8sTaskHandlerTest extends Specification {
6262
def task = Mock(TaskRun)
6363
def client = Mock(K8sClient)
6464
def builder = Mock(K8sWrapperBuilder)
65-
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
65+
def handler = Spy(new K8sTaskHandler(builder:builder))
66+
handler.getClient() >> client
6667
Map result
6768

6869
when:
@@ -162,7 +163,8 @@ class K8sTaskHandlerTest extends Specification {
162163
def task = Mock(TaskRun)
163164
def client = Mock(K8sClient)
164165
def builder = Mock(K8sWrapperBuilder)
165-
def handler = Spy(new K8sTaskHandler(builder: builder, client:client))
166+
def handler = Spy(new K8sTaskHandler(builder: builder))
167+
handler.getClient() >> client
166168
Map result
167169

168170
when:
@@ -198,7 +200,8 @@ class K8sTaskHandlerTest extends Specification {
198200
def client = Mock(K8sClient)
199201
def builder = Mock(K8sWrapperBuilder)
200202
def config = Mock(ClientConfig)
201-
def handler = Spy(new K8sTaskHandler(builder: builder, client: client))
203+
def handler = Spy(new K8sTaskHandler(builder: builder))
204+
handler.getClient() >> client
202205
Map result
203206

204207
when:
@@ -233,7 +236,8 @@ class K8sTaskHandlerTest extends Specification {
233236
def client = Mock(K8sClient)
234237
def builder = Mock(K8sWrapperBuilder)
235238
def config = Mock(TaskConfig)
236-
def handler = Spy(new K8sTaskHandler(builder:builder, client:client))
239+
def handler = Spy(new K8sTaskHandler(builder:builder))
240+
handler.getClient() >> client
237241
def podOptions = Mock(PodOptions)
238242
and:
239243
Map result
@@ -281,7 +285,8 @@ class K8sTaskHandlerTest extends Specification {
281285
def task = Mock(TaskRun)
282286
def client = Mock(K8sClient)
283287
def builder = Mock(K8sWrapperBuilder)
284-
def handler = Spy(new K8sTaskHandler(builder:builder, client:client))
288+
def handler = Spy(new K8sTaskHandler(builder:builder))
289+
handler.getClient() >> client
285290
def podOptions = Mock(PodOptions)
286291
and:
287292
Map result
@@ -354,7 +359,8 @@ class K8sTaskHandlerTest extends Specification {
354359
def task = Mock(TaskRun)
355360
def client = Mock(K8sClient)
356361
def builder = Mock(K8sWrapperBuilder)
357-
def handler = Spy(new K8sTaskHandler(client: client, task:task))
362+
def handler = Spy(new K8sTaskHandler(task:task))
363+
handler.getClient() >> client
358364

359365
def POD_NAME = 'new-pod-id'
360366
def REQUEST = [foo: 'bar']
@@ -391,7 +397,8 @@ class K8sTaskHandlerTest extends Specification {
391397
def builder = Mock(K8sWrapperBuilder)
392398
def config = Mock(TaskConfig)
393399
def executor = Mock(K8sExecutor)
394-
def handler = Spy(new K8sTaskHandler(builder: builder, client: client, executor: executor))
400+
def handler = Spy(new K8sTaskHandler(builder: builder, executor: executor))
401+
handler.getClient() >> client
395402
def podOptions = Mock(PodOptions)
396403
and:
397404
Map result
@@ -440,7 +447,8 @@ class K8sTaskHandlerTest extends Specification {
440447
given:
441448
def POD_NAME = 'pod-xyz'
442449
def client = Mock(K8sClient)
443-
def handler = Spy(new K8sTaskHandler(client: client, podName: POD_NAME, status: TaskStatus.SUBMITTED))
450+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, status: TaskStatus.SUBMITTED))
451+
handler.getClient() >> client
444452

445453
when:
446454
def result = handler.checkIfRunning()
@@ -479,7 +487,8 @@ class K8sTaskHandlerTest extends Specification {
479487
finishedAt: "2018-01-13T10:19:36Z" ]
480488
def noExitCodeState = [terminated: noExitCodeTermState]
481489
and:
482-
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
490+
def handler = Spy(new K8sTaskHandler(task: task, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
491+
handler.getClient() >> client
483492

484493
when:
485494
def result = handler.checkIfCompleted()
@@ -539,7 +548,8 @@ class K8sTaskHandlerTest extends Specification {
539548
finishedAt: "2018-01-13T10:19:36Z",
540549
exitCode: 137 ]
541550
def task = new TaskRun()
542-
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
551+
def handler = Spy(new K8sTaskHandler(task: task, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))
552+
handler.getClient() >> client
543553

544554
when:
545555
def result = handler.checkIfCompleted()
@@ -558,7 +568,8 @@ class K8sTaskHandlerTest extends Specification {
558568
given:
559569
def POD_NAME = 'pod-xyz'
560570
def client = Mock(K8sClient)
561-
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
571+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
572+
handler.getClient() >> client
562573

563574
when:
564575
handler.killTask()
@@ -577,7 +588,8 @@ class K8sTaskHandlerTest extends Specification {
577588
given:
578589
def POD_NAME = 'pod-xyz'
579590
def client = Mock(K8sClient)
580-
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
591+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
592+
handler.getClient() >> client
581593
and:
582594
Map STATE1 = [status:'pending']
583595
Map STATE2 = [status:'running']
@@ -646,7 +658,8 @@ class K8sTaskHandlerTest extends Specification {
646658
given:
647659
def POD_NAME = 'pod-xyz'
648660
def client = Mock(K8sClient)
649-
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
661+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
662+
handler.getClient() >> client
650663

651664
when:
652665
def state = handler.getState()
@@ -664,7 +677,8 @@ class K8sTaskHandlerTest extends Specification {
664677
given:
665678
def POD_NAME = 'pod-xyz'
666679
def client = Mock(K8sClient)
667-
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))
680+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME))
681+
handler.getClient() >> client
668682

669683
when:
670684
def state = handler.getState()
@@ -782,7 +796,8 @@ class K8sTaskHandlerTest extends Specification {
782796
def POD_NAME = 'the-pod-name'
783797
def executor = Mock(K8sExecutor)
784798
def client = Mock(K8sClient)
785-
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
799+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor))
800+
handler.getClient() >> client
786801
handler.useJobResource() >> false
787802
and:
788803
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
@@ -814,7 +829,8 @@ class K8sTaskHandlerTest extends Specification {
814829
def POD_NAME = 'the-job-name'
815830
def executor = Mock(K8sExecutor)
816831
def client = Mock(K8sClient)
817-
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
832+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor))
833+
handler.getClient() >> client
818834
handler.useJobResource() >> true
819835
and:
820836
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
@@ -846,7 +862,8 @@ class K8sTaskHandlerTest extends Specification {
846862
def executor = Mock(K8sExecutor)
847863
def client = Mock(K8sClient)
848864
and:
849-
def handler = Spy(new K8sTaskHandler(executor: executor, client: client, podName: POD_NAME))
865+
def handler = Spy(new K8sTaskHandler(executor: executor, podName: POD_NAME))
866+
handler.getClient() >> client
850867

851868
when:
852869
handler.saveJobLogOnError(task)
@@ -976,7 +993,8 @@ class K8sTaskHandlerTest extends Specification {
976993
def client = Mock(K8sClient)
977994
def builder = Mock(K8sWrapperBuilder)
978995
def launcher = Mock(FusionScriptLauncher)
979-
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
996+
def handler = Spy(new K8sTaskHandler(builder:builder))
997+
handler.getClient() >> client
980998
Map result
981999

9821000
when:
@@ -1022,7 +1040,8 @@ class K8sTaskHandlerTest extends Specification {
10221040
def launcher = Mock(FusionScriptLauncher)
10231041
def k8sConfig = Spy(K8sConfig)
10241042
def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig }
1025-
def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec))
1043+
def handler = Spy(new K8sTaskHandler(builder:builder, executor: exec))
1044+
handler.getClient() >> client
10261045
Map result
10271046

10281047
when:

plugins/nf-k8s/src/test/nextflow/k8s/client/ClientConfigTest.groovy

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,60 @@ class ClientConfigTest extends Specification {
117117
folder?.deleteDir()
118118
}
119119

120+
def 'should preserve token file path when reading token from tokenFile in nextflow config' () {
121+
122+
given:
123+
def folder = Files.createTempDirectory('test')
124+
def tokenFile = folder.resolve('token')
125+
tokenFile.text = 'file-token'
126+
127+
def MAP = [
128+
server: 'foo.com',
129+
tokenFile: tokenFile ]
130+
131+
when:
132+
def result = ClientConfig.fromNextflowConfig(MAP, null, null)
133+
134+
then:
135+
result.token == 'file-token'
136+
result.tokenPath == tokenFile
137+
138+
cleanup:
139+
folder?.deleteDir()
140+
}
141+
142+
def 'should preserve token file path when reading token from tokenFile in kubeconfig' () {
143+
144+
given:
145+
def folder = Files.createTempDirectory('test')
146+
def tokenFile = folder.resolve('token')
147+
tokenFile.text = 'file-token'
148+
149+
def user = [ tokenFile: tokenFile.toString() ]
150+
def cluster = [ server: 'https://foo:6443' ]
151+
152+
when:
153+
def result = ClientConfig.fromUserAndCluster(user, cluster, folder)
154+
155+
then:
156+
result.token == 'file-token'
157+
result.tokenPath == tokenFile
158+
159+
cleanup:
160+
folder?.deleteDir()
161+
}
162+
163+
def 'should not set token path when token is provided inline' () {
164+
165+
given:
166+
def MAP = [ server: 'foo.com', token: 'inline-token' ]
167+
168+
when:
169+
def result = ClientConfig.fromNextflowConfig(MAP, null, null)
170+
171+
then:
172+
result.token == 'inline-token'
173+
result.tokenPath == null
174+
}
175+
120176
}

plugins/nf-k8s/src/test/nextflow/k8s/client/ConfigDiscoveryTest.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ class ConfigDiscoveryTest extends Specification {
387387
config.server == 'foo.com:4343'
388388
config.namespace == 'foo-namespace'
389389
config.token == 'my-token'
390+
config.tokenPath == TOKEN_FILE
390391
config.sslCert == CERT_FILE.text.bytes
391392
config.isFromCluster
392393

0 commit comments

Comments
 (0)