Skip to content

Commit b1ad3f7

Browse files
authored
Abort execution when platform telemetry error (#6827)
1 parent 5e8276c commit b1ad3f7

6 files changed

Lines changed: 118 additions & 57 deletions

File tree

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import nextflow.container.ShifterConfig
4747
import nextflow.container.SingularityConfig
4848
import nextflow.dag.DAG
4949
import nextflow.exception.AbortOperationException
50+
import nextflow.exception.AbortRunException
5051
import nextflow.exception.AbortSignalException
5152
import nextflow.exception.IllegalConfigException
5253
import nextflow.exception.MissingLibraryException
@@ -1103,6 +1104,11 @@ class Session implements ISession {
11031104
try {
11041105
action.accept(observer)
11051106
}
1107+
catch (AbortRunException e) {
1108+
// AbortRunException are forwarded to produce an error in the execution
1109+
log.error("Abort exception produced when notifying an event - $e.message")
1110+
throw e
1111+
}
11061112
catch ( Throwable e ) {
11071113
log.debug(e.getMessage(), e)
11081114
}

plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException
3232
import io.seqera.util.trace.TraceUtils
3333
import nextflow.BuildInfo
3434
import nextflow.SysEnv
35-
import nextflow.exception.AbortOperationException
35+
import nextflow.exception.AbortRunException
3636
import nextflow.util.Duration
3737
import nextflow.util.TestOnly
3838
/**
@@ -122,7 +122,7 @@ class TowerClient {
122122
}
123123

124124
Map traceBegin(Map req, String workspaceId, String workflowId){
125-
return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'POST')
125+
return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'PUT')
126126
}
127127

128128
void traceComplete(Map req, String workspaceId, String workflowId) {
@@ -140,19 +140,27 @@ class TowerClient {
140140
void traceProgress(Map req, String workspaceId, String workflowId) {
141141
final url = getUrlTraceProgress( workspaceId, workflowId )
142142
final resp = sendHttpMessage(url, req, 'PUT')
143-
logHttpResponse(url, resp)
143+
if( resp.error ) {
144+
final message = """\
145+
Unexpected HTTP response
146+
- endpoint : $url
147+
- status code : $resp.code
148+
- response msg: $resp.message
149+
""".stripIndent(true)
150+
throw new AbortRunException(message)
151+
}
144152
}
145153

146154
protected Map sendAndProcessRequest(String url, Map req, String method){
147155
final resp = sendHttpMessage(url, req, method)
148156
if( resp.error ) {
149-
log.debug """\
157+
final message = """\
150158
Unexpected HTTP response
151159
- endpoint : $url
152160
- status code : $resp.code
153-
- response msg: $resp.cause
161+
- response msg: $resp.message
154162
""".stripIndent(true)
155-
throw new AbortOperationException(resp.message)
163+
throw new AbortRunException(message)
156164
}
157165
return parseTowerResponse(resp)
158166
}
@@ -241,7 +249,7 @@ class TowerClient {
241249

242250
String getAccessToken() {
243251
if( !accessToken )
244-
throw new AbortOperationException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
252+
throw new AbortRunException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
245253
return accessToken
246254
}
247255

@@ -377,7 +385,7 @@ class TowerClient {
377385
""".stripIndent(true)
378386
// append separately otherwise formatting get broken
379387
msg += "- error cause : ${cause ?: '-'}"
380-
throw new Exception(msg)
388+
throw new AbortRunException(msg)
381389
}
382390

383391
protected String parseCause(String cause) {

plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerObserver.groovy

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import groovy.transform.ToString
2121
import groovy.util.logging.Slf4j
2222
import nextflow.Session
2323
import nextflow.container.resolver.ContainerMeta
24-
import nextflow.exception.AbortOperationException
24+
import nextflow.exception.AbortRunException
2525
import nextflow.processor.TaskHandler
2626
import nextflow.processor.TaskId
2727
import nextflow.processor.TaskProcessor
@@ -156,7 +156,7 @@ class TowerObserver implements TraceObserverV2 {
156156
final ret = client.traceCreate(makeCreateReq(session), workspaceId)
157157
this.workflowId = ret.workflowId
158158
if( !workflowId )
159-
throw new AbortOperationException("Invalid Seqera Platform API response - Missing workflow Id")
159+
throw new AbortRunException("Invalid Seqera Platform API response - Missing workflow Id")
160160
log.debug "Platform workflow id: $workflowId; workflow url: ${ret.watchUrl}"
161161
session.workflowMetadata.platform.workflowId = workflowId
162162
// note: `watchUrl` in the create response requires Platform 26.01 or later
@@ -530,46 +530,53 @@ class TowerObserver implements TraceObserverV2 {
530530

531531

532532
protected void sendTasks0(dummy) {
533-
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
534-
boolean complete = false
535-
long previous = System.currentTimeMillis()
536-
final long period = requestInterval.millis
537-
final long delay = period / 10 as long
538-
539-
while( !complete ) {
540-
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
541-
// reconcile task events ie. send out only the last event
542-
if( ev ) {
543-
log.trace "Tower event=$ev"
544-
if( ev.trace )
545-
tasks[ev.trace.taskId] = ev.trace
546-
if( ev.completed )
547-
complete = true
548-
}
549-
550-
// check if there's something to send
551-
final now = System.currentTimeMillis()
552-
final delta = now -previous
533+
try {
534+
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
535+
boolean complete = false
536+
long previous = System.currentTimeMillis()
537+
final long period = requestInterval.millis
538+
final long delay = period / 10 as long
539+
540+
while( !complete ) {
541+
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
542+
// reconcile task events ie. send out only the last event
543+
if( ev ) {
544+
log.trace "Tower event=$ev"
545+
if( ev.trace )
546+
tasks[ev.trace.taskId] = ev.trace
547+
if( ev.completed )
548+
complete = true
549+
}
553550

554-
if( !tasks ) {
555-
if( delta > aliveInterval.millis ) {
556-
final req = makeHeartbeatReq()
557-
client.traceHeartbeat(req, workspaceId, workflowId)
558-
previous = now
551+
// check if there's something to send
552+
final now = System.currentTimeMillis()
553+
final delta = now - previous
554+
555+
if( !tasks ) {
556+
if( delta > aliveInterval.millis ) {
557+
final req = makeHeartbeatReq()
558+
client.traceHeartbeat(req, workspaceId, workflowId)
559+
previous = now
560+
}
561+
continue
559562
}
560-
continue
561-
}
562563

563-
if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
564-
// send
565-
final req = makeTasksReq(tasks.values())
566-
client.traceProgress(req, workspaceId, workflowId)
564+
if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
565+
// send
566+
final req = makeTasksReq(tasks.values())
567+
client.traceProgress(req, workspaceId, workflowId)
567568

568-
// clean up for next iteration
569-
previous = now
570-
tasks.clear()
569+
// clean up for next iteration
570+
previous = now
571+
tasks.clear()
572+
}
571573
}
572574
}
575+
catch( Exception e ) {
576+
this.sender = null
577+
log.error("Aborting session due to Seqera Platform telemetry exception - $e.message", e)
578+
session.abort(e)
579+
}
573580
}
574581

575582
}

plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.time.Instant
2222
import com.github.tomakehurst.wiremock.WireMockServer
2323
import com.github.tomakehurst.wiremock.client.WireMock
2424
import io.seqera.http.HxClient
25-
import nextflow.exception.AbortOperationException
25+
import nextflow.exception.AbortRunException
2626
import nextflow.util.Duration
2727
import spock.lang.Specification
2828
/**
@@ -113,7 +113,7 @@ class TowerClientTest extends Specification {
113113
def c = new TowerClient()
114114
c.getAccessToken()
115115
then:
116-
thrown(AbortOperationException)
116+
thrown(AbortRunException)
117117
}
118118

119119
def 'should set the auth token' () {
@@ -276,4 +276,35 @@ class TowerClientTest extends Specification {
276276
url.contains('name=test+workflow')
277277
}
278278

279+
def 'should send AbortRunException in selected client calls'() {
280+
given:
281+
def client = Spy(new TowerClient(new TowerConfig([:], [TOWER_ACCESS_TOKEN: 'token']))){
282+
sendHttpMessage(_,_,_) >> new TowerClient.Response(401)
283+
}
284+
285+
when:
286+
client.traceCreate([:], '1234')
287+
then:
288+
thrown(AbortRunException)
289+
290+
when:
291+
client.traceBegin([:], '1234', '5678')
292+
then:
293+
thrown(AbortRunException)
294+
295+
when:
296+
client.traceProgress([:], '1234', '5678')
297+
then:
298+
thrown(AbortRunException)
299+
300+
when:
301+
client.traceComplete([:], '1234', '5678')
302+
then:
303+
notThrown(AbortRunException)
304+
305+
when:
306+
client.traceHeartbeat([:], '1234', '5678')
307+
then:
308+
notThrown(AbortRunException)
309+
}
279310
}

plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package io.seqera.tower.plugin
1818

19-
import nextflow.exception.AbortOperationException
20-
2119
import static com.github.tomakehurst.wiremock.client.WireMock.*
2220

2321
import java.time.Instant
@@ -31,6 +29,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException
3129
import nextflow.Global
3230
import nextflow.Session
3331
import nextflow.SysEnv
32+
import nextflow.exception.AbortRunException
3433
import nextflow.script.WorkflowMetadata
3534
import nextflow.serde.gson.InstantAdapter
3635
import spock.lang.Shared
@@ -258,7 +257,7 @@ class TowerFusionEnvTest extends Specification {
258257
def provider = new TowerFusionToken()
259258

260259
then: 'the access token has the expected value'
261-
def e = thrown(AbortOperationException)
260+
def e = thrown(AbortRunException)
262261
e.message.contains("Missing Seqera Platform access token")
263262

264263
cleanup:

plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerObserverTest.groovy

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,24 @@
1616

1717
package io.seqera.tower.plugin
1818

19-
import nextflow.script.PlatformMetadata
20-
21-
import java.net.http.HttpResponse
2219
import java.nio.file.Files
2320
import java.time.Instant
2421
import java.time.OffsetDateTime
2522
import java.time.ZoneId
2623

27-
import com.github.tomakehurst.wiremock.WireMockServer
28-
import com.github.tomakehurst.wiremock.client.WireMock
29-
import io.seqera.http.HxClient
3024
import nextflow.Session
3125
import nextflow.SysEnv
3226
import nextflow.cloud.types.CloudMachineInfo
3327
import nextflow.cloud.types.PriceModel
3428
import nextflow.container.DockerConfig
3529
import nextflow.container.resolver.ContainerMeta
36-
import nextflow.exception.AbortOperationException
30+
import nextflow.exception.AbortRunException
3731
import nextflow.script.PlatformMetadata
3832
import nextflow.script.ScriptBinding
3933
import nextflow.script.WorkflowMetadata
4034
import nextflow.trace.TraceRecord
4135
import nextflow.trace.WorkflowStats
4236
import nextflow.trace.WorkflowStatsObserver
43-
import nextflow.util.Duration
4437
import nextflow.util.ProcessHelper
4538
import spock.lang.Specification
4639
/**
@@ -577,5 +570,22 @@ class TowerObserverTest extends Specification {
577570
req.tasks[0].gpuMetrics.peak == 100
578571
}
579572

573+
def 'should throw AbortRunException if workflow id is not found'() {
574+
given:
575+
def session = Mock(Session){
576+
getUniqueId() >> UUID.randomUUID()
577+
getWorkflowMetadata() >> Mock(WorkflowMetadata)
578+
}
579+
def client = Mock(TowerClient){
580+
traceCreate(_,_) >> [:]
581+
}
582+
def observer = new TowerObserver(session, client, null, [:])
583+
584+
when:
585+
observer.onFlowCreate(session)
586+
then:
587+
thrown(AbortRunException)
588+
}
589+
580590

581591
}

0 commit comments

Comments
 (0)