Skip to content

Commit 6685248

Browse files
authored
Merge pull request galaxyproject#20862 from jmchilton/pulsar_15_10
Various Container Execution Enhancements (including GCP Batch support)
2 parents 286fd6e + 4e21123 commit 6685248

5 files changed

Lines changed: 138 additions & 77 deletions

File tree

lib/galaxy/config/sample/job_conf.sample.yml

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,27 @@ runners:
338338
#app: {}
339339
#pulsar_config: path/to/pulsar/app.yml
340340

341+
# Using Pulsar and co-execution to run jobs against a TES service.
342+
# - https://www.ga4gh.org/product/task-execution-service-tes/
343+
# - https://pulsar.readthedocs.io/en/latest/containers.html#ga4gh-tes
344+
pulsar_tes:
345+
load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner
346+
# RabbitMQ URL from Galaxy server (include credentials).
347+
amqp_url: <amqp_url>
348+
# If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
349+
#galaxy_url: <galaxy_url>
350+
351+
# Using Pulsar and co-execution to run jobs against Google Cloud Platform's Batch service.
352+
# - https://cloud.google.com/batch/docs/get-started
353+
# - https://pulsar.readthedocs.io/en/latest/containers.html#google-cloud-platform-batch
354+
pulsar_gcp:
355+
load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner
356+
# RabbitMQ URL from Galaxy server (include credentials).
357+
amqp_url: <amqp_url>
358+
# If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
359+
#galaxy_url: <galaxy_url>
360+
361+
341362
# Job handler configuration - for a full discussion of job handlers, see the documentation at:
342363
# https://docs.galaxyproject.org/en/latest/admin/scaling.html
343364
handling:
@@ -916,6 +937,55 @@ execution:
916937
# Path to Kubernetes configuration fil (see Kubernetes runner description.)
917938
#k8s_config_path: /path/to/kubeconfig
918939

940+
pulsar_tes_environment:
941+
runner: pulsar_tes
942+
# A tes_url is required
943+
tes_url: "<tes_url>"
944+
#basic_auth:
945+
# username: <username>
946+
# password: <password>
947+
#cpu_cores: 1
948+
# Define if the task is allowed to run on preemptible compute instances,\nfor example, AWS Spot. This option may have no effect when utilized\non some backends that don't have the concept of preemptible jobs.
949+
#preemptible: false
950+
#ram_gb: 8
951+
#disk_gb: 40
952+
# Request that the task be run in these compute zones. How this string is utilized will be dependent on the backend system. For example, a\nsystem based on a cluster queueing system may use this string to define\npriorty queue to which the job is assigned.
953+
#zones: us-west-1
954+
#backend_parameters: {}
955+
#backend_parameters_strict: false
956+
# Configure the embedded Pulsar app, only message_queue_url is required but
957+
# other options may be useful (unsure).
958+
pulsar_app_config:
959+
# This needs to be the RabbitMQ server, but this should be the host
960+
# and port that your TES nodes would connect to the server via.
961+
message_queue_url: "<amqp_url>"
962+
963+
pulsar_gcp_environment:
964+
runner: pulsar_gcp
965+
# required
966+
project_id: <gcp_project_id>
967+
# Path to GCP service account credentials file. (not sure if ~ would be implicitly respected in this example)
968+
#credentials_file: ~/.config/gcloud/application_default_credentials.json
969+
# GCP region or zone to use (optional)
970+
#region: us-central1
971+
# Max walltime to use in seconds (defaults to 60 * 60 * 24)
972+
#walltime_limit: 216000
973+
# Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.
974+
#retry_count: 2
975+
# Name of the SSD volume to be mounted in the task. Shared among all containers in job.
976+
#ssd_name: pulsar_staging
977+
# Size of the shared local SSD disk in GB (must be a multiple of 375).
978+
#disk_size: 375
979+
# Machine type for the job's VM.
980+
#machine_type: n1-standard-1
981+
#labels: {}
982+
# Configure the embedded Pulsar app, only message_queue_url is required but
983+
# other options may be useful (unsure).
984+
pulsar_app_config:
985+
# This needs to be the RabbitMQ server, but this should be the host
986+
# and port that your GCP compute would connect to the server via.
987+
message_queue_url: "<amqp_url>"
988+
919989
# Example CLI runners.
920990
ssh_torque:
921991
runner: cli

