diff --git a/Utils/Dataflow/data4es/batch_stage/inp b/Utils/Dataflow/data4es/batch_stage/inp new file mode 100644 index 000000000..c67599f38 --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/inp @@ -0,0 +1,10 @@ +{} +{"!@#$": true} +{"df": 1} +{"df": "abc"} +{"df": "abc1"} +{"df": "absdfg"} +{"df": "Some text here."} +{"df": "abc", "fd": true} +{"d": "abc"} +{"df": "abc"} diff --git a/Utils/Dataflow/data4es/batch_stage/run.sh b/Utils/Dataflow/data4es/batch_stage/run.sh new file mode 100755 index 000000000..cb41340ff --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/run.sh @@ -0,0 +1,15 @@ +# $1 has to be used as a workaround to pass empty string as a value for +# -E, since "... -E ''" will treat the single quotes literally due to double +# quotes around them. +cmd="./stage.py -m s -E $1" +cmd_batch2="./stage.py -b 2 -m s -E $1" +cmd_batch100="./stage.py -b 100 -m s -E $1" + +# Various tests that should produce the same results. + +# Stage chains. +cat inp | $cmd "" | $cmd "" > outp1 +cat inp | $cmd_batch2 "" | $cmd_batch2 "" > outp2 +cat inp | $cmd_batch100 "" | $cmd_batch100 "" > outp100 +cat inp | $cmd "" | $cmd_batch2 "" > outp12 +cat inp | $cmd_batch2 "" | $cmd "" > outp21 diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py new file mode 100755 index 000000000..45aca1489 --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -0,0 +1,102 @@ +#!/bin/env python +""" +DKB Dataflow stage XXX (StageName). + +Stage short description + +Authors: + Author Name (author@cern.ch) +""" + +import os +import sys +import traceback + +try: + base_dir = os.path.dirname(__file__) + dkb_dir = os.path.join(base_dir, os.pardir) + sys.path.append(dkb_dir) + import pyDKB + from pyDKB.dataflow.stage import ProcessorStage + from pyDKB.dataflow.communication.messages import JSONMessage + from pyDKB.dataflow.exceptions import DataflowException + from pyDKB.dataflow import messageType + from pyDKB.common.types import logLevel +except Exception, err: + sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) + sys.exit(1) + + +def process(stage, messages): + """ Single or batch message processing. + + This form of batch processing is pretty pointless in terms of efficiency: + using it will replace, for example, ProcessorStage cycling over 100 + messages with it cycling over 10 batches, and this stage cycling + over 10 messages in each batch. But for testing and illustrative purposes + it will do. + """ + if not isinstance(messages, list): + messages = [messages] + for message in messages: + data = message.content() + if not isinstance(data, dict): + stage.log("Cannot process non-dict data: %s." % data, + logLevel.WARN) + continue + # Processing machinery + if 'df' in data and isinstance(data['df'], (str, unicode)): + data['df'] = 'processed ' + data['df'] + else: + stage.log("Failed to process data %s, required field 'df' not" + " found or contains non-str value." % data, + logLevel.WARN) + out_message = JSONMessage(data) + stage.output(out_message) + return True + + +def main(args): + """ Program body. """ + stage = ProcessorStage() + stage.set_input_message_type(messageType.JSON) + stage.set_output_message_type(messageType.JSON) + stage.set_default_arguments(bnc='') + + # Accept batch size from command line. + # This is cheating because batch size is supposed to be set by + # stage developer, not received from command line (so, + # from supervisor). However, this is done in this illustrative + # stage to simplify a process of comparing the results of + # normal mode and batch mode with different batch sizes. + stage.add_argument('-b', action='store', type=int, help='Batch size.', + default=1, dest='bsize') + + stage.process = process + + exit_code = 0 + exc_info = None + try: + stage.configure(args) + stage.set_batch_size(stage.ARGS.bsize) + stage.run() + except (DataflowException, RuntimeError), err: + if str(err): + sys.stderr.write("(ERROR) %s\n" % err) + exit_code = 2 + except Exception: + exc_info = sys.exc_info() + exit_code = 3 + finally: + stage.stop() + + if exc_info: + trace = traceback.format_exception(*exc_info) + for line in trace: + sys.stderr.write("(ERROR) %s" % line) + + exit(exit_code) + + +if __name__ == '__main__': + main(sys.argv[1:]) diff --git a/Utils/Dataflow/pyDKB/common/custom_readline.py b/Utils/Dataflow/pyDKB/common/custom_readline.py index 7443fabb5..1a4737ca6 100644 --- a/Utils/Dataflow/pyDKB/common/custom_readline.py +++ b/Utils/Dataflow/pyDKB/common/custom_readline.py @@ -10,12 +10,20 @@ import fcntl -def custom_readline(f, newline): +def custom_readline(f, newline, markers={}): """ Read lines with custom line separator. Construct generator with readline-like functionality: with every call of ``next()`` method it will read data from ``f`` - untill the ``newline`` separator is found; then yields what was read. + until the ``newline`` separator is found; then yields what was read. + If ``markers`` are supplied, then check for their presence: markers + are special strings that can occur: + - At the beginning of ``f``. + - Immediately after a ``newline``. + - Immediately after another marker. + If a marker's value is found in such place, its name is yielded instead + of another chunk of text and the value is removed. Markers in other + places are ignored. .. warning:: the last line can be incomplete, if the input data flow is interrupted in the middle of data writing. @@ -28,6 +36,8 @@ def custom_readline(f, newline): :type f: file :param newline: delimeter to be used instead of ``\\n`` :type newline: str + :param markers: markers to look for, {name:value} + :type markers: dict :return: iterable object :rtype: generator @@ -52,6 +62,19 @@ def custom_readline(f, newline): chunk = f.read() if not chunk: if buf: + if markers: + # Look for markers after last newline. + look_for_markers = True + while look_for_markers: + look_for_markers = False + for name, value in markers.iteritems(): + if buf.startswith(value): + buf = buf[len(value):] + look_for_markers = True + while send_not_next: + send_not_next = yield True + send_not_next = yield name + break while send_not_next: # If we are here, the source is not empty for sure: # we have another message to yield @@ -64,6 +87,20 @@ def custom_readline(f, newline): # and (in theory) may provide another message sooner or later send_not_next = yield True while newline in buf: + if markers: + # Look for markers before each yielded "line". + # This includes start of f. + look_for_markers = True + while look_for_markers: + look_for_markers = False + for name, value in markers.iteritems(): + if buf.startswith(value): + buf = buf[len(value):] + look_for_markers = True + while send_not_next: + send_not_next = yield True + send_not_next = yield name + break pos = buf.index(newline) + len(newline) while send_not_next: # If we are here, the source is not empty for sure: diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py index 20ef4bdd7..cfa595e5d 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py @@ -100,10 +100,11 @@ def get_source_info(self): raise NotImplementedError def get_message(self): - """ Get new message from current source. + """ Get new message or marker from current source. Return values: Message object + marker (str) False (failed to parse message) None (all input sources are empty) """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py index d953cf070..7bd361d97 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py @@ -102,6 +102,10 @@ def eop(self): """ Write EOP marker to the current dest. """ self.get_stream().eop() + def bnc(self): + """ Write BNC marker to the current dest. """ + self.get_stream().bnc() + def flush(self): """ Flush buffered messages to the current dest. """ self.get_stream().flush() diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index 086ff678f..efbed0257 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -16,6 +16,23 @@ class InputStream(Stream): __iterator = None + # Names of markers that the stream knows how to process. + # Values are taken from config, markers with empty values are ignored. + marker_names = ['eob'] + + def configure(self, config={}): + """ Configure instance. """ + super(InputStream, self).configure(config) + self.markers = {} + # If batch size is 1, meaning non-batch mode will be used, then + # markers are unnecessary (and even can be a hindrance by forcing + # usage of custom_readline() without need). + if config.get('bsize', 1) > 1: + for name in self.marker_names: + value = config.get(name) + if value: + self.markers[name] = value + def __iter__(self): """ Initialize iteration. """ self._reset_iterator() @@ -24,14 +41,14 @@ def __iter__(self): def _reset_iterator(self): """ Reset inner iterator on a new file descriptor. """ fd = self.get_fd() - if self.EOM == '\n': + if self.EOM == '\n' and not self.markers: self.__iterator = iter(fd.readline, "") self.is_readable = self._fd_is_readable - elif self.EOM == '': + elif self.EOM == '' and not self.markers: self.__iterator = iter(fd.read, "") self.is_readable = self._fd_is_readable else: - self.__iterator = custom_readline(fd, self.EOM) + self.__iterator = custom_readline(fd, self.EOM, self.markers) self.is_readable = self._gi_is_readable def reset(self, fd, close=True, force=False): @@ -139,9 +156,10 @@ def parse_message(self, message): return False def get_message(self): - """ Get next message from the input stream. + """ Get next message or marker from the input stream. :returns: parsed next message, + next marker, False -- parsing failed, None -- no messages left :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, @@ -154,16 +172,22 @@ def get_message(self): return result def next(self): - """ Get next message from the input stream. + """ Get next message or marker from the input stream. :returns: parsed next message, + next marker, False -- parsing failed or unexpected end of stream occurred - :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, bool + :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, + str, bool """ if not self.__iterator: self._reset_iterator() msg = self.__iterator.next() if not msg.endswith(self.EOM): + # Check whether an expected marker was received. + for key in self.markers: + if msg == key: + return key log_msg = msg[:10] + '<...>' * (len(msg) > 20) log_msg += msg[-min(len(msg) - 10, 10):] log_msg = log_msg.replace('\n', r'\n') diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py index c0b6ee56d..642c721ac 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py @@ -15,6 +15,7 @@ def configure(self, config={}): """ Configure instance. """ super(OutputStream, self).configure(config) self.EOP = config.get('eop', '') + self.BNC = config.get('bnc', '') def write(self, message): """ Add message to the buffer. """ @@ -40,6 +41,10 @@ def eop(self): """ Signalize Supervisor about end of process. """ self.get_fd().write(self.EOP) + def bnc(self): + """ Signalize Supervisor about batch being incomplete. """ + self.get_fd().write(self.BNC) + def drop(self): """ Drop buffer without sending messages anywhere. """ self.msg_buffer = [] diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 1a90c2adf..a5b088a91 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -132,6 +132,18 @@ def defaultArguments(self): default=None, dest='eop' ) + self.add_argument('--batch-not-complete', action='store', type=str, + help=u'custom batch-not-complete marker\n' + 'DEFAULT: \'BNC\'', + default=None, + dest='bnc' + ) + self.add_argument('--end-of-batch', action='store', type=str, + help=u'custom end-of-batch marker\n' + 'DEFAULT: \'EOB\'', + default=None, + dest='eob' + ) def _is_flag_option(self, **kwargs): """ Check if added argument is a flag option. """ @@ -224,6 +236,12 @@ def parse_args(self, args): "Case: %s" % (err), logLevel.ERROR) sys.exit(1) + if self.ARGS.bnc is None: + self.ARGS.bnc = 'BNC' + + if self.ARGS.eob is None: + self.ARGS.eob = 'EOB' + if self.ARGS.mode == 'm': if 'f' in (self.ARGS.source, self.ARGS.dest): self.log("File source/destination is not allowed " diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 688721b96..e4dc398ab 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -67,6 +67,9 @@ class ProcessorStage(AbstractStage): * List of objects to be "stopped" __stoppable + + * The stage will try to process messages in batches of this size. + _batch_size """ __input_message_type = None @@ -92,6 +95,7 @@ def __init__(self, description="DKB Dataflow data processing stage."): * ... """ self.__stoppable = [] + self._batch_size = 1 super(ProcessorStage, self).__init__(description) def set_input_message_type(self, Type=None): @@ -202,6 +206,23 @@ def set_default_arguments(self, ignore_on_skip=False, **kwargs): if ignore_on_skip: self._reset_on_skip += kwargs.keys() + def set_batch_size(self, size): + """ Set batch size. + + :param size: size + :type size: int + """ + if type(size) != int: + self.log("Cannot set batch size to %s: non-integer value." % size, + logLevel.WARN) + return False + if size < 1: + self.log("Cannot set batch size to %d: value must" + " be positive." % size, + logLevel.WARN) + return False + self._batch_size = size + def configure(self, args=None): """ Configure stage according to the config parameters. @@ -352,8 +373,36 @@ def input(self): Returns iterable object. Every iteration returns single input message to be processed. """ - for r in self.__input: - yield r + if self._batch_size == 1 or self.ARGS.skip_process: + for r in self.__input: + if type(r) == str: + # Normal processing mode expects no markers. + raise DataflowException("Unexpected marker" + " received: %s." % r) + yield r + else: + batch = [] + for r in self.__input: + if type(r) != str: + # Message was received. + if r: + batch.append(r) + if len(batch) == self._batch_size: + yield batch + batch = [] + else: + self.bnc() + else: + # Marker was received. + if r == 'eob': + yield batch + batch = [] + else: + raise DataflowException("Unexpected marker" + " received: %s." % r) + if batch: + # There is no more input, but there is an unfinished batch. + yield batch def output(self, message): """ Put the (list of) message(s) to the output buffer. """ @@ -363,6 +412,10 @@ def forward(self): """ Send EOPMarker to the output stream. """ self.__output.eop() + def bnc(self): + """ Send BNCMarker to the output stream. """ + self.__output.bnc() + def flush_buffer(self): """ Flush message buffer to the output. """ self.__output.flush()