diff --git a/Utils/Dataflow/000_kafka/ru/kiae/dkb/kafka/common/external/ExternalProcessLogger.java b/Utils/Dataflow/000_kafka/ru/kiae/dkb/kafka/common/external/ExternalProcessLogger.java index 009a15819..bdd119e1c 100644 --- a/Utils/Dataflow/000_kafka/ru/kiae/dkb/kafka/common/external/ExternalProcessLogger.java +++ b/Utils/Dataflow/000_kafka/ru/kiae/dkb/kafka/common/external/ExternalProcessLogger.java @@ -27,9 +27,10 @@ public class ExternalProcessLogger implements Runnable { private BufferedReader STDERR; + private Pattern lmt_p = Pattern.compile("\\(?(TRACE|DEBUG|INFO|" + + "WARN(?:ING)?|ERROR|==)\\)?"); - private Pattern lmt_p = Pattern.compile("\\(?(TRACE|DEBUG|INFO|" - + "WARN(?:ING)?|ERROR)\\)?"); + private String prev_type = ""; public ExternalProcessLogger(Process process, String command) { this.process = process; @@ -52,9 +53,12 @@ private void external_log(String line) { String type = "TRACE"; if (m.lookingAt()) { type = m.group(1); - line = line.replaceFirst("\\(?" + type + "\\)?", ""); + line = line.replaceFirst("^(.*)?\\(?" + type + "\\)?", ""); } line = "(" + this.command + ")" + line; + if (type == "==") { + type = prev_type; + } switch (type) { case "TRACE": log.trace(line); @@ -75,6 +79,6 @@ private void external_log(String line) { default: log.trace(line); } - + prev_type = type; } } diff --git a/Utils/Dataflow/pyDKB/__init__.py b/Utils/Dataflow/pyDKB/__init__.py index a821c1740..e9d89225c 100644 --- a/Utils/Dataflow/pyDKB/__init__.py +++ b/Utils/Dataflow/pyDKB/__init__.py @@ -2,9 +2,27 @@ Common library for Data Knowledge Base development. """ -import dataflow -import common +import sys __version__ = "0.3-SNAPSHOT" __all__ = ["dataflow"] + +try: + import common + logMsgFormat = '%(asctime)s: (%(levelname)s) %(message)s' \ + ' (%(name)s)' +# Or, in case of multithreading: +# ' (%(name)s) (%(threadName)s)' + common.logging.configureRootLogger(msg_format=logMsgFormat, + level=common.logging.WARN) + logger = common.logging.getLogger(__name__) +except (SyntaxError, ImportError), err: + raise ImportError("%s (in submodule 'common')" % err) + +try: + import dataflow +except (SyntaxError, ImportError), err: + logger.error("Failed to import submodule 'dataflow'") + logger.traceback() + raise ImportError("%s (in submodule 'dataflow')" % err) diff --git a/Utils/Dataflow/pyDKB/common/__init__.py b/Utils/Dataflow/pyDKB/common/__init__.py index 024e2871f..b0b0fe9b7 100644 --- a/Utils/Dataflow/pyDKB/common/__init__.py +++ b/Utils/Dataflow/pyDKB/common/__init__.py @@ -3,6 +3,7 @@ """ from exceptions import * +import _logging as logging import hdfs import json_utils as json from custom_readline import custom_readline diff --git a/Utils/Dataflow/pyDKB/common/_logging.py b/Utils/Dataflow/pyDKB/common/_logging.py new file mode 100644 index 000000000..240fce093 --- /dev/null +++ b/Utils/Dataflow/pyDKB/common/_logging.py @@ -0,0 +1,318 @@ +""" +pyDKB.common._logging + +A wrapper for standard 'logging' module. +""" + +import logging +from logging import CRITICAL, FATAL, ERROR, WARNING, WARN, INFO, DEBUG, NOTSET +import sys + +__logging_version = int(logging.__version__.split('.')[1]) + +# --------------------------------------------------- +# Module variables +# --------------------------------------------------- +# +# Some variables are initialized in 'init()' function. +# +# Logger instance to be used in the module +logger = None +# Root logger for the whole library +_rootLogger = None +# Additional log level +TRACE = DEBUG / 2 +logging.addLevelName(TRACE, 'TRACE') + + +# ------------------------------------------- +# Custom implementation for 'logging' classes +# ------------------------------------------- + + +class Logger(logging.Logger, object): + """ Logger implementation, aware of 'TRACE' log level. + + New methods: + * trace() -- log with TRACE level; + * traceback() -- log traceback with DEBUG level. + """ + + def trace(self, msg, *args, **kwargs): + """ Log 'msg % args' with severity 'TRACE'. + + To pass exception information, use the keyword argument exc_info with + a true value, e.g. + + logger.trace("Houston, we have a %s", "interesting problem", + exc_info=1) + """ + if self.isEnabledFor(TRACE): + self._log(TRACE, msg, args, **kwargs) + + def traceback(self, **kwargs): + """ Log traceback without additionat messages with severity 'DEBUG'. + + logger.traceback() + """ + if self.isEnabledFor(DEBUG): + if not (kwargs.get('exc_info')): + kwargs['exc_info'] = 1 + self.debug('Traceback info:', **kwargs) + + +class RootLogger(Logger): + """ Same as Logger, but must must have `Level` and be the only one. """ + def __init__(self, level, name='root'): + """ Initialize new root logger. """ + Logger.__init__(self, name, level) + + +class MultilineFormatter(logging.Formatter, object): + """ Formatter for multiline messages. + + Every extra line (directly in the message body, or the traceback) + is: + * prefixed with ' (==)'; + * suffixed with the part of format string which goes after + '%(message)s' part. + """ + + _suffix = None + + def __init__(self, *args): + """ Split format string into message format and suffix. """ + new_args = list(args) + if len(args): + fmt = args[0] + new_args[0], self._suffix = self.splitFormat(fmt) + super(MultilineFormatter, self).__init__(*new_args) + + def splitFormat(self, fmt): + """ Split format string into msg format and suffix. + + Suffix is everything that goes after the message body. + """ + format, suffix = ('', '') + splitted = fmt.split("%(message)s") + if len(splitted) > 1: + format = "%(message)s".join(splitted[:-1]) + "%(message)s" + suffix = splitted[-1] + else: + format = fmt + return format, suffix + + def formatSuffix(self, record): + """ Return formatted suffix. """ + return self._suffix % record.__dict__ + + def formatExtra(self, lines, prefix=" (==) ", suffix='', align=False): + """ Format extra lines of the log message (traceback, ...). + + Parameter 'align' shows whether the suffix should be aligned + to the right (by the longest line), or to the left (as for normal + log messages). + """ + if isinstance(lines, list) and len(lines): + max_len = len(max(lines, key=len)) + # Need to add prefix len (in case that the longest line is + # among those that will be prefixed). + max_len += len(prefix) + if suffix and align: + line_fmt = "%%(line)-%ds" % max_len + else: + line_fmt = "%(line)s" + extra = line_fmt % {'line': lines[0]} + suffix + for line in lines[1:]: + extra += "\n" + line_fmt % {'line': prefix + line} + suffix + else: + extra = "" + return extra + + def format(self, record): + """ Format multiline message. + + Second and further lines from initial message are formatted + 'extra' lines. + """ + formatted = super(MultilineFormatter, self).format(record) + lines = formatted.splitlines() + suffix = self.formatSuffix(record) + result = self.formatExtra(lines, suffix=suffix, align=True) + return result + + +# -------------------------- +# Module top-level functions +# -------------------------- + + +def isInitialized(): + """ Checks if the module (namely, root logger) is initialized. """ + return isinstance(_rootLogger, Logger) and _rootLogger.handlers + + +def getRootLogger(**kwargs): + """ Reconfigure and return library root logger. """ + if kwargs or not isInitialized(): + configureRootLogger(**kwargs) + return _rootLogger + + +def init(**kwargs): + """ Initialize module. + + If already initialized, do nothing. + """ + global logger + if not isInitialized(): + initRootLogger(**kwargs) + logger = getLogger(__name__) + + +def initRootLogger(**kwargs): + """ Initialize root logger. + + If already initialized, do nothing. + """ + global _rootLogger + + if isInitialized(): + return _rootLogger + + name = kwargs.get('name') + if not name: + name = __name__.split('.')[0] + # Create new root logger + root = RootLogger(WARNING, name) + # Set Logger class 'root' to the new root + Logger.root = root + # Create new manager (object, responsible for new loggers creation) + manager = logging.Manager(root) + if __logging_version < 5: + logging.setLoggerClass(Logger) + else: + manager.setLoggerClass(Logger) + Logger.manager = manager + root.manager = Logger.manager + # 3) reset root logger for our class. + Logger.root = root + + filename = kwargs.get('filename') + if filename: + mode = kwargs.get('mode', 'a') + hdlr_name = "file_%s_%s" % (filename, mode) + # Open file without delay, as presence of 'filename' + # keyword supposes that function was called during + # intended module initialization, not as a side effect + # of calling 'getLogger()' before initializtion. + hdlr = FileHandler(filename, mode) + else: + stream = kwargs.get('stream', sys.stderr) + hdlr_name = "stream_%s" % stream.fileno() + hdlr = logging.StreamHandler(stream) + + if __logging_version < 5: + hdlr._name = hdlr_name + else: + hdlr.set_name(hdlr_name) + + fs = kwargs.get('msg_format', logging.BASIC_FORMAT) + dfs = kwargs.get('datefmt', None) + fmt = MultilineFormatter(fs, dfs) + hdlr.setFormatter(fmt) + root.addHandler(hdlr) + + level = kwargs.get('level') + if level is not None: + root.setLevel(level) + _rootLogger = root + + return _rootLogger + + +def configureRootLogger(**kwargs): + """ (Re)configure root logger. """ + if not isInitialized(): + return init(**kwargs) + + root = _rootLogger + name = kwargs.get('name') + if name and name != root.name: + # Renaming is not allowed, + # as logger name is the hierarchy-forming parameter + logger.warning("Root logger ('%s') can not be renamed to '%s'.", + root.name, name) + + # Root logger is supposed to have exactly one handler, + # so we count on this fact here + old_hdlr = root.handlers[0] + old_fmt = old_hdlr.formatter + + filename = kwargs.get('filename') + stream = kwargs.get('stream', sys.stderr) + if filename: + mode = kwargs.get('mode', 'a') + hdlr_name = "file_%s_%s" % (filename, mode) + # Delay allows us not to open file right now, + # but only when it is actually required + hdlr = FileHandler(filename, mode, delay=True) + elif stream: + hdlr_name = "stream_%s" % stream.fileno() + hdlr = logging.StreamHandler(stream) + else: + hdlr = old_hdlr + + if __logging_version < 5: + if not getattr(hdlr, '_name', None): + hdlr._name = hdlr_name + else: + if not hdlr.get_name(): + hdlr.set_name(hdlr_name) + + # Remove old handler or go on with it instead of the newly created one + # (if it is configured just the same way). + if old_hdlr != hdlr: + if __logging_version < 5 \ + and (getattr(old_hdlr, '_name', None) + != getattr(hdlr, '_name', '')) \ + or __logging_version >= 5 \ + and old_hdlr.get_name() != hdlr.get_name(): + old_hdlr.close() + root.removeHandler(old_hdlr) + root.addHandler(hdlr) + else: + hdlr.close() + hdlr = old_hdlr + + fs = kwargs.get("msg_format") + dfs = kwargs.get("datefmt") + + # Check if handler formatter was configured earlier + # and use old values if no new specified + if isinstance(old_fmt, logging.Formatter): + if not fs: + fs = old_fmt._fmt + if not dfs: + dfs = old_fmt.datefmt + elif not fs: + fs = logging.BASIC_FORMAT + + fmt = MultilineFormatter(fs, dfs) + hdlr.setFormatter(fmt) + + level = kwargs.get("level") + if level is not None: + root.setLevel(level) + + return root + + +def getLogger(name): + """ Return logger with given name. """ + if not isInitialized(): + init() + root = _rootLogger + if name == root.name: + return root + return root.manager.getLogger(name) diff --git a/Utils/Dataflow/pyDKB/common/hdfs.py b/Utils/Dataflow/pyDKB/common/hdfs.py index f8aa18eb2..74c8c4607 100644 --- a/Utils/Dataflow/pyDKB/common/hdfs.py +++ b/Utils/Dataflow/pyDKB/common/hdfs.py @@ -10,10 +10,13 @@ import tempfile from . import HDFSException +from . import logging DEVNULL = open(os.path.devnull, "w") DKB_HOME = "/user/DKB/" +logger = logging.getLogger(__name__) + def check_stderr(proc, timeout=None, max_lines=1): """ Wait till the end of the subprocess and send its STDERR to STDERR. @@ -33,7 +36,7 @@ def check_stderr(proc, timeout=None, max_lines=1): if err: n_lines += 1 if max_lines is None or n_lines <= max_lines: - sys.stderr.write("(INFO) (proc) %s\n" % err) + logger.info("(proc) %s" % err) if proc.poll(): raise subprocess.CalledProcessError(proc.returncode, None) return proc.poll() @@ -78,8 +81,8 @@ def movefile(fname, dest): try: os.remove(fname) except OSError, err: - sys.stderr.write("(WARN) Failed to remove local copy of HDFS file" - " (%s): %s" % (fname, err)) + logger.warn("Failed to remove local copy of HDFS file" + " (%s): %s", fname, err) def getfile(fname): diff --git a/Utils/Dataflow/pyDKB/dataflow/cds.py b/Utils/Dataflow/pyDKB/dataflow/cds.py index 342eae719..2611db73a 100644 --- a/Utils/Dataflow/pyDKB/dataflow/cds.py +++ b/Utils/Dataflow/pyDKB/dataflow/cds.py @@ -6,6 +6,9 @@ import signal import os +from pyDKB.common import logging + +logger = logging.getLogger(__name__) __all__ = ["CDSInvenioConnector", "KerberizedCDSInvenioConnector"] @@ -19,7 +22,7 @@ from invenio_client.contrib import cds import splinter except ImportError, e: - sys.stderr.write("(WARN) %s failed (%s)\n" % (__name__, e)) + logger.warn("%s failed (%s)", __name__, e) __all__ = [] else: @@ -80,9 +83,9 @@ def __init__(self, login="user", password="password"): try: kerberos except NameError: - sys.stderr.write("(ERROR) Kerberos Python package is not" - " installed. Can't proceed with Kerberos" - " authorization.\n") + logger.error("Kerberos Python package is not" + " installed. Can't proceed with Kerberos" + " authorization.") sys.exit(4) super(KerberizedCDSInvenioConnector, self).__init__("user", @@ -105,5 +108,5 @@ def _init_browser(self): self.browser.find_link_by_partial_text("Sign in").click() except kerberos.GSSError, e: - sys.stderr.write("(ERROR) %s\n" % str(e)) + logger.error(str(e)) sys.exit(3) diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py index 014963cdc..ef2908394 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/__init__.py @@ -4,7 +4,6 @@ from .. import messageType from .. import codeType -from .. import logLevel from .. import DataflowException from messages import Message diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py index 5abd3dcee..84c3bec21 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py @@ -6,12 +6,13 @@ from collections import defaultdict from . import messageType -from . import logLevel from . import DataflowException from .. import Message from ..stream import StreamBuilder +from pyDKB.common import logging + class ConsumerException(DataflowException): """ Dataflow Consumer exception. """ @@ -21,6 +22,8 @@ class ConsumerException(DataflowException): class Consumer(object): """ Data consumer implementation. """ + logger = logging.getLogger(__name__) + config = None message_type = None @@ -32,23 +35,9 @@ def __init__(self, config={}): self.config = config self.reconfigure() - def log(self, message, level=logLevel.INFO): + def log(self, message, level=logging.INFO): """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) + self.logger.log(level, message) def __iter__(self): """ Initialize iteration. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py index d014a9679..919205d1b 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/FileConsumer.py @@ -15,13 +15,16 @@ import Consumer from . import DataflowException -from . import logLevel from .. import Message +from pyDKB.common import logging + class FileConsumer(Consumer.Consumer): """ Data consumer implementation for HDFS data source. """ + logger = logging.getLogger(__name__) + # Current file current_file = None @@ -93,7 +96,7 @@ def _filenames(self): files = self._filenames_from_dir(self.config['input_dir']) else: self.log("No input files configured; reading filenames from" - " STDIN.", logLevel.WARN) + " STDIN.", logging.WARN) files = self._filenames_from_stdin() return files diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/HDFSConsumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/HDFSConsumer.py index f5e049d48..cb45fa490 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/HDFSConsumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/HDFSConsumer.py @@ -7,14 +7,17 @@ import FileConsumer import Consumer from . import DataflowException + from pyDKB.common import hdfs from pyDKB.common import HDFSException -from . import logLevel +from pyDKB.common import logging class HDFSConsumer(FileConsumer.FileConsumer): """ Data consumer implementation for HDFS data source. """ + logger = logging.getLogger(__name__) + # Override def reconfigure(self, config={}): """ Configure HDFS Consumer according to the config parameters. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/StreamConsumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/StreamConsumer.py index 752f576c2..c927021bf 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/StreamConsumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/StreamConsumer.py @@ -12,12 +12,15 @@ import Consumer from . import DataflowException -from . import logLevel + +from pyDKB.common import logging class StreamConsumer(Consumer.Consumer): """ Data consumer implementation for Stream data source. """ + logger = logging.getLogger(__name__) + fd = None # Override diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py index 4a19d219f..aa79c3851 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from .. import DataflowException from Consumer import Consumer @@ -11,12 +10,16 @@ from HDFSConsumer import HDFSConsumer from StreamConsumer import StreamConsumer +from pyDKB.common import logging + __all__ = ['ConsumerBuilder'] class ConsumerBuilder(object): """ Constructor for Consumer instance. """ + logger = logging.getLogger(__name__) + consumerClass = None def __init__(self, config={}): diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/messages.py b/Utils/Dataflow/pyDKB/dataflow/communication/messages.py index 860a94709..e27b80a1e 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/messages.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/messages.py @@ -7,9 +7,13 @@ from . import messageType from . import codeType +from pyDKB.common import logging + import json import sys +logger = logging.getLogger(__name__) + __message_class = {} @@ -35,9 +39,9 @@ def Message(msg_type): raise ValueError("Message type must be a member of messageType") cls = __message_class.get(msg_type) if not cls: - sys.stderr.write( - "(WARN) Message class for type %s is not implemented. " - "Using AbstractMessage instead.") + logger.warn("Message class for type %s is not implemented. " + "Using AbstractMessage instead." + % messageType.memberName(msg_type)) cls = AbstractMessage return cls @@ -46,6 +50,8 @@ def Message(msg_type): class AbstractMessage(object): """ Abstract message """ + logger = logging.getLogger("%s.AbstractMessage" % __name__) + msg_type = None native_types = [] diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py index c82d049c1..16a262b0f 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/FileProducer.py @@ -13,12 +13,15 @@ import time from Producer import Producer, ProducerException -from . import logLevel + +from pyDKB.common import logging class FileProducer(Producer): """ Data producer implementation for local file data dest. """ + logger = logging.getLogger(__name__) + _dir = None _default_dir = None current_file = None @@ -119,7 +122,7 @@ def ensure_dir(self): os.makedirs(path, 0770) except OSError, err: self.log("Failed to create output directory\n" - "Error message: %s\n" % err, logLevel.ERROR) + "Error message: %s\n" % err, logging.ERROR) raise ProducerException return path diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/HDFSProducer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/HDFSProducer.py index f0e7085e2..388d61214 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/HDFSProducer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/HDFSProducer.py @@ -13,13 +13,16 @@ from Producer import ProducerException from FileProducer import FileProducer -from . import logLevel + from pyDKB.common import hdfs +from pyDKB.common import logging class HDFSProducer(FileProducer): """ Data producer implementation for HDFS data dest. """ + logger = logging.getLogger(__name__) + def config_dir(self, config={}): """ Configure output directory. """ if not config: diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py index bd1927a62..3bf951726 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py @@ -5,12 +5,13 @@ import sys from . import messageType -from . import logLevel from . import DataflowException from .. import Message from ..stream import StreamBuilder +from pyDKB.common import logging + class ProducerException(DataflowException): """ Dataflow Producer exception. """ @@ -20,6 +21,8 @@ class ProducerException(DataflowException): class Producer(object): """ Data producer implementation. """ + logger = logging.getLogger(__name__) + config = None message_type = None @@ -31,22 +34,9 @@ def __init__(self, config={}): self.config = config self.reconfigure() - def log(self, message, level=logLevel.INFO): + def log(self, message, level=logging.INFO): """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) + self.logger.log(level, message) def reconfigure(self, config={}): """ (Re)initialize producer with stage config arguments. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/StreamProducer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/StreamProducer.py index dfb02baf5..e889a5c45 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/StreamProducer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/StreamProducer.py @@ -11,12 +11,15 @@ from Producer import Producer from . import DataflowException -from . import logLevel + +from pyDKB.common import logging class StreamProducer(Producer): """ Data producer implementation for Stream data dest. """ + logger = logging.getLogger(__name__) + fd = None # Override diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py index ae404d333..a7d212063 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from .. import DataflowException from Producer import Producer @@ -11,12 +10,16 @@ from HDFSProducer import HDFSProducer from StreamProducer import StreamProducer +from pyDKB.common import logging + __all__ = ['ProducerBuilder'] class ProducerBuilder(object): """ Constructor for Producer instance. """ + logger = logging.getLogger(__name__) + producerClass = None message_type = None src_info = None diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index f6e4ee0bb..4912f199a 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -4,14 +4,17 @@ from Stream import Stream from . import messageType -from . import logLevel from . import Message + from pyDKB.common import custom_readline +from pyDKB.common import logging class InputStream(Stream): """ Implementation of the input stream. """ + logger = logging.getLogger(__name__) + __iterator = None def __iter__(self): @@ -58,7 +61,7 @@ def parse_message(self, message): "Cause: %s\n" "Original message: '%s'" % (messageClass.typeName(), err, message), - logLevel.WARN) + logging.WARN) return False def get_message(self): diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py index 8411a5a39..03ab8ec21 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py @@ -4,14 +4,17 @@ from Stream import Stream from . import messageType -from . import logLevel from . import Message + from pyDKB.common import custom_readline +from pyDKB.common import logging class OutputStream(Stream): """ Implementation of the output stream. """ + logger = logging.getLogger(__name__) + msg_buffer = [] def configure(self, config={}): diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py index 4ca501104..e6b94e19c 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/Stream.py @@ -5,13 +5,16 @@ import sys from . import messageType -from . import logLevel from . import Message +from pyDKB.common import logging + class Stream(object): """ Abstract class for input/output streams. """ + logger = logging.getLogger(__name__) + message_type = None _fd = None @@ -20,23 +23,9 @@ def __init__(self, fd=None, config={}): self.reset(fd) self.configure(config) - def log(self, message, level=logLevel.INFO): + def log(self, message, level=logging.INFO): """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) + self.logger.log(level, message) def configure(self, config): """ Stream configuration. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py index f648d3184..2ef5ef1ce 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/__init__.py @@ -4,7 +4,6 @@ from .. import messageType from .. import codeType -from .. import logLevel from .. import DataflowException from .. import Message from InputStream import InputStream @@ -13,6 +12,8 @@ from InputStream import InputStream from OutputStream import OutputStream +from pyDKB.common import logging + __all__ = ['StreamBuilder', 'StreamException', 'Stream', 'InputStream', 'OutputStream'] @@ -25,6 +26,8 @@ class StreamException(DataflowException): class StreamBuilder(object): """ Constructor for Stream object. """ + logger = logging.getLogger(__name__) + message_type = None streamClass = None diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 9815fd6e1..2a299ceda 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -7,14 +7,9 @@ import ConfigParser from collections import defaultdict import textwrap +import argparse -from . import logLevel - -try: - import argparse -except ImportError, e: - sys.stderr.write("(ERROR) argparse package is not installed.\n") - raise e +from pyDKB.common import logging class AbstractStage(object): @@ -33,6 +28,8 @@ class AbstractStage(object): CONFIG """ + logger = logging.getLogger(__name__) + def __init__(self, description="DKB Dataflow stage"): """ Initialize the stage @@ -70,23 +67,9 @@ def __init__(self, description="DKB Dataflow stage"): self._error = None - def log(self, message, level=logLevel.INFO): + def log(self, message, level=logging.INFO): """ Output log message with given log level. """ - if not logLevel.hasMember(level): - self.log("Unknown log level: %s" % level, logLevel.WARN) - level = logLevel.INFO - if type(message) == list: - lines = message - else: - lines = message.splitlines() - if lines: - out_message = "(%s) (%s) %s" % (logLevel.memberName(level), - self.__class__.__name__, - lines[0]) - for l in lines[1:]: - out_message += "\n(==) %s" % l - out_message += "\n" - sys.stderr.write(out_message) + self.logger.log(level, message) def defaultArguments(self): """ Config argument parser with parameters common for all stages. """ @@ -137,7 +120,7 @@ def parse_args(self, args): if self.ARGS.eom is None: self.ARGS.eom = '\n' elif self.ARGS.eom == '': - self.log("Empty EOM marker specified!", logLevel.WARN) + self.log("Empty EOM marker specified!", logging.WARN) else: try: self.ARGS.eom = self.ARGS.eom.decode('string_escape') @@ -204,7 +187,7 @@ def print_usage(self, fd=sys.stderr): self.__parser.print_usage(fd) def set_error(self, err_type, err_val, err_trace): - """ Set object `_err` variable from the last error info. """ + """ Set `_error` attribute from the passed error info. """ self._error = {'etype': err_type, 'exception': err_val, 'trace': err_trace} @@ -218,13 +201,12 @@ def output_error(self, message=None, exc_info=None): if not message and exc_info: message = str(exc_info[1]) if message: - self.log(message, logLevel.ERROR) + self.log(message, logging.ERROR) if exc_info: if exc_info[0] == KeyboardInterrupt: self.log("Interrupted by user.") else: - trace = traceback.format_exception(*exc_info) - self.log(''.join(trace), logLevel.DEBUG) + self.logger.traceback(exc_info=exc_info) def stop(self): """ Stop running processes and output error information. """ diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index bdd10c2e4..9c4086ce0 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -44,10 +44,10 @@ from . import AbstractStage from . import messageType -from . import logLevel from pyDKB.dataflow import DataflowException from pyDKB.common import hdfs from pyDKB.common import custom_readline +from pyDKB.common import logging from pyDKB.dataflow import communication from pyDKB.dataflow.communication import stream from pyDKB.dataflow.communication import consumer @@ -72,6 +72,8 @@ class ProcessorStage(AbstractStage): __stoppable """ + logger = logging.getLogger(__name__) + __input_message_type = None __output_message_type = None @@ -204,7 +206,7 @@ def get_out_stream(self): try: fd = self.__output.next() except DataflowException, err: - self.log(str(err), logLevel.ERROR) + self.log(str(err), logging.ERROR) raise DataflowException("Failed to configure output stream.") if not self._out_stream: self._out_stream = stream.StreamBuilder(fd, vars(self.ARGS)) \ @@ -257,13 +259,13 @@ def stop(self): p.close() except AttributeError, e: self.log("Close method is not defined for %s." % p, - logLevel.WARN) + logging.WARN) except Exception, e: failures.append((p, sys.exc_info())) if failures: for f in failures: self.log("Failed to stop %s: %s" % (f[0], f[1][1]), - logLevel.ERROR) + logging.ERROR) self.output_error(exc_info=f[1]) @staticmethod diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py b/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py index 611ef0cd5..5c5c799f6 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/__init__.py @@ -3,7 +3,6 @@ """ from .. import messageType -from .. import logLevel from AbstractStage import AbstractStage from ProcessorStage import ProcessorStage diff --git a/Utils/Dataflow/pyDKB/dataflow/types.py b/Utils/Dataflow/pyDKB/dataflow/types.py index ecc1f790a..b21e0db9a 100644 --- a/Utils/Dataflow/pyDKB/dataflow/types.py +++ b/Utils/Dataflow/pyDKB/dataflow/types.py @@ -4,9 +4,8 @@ from ..common import Type -__all__ = ["dataType", "messageType", "codeType", "logLevel"] +__all__ = ["dataType", "messageType", "codeType"] dataType = Type("DOCUMENT", "AUTHOR", "DATASET") messageType = Type("STRING", "JSON", "TTL") codeType = Type("STRING") -logLevel = Type("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL")