-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathOutputStream.py
More file actions
50 lines (40 loc) · 1.49 KB
/
OutputStream.py
File metadata and controls
50 lines (40 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
pyDKB.dataflow.communication.stream.OutputStream
"""
from Stream import Stream
from . import Message
class OutputStream(Stream):
""" Implementation of the output stream. """
msg_buffer = []
def configure(self, config={}):
""" Configure instance. """
super(OutputStream, self).configure(config)
self.EOP = config.get('eop', '')
self.BNC = config.get('bnc', '')
def write(self, message):
""" Add message to the buffer. """
messageClass = Message(self.message_type)
if isinstance(message, messageClass):
self.msg_buffer.append(message)
elif type(message) == list:
for m in message:
self.write(m)
else:
raise TypeError("OutputStream.write() expects parameter to be of"
" type '%s' or 'list' (got '%s')"
% (messageClass.__name__, type(message).__name__))
def flush(self):
""" Flush buffer to the output stream. """
for msg in self.msg_buffer:
self.get_fd().write(msg.encode())
self.get_fd().write(self.EOM)
self.drop()
def eop(self):
""" Signalize Supervisor about end of process. """
self.get_fd().write(self.EOP)
def bnc(self):
""" Signalize Supervisor about batch being incomplete. """
self.get_fd().write(self.BNC)
def drop(self):
""" Drop buffer without sending messages anywhere. """
self.msg_buffer = []