Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planemo_ext/galaxy/jobs/metrics/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def format( self, key, value ):
return ( str( key ), str( value ) )


## Formatting utilities
# Formatting utilities

def seconds_to_str( value ):
if value < 60:
Expand Down
11 changes: 7 additions & 4 deletions planemo_ext/galaxy/jobs/metrics/instrumenters/collectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import logging
log = logging.getLogger( __name__ )

DEFAULT_PROCFILT_ON = "username" # By default, only grab statistics for user
# processes (as identifiers by username).
# By default, only grab statistics for user processes (as identified by
# username).
DEFAULT_PROCFILT_ON = "username"
DEFAULT_SUBSYSTEMS = "process"
DEFAULT_FLUSH_INTERVAL = "0" # Set to zero to flush every collection.
# Set to zero to flush every collection.
DEFAULT_FLUSH_INTERVAL = "0"

FORMATTED_RESOURCE_TITLES = {
"PCT": "Percent CPU Usage",
Expand Down Expand Up @@ -58,8 +60,9 @@ class CollectlPlugin( InstrumentPlugin ):
def __init__( self, **kwargs ):
self.__configure_paths( kwargs )
self.__configure_subsystems( kwargs )
saved_logs_path = kwargs.get( "saved_logs_path", None )
saved_logs_path = kwargs.get( "saved_logs_path", "" )
if "app" in kwargs:
log.debug("Found path for saved logs: %s" % saved_logs_path)
saved_logs_path = kwargs[ "app" ].config.resolve_path( saved_logs_path )
self.saved_logs_path = saved_logs_path
self.__configure_collectl_recorder_args( kwargs )
Expand Down
436 changes: 270 additions & 166 deletions planemo_ext/galaxy/objectstore/__init__.py

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions planemo_ext/galaxy/objectstore/rods.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@
from galaxy.exceptions import ObjectNotFound
from ..objectstore import DiskObjectStore, ObjectStore, local_extra_dirs

try:
import galaxy.eggs
galaxy.eggs.require( 'PyRods' )
except Exception:
pass
try:
import irods
except ImportError:
irods = None

NO_PYRODS_ERROR_MESSAGE = "IRODS object store configured, but no PyRods dependency available. Please install and properly configure PyRods or modify object store configuration."

IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this '
'feature, please install it')

log = logging.getLogger( __name__ )

