Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ PyYAML
bioblend>=0.10.0
Jinja2
galaxy-lib>=18.5.7

futures ; python_version < '3'
139 changes: 89 additions & 50 deletions src/ephemeris/shed_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
"""
import datetime as dt
import json
import os
import re
import time
from collections import namedtuple
from concurrent.futures import thread, ThreadPoolExecutor

import yaml
from bioblend.galaxy.client import ConnectionError
Expand Down Expand Up @@ -211,7 +213,8 @@ def test_tools(self,
repositories=None,
log=None,
test_user_api_key=None,
test_user="ephemeris@galaxyproject.org"
test_user="ephemeris@galaxyproject.org",
parallel_tests=1,
):
"""Run tool tests for all tools in each repository in supplied tool list or ``self.installed_repositories()``.
"""
Expand All @@ -232,75 +235,107 @@ def test_tools(self,
installed_tools.extend(repo_tools)

all_test_results = []
galaxy_interactor = self._get_interactor(test_user, test_user_api_key)
test_history = galaxy_interactor.new_history()

for tool in installed_tools:
results = self._test_tool(tool, test_user, test_user_api_key)
all_test_results.extend(results.tool_test_results)
tests_passed.extend(results.tests_passed)
test_exceptions.extend(results.test_exceptions)

report_obj = {
'version': '0.1',
'tests': all_test_results,
}
with open(test_json, "w") as f:
json.dump(report_obj, f)
if log:
log.info("Passed tool tests ({0}): {1}".format(
len(tests_passed),
[t for t in tests_passed])
)
log.info("Failed tool tests ({0}): {1}".format(
len(test_exceptions),
[t[0] for t in test_exceptions])
)
log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))

def _test_tool(self, tool, test_user, test_user_api_key):

with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
try:
for tool in installed_tools:
self._test_tool(executor=executor,
tool=tool,
galaxy_interactor=galaxy_interactor,
test_history=test_history,
log=log,
tool_test_results=all_test_results,
tests_passed=tests_passed,
test_exceptions=test_exceptions,
)
finally:
# Always write report, even if test was cancelled.
try:
executor.shutdown(wait=True)
except KeyboardInterrupt:
executor._threads.clear()
thread._threads_queues.clear()
report_obj = {
'version': '0.1',
'tests': sorted(all_test_results, key=lambda el: el['id']),
}
with open(test_json, "w") as f:
json.dump(report_obj, f)
if log:
log.info("Report written to '%s'", os.path.abspath(test_json))
log.info("Passed tool tests ({0}): {1}".format(
len(tests_passed),
[t for t in tests_passed])
)
log.info("Failed tool tests ({0}): {1}".format(
len(test_exceptions),
[t[0] for t in test_exceptions])
)
log.info("Total tool test time: {0}".format(dt.datetime.now() - tool_test_start))

def _get_interactor(self, test_user, test_user_api_key):
if test_user_api_key is None:
whoami = self.gi.make_get_request(self.gi.url + "/whoami").json()
if whoami is not None:
test_user_api_key = self.gi.key
galaxy_interactor_kwds = {
"galaxy_url": re.sub('/api', '', self.gi.url),
"master_api_key": self.gi.key,
"api_key": None, # TODO
"api_key": test_user_api_key, # TODO
"keep_outputs_dir": '',
}
if test_user_api_key is None:
galaxy_interactor_kwds["test_user"] = test_user
galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
return galaxy_interactor

@staticmethod
def _test_tool(executor,
tool,
galaxy_interactor,
tool_test_results,
tests_passed,
test_exceptions,
log,
test_history=None,
):
if test_history is None:
test_history = galaxy_interactor.new_history()
tool_id = tool["id"]
tool_version = tool["version"]
tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
test_indices = list(range(len(tool_test_dicts)))
tool_test_results = []
tests_passed = []
test_exceptions = []

for test_index in test_indices:
test_id = tool_id + "-" + str(test_index)

def register(job_data):
tool_test_results.append({
'id': test_id,
'has_data': True,
'data': job_data,
})
def run_test(index, test_id):

try:
verify_tool(
tool_id, galaxy_interactor, test_index=test_index, tool_version=tool_version,
register_job_data=register, quiet=True
)
tests_passed.append(test_id)
except Exception as e:
test_exceptions.append((test_id, e))
Results = namedtuple("Results", ["tool_test_results", "tests_passed", "test_exceptions"])
return Results(tool_test_results=tool_test_results,
tests_passed=tests_passed,
test_exceptions=test_exceptions)
def register(job_data):
tool_test_results.append({
'id': test_id,
'has_data': True,
'data': job_data,
})

try:
if log:
log.info("Executing test '%s'", test_id)
verify_tool(
tool_id, galaxy_interactor, test_index=index, tool_version=tool_version,
register_job_data=register, quiet=True, test_history=test_history,
)
tests_passed.append(test_id)
if log:
log.info("Test '%s' passed", test_id)
except Exception as e:
if log:
log.warning("Test '%s' failed", test_id)
test_exceptions.append((test_id, e))

executor.submit(run_test, test_index, test_id)

def install_repository_revision(self, repository, log):
default_err_msg = ('All repositories that you are attempting to install '
Expand Down Expand Up @@ -506,7 +541,9 @@ def main():
repositories=repos,
log=log,
test_user_api_key=args.test_user_api_key,
test_user=args.test_user)
test_user=args.test_user,
parallel_tests=args.parallel_tests,
)
else:
raise NotImplementedError("This point in the code should not be reached. Please contact the developers.")

Expand All @@ -521,7 +558,9 @@ def main():
repositories=to_be_tested_repositories,
log=log,
test_user_api_key=args.test_user_api_key,
test_user=args.test_user)
test_user=args.test_user,
parallel_tests=args.parallel_tests,
)


if __name__ == "__main__":
Expand Down
15 changes: 15 additions & 0 deletions src/ephemeris/shed_tools_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def parser():
test_user="ephemeris@galaxyproject.org",
test_json="tool_test_output.json",
test_existing=False,
max_parallel_tests=1,
)

# SUBPARSERS
Expand Down Expand Up @@ -168,6 +169,13 @@ def parser():
"--test_user_api_key isn't specified, this user email will be used. This "
"user will be created if needed."
)
command_parser.add_argument(
"--parallel_tests",
dest="parallel_tests",
default=1,
type=int,
help="Specify the maximum number of tests that will be run in parallel."
)

# OPTIONS UNIQUE TO INSTALL

Expand Down Expand Up @@ -217,5 +225,12 @@ def parser():
"--test_user_api_key isn't specified, this user email will be used. This "
"user will be created if needed."
)
test_command_parser.add_argument(
"--parallel_tests",
dest="parallel_tests",
default=1,
type=int,
help="Specify the maximum number of tests that will be run in parallel."
)

return shed_parser
22 changes: 22 additions & 0 deletions tests/test_shed_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
# for pytest to work.
# pylint: disable=no-self-use,unused-import

import json
import logging
import os
import tempfile

import pytest
from docker_for_galaxy import start_container # noqa: F401 prevent unused error

from ephemeris.shed_tools import InstallRepositoryManager
Expand All @@ -27,3 +31,21 @@ def test_invalid_keys_in_repo_list(self, caplog, start_container): # noqa: F811
sesame_ouvre_toi="Invalid key")
], log=logging.getLogger())
assert "'sesame_ouvre_toi' not a valid key. Will be skipped during parsing" in caplog.text

@pytest.mark.parametrize("parallel_tests", [1, 2])
def test_tool_tests(self, caplog, start_container, parallel_tests): # noqa: F811
container = start_container
irm = InstallRepositoryManager(container.gi)
caplog.set_level(logging.WARNING)
repos = [{'name': 'collection_element_identifiers', 'owner': 'iuc', 'tool_panel_section_label': "NGS: Alignment"}]
log = logging.getLogger()
irm.install_repositories(
repositories=repos,
log=log
)
fd, test_result_file = tempfile.mkstemp()
os.close(fd)
irm.test_tools(test_json=test_result_file, repositories=repos, log=log, parallel_tests=parallel_tests)
with open(test_result_file) as test_result:
result = json.load(test_result)
assert 'tests' in result