Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile long timestamp

/**
* Flag to indicate that the zone has been resolved from the status events
*/
private volatile boolean zoneUpdated

/**
* A flag to indicate that the job has failed without launching any tasks
*/
Expand Down Expand Up @@ -769,10 +774,57 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}
}

/**
* Regex pattern to extract zone from StatusEvent description.
* Matches the pattern: zones/<zone-name>/instances/<instance-id>
*/
private static final Pattern ZONE_PATTERN = ~/zones\/([^\/]+)\/instances\//

protected CloudMachineInfo getMachineInfo() {
if( machineInfo!=null && !zoneUpdated && isCompleted() )
updateZoneFromEvents()
return machineInfo
}

/**
* Parse the actual execution zone from StatusEvent descriptions and update
* the machineInfo zone field accordingly.
*
* Google Batch status events include zone info in the description field with
* the format: {@code zones/<zone-name>/instances/<instance-id>}
*/
private void updateZoneFromEvents() {
try {
final events = client.getTaskStatus(jobId, taskId)?.statusEventsList
final zone = resolveZoneFromEvents(events)
if( zone ) {
machineInfo = new CloudMachineInfo(
type: machineInfo.type,
zone: zone,
priceModel: machineInfo.priceModel
)
}
}
catch( Exception e ) {
log.debug "[GOOGLE BATCH] Unable to resolve zone from events for task: `${task.lazyName()}` - ${e.message}"
}
finally {
zoneUpdated = true
}
}

@PackageScope
static String resolveZoneFromEvents(List<StatusEvent> events) {
if( !events )
return null
for( def event : events ) {
final matcher = ZONE_PATTERN.matcher(event.description ?: '')
if( matcher.find() )
return matcher.group(1)
}
return null
}

/**
* Count the number of spot instance reclamations for this task by examining
* the task status events and checking for preemption exit codes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.Session
import nextflow.SysEnv
import nextflow.cloud.google.batch.client.BatchClient
import nextflow.cloud.google.batch.client.BatchConfig
import nextflow.cloud.types.CloudMachineInfo
import nextflow.cloud.types.PriceModel
import nextflow.executor.Executor
import nextflow.executor.ExecutorConfig
Expand Down Expand Up @@ -1113,6 +1114,116 @@ class GoogleBatchTaskHandlerTest extends Specification {
result.getEnvironment().getVariablesMap() == [VAR1: 'value1']
}

def 'should resolve zone from status events: #DESCRIPTION' () {
given:
def handler = Spy(GoogleBatchTaskHandler)
handler.@jobId = 'job-123'
handler.@taskId = '0'
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: ORIGINAL, priceModel: PriceModel.spot)
and:
def builder = TaskStatus.newBuilder()
if( EVENT_DESC )
builder.addStatusEvents(StatusEvent.newBuilder().setDescription(EVENT_DESC).build())
def status = builder.build()

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> true
handler.@client = Mock(BatchClient) {
getTaskStatus('job-123', '0') >> status
}
and:
info.zone == EXPECTED

where:
DESCRIPTION | ORIGINAL | EVENT_DESC | EXPECTED
'succeeded event' | 'europe-west2' | 'Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc123' | 'europe-west2-a'
'preemption event' | 'us-central1' | 'Task state is updated from RUNNING to FAILED on zones/us-central1-f/instances/i-xyz789 due to Spot VM preemption with exit code 50001.' | 'us-central1-f'
'no zone in event' | 'europe-west2' | 'Task state is updated from PENDING to RUNNING' | 'europe-west2'
'empty events' | 'europe-west2' | null | 'europe-west2'
}

def 'should not resolve zone when task is not completed or machineInfo is null' () {
given:
def handler = Spy(GoogleBatchTaskHandler)
handler.@machineInfo = MACHINE_INFO

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> COMPLETED
and:
info?.zone == EXPECTED

where:
MACHINE_INFO | COMPLETED | EXPECTED
new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot) | false | 'europe-west2'
null | false | null
}

def 'should only resolve zone once across multiple getMachineInfo calls' () {
given:
def mockClient = Mock(BatchClient)
def handler = Spy(GoogleBatchTaskHandler)
handler.@jobId = 'job-123'
handler.@taskId = '0'
handler.@client = mockClient
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)
and:
def status = TaskStatus.newBuilder()
.addStatusEvents(StatusEvent.newBuilder()
.setDescription('Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc123')
.build())
.build()

when:
handler.isCompleted() >> true
def info1 = handler.getMachineInfo()
def info2 = handler.getMachineInfo()

then:
1 * mockClient.getTaskStatus('job-123', '0') >> status
and:
info1.zone == 'europe-west2-a'
info2.zone == 'europe-west2-a'
}

def 'should handle API error gracefully when resolving zone' () {
given:
def handler = Spy(GoogleBatchTaskHandler)
handler.@jobId = 'job-123'
handler.@taskId = '0'
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)
handler.task = Mock(TaskRun) { lazyName() >> 'foo (1)' }

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> true
handler.@client = Mock(BatchClient) {
getTaskStatus('job-123', '0') >> { throw new IOException('API error') }
}
and:
info.zone == 'europe-west2'
}

def 'should resolve zone from events: #DESCRIPTION' () {
expect:
GoogleBatchTaskHandler.resolveZoneFromEvents(EVENTS) == EXPECTED

where:
DESCRIPTION | EVENTS | EXPECTED
'succeeded event' | [StatusEvent.newBuilder().setDescription('Task state is updated from RUNNING to SUCCEEDED on zones/europe-west2-a/instances/i-abc').build()] | 'europe-west2-a'
'preemption event' | [StatusEvent.newBuilder().setDescription('Task state is updated from RUNNING to FAILED on zones/us-central1-f/instances/i-xyz due to Spot VM preemption').build()] | 'us-central1-f'
'no zone pattern' | [StatusEvent.newBuilder().setDescription('Task state is updated from PENDING to RUNNING').build()] | null
'empty description' | [StatusEvent.newBuilder().setDescription('').build()] | null
'null description' | [StatusEvent.newBuilder().build()] | null
'null events' | null | null
'empty events' | [] | null
'picks first zone match' | [StatusEvent.newBuilder().setDescription('no zone here').build(), StatusEvent.newBuilder().setDescription('on zones/us-east1-b/instances/i-1').build()] | 'us-east1-b'
}

def 'should build container runnable with fusion privileged' () {
given:
def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch')
Expand Down
Loading