@@ -125,7 +125,6 @@ class Stream(object):
125125 NAME = None # type: str # The name of the stream
126126 # The type of the row. Used by the default impl of parse_row.
127127 ROW_TYPE = None # type: Any
128- _LIMITED = True # Whether the update function takes a limit
129128
130129 @classmethod
131130 def parse_row (cls , row ):
@@ -187,21 +186,18 @@ async def get_updates_since(self, from_token):
187186 if from_token == current_token :
188187 return [], current_token
189188
190- if self ._LIMITED :
191- rows = await self .update_function (
192- from_token , current_token , limit = MAX_EVENTS_BEHIND + 1
193- )
189+ rows = await self .update_function (
190+ from_token , current_token , limit = MAX_EVENTS_BEHIND + 1
191+ )
194192
195- # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
196- rows = itertools .islice (rows , MAX_EVENTS_BEHIND + 1 )
197- else :
198- rows = await self .update_function (from_token , current_token )
193+ # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
194+ rows = itertools .islice (rows , MAX_EVENTS_BEHIND + 1 )
199195
200196 updates = [(row [0 ], row [1 :]) for row in rows ]
201197
202198 # check we didn't get more rows than the limit.
203199 # doing it like this allows the update_function to be a generator.
204- if self . _LIMITED and len (updates ) >= MAX_EVENTS_BEHIND :
200+ if len (updates ) >= MAX_EVENTS_BEHIND :
205201 raise Exception ("stream %s has fallen behind" % (self .NAME ))
206202
207203 return updates , current_token
@@ -215,9 +211,8 @@ def current_token(self):
215211 """
216212 raise NotImplementedError ()
217213
218- def update_function (self , from_token , current_token , limit = None ):
219- """Get updates between from_token and to_token. If Stream._LIMITED is
220- True then limit is provided, otherwise it's not.
214+ def update_function (self , from_token , current_token , limit ):
215+ """Get updates between from_token and to_token.
221216
222217 Returns:
223218 Deferred(list(tuple)): the first entry in the tuple is the token for
0 commit comments