Skip to content
92 changes: 90 additions & 2 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import sys
import os
import argparse

base_dir = os.path.abspath(os.path.dirname(__file__))

Expand All @@ -35,6 +36,7 @@
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB import storages
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)
Expand Down Expand Up @@ -69,9 +71,21 @@ def main(argv):
choices=[INPUT, OUTPUT],
dest='ds_type'
)
stage.add_argument('--es-config', action='store',
type=argparse.FileType('r'),
help=u'Use ES as a backup source for dataset info'
' in order to save information even if it was'
' removed from the original source',
nargs='?',
dest='es'
)
exit_code = 0
try:
stage.parse_args(argv)
if stage.ARGS.es:
cfg = read_es_config(stage.ARGS.es)
stage.ARGS.es.close()
storages.create("ES", storages.storageType.ES, cfg)
if stage.ARGS.ds_type == OUTPUT:
stage.process = process_output_ds
elif stage.ARGS.ds_type == INPUT:
Expand All @@ -89,6 +103,34 @@ def main(argv):
sys.exit(exit_code)


def read_es_config(cfg_file):
""" Read ES configuration file.

:param cfg_file: open file descriptor with ES access configuration
:type cfg_file: file descriptor
"""
keys = {'ES_HOST': 'host',
'ES_PORT': 'port',
'ES_USER': 'user',
'ES_PASSWORD': '__passwd',
'ES_INDEX': 'index'
}
cfg = {}
for line in cfg_file.readlines():
if line.strip().startswith('#'):
continue
line = line.split('#')[0].strip()
if '=' not in line:
continue
key, val = line.split('=')[:2]
try:
cfg[keys[key]] = val
except KeyError:
sys.stderr.write("(WARN) Unknown configuration parameter: "
"'%s'.\n" % key)
return cfg


def init_rucio_client():
""" Initialize global variable `rucio_client`. """
global rucio_client
Expand Down Expand Up @@ -125,12 +167,14 @@ def process_output_ds(stage, message):
datasets = [datasets]

for dataset in datasets:
taskid = json_str.get('taskid')
ds = get_output_ds_info(dataset)
ds['taskid'] = json_str.get('taskid')
ds['taskid'] = taskid
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing).\n")
return True
fix_ds_info(ds)
del(ds['taskid'])
stage.output(pyDKB.dataflow.messages.JSONMessage(ds))

Expand All @@ -156,8 +200,10 @@ def process_input_ds(stage, message):
except RucioException:
data[mfields['bytes']] = -1
data[mfields['deleted']] = True
stage.output(pyDKB.dataflow.messages.JSONMessage(data))

fix_ds_info(data)

stage.output(pyDKB.dataflow.messages.JSONMessage(data))
return True


Expand Down Expand Up @@ -185,9 +231,41 @@ def get_output_ds_info(dataset):
# the length of file is set to -1
ds_dict[mfields['bytes']] = -1
ds_dict[mfields['deleted']] = True

return ds_dict


def fix_ds_info(data):
""" Fix dataset metadata with data from ES, if needed and possible.

:param data: data
:type data: dict
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
# ES configuration was not passed to the stage
return None
if data.get('_type') == 'output_dataset':
mfields = META_FIELDS[OUTPUT]
oid = data.get('_id')
else:
mfields = META_FIELDS[INPUT]
oid = data.get('taskid')

if data.get(mfields['deleted']) and es:
es_fields = mfields.values()
es_fields.remove(mfields['deleted'])
es_mdata = get_es_metadata(oid, mfields.values(), data.get('_parent'))
for f in es_fields:
if es_mdata.get(f) and data.get(f) != es_mdata.get(f):
sys.stderr.write("(DEBUG) Update Rucio info with data from ES:"
" %s = '%s' (was: '%s')\n" % (f, es_mdata[f],
data.get(f)))
data[f] = es_mdata[f]
return True


def extract_scope(dsn):
""" Extract the first field from the dataset name

Expand Down Expand Up @@ -235,6 +313,16 @@ def get_metadata(dsn, attributes=None):
return result


def get_es_metadata(oid, fields=[], parent=None):
es = storages.get("ES")
r = {}
try:
r = es.get(oid, fields, parent=parent)
except storages.exceptions.NotFound, err:
sys.stderr.write("(WARN) %s\n" % err)
return r


def adjust_metadata(mdata):
""" Update metadata taken from Rucio with values required to proceed. """
if not mdata:
Expand Down
22 changes: 22 additions & 0 deletions Utils/Dataflow/pyDKB/storages/Storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ def log(self, level, message):
self.__class__.__name__,
message))

def log_cfg(self, cfg, defaults={}):
""" Log storage configuration.

:param cfg: configuration to be logged
:type cfg: dict
:param defaults: default parameter values, if any (will be logged only
if the parameter is missed in ``cfg``)
:type defaults: dict
"""
fname = ''
if cfg.get('__file'):
fname = ' (%s)' % cfg['__file']
self.log("INFO", "'%s' storage configuration%s:" % (self.name, fname))
key_len = len(max(cfg.keys() + defaults.keys(), key=len))
pattern = "%%-%ds : '%%s'" % key_len
self.log("INFO", "---")
for key in set(cfg.keys() + defaults.keys()):
if key.startswith('__'):
continue
self.log("INFO", pattern % (key, cfg.get(key, defaults.get(key))))
self.log("INFO", "---")

def configure(self, cfg):
""" Apply storage configuration (initialize client).

Expand Down
13 changes: 7 additions & 6 deletions Utils/Dataflow/pyDKB/storages/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,21 @@ def configure(self, cfg):
""" Configure ES client.

Configuration parameters:
host (str: '127.0.0.1')
port (str: '9200')
index (str)
user (str)
passwd (str)
host (str: '127.0.0.1')
port (str: '9200')
index (str)
user (str)
__passwd (str)

:param cfg: configuration parameters
:type cfg: dict
"""
self.log_cfg(cfg, DEFAULT_CFG)
hosts = [{'host': cfg.get('host', DEFAULT_CFG['host']),
'port': cfg.get('port', DEFAULT_CFG['port'])}]
kwargs = {}
if cfg.get('user'):
kwargs['http_auth'] = '%(user)s:%(passwd)s' % cfg
kwargs['http_auth'] = '%(user)s:%(__passwd)s' % cfg
if cfg.get('index'):
self.index = cfg['index']
self.c = elasticsearch.Elasticsearch(hosts, **kwargs)
Expand Down