|
3 | 3 |
|
4 | 4 | import yaml |
5 | 5 | from selenium.webdriver.common.by import By |
| 6 | +from typing_extensions import Literal |
6 | 7 |
|
7 | 8 | from galaxy_test.base import rules_test_data |
8 | 9 | from galaxy_test.base.workflow_fixtures import ( |
|
17 | 18 | WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA, |
18 | 19 | WORKFLOW_WITH_DATA_TAG_FILTER, |
19 | 20 | WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION, |
| 21 | + WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION, |
20 | 22 | WORKFLOW_WITH_OLD_TOOL_VERSION, |
21 | 23 | WORKFLOW_WITH_RULES_1, |
22 | 24 | ) |
@@ -389,16 +391,78 @@ def test_workflow_run_tag_filter(self): |
389 | 391 | self.dataset_populator.tag_dataset(history_id, dataset["id"], tags=["genomescope_model"]) |
390 | 392 | # Add another possible input that should not be selected |
391 | 393 | self.dataset_populator.new_dataset(history_id, wait=True) |
392 | | - wf = json.loads(WORKFLOW_WITH_DATA_TAG_FILTER) |
393 | | - wf["name"] = str(uuid4()) |
394 | | - workflow_id = self.workflow_populator.create_workflow(wf) |
395 | | - self.workflow_run_with_name(wf["name"]) |
| 394 | + workflow_id, workflow_name = self._create_workflow_with_unique_name(WORKFLOW_WITH_DATA_TAG_FILTER, "ga") |
| 395 | + self.workflow_run_with_name(workflow_name) |
396 | 396 | self.workflow_run_submit() |
397 | 397 | self.sleep_for(self.wait_types.HISTORY_POLL) |
398 | 398 | invocations = self.workflow_populator.workflow_invocations(workflow_id=workflow_id) |
399 | 399 | invocation = self.workflow_populator.get_invocation(invocations[-1]["id"]) |
400 | 400 | assert invocation["inputs"]["0"]["id"] == dataset["id"] |
401 | 401 |
|
| 402 | + @selenium_test |
| 403 | + @managed_history |
| 404 | + def test_upload_dataset_from_workflow_simple(self): |
| 405 | + history_id = self.current_history_id() |
| 406 | + self._create_and_run_workflow_with_unique_name(WORKFLOW_SIMPLE_CAT_TWICE) |
| 407 | + workflow_run = self.components.workflow_run |
| 408 | + input = workflow_run.input._(label="input1") |
| 409 | + input.upload.wait_for_and_click() |
| 410 | + self._upload_hello_world_for_input(input) |
| 411 | + self.workflow_run_submit() |
| 412 | + self.history_panel_wait_for_hid_ok(2) |
| 413 | + content = self.dataset_populator.get_history_dataset_content(history_id, hid=2) |
| 414 | + assert content.strip() == "hello world\nhello world" |
| 415 | + |
| 416 | + @selenium_test |
| 417 | + @managed_history |
| 418 | + def test_upload_list_from_workflow_simple(self): |
| 419 | + self._create_and_run_workflow_with_unique_name(WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION) |
| 420 | + workflow_run = self.components.workflow_run |
| 421 | + input = workflow_run.input._(label="input1") |
| 422 | + input.upload.wait_for_and_click() |
| 423 | + input.collection_tab_upload_link.wait_for_and_click() |
| 424 | + builder = workflow_run.input.collection_builder._(label="input1") |
| 425 | + self._upload_hello_world_for_input(builder, count=2) |
| 426 | + input.collection_tab_build_link.wait_for_and_click() |
| 427 | + builder.create.wait_for_and_click() |
| 428 | + self.workflow_run_submit() |
| 429 | + self.history_panel_wait_for_hid_ok(6) |
| 430 | + |
| 431 | + def _upload_hello_world_for_input(self, workflow_input, count=1, from_hid=1): |
| 432 | + # assumes fresh history... |
| 433 | + workflow_input.create_button.wait_for_and_click() |
| 434 | + url = self.dataset_populator.base64_url_for_string("hello world") |
| 435 | + content = "" |
| 436 | + for _ in range(count): |
| 437 | + content += url + "\n" |
| 438 | + workflow_input.paste_content(n=0).wait_for_and_send_keys(content) |
| 439 | + workflow_input.embedded_start_button.wait_for_and_click() |
| 440 | + workflow_input.status.wait_for_present() |
| 441 | + for i in range(from_hid, from_hid + count): |
| 442 | + self.history_panel_wait_for_hid_ok(i) |
| 443 | + workflow_input.use_button.wait_for_and_click() |
| 444 | + |
| 445 | + def _create_and_run_workflow_with_unique_name( |
| 446 | + self, workflow_contents: str, format: Literal["ga", "gxformat2"] = "gxformat2" |
| 447 | + ): |
| 448 | + workflow_id, workflow_name = self._create_workflow_with_unique_name(workflow_contents, format) |
| 449 | + self.workflow_run_with_name(workflow_name) |
| 450 | + return workflow_id, workflow_name |
| 451 | + |
| 452 | + def _create_workflow_with_unique_name( |
| 453 | + self, workflow_contents: str, format: Literal["ga", "gxformat2"] = "gxformat2" |
| 454 | + ): |
| 455 | + workflow_name = str(uuid4()) |
| 456 | + if format == "gxformat2": |
| 457 | + wf = yaml.safe_load(workflow_contents) |
| 458 | + wf["name"] = workflow_name |
| 459 | + workflow_id = self.workflow_populator.upload_yaml_workflow(wf) |
| 460 | + else: |
| 461 | + wf = json.loads(workflow_contents) |
| 462 | + wf["name"] = workflow_name |
| 463 | + workflow_id = self.workflow_populator.create_workflow(wf) |
| 464 | + return (workflow_id, workflow_name) |
| 465 | + |
402 | 466 | def _assert_has_3_lines_after_run(self, hid): |
403 | 467 | self.workflow_run_wait_for_ok(hid=hid) |
404 | 468 | history_id = self.current_history_id() |
|
0 commit comments