Skip to content

Commit 7491e35

Browse files
committed
Implement collection references in tool request API.
Based on work in #20004.
1 parent ccd36f2 commit 7491e35

6 files changed

Lines changed: 125 additions & 14 deletions

File tree

lib/galaxy/managers/jobs.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@
9191
StructuredApp,
9292
)
9393
from galaxy.tool_util.parameters import (
94+
DataRequestCollectionUri,
9495
DataRequestInternalHda,
96+
DataRequestInternalHdca,
9597
DataRequestUri,
9698
dereference,
9799
RequestInternalDereferencedToolState,
@@ -1978,21 +1980,44 @@ def dereference(
19781980
) -> Tuple[RequestInternalDereferencedToolState, list[DereferencedDatasetPair]]:
19791981
new_hdas: list[DereferencedDatasetPair] = []
19801982

1983+
def dereference_collection_callback(data_request: DataRequestCollectionUri) -> DataRequestInternalHdca:
1984+
# a deferred dataset corresponding to request
1985+
history = tool_request.history
1986+
if not history:
1987+
raise InconsistentDatabase("Tool request has no history associated")
1988+
1989+
hdca = dereference_input(trans, data_request, history)
1990+
assert isinstance(hdca, model.HistoryDatasetCollectionAssociation)
1991+
1992+
# we need the HDCA to have an ID - so we force a commit here - for
1993+
# consistency it would be great if this happened in the dereference_input
1994+
# since the HDA is committed in the other branch.
1995+
history.add_pending_items()
1996+
trans.sa_session.commit()
1997+
1998+
def find_new_hdas(collection: model.DatasetCollection, request_elements) -> None:
1999+
for dce, dce_request in zip(collection.elements, request_elements):
2000+
if dce.is_collection:
2001+
find_new_hdas(dce.child_collection, dce_request.elements)
2002+
else:
2003+
new_hdas.append(DereferencedDatasetPair(dce.hda, dce_request))
2004+
2005+
find_new_hdas(hdca.collection, data_request.elements)
2006+
return DataRequestInternalHdca(id=hdca.id, src="hdca")
2007+
19812008
def dereference_callback(data_request: DataRequestUri) -> DataRequestInternalHda:
19822009
# a deferred dataset corresponding to request
19832010
history = tool_request.history
19842011
if not history:
19852012
raise InconsistentDatabase("Tool request has no history associated")
19862013

19872014
hda = dereference_input(trans, data_request, history)
1988-
if not isinstance(hda, model.HistoryDatasetAssociation):
1989-
raise RequestParameterInvalidException("Input dataset is not a history dataset association")
1990-
2015+
assert isinstance(hda, model.HistoryDatasetAssociation)
19912016
new_hdas.append(DereferencedDatasetPair(hda, data_request))
19922017
return DataRequestInternalHda(id=hda.id, src="hda")
19932018

19942019
tool_state = RequestInternalToolState(tool_request.request)
1995-
return dereference(tool_state, tool, dereference_callback), new_hdas
2020+
return dereference(tool_state, tool, dereference_callback, dereference_collection_callback), new_hdas
19962021

19972022
def queue_jobs(self, tool: Tool, request: QueueJobs) -> None:
19982023
tool_request: ToolRequest = self._tool_request(request.tool_request_id)

lib/galaxy/tool_util/parameters/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
DataCollectionRequest,
1717
DataParameterModel,
1818
DataRequest,
19+
DataRequestCollectionUri,
1920
DataRequestHda,
2021
DataRequestInternalHda,
22+
DataRequestInternalHdca,
2123
DataRequestUri,
2224
FloatParameterModel,
2325
GalaxyParameterT,
@@ -105,7 +107,9 @@
105107
"ToolParameterBundle",
106108
"ToolParameterBundleModel",
107109
"DataRequest",
110+
"DataRequestCollectionUri",
108111
"DataRequestInternalHda",
112+
"DataRequestInternalHdca",
109113
"DataRequestHda",
110114
"DataRequestUri",
111115
"DataCollectionRequest",

lib/galaxy/tool_util/parameters/convert.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
DataCollectionRequest,
2121
DataColumnParameterModel,
2222
DataParameterModel,
23+
DataRequestCollectionUri,
2324
DataRequestHda,
2425
DataRequestInternalHda,
26+
DataRequestInternalHdca,
2527
DataRequestUri,
2628
DiscriminatorType,
2729
DrillDownParameterModel,
@@ -63,6 +65,7 @@
6365
DecodeFunctionT = Callable[[str], int]
6466
EncodeFunctionT = Callable[[int], str]
6567
DereferenceCallable = Callable[[DataRequestUri], DataRequestInternalHda]
68+
DereferenceCollectionCallable = Callable[[DataRequestCollectionUri], DataRequestInternalHdca]
6669
# interfaces for adapting test data dictionaries to tool request dictionaries
6770
# e.g. {class: File, path: foo.bed} => {src: hda, id: ab1235cdfea3}
6871
AdaptDatasets = Callable[[JsonTestDatasetDefDict], DataRequestHda]
@@ -213,15 +216,25 @@ def _strictify_parameters(tool_state: Dict[str, Any], input_models: ToolParamete
213216

214217

215218
def dereference(
216-
internal_state: RequestInternalToolState, input_models: ToolParameterBundle, dereference: DereferenceCallable
219+
internal_state: RequestInternalToolState,
220+
input_models: ToolParameterBundle,
221+
dereference: DereferenceCallable,
222+
dereference_collection: DereferenceCollectionCallable,
217223
) -> RequestInternalDereferencedToolState:
218224

219225
def dereference_dict(src_dict: dict):
220226
src = src_dict.get("src")
227+
clazz = src_dict.get("class")
221228
if src == "url":
222229
data_request_uri: DataRequestUri = DataRequestUri.model_validate(src_dict)
223230
data_request_hda: DataRequestInternalHda = dereference(data_request_uri)
224231
return data_request_hda.model_dump()
232+
elif clazz == "Collection":
233+
data_request_collection_from_uri: DataRequestCollectionUri = DataRequestCollectionUri.model_validate(
234+
src_dict
235+
)
236+
data_request_hdca: DataRequestInternalHdca = dereference_collection(data_request_collection_from_uri)
237+
return data_request_hdca.model_dump()
225238
else:
226239
return src_dict
227240

@@ -234,6 +247,11 @@ def dereference_callback(parameter: ToolParameterT, value: Any):
234247
else:
235248
assert isinstance(value, dict), str(value)
236249
return dereference_dict(value)
250+
elif isinstance(parameter, DataCollectionParameterModel):
251+
if value is None:
252+
return VISITOR_NO_REPLACEMENT
253+
assert isinstance(value, dict), str(value)
254+
return dereference_dict(value)
237255
else:
238256
return VISITOR_NO_REPLACEMENT
239257

lib/galaxy_test/api/test_tool_execution.py

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ def test_execution_with_deferred_src_urls(self):
124124
assert input_dataset_details["state"] == "deferred", input_dataset_details
125125

126126
@skip_without_tool("gx_data_collection_list")
127-
def test_execution_with_deferred_collection(self):
127+
def test_execution_with_deferred_list(self):
128128
with self.dataset_populator.test_history() as history_id:
129-
input_b64_1 = base64.b64encode(b"1 2 3").decode("utf-8")
129+
input_b64_1 = base64.b64encode(b"Hello World!\n").decode("utf-8")
130+
input_b64_2 = base64.b64encode(b"It is me - a collection!\n").decode("utf-8")
130131
response = self._run(
131132
"gx_data_collection_list",
132133
history_id,
@@ -141,7 +142,14 @@ def test_execution_with_deferred_collection(self):
141142
"url": f"base64://{input_b64_1}",
142143
"ext": "txt",
143144
"deferred": True,
144-
}
145+
},
146+
{
147+
"class": "File",
148+
"identifier": "mycoolelement2",
149+
"url": f"base64://{input_b64_2}",
150+
"ext": "txt",
151+
"deferred": False,
152+
},
145153
],
146154
},
147155
},
@@ -164,11 +172,67 @@ def test_execution_with_deferred_collection(self):
164172
job_output = job_outputs[0]
165173
assert job_output["name"] == "output"
166174
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=job_output["dataset"])
167-
assert content == "Hello World!"
175+
assert content == "Hello World!\nIt is me - a collection!\n"
168176