Expand All @@ -35,9 +32,8 @@ class IRODSObjectStore( DiskObjectStore, ObjectStore ):
Galaxy object store based on iRODS
"""
def __init__( self, config, file_path=None, extra_dirs=None ):
if irods is None:
raise Exception(NO_PYRODS_ERROR_MESSAGE)
super( IRODSObjectStore, self ).__init__( config, file_path=file_path, extra_dirs=extra_dirs )
assert irods is not None, IRODS_IMPORT_MESSAGE
self.cache_path = config.object_store_cache_path
self.default_resource = config.irods_default_resource or None

Expand Down
49 changes: 46 additions & 3 deletions planemo_ext/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class S3ObjectStore(ObjectStore):
def __init__(self, config, config_xml):
if boto is None:
raise Exception(NO_BOTO_ERROR_MESSAGE)
super(S3ObjectStore, self).__init__(config, config_xml)
self.config = config
super(S3ObjectStore, self).__init__(config)
self.staging_path = self.config.file_path
self.transfer_progress = 0
self._parse_config_xml(config_xml)
Expand Down Expand Up @@ -92,6 +91,14 @@ def _parse_config_xml(self, config_xml):
self.conn_path = cn_xml.get('conn_path', '/')
c_xml = config_xml.findall('cache')[0]
self.cache_size = float(c_xml.get('size', -1))
self.staging_path = c_xml.get('path', self.config.object_store_cache_path)

for d_xml in config_xml.findall('extra_dir'):
self.extra_dirs[d_xml.get('type')] = d_xml.get('path')

log.debug("Object cache dir: %s", self.staging_path)
log.debug(" job work dir: %s", self.extra_dirs['job_work'])

# for multipart upload
self.s3server = {'access_key': self.access_key,
'secret_key': self.secret_key,
Expand Down Expand Up @@ -200,15 +207,24 @@ def _fix_permissions(self, rel_path):
continue
umask_fix_perms( path, self.config.umask, 0o666, self.config.gid )

def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs):
def _construct_path(self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs):
rel_path = os.path.join(*directory_hash_id(obj.id))
if extra_dir is not None:
if extra_dir_at_root:
rel_path = os.path.join(extra_dir, rel_path)
else:
rel_path = os.path.join(rel_path, extra_dir)

# for JOB_WORK directory
if obj_dir:
rel_path = os.path.join(rel_path, str(obj.id))
if base_dir:
base = self.extra_dirs.get(base_dir)
return os.path.join(base, rel_path)

# S3 folders are marked by having trailing '/' so add it now
rel_path = '%s/' % rel_path

if not dir_only:
rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id)
return rel_path
Expand Down Expand Up @@ -374,6 +390,7 @@ def file_ready(self, obj, **kwargs):
def exists(self, obj, **kwargs):
in_cache = in_s3 = False
rel_path = self._construct_path(obj, **kwargs)

# Check cache
if self._in_cache(rel_path):
in_cache = True
Expand All @@ -382,11 +399,18 @@ def exists(self, obj, **kwargs):
# log.debug("~~~~~~ File '%s' exists in cache: %s; in s3: %s" % (rel_path, in_cache, in_s3))
# dir_only does not get synced so shortcut the decision
dir_only = kwargs.get('dir_only', False)
base_dir = kwargs.get('base_dir', None)
if dir_only:
if in_cache or in_s3:
return True
# for JOB_WORK directory
elif base_dir:
if not os.path.exists(rel_path):
os.makedirs(rel_path)
return True
else:
return False

# TODO: Sync should probably not be done here. Add this to an async upload stack?
if in_cache and not in_s3:
self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path))
Expand All @@ -398,11 +422,13 @@ def exists(self, obj, **kwargs):

def create(self, obj, **kwargs):
if not self.exists(obj, **kwargs):

# Pull out locally used fields
extra_dir = kwargs.get('extra_dir', None)
extra_dir_at_root = kwargs.get('extra_dir_at_root', False)
dir_only = kwargs.get('dir_only', False)
alt_name = kwargs.get('alt_name', None)

# Construct hashed path
rel_path = os.path.join(*directory_hash_id(obj.id))

Expand All @@ -412,10 +438,12 @@ def create(self, obj, **kwargs):
rel_path = os.path.join(extra_dir, rel_path)
else:
rel_path = os.path.join(rel_path, extra_dir)

# Create given directory in cache
cache_dir = os.path.join(self.staging_path, rel_path)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

# Although not really necessary to create S3 folders (because S3 has
# flat namespace), do so for consistency with the regular file system
# S3 folders are marked by having trailing '/' so add it now
Expand Down Expand Up @@ -449,7 +477,15 @@ def size(self, obj, **kwargs):
def delete(self, obj, entire_dir=False, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
extra_dir = kwargs.get('extra_dir', None)
base_dir = kwargs.get('base_dir', None)
dir_only = kwargs.get('dir_only', False)
obj_dir = kwargs.get('obj_dir', False)
try:
# Remove temparory data in JOB_WORK directory
if base_dir and dir_only and obj_dir:
shutil.rmtree(os.path.abspath(rel_path))
return True

# For the case of extra_files, because we don't have a reference to
# individual files/keys we need to remove the entire directory structure
# with all the files in it. This is easy for the local file system,
Expand Down Expand Up @@ -489,8 +525,15 @@ def get_data(self, obj, start=0, count=-1, **kwargs):
return content

def get_filename(self, obj, **kwargs):
base_dir = kwargs.get('base_dir', None)
dir_only = kwargs.get('dir_only', False)
obj_dir = kwargs.get('obj_dir', False)
rel_path = self._construct_path(obj, **kwargs)

# for JOB_WORK directory
if base_dir and dir_only and obj_dir:
return os.path.abspath(rel_path)

cache_path = self._get_cache_path(rel_path)
# S3 does not recognize directories as files so cannot check if those exist.
# So, if checking dir only, ensure given dir exists in cache and return
Expand Down
20 changes: 7 additions & 13 deletions planemo_ext/galaxy/objectstore/s3_multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

from multiprocessing.pool import IMapIterator

try:
from galaxy import eggs
eggs.require('boto')
except ImportError:
pass

try:
import boto
from boto.s3.connection import S3Connection
Expand All @@ -42,12 +36,12 @@ def mp_from_ids(s3server, mp_id, mp_keyname, mp_bucketname):
"""
if s3server['host']:
conn = boto.connect_s3(aws_access_key_id=s3server['access_key'],
aws_secret_access_key=s3server['secret_key'],
is_secure=s3server['is_secure'],
host=s3server['host'],
port=s3server['port'],
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
path=s3server['conn_path'])
aws_secret_access_key=s3server['secret_key'],
is_secure=s3server['is_secure'],
host=s3server['host'],
port=s3server['port'],
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
path=s3server['conn_path'])
else:
conn = S3Connection(s3server['access_key'], s3server['secret_key'])

Expand Down Expand Up @@ -75,7 +69,7 @@ def multipart_upload(s3server, bucket, s3_key_name, tarball, mb_size):

def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
"%sS3PART" % (os.path.basename(s3_key_name)))
max_chunk = s3server['max_chunk_size']
# Split chunks so they are 5MB < chunk < 250MB(max_chunk_size)
split_size = int(max(min(mb_size / (split_num * 2.0), max_chunk), 5))
Expand Down
5 changes: 4 additions & 1 deletion planemo_ext/galaxy/tools/deps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ def find_dep( self, name, version=None, type='package', **kwds ):
return INDETERMINATE_DEPENDENCY

def __build_dependency_resolvers( self, conf_file ):
if not conf_file or not os.path.exists( conf_file ):
if not conf_file:
return self.__default_dependency_resolvers()
if not os.path.exists( conf_file ):
log.debug( "Unable to find config file '%s'", conf_file)
return self.__default_dependency_resolvers()
plugin_source = plugin_config.plugin_source_from_path( conf_file )
return self.__parse_resolver_conf_xml( plugin_source )
Expand Down
2 changes: 1 addition & 1 deletion planemo_ext/galaxy/tools/deps/brew_exts.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def execute(cmds, env=None):
if env:
subprocess_kwds["env"] = env
p = subprocess.Popen(cmds, **subprocess_kwds)
#log = p.stdout.read()
# log = p.stdout.read()
global VERBOSE
stdout, stderr = p.communicate()
if p.returncode != 0:
Expand Down
1 change: 0 additions & 1 deletion planemo_ext/galaxy/tools/deps/brew_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ def to_recipe_context(homebrew_recipe):
brew_context
)
return map(to_recipe_context, requirements_to_recipes(requirements))

2 changes: 2 additions & 0 deletions planemo_ext/galaxy/tools/deps/docker_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def build_save_image_command(
build_command_parts.extend(["save", "-o", destination, image])
return build_command_parts


def build_pull_command(
tag,
**kwds
Expand All @@ -83,6 +84,7 @@ def build_pull_command(
build_command_parts.extend(["pull", tag])
return build_command_parts


def build_docker_cache_command(
image,
**kwds
Expand Down
10 changes: 5 additions & 5 deletions planemo_ext/galaxy/tools/deps/resolvers/galaxy_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ class GalaxyPackageDependencyResolver(DependencyResolver, UsesToolDependencyDirM
resolver_type = "galaxy_packages"

def __init__(self, dependency_manager, **kwds):
## Galaxy tool shed requires explicit versions on XML elements,
## this in inconvient for testing or Galaxy instances not utilizing
## the tool shed so allow a fallback version of the Galaxy package
## resolver that will just grab 'default' version of exact version
## unavailable.
# Galaxy tool shed requires explicit versions on XML elements,
# this in inconvient for testing or Galaxy instances not utilizing
# the tool shed so allow a fallback version of the Galaxy package
# resolver that will just grab 'default' version of exact version
# unavailable.
self.versionless = str(kwds.get('versionless', "false")).lower() == "true"
self._init_base_path( dependency_manager, **kwds )

Expand Down
1 change: 0 additions & 1 deletion planemo_ext/galaxy/tools/deps/resolvers/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def __init__(self, module_dependency_resolver, modulepath, prefetch):
self.directories = modulepath.split(pathsep)
if prefetch:
log.warn("Created module dependency resolver with prefetch enabled, but directory module checker does not support this.")
pass

def has_module(self, module, version):
has_module = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ def _find_dep_default( self, name, type='package', **kwds ):
installed_tool_dependency = self._get_installed_dependency( name, type, version=None, **kwds )
if installed_tool_dependency:
dependency = self._get_set_environment_installed_dependency_script_path( installed_tool_dependency, name )
is_galaxy_dep = isinstance(dependency, GalaxyPackageDependency)
has_script_dep = is_galaxy_dep and dependency.script and dependency.path
if has_script_dep:
if dependency.script and dependency.path:
# Environment settings do not use versions.
return GalaxyPackageDependency(dependency.script, dependency.path, None)
return INDETERMINATE_DEPENDENCY
Expand Down
Loading