lib/galaxy/jobs/runners/__init__.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,13 @@ def register_cleanup_file_attribute(self, attribute):
818818
if attribute not in self.cleanup_file_attributes:
819819
self.cleanup_file_attributes.append(attribute)
820820

821+
def init_job_stream_files(self):
822+
"""For runners that don't create explicit job scripts - create job stream files."""
823+
with open(self.output_file, "w"):
824+
pass
825+
with open(self.error_file, "w"):
826+
pass
827+
821828

822829
class AsynchronousJobRunner(BaseJobRunner, Monitors):
823830
"""Parent class for any job runner that runs jobs asynchronously (e.g. via
@@ -907,17 +914,7 @@ def check_watched_items(self):
907914
def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]:
908915
raise NotImplementedError()
909916

910-
def finish_job(self, job_state: AsynchronousJobState):
911-
"""
912-
Get the output/error for a finished job, pass to `job_wrapper.finish`
913-
and cleanup all the job's temporary files.
914-
"""
915-
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
916-
external_job_id = job_state.job_id
917-
918-
# To ensure that files below are readable, ownership must be reclaimed first
919-
job_state.job_wrapper.reclaim_ownership()
920-
917+
def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
921918
# wait for the files to appear
922919
which_try = 0
923920
collect_output_success = True
@@ -931,11 +928,25 @@ def finish_job(self, job_state: AsynchronousJobState):
931928
if which_try == self.app.config.retry_job_output_collection:
932929
stdout = ""
933930
stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
934-
log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
931+
log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
935932
collect_output_success = False
936933
else:
937934
time.sleep(1)
938935
which_try += 1
936+
return collect_output_success, stdout, stderr
937+
938+
def finish_job(self, job_state: AsynchronousJobState):
939+
"""
940+
Get the output/error for a finished job, pass to `job_wrapper.finish`
941+
and cleanup all the job's temporary files.
942+
"""
943+
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
944+
external_job_id = job_state.job_id
945+
946+
# To ensure that files below are readable, ownership must be reclaimed first
947+
job_state.job_wrapper.reclaim_ownership()
948+
949+
collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)
939950

940951
if not collect_output_success:
941952
job_state.fail_message = stderr

lib/galaxy/jobs/runners/kubernetes.py

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import math
88
import os
99
import re
10-
import time
1110
from dataclasses import dataclass
1211
from datetime import datetime
1312
from typing import Union
@@ -50,7 +49,6 @@
5049
Service,
5150
service_object_dict,
5251
)
53-
from galaxy.util import unicodify
5452
from galaxy.util.bytesize import ByteSize
5553

5654
log = logging.getLogger(__name__)
@@ -175,12 +173,8 @@ def queue_job(self, job_wrapper):
175173
job_wrapper=job_wrapper,
176174
job_destination=job_wrapper.job_destination,
177175
)
178-
# Kubernetes doesn't really produce meaningful "job stdout", but file needs to be present
179-
with open(ajs.output_file, "w"):
180-
pass
181-
with open(ajs.error_file, "w"):
182-
pass
183-
176+
# Kubernetes doesn't really produce a "job script", but job stream files needs to be present
177+
ajs.init_job_stream_files()
184178
if not self.prepare_job(
185179
job_wrapper,
186180
include_metadata=False,
@@ -680,26 +674,6 @@ def __transform_memory_value(self, mem_value):
680674
"""
681675
return ByteSize(mem_value).value
682676

