Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 176 additions & 118 deletions ephemeris/run_data_managers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
'''Run-data-managers is a tool for provisioning data on a galaxy instance.
"""Run-data-managers is a tool for provisioning data on a galaxy instance.

Run-data-managers has the ability to run multiple data managers that are interdependent.
When a reference genome is needed for bwa-mem for example, Run-data-managers
Expand All @@ -21,7 +21,7 @@
the "value" column in the data table already has this entry.
Value takes precedence over sequence_id which takes precedence over dbkey.
* If none of the above input variables are specified the data manager will always run.
'''
"""
import argparse
import json
import logging
Expand All @@ -37,11 +37,12 @@


DEFAULT_URL = "http://localhost"
DEFAULT_SOURCE_TABLES = ["all_fasta"]


def wait(gi, job_list):
def wait(gi, job_list, log):
"""
Waits until a data_manager is finished or failed.
Waits until all jobs in a list are finished or failed.
It will check the state of the created datasets every 30s.
It will return a tuple: ( finished_jobs, failed_jobs )
"""
Expand Down Expand Up @@ -76,94 +77,58 @@ def wait(gi, job_list):
return successful_jobs, failed_jobs


def data_table_entry_exists(tool_data_client, data_table_name, entry, column='value'):
'''Checks whether an entry exists in the a specified column in the data_table.'''
try:
data_table_content = tool_data_client.show_data_table(data_table_name)
except Exception:
raise Exception('Table "%s" does not exist' % (data_table_name))

try:
column_index = data_table_content.get('columns').index(column)
except IndexError:
raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name))

for field in data_table_content.get('fields'):
if field[column_index] == entry:
return True
return False


def get_name_from_inputs(input_dict):
'''Returns the value that will most likely be recorded in the "name" column of the datatable. Or returns False'''
possible_keys = ['name', 'sequence_name'] # In order of importance!
for key in possible_keys:
if key in input_dict:
return input_dict.get(key)
return False


def get_value_from_inputs(input_dict):
'''Returns the value that will most likely be recorded in the "value" column of the datatable. Or returns False'''
possible_keys = ['value', 'sequence_id', 'dbkey'] # In order of importance!
for key in possible_keys:
def get_first_valid_entry(input_dict, key_list):
"""Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None"""
for key in key_list:
if key in input_dict:
return input_dict.get(key)
return False


def input_entries_exist_in_data_tables(tool_data_client, data_tables, input_dict):
'''Checks whether name and value entries from the input are already present in the data tables.
If an entry is missing in of the tables, this function returns False'''
value_entry = get_value_from_inputs(input_dict)
name_entry = get_name_from_inputs(input_dict)

# Return False if name and value entries are both False
if not value_entry and not name_entry:
return False
return None


class DataManagers:
def __init__(self, galaxy_instance, configuration):
"""
:param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy)
:param configuration: A dictionary. Examples in the ephemeris documentation.
"""
self.gi = galaxy_instance
self.config = configuration
self.tool_data_client = ToolDataClient(self.gi)
self.possible_name_keys = ['name', 'sequence_name'] # In order of importance!
self.possible_value_keys = ['value', 'sequence_id', 'dbkey'] # In order of importance!
self.data_managers = self.config.get('data_managers')
self.genomes = self.config.get('genomes', '')
self.source_tables = DEFAULT_SOURCE_TABLES
self.fetch_jobs = []
self.skipped_fetch_jobs = []
self.index_jobs = []
self.skipped_index_jobs = []

def initiate_job_lists(self):
"""
Determines which data managers should be run to populate the data tables.
Distinguishes between fetch jobs (download files) and index jobs.
:return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs
"""
self.fetch_jobs = []
self.skipped_fetch_jobs = []
self.index_jobs = []
self.skipped_index_jobs = []
for dm in self.data_managers:
jobs, skipped_jobs = self.get_dm_jobs(dm)
if self.dm_is_fetcher(dm):
self.fetch_jobs.extend(jobs)
self.skipped_fetch_jobs.extend(skipped_jobs)
else:
self.index_jobs.extend(jobs)
self.skipped_index_jobs.extend(skipped_jobs)

# Check every data table for existance of name and value
# Return False as soon as entry is not present
for data_table in data_tables:
if value_entry:
if not data_table_entry_exists(tool_data_client, data_table, value_entry, column='value'):
return False
if name_entry:
if not data_table_entry_exists(tool_data_client, data_table, name_entry, column='name'):
return False
# If all checks are passed the entries are present in the database tables.
return True


def parse_items(items, genomes):
if bool(genomes):
items_template = Template(json.dumps(items))
rendered_items = items_template.render(genomes=json.dumps(genomes))
# Remove trailing " if present
rendered_items = rendered_items.strip('"')
items = json.loads(rendered_items)
return items


def run_dm(args):
args.galaxy = args.galaxy or DEFAULT_URL
conf = load_yaml_file(args.config)
gi = get_galaxy_connection(args, log=log, file=args.config, login_required=True)
# should test valid connection
# The following should throw a ConnectionError when invalid API key or password
genomes = gi.genomes.get_genomes() # Does not get genomes but preconfigured dbkeys
log.info('Number of possible dbkeys: %s' % str(len(genomes)))

tool_data_client = ToolDataClient(gi)

number_skipped_jobs = 0
all_failed_jobs = []
all_successful_jobs = []
genomes = conf.get('genomes', '')

for dm in conf.get('data_managers'):
items = parse_items(dm.get('items', ['']), genomes)
def get_dm_jobs(self, dm):
"""Gets the job entries for a single dm. Puts entries that already present in skipped_job_list.
:returns job_list, skipped_job_list"""
job_list = []
skipped_job_list = []
items = self.parse_items(dm.get('items', ['']))
for item in items:
dm_id = dm['id']
params = dm['params']
Expand All @@ -176,38 +141,135 @@ def run_dm(args):
value = value_template.render(item=item)
inputs.update({key: value})

job = dict(tool_id=dm_id, inputs=inputs)

data_tables = dm.get('data_table_reload', [])
# Only run if not run before.
if input_entries_exist_in_data_tables(tool_data_client, data_tables, inputs) and not args.overwrite:
log.info('%s already run for %s' % (dm_id, inputs))
number_skipped_jobs += 1
if self.input_entries_exist_in_data_tables(data_tables, inputs):
skipped_job_list.append(job)
else:
# run the DM-job
job = gi.tools.run_tool(history_id=None, tool_id=dm_id, tool_inputs=inputs)
log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' % (job['outputs'][0]['hid'], dm_id, inputs))
job_list.append(job)
successful_jobs, failed_jobs = wait(gi, job_list)
if failed_jobs:
if not args.ignore_errors:
raise Exception('Not all jobs successful! aborting...')
else:
log.error('Not all jobs successful! ignoring...')
all_successful_jobs += successful_jobs
all_failed_jobs += failed_jobs
job_summary = dict()
job_summary['successful_jobs'] = len(all_successful_jobs)
job_summary['failed_jobs'] = len(all_failed_jobs)
job_summary['skipped_jobs'] = number_skipped_jobs
return job_summary
return job_list, skipped_job_list

def dm_is_fetcher(self, dm):
"""Checks whether the data manager fetches a sequence instead of indexing.
This is based on the source table.
:returns True if dm is a fetcher. False if it is not."""
data_tables = dm.get('data_table_reload', [])
for data_table in data_tables:
if data_table in self.source_tables:
return True
return False

def data_table_entry_exists(self, data_table_name, entry, column='value'):
"""Checks whether an entry exists in the a specified column in the data_table."""
try:
data_table_content = self.tool_data_client.show_data_table(data_table_name)
except Exception:
raise Exception('Table "%s" does not exist' % data_table_name)

try:
column_index = data_table_content.get('columns').index(column)
except IndexError:
raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name))

for field in data_table_content.get('fields'):
if field[column_index] == entry:
return True
return False

def input_entries_exist_in_data_tables(self, data_tables, input_dict):
"""Checks whether name and value entries from the input are already present in the data tables.
If an entry is missing in of the tables, this function returns False"""
value_entry = get_first_valid_entry(input_dict, self.possible_value_keys)
name_entry = get_first_valid_entry(input_dict, self.possible_name_keys)

# Return False if name and value entries are both None
if not value_entry and not name_entry:
return False

# Check every data table for existence of name and value
# Return False as soon as entry is not present
for data_table in data_tables:
if value_entry:
if not self.data_table_entry_exists(data_table, value_entry, column='value'):
return False
if name_entry:
if not self.data_table_entry_exists(data_table, name_entry, column='name'):
return False
# If all checks are passed the entries are present in the database tables.
return True

def parse_items(self, items):
"""
Parses items with jinja2.
:param items: the items to be parsed
:return: the parsed items
"""
if bool(self.genomes):
items_template = Template(json.dumps(items))
rendered_items = items_template.render(genomes=json.dumps(self.genomes))
# Remove trailing " if present
rendered_items = rendered_items.strip('"')
items = json.loads(rendered_items)
return items

def run(self, log, ignore_errors=False, overwrite=False):
"""
Runs the data managers.
:param log: The log to be used.
:param ignore_errors: Ignore erroring data_managers. Continue regardless.
:param overwrite: Overwrite existing entries in data tables
"""
self.initiate_job_lists()
all_succesful_jobs = []
all_failed_jobs = []
all_skipped_jobs = []

def run_jobs(jobs, skipped_jobs):
job_list = []
for skipped_job in skipped_jobs:
if overwrite:
log.info('%s already run for %s. Entry will be overwritten.' %
(skipped_job["tool_id"], skipped_job["inputs"]))
jobs.append(skipped_job)
else:
log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"]))
all_skipped_jobs.append(skipped_job)
for job in jobs:
started_job = self.gi.tools.run_tool(history_id=None, tool_id=job["tool_id"], tool_inputs=job["inputs"])
log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' %
(started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"]))
job_list.append(started_job)

successful_jobs, failed_jobs = wait(self.gi, job_list, log)
if failed_jobs:
if not ignore_errors:
log.error('Not all jobs successful! aborting...')
raise Exception('Not all jobs successful! aborting...')
else:
log.warning('Not all jobs successful! ignoring...')
all_succesful_jobs.extend(successful_jobs)
all_failed_jobs.extend(failed_jobs)

log.info("Running data managers that populate the following source data tables: %s" % self.source_tables)
run_jobs(self.fetch_jobs, self.skipped_fetch_jobs)
log.info("Running data managers that index sequences.")
run_jobs(self.index_jobs, self.skipped_index_jobs)

log.info('Finished running data managers. Results:')
log.info('Successful jobs: %i ' % len(all_succesful_jobs))
log.info('Skipped jobs: %i ' % len(all_skipped_jobs))
log.info('Failed jobs: %i ' % len(all_failed_jobs))


def _parser():
'''returns the parser object.'''
"""returns the parser object."""
parent = get_common_args()

parser = argparse.ArgumentParser(
parents=[parent],
description='Running Galaxy data managers in a defined order with defined parameters.')
description='Running Galaxy data managers in a defined order with defined parameters.'
"'watch_tool_data_dir' in galaxy config should be set to true.'")
parser.add_argument("--config", required=True,
help="Path to the YAML config file with the list of data managers and data to install.")
parser.add_argument("--overwrite", action="store_true",
Expand All @@ -218,22 +280,18 @@ def _parser():


def main():
global log
disable_external_library_logging()
log = setup_global_logger(name=__name__, log_file='/tmp/galaxy_data_manager_install.log')
parser = _parser()
args = parser.parse_args()
if args.verbose:

log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
log.info("Running data managers...")
job_summary = run_dm(args)
log.info('Finished running data managers. Results:')
log.info('Successful jobs: %i ' % job_summary['successful_jobs'])
log.info('Skipped jobs: %i ' % job_summary['skipped_jobs'])
log.info('Failed jobs: %i ' % job_summary['failed_jobs'])
gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True)
config = load_yaml_file(args.config)
data_managers = DataManagers(gi, config)
data_managers.run(log, args.ignore_errors, args.overwrite)


if __name__ == '__main__':
Expand Down
10 changes: 10 additions & 0 deletions tests/data_manager_list.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This is just a sample file. For a fully documented version of this file, see
# https://github.com/galaxyproject/ansible-galaxy-tools/blob/master/files/tool_list.yaml.sample
tools:
- name: data_manager_fetch_genome_dbkeys_all_fasta
owner: devteam
- name: data_manager_sam_fasta_index_builder
owner: devteam
- name: data_manager_bwa_mem_index_builder
owner: devteam

11 changes: 10 additions & 1 deletion tests/run_data_managers.yaml.test
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ genomes:
id: NoroVirCHA7A011

data_managers:
- id: toolshed.g2.bx.psu.edu/repos/devteam/data_manager_fetch_genome_dbkeys_all_fasta/data_manager_fetch_genome_all_fasta_dbkey/0.0.2
- id: data_manager_fetch_genome_all_fasta_dbkey
params:
- 'dbkey_source|dbkey_source_selector': 'new'
- 'dbkey_source|dbkey': '{{ item.id }}'
Expand All @@ -33,3 +33,12 @@ data_managers:
items: "{{ genomes }}"
data_table_reload:
- fasta_indexes

- id: bwa_mem_index_builder_data_manager
params:
- 'all_fasta_source': '{{ item.id }}'
- 'sequence_name': '{{ item.name }}'
- 'sequence_id': '{{ item.id }}'
items: "{{ genomes }}"
data_table_reload:
- bwa_mem_indexes
Loading