169-
# verify input was left deferred and infer must have been materialized just for the job
170-
input_dataset_details = self.dataset_populator.get_history_dataset_details(history_id, hid=1)
171-
assert input_dataset_details["state"] == "deferred", input_dataset_details
177+
@skip_without_tool("gx_data_collection")
178+
def test_execution_with_deferred_nested_list(self):
179+
with self.dataset_populator.test_history() as history_id:
180+
input_b64_1 = base64.b64encode(b"Hello World!\n").decode("utf-8")
181+
input_b64_2 = base64.b64encode(b"It is me - a collection!\n").decode("utf-8")
182+
response = self._run(
183+
"gx_data_collection",
184+
history_id,
185+
{
186+
"parameter": {
187+
"class": "Collection",
188+
"collection_type": "list:list",
189+
"elements": [
190+
{
191+
# Why is this needed? Planemo doesn't require this class right?
192+
# wait is it needed?
193+
"class": "Collection",
194+
"identifier": "outer_element",
195+
"collection_type": "list",
196+
"elements": [
197+
{
198+
"class": "File",
199+
"identifier": "mycoolelement",
200+
"url": f"base64://{input_b64_1}",
201+
"ext": "txt",
202+
"deferred": True,
203+
},
204+
{
205+
"class": "File",
206+
"identifier": "mycoolelement2",
207+
"url": f"base64://{input_b64_2}",
208+
"ext": "txt",
209+
"deferred": False,
210+
},
211+
],
212+
}
213+
],
214+
},
215+
},
216+
)
217+
assert_status_code_is_ok(response)
218+
response_json = response.json()
219+
tool_request_id = response_json.get("tool_request_id")
220+
task_result = response_json["task_result"]
221+
self.dataset_populator.wait_on_task_object(task_result)
222+
state = self.dataset_populator.wait_on_tool_request(tool_request_id)
223+
assert state, str(self.dataset_populator.get_tool_request(tool_request_id))
224+
jobs = self.galaxy_interactor.jobs_for_tool_request(tool_request_id)
225+
self.dataset_populator.wait_for_jobs(jobs, assert_ok=True)
226+
if len(jobs) != 1:
227+
raise Exception(f"Found incorrect number of jobs for tool request - was expecting a single job {jobs}")
228+
assert len(jobs) == 1, jobs
229+
job_id = jobs[0]["id"]
230+
job_outputs = self.galaxy_interactor.job_outputs(job_id)
231+
assert len(job_outputs) == 1
232+
job_output = job_outputs[0]
233+
assert job_output["name"] == "output"
234+
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=job_output["dataset"])
235+
assert content == "Hello World!\nIt is me - a collection!\n"
172236

173237
def _assert_request_validates(self, tool_id: str, history_id: str, inputs: Dict[str, Any]):
174238
response = self._run(tool_id, history_id, inputs)

test/functional/tools/parameters/gx_data_collection.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<tool id="gx_data_collection" name="gx_data_collection" version="1.0.0">
22
<command><![CDATA[
3-
cat '$parameter' >> '$output'
3+
cat #for $f in $parameter# #if $f.is_collection# #for $inner in $f# '${inner}' #end for# #else# '$f' # #end if# #end for# >> '$output'
44
]]></command>
55
<inputs>
66
<param name="parameter" type="data_collection" ext="data" />

test/functional/tools/parameters/gx_data_collection_list.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<tool id="gx_data_collection_list" name="gx_data_collection_list" version="1.0.0">
22
<command><![CDATA[
3-
cat '$parameter' >> '$output'
3+
cat #for $q in $parameter# '$q' #end for# > '$output'
44
]]></command>
55
<inputs>
66
<param name="parameter" type="data_collection" collection_type="list" ext="data" />

0 commit comments

Comments
 (0)