683-
def __assemble_k8s_container_image_name(self, job_wrapper):
684-
"""Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional"""
685-
job_destination = job_wrapper.job_destination
686-
687-
# Determine the job's Kubernetes destination (context, namespace) and options from the job destination
688-
# definition
689-
repo = ""
690-
owner = ""
691-
if "repo" in job_destination.params:
692-
repo = f"{job_destination.params['repo']}/"
693-
if "owner" in job_destination.params:
694-
owner = f"{job_destination.params['owner']}/"
695-
696-
k8s_cont_image = repo + owner + job_destination.params["image"]
697-
698-
if "tag" in job_destination.params:
699-
k8s_cont_image += f":{job_destination.params['tag']}"
700-
701-
return k8s_cont_image
702-
703677
def __get_k8s_container_name(self, job_wrapper):
704678
# These must follow a specific regex for Kubernetes.
705679
raw_id = job_wrapper.job_destination.id
@@ -1115,22 +1089,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed",
11151089
# To ensure that files below are readable, ownership must be reclaimed first
11161090
job_state.job_wrapper.reclaim_ownership()
11171091

1118-
# wait for the files to appear
1119-
which_try = 0
1120-
while which_try < self.app.config.retry_job_output_collection + 1:
1121-
try:
1122-
with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
1123-
job_stdout = self._job_io_for_db(stdout_file)
1124-
job_stderr = self._job_io_for_db(stderr_file)
1125-
break
1126-
except Exception as e:
1127-
if which_try == self.app.config.retry_job_output_collection:
1128-
job_stdout = ""
1129-
job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
1130-
log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
1131-
else:
1132-
time.sleep(1)
1133-
which_try += 1
1092+
_, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)
11341093

11351094
# get stderr and stdout to database
11361095
outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
210210
default_build_pulsar_app = False
211211
use_mq = False
212212
poll = True
213+
client_manager_kwargs: dict[str, Any] = {}
213214

214215
def __init__(self, app, nworkers, **kwds):
215216
"""Start the job runner."""
@@ -240,19 +241,27 @@ def _init_client_manager(self):
240241
pulsar_conf_file = self.runner_params.get("pulsar_config", None)
241242
self.__init_pulsar_app(pulsar_conf, pulsar_conf_file)
242243

243-
client_manager_kwargs = {}
244-
for kwd in "manager", "cache", "transport", "persistence_directory":
245-
client_manager_kwargs[kwd] = self.runner_params[kwd]
244+
client_manager_kwargs = self._pulsar_client_manager_args()
246245
if self.pulsar_app is not None:
247246
client_manager_kwargs["pulsar_app"] = self.pulsar_app
248-
# TODO: Hack remove this following line pulsar lib update
249-
# that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232
250-
client_manager_kwargs["file_cache"] = None
247+
self.client_manager = build_client_manager(**client_manager_kwargs)
248+
249+
def _pulsar_client_manager_args(self):
250+
"""Most connection parameters can be specified as environment parameters, but...
251+
252+
... global parameters about message queues, what Pulsar client to use, etc... must
253+
be specified as runner parameters. This method returns a configuration based
254+
on the runner parameters that is ready for Pulsar's build_client_manager.
255+
"""
256+
client_manager_kwargs = self.client_manager_kwargs.copy()
257+
for kwd in "manager", "cache", "transport", "persistence_directory":
258+
client_manager_kwargs[kwd] = self.runner_params[kwd]
251259

252260
for kwd in self.runner_params.keys():
253261
if kwd.startswith("amqp_") or kwd.startswith("transport_"):
254262
client_manager_kwargs[kwd] = self.runner_params[kwd]
255-
self.client_manager = build_client_manager(**client_manager_kwargs)
263+
264+
return client_manager_kwargs
256265

257266
def __init_pulsar_app(self, conf, pulsar_conf_path):
258267
if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
@@ -1050,45 +1059,57 @@ class PulsarMQJobRunner(PulsarJobRunner):
10501059
COEXECUTION_DESTINATION_DEFAULTS = {
10511060
"default_file_action": "remote_transfer",
10521061
"rewrite_parameters": "true",
1053-
"jobs_directory": "/pulsar_staging",
10541062
"pulsar_container_image": DEFAULT_PULSAR_CONTAINER,
10551063
"remote_container_handling": True,
10561064
"url": PARAMETER_SPECIFICATION_IGNORED,
10571065
"private_token": PARAMETER_SPECIFICATION_IGNORED,
10581066
}
10591067

10601068

1061-
class PulsarCoexecutionJobRunner(PulsarMQJobRunner):
1069+
class PulsarCoexecutionJobRunner(PulsarJobRunner):
10621070
destination_defaults = COEXECUTION_DESTINATION_DEFAULTS
10631071

1064-
def _populate_parameter_defaults(self, job_destination):
1065-
super()._populate_parameter_defaults(job_destination)
1066-
params = job_destination.params
1067-
# Set some sensible defaults for Pulsar application that runs in staging container.
1068-
if "pulsar_app_config" not in params:
1069-
params["pulsar_app_config"] = {}
1070-
pulsar_app_config = params["pulsar_app_config"]
1071-
if "staging_directory" not in pulsar_app_config:
1072-
# coexecution always uses a fixed path for staging directory
1073-
pulsar_app_config["staging_directory"] = params.get("jobs_directory")
1074-
10751072

10761073
KUBERNETES_DESTINATION_DEFAULTS: dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}
10771074

1075+
KUBERNETES_CLIENT_MANAGER_KWARGS = {"k8s_enabled": True}
1076+
10781077

10791078
class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner):
10801079
destination_defaults = KUBERNETES_DESTINATION_DEFAULTS
1080+
use_mq = True
10811081
poll = True # Poll so we can check API for pod IP for ITs.
1082+
client_manager_kwargs = KUBERNETES_CLIENT_MANAGER_KWARGS
10821083

10831084

10841085
TES_DESTINATION_DEFAULTS: dict[str, Any] = {
10851086
"tes_url": PARAMETER_SPECIFICATION_REQUIRED,
10861087
**COEXECUTION_DESTINATION_DEFAULTS,
10871088
}
10881089

1090+
TES_CLIENT_MANAGER_KWARGS = {"tes_enabled": True}
1091+
10891092

10901093
class PulsarTesJobRunner(PulsarCoexecutionJobRunner):
10911094
destination_defaults = TES_DESTINATION_DEFAULTS
1095+
client_manager_kwargs = TES_CLIENT_MANAGER_KWARGS
1096+
use_mq = True
1097+
poll = False
1098+
1099+
1100+
GCP_DESTINATION_DEFAULTS: dict[str, Any] = {
1101+
"project_id": PARAMETER_SPECIFICATION_REQUIRED,
1102+
**COEXECUTION_DESTINATION_DEFAULTS,
1103+
}
1104+
GCP_BATCH_CLIENT_MANAGER_KWARGS = {"gcp_batch_enabled": True}
1105+
1106+
1107+
class PulsarGcpBatchJobRunner(PulsarCoexecutionJobRunner):
1108+
use_mq = True
1109+
poll = False
1110+
1111+
client_manager_kwargs = GCP_BATCH_CLIENT_MANAGER_KWARGS
1112+
destination_defaults = GCP_DESTINATION_DEFAULTS
10921113

10931114

10941115
class PulsarRESTJobRunner(PulsarJobRunner):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ dependencies = [
7070
"pebble",
7171
"pillow",
7272
"psutil",
73-
"pulsar-galaxy-lib>=0.15.0.dev0",
73+
"pulsar-galaxy-lib>=0.15.10",
7474
"pycryptodome",
7575
"pydantic[email]>=2.7.4", # https://github.com/pydantic/pydantic/pull/9639
7676
"PyJWT",

0 commit comments

Comments
 (0)