diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index ae118fb4a6..40eed88698 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -118,6 +118,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { private volatile long timestamp + /** + * Flag to indicate that the zone has been resolved from the status events + */ + private volatile boolean zoneUpdated + /** * A flag to indicate that the job has failed without launching any tasks */ @@ -769,10 +774,57 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { } } + /** + * Regex pattern to extract zone from StatusEvent description. + * Matches the pattern: zones//instances/ + */ + private static final Pattern ZONE_PATTERN = ~/zones\/([^\/]+)\/instances\// + protected CloudMachineInfo getMachineInfo() { + if( machineInfo!=null && !zoneUpdated && isCompleted() ) + updateZoneFromEvents() return machineInfo } + /** + * Parse the actual execution zone from StatusEvent descriptions and update + * the machineInfo zone field accordingly. + * + * Google Batch status events include zone info in the description field with + * the format: {@code zones//instances/} + */ + private void updateZoneFromEvents() { + try { + final events = client.getTaskStatus(jobId, taskId)?.statusEventsList + final zone = resolveZoneFromEvents(events) + if( zone ) { + machineInfo = new CloudMachineInfo( + type: machineInfo.type, + zone: zone, + priceModel: machineInfo.priceModel + ) + } + } + catch( Exception e ) { + log.debug "[GOOGLE BATCH] Unable to resolve zone from events for task: `${task.lazyName()}` - ${e.message}" + } + finally { + zoneUpdated = true + } + } + + @PackageScope + static String resolveZoneFromEvents(List events) { + if( !events ) + return null + for( def event : events ) { + final matcher = ZONE_PATTERN.matcher(event.description ?: '') + if( matcher.find() ) + return matcher.group(1) + } + return null + } + /** * Count the number of spot instance reclamations for this task by examining * the task status events and checking for preemption exit codes diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 9054a347d2..cea6777594 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -39,6 +39,7 @@ import nextflow.Session import nextflow.SysEnv import nextflow.cloud.google.batch.client.BatchClient import nextflow.cloud.google.batch.client.BatchConfig +import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel import nextflow.executor.Executor import nextflow.executor.ExecutorConfig @@ -1113,6 +1114,116 @@ class GoogleBatchTaskHandlerTest extends Specification { result.getEnvironment().getVariablesMap() == [VAR1: 'value1'] } + def 'should resolve zone from status events: #DESCRIPTION' () { + given: + def handler = Spy(GoogleBatchTaskHandler) + handler.@jobId = 'job-123' + handler.@taskId = '0' + handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: ORIGINAL, priceModel: PriceModel.spot) + and: + def builder = TaskStatus.newBuilder() + if( EVENT_DESC ) + builder.addStatusEvents(StatusEvent.newBuilder().setDescription(EVENT_DESC).build()) + def status = builder.build() + + when: + def info = handler.getMachineInfo() + then: + handler.isCompleted() >> true + handler.@client = Mock(BatchClient) { + getTaskStatus('job-123', '0') >> status + } + and: + info.zone == EXPECTED + + where: + DESCRIPTION | ORIGINAL | EVENT_DESC | EXPECTED + 'succeeded event' | 'europe-west2' | 'Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc123' | 'europe-west2-a' + 'preemption event' | 'us-central1' | 'Task state is updated from RUNNING to FAILED on zones/us-central1-f/instances/i-xyz789 due to Spot VM preemption with exit code 50001.' | 'us-central1-f' + 'no zone in event' | 'europe-west2' | 'Task state is updated from PENDING to RUNNING' | 'europe-west2' + 'empty events' | 'europe-west2' | null | 'europe-west2' + } + + def 'should not resolve zone when task is not completed or machineInfo is null' () { + given: + def handler = Spy(GoogleBatchTaskHandler) + handler.@machineInfo = MACHINE_INFO + + when: + def info = handler.getMachineInfo() + then: + handler.isCompleted() >> COMPLETED + and: + info?.zone == EXPECTED + + where: + MACHINE_INFO | COMPLETED | EXPECTED + new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot) | false | 'europe-west2' + null | false | null + } + + def 'should only resolve zone once across multiple getMachineInfo calls' () { + given: + def mockClient = Mock(BatchClient) + def handler = Spy(GoogleBatchTaskHandler) + handler.@jobId = 'job-123' + handler.@taskId = '0' + handler.@client = mockClient + handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot) + and: + def status = TaskStatus.newBuilder() + .addStatusEvents(StatusEvent.newBuilder() + .setDescription('Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc123') + .build()) + .build() + + when: + handler.isCompleted() >> true + def info1 = handler.getMachineInfo() + def info2 = handler.getMachineInfo() + + then: + 1 * mockClient.getTaskStatus('job-123', '0') >> status + and: + info1.zone == 'europe-west2-a' + info2.zone == 'europe-west2-a' + } + + def 'should handle API error gracefully when resolving zone' () { + given: + def handler = Spy(GoogleBatchTaskHandler) + handler.@jobId = 'job-123' + handler.@taskId = '0' + handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot) + handler.task = Mock(TaskRun) { lazyName() >> 'foo (1)' } + + when: + def info = handler.getMachineInfo() + then: + handler.isCompleted() >> true + handler.@client = Mock(BatchClient) { + getTaskStatus('job-123', '0') >> { throw new IOException('API error') } + } + and: + info.zone == 'europe-west2' + } + + def 'should resolve zone from events: #DESCRIPTION' () { + expect: + GoogleBatchTaskHandler.resolveZoneFromEvents(EVENTS) == EXPECTED + + where: + DESCRIPTION | EVENTS | EXPECTED + 'succeeded event' | [StatusEvent.newBuilder().setDescription('Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc').build()] | 'europe-west2-a' + 'preemption event' | [StatusEvent.newBuilder().setDescription('Task state is updated from RUNNING to FAILED on zones/us-central1-f/instances/i-xyz due to Spot VM preemption').build()] | 'us-central1-f' + 'no zone pattern' | [StatusEvent.newBuilder().setDescription('Task state is updated from PENDING to RUNNING').build()] | null + 'empty description' | [StatusEvent.newBuilder().setDescription('').build()] | null + 'null description' | [StatusEvent.newBuilder().build()] | null + 'null events' | null | null + 'empty events' | [] | null + 'picks first zone match' | [StatusEvent.newBuilder().setDescription('no zone here').build(), StatusEvent.newBuilder().setDescription('on zones/us-east1-b/instances/i-1').build()] | 'us-east1-b' + } + def 'should build container runnable with fusion privileged' () { given: def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch')