-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathInputStream.py
More file actions
201 lines (171 loc) · 6.56 KB
/
InputStream.py
File metadata and controls
201 lines (171 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
pyDKB.dataflow.communication.stream.InputStream
"""
from Stream import Stream
from pyDKB.common.types import logLevel
from . import Message
from pyDKB.common import custom_readline
import os
import sys
class InputStream(Stream):
""" Implementation of the input stream. """
__iterator = None
# Names of markers that the stream knows how to process.
# Values are taken from config, markers with empty values are ignored.
marker_names = ['eob']
def configure(self, config={}):
""" Configure instance. """
super(InputStream, self).configure(config)
self.markers = {}
# If batch size is 1, meaning non-batch mode will be used, then
# markers are unnecessary (and even can be a hindrance by forcing
# usage of custom_readline() without need).
if config.get('bsize', 1) > 1:
for name in self.marker_names:
value = config.get(name)
if value:
self.markers[name] = value
def __iter__(self):
""" Initialize iteration. """
self._reset_iterator()
return self
def _reset_iterator(self):
""" Reset inner iterator on a new file descriptor. """
fd = self.get_fd()
if self.EOM == '\n' and not self.markers:
self.__iterator = iter(fd.readline, "")
self.is_readable = self._fd_is_readable
elif self.EOM == '' and not self.markers:
self.__iterator = iter(fd.read, "")
self.is_readable = self._fd_is_readable
else:
self.__iterator = custom_readline(fd, self.EOM, self.markers)
self.is_readable = self._gi_is_readable
def reset(self, fd, close=True, force=False):
""" Reset current stream with new file descriptor.
Overrides parent method to reset __iterator property.
:param fd: new file descriptor
:type fd: file
:param close: if True, close the old file descriptor
:type close: bool
:param force: if True, force the reset of iterator
(normally, iterator is not reset if the new
file descriptor is the same as the old one)
:type force: bool
:returns: old file descriptor
:rtype: file
"""
old_fd = super(InputStream, self).reset(fd, close)
# We do not want to reset iterator if `reset()` was called
# with the same `fd` as before.
if force or (old_fd and fd != old_fd):
self._reset_iterator()
return old_fd
def is_readable(self):
""" Check if current input stream is readable.
:returns: None -- not initialized,
False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
return self._unknown_is_readable()
def _unknown_is_readable(self):
""" Placeholder: readability test for not initialized stream.
This function is needed in case that we need to reset `is_readable`
and the whole Stream object back to the "undefined" state.
:returns: None
:rtype: NoneType
"""
return None
def _fd_is_readable(self):
""" Check if bound file descriptor is readable.
:returns: None -- not initialized,
False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
fd = self.get_fd()
if not fd:
result = None
elif getattr(fd, 'closed', True):
result = False
elif fd.fileno() == sys.stdin.fileno():
result = True
else:
stat = os.fstat(fd.fileno())
result = fd.tell() != stat.st_size
return result
def _gi_is_readable(self):
""" Check if the generator iterator can return value on `next()` call.
:returns: False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
try:
return self.__iterator.send(True)
except StopIteration:
return False
except TypeError:
# If method 'next()' was never called yet,
# sending anything but None raises TypeError
return self._fd_is_readable()
def parse_message(self, message):
""" Verify and parse input message.
:param message: message to parse
:type message: pyDKB.dataflow.communication.messages.AbstractMessage
:returns: decoded message or False if parsing failed
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage, bool
"""
messageClass = Message(self.message_type)
try:
msg = messageClass(message)
msg.decode()
return msg
except (ValueError, TypeError), err:
self.log("Failed to read input message as %s.\n"
"Cause: %s\n"
"Original message: '%s'"
% (messageClass.typeName(), err, message),
logLevel.WARN)
return False
def get_message(self):
""" Get next message or marker from the input stream.
:returns: parsed next message,
next marker,
False -- parsing failed,
None -- no messages left
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage,
bool, NoneType
"""
try:
result = self.next()
except StopIteration:
result = None
return result
def next(self):
""" Get next message or marker from the input stream.
:returns: parsed next message,
next marker,
False -- parsing failed or unexpected end of stream occurred
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage,
str, bool
"""
if not self.__iterator:
self._reset_iterator()
msg = self.__iterator.next()
if not msg.endswith(self.EOM):
# Check whether an expected marker was received.
for key in self.markers:
if msg == key:
return key
log_msg = msg[:10] + '<...>' * (len(msg) > 20)
log_msg += msg[-min(len(msg) - 10, 10):]
log_msg = log_msg.replace('\n', r'\n')
self.log("Unexpected end of stream, skipping rest of input:\n"
"'%s'" % log_msg, logLevel.WARN)
return False
else:
if self.EOM != '':
msg = msg[:-len(self.EOM)]
result = self.parse_message(msg)
return result