Skip to content

Commit 729f40e

Browse files
committed
pyDKB/storages: add ES implementation for queries.
Added concept of "raw" queries: it is useful when query must be stored, yet can not be converted to the format acceptable by storage client. Here it is JSON with %(parameter)s values: it may fail to be parsed as proper JSON if there is something like `"taskname": %(name)s`.
1 parent b63063e commit 729f40e

3 files changed

Lines changed: 161 additions & 6 deletions

File tree

Utils/Dataflow/pyDKB/storages/Storage.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,42 @@ def read_query(self, fname, qname=None):
8686
"""
8787
raise NotImplementedError
8888

89-
def save_query(self, query, qname=None):
89+
def query_is_raw(self, query):
90+
""" Check if given query is not compiled ("raw").
91+
92+
:param query: query body
93+
:type query: obj
94+
95+
:return: True/False
96+
:rtype: bool
97+
"""
98+
raise NotImplementedError
99+
100+
def save_query(self, query, qname=None, raw=False):
90101
""" Save query for further usage.
91102
92103
:param query: query content
93104
:type query: object
94105
:param qname: query name (must not start with '__')
95106
:type qname: str
107+
:param raw: store "raw" (not compiled) version of query
108+
:type raw: bool
96109
"""
97110
if qname and qname.startswith('__'):
98111
raise ValueError("Query name must not start with '__'"
99112
" (reserved for service needs).")
113+
if not raw:
114+
try:
115+
raw = self.query_is_raw(query)
116+
except NotImplementedError:
117+
pass
118+
prefix = ''
100119
if not qname:
101120
qname = '__last'
102-
self.stored_queries[qname] = query
103-
self.stored_queries[__last'] = query
121+
if raw:
122+
prefix = '__raw'
123+
self.stored_queries[prefix+qname] = query
124+
self.stored_queries[prefix+'__last'] = query
104125

105126
def get_query(self, qname):
106127
""" Get query by name.
@@ -119,8 +140,12 @@ def get_query(self, qname):
119140
q = self.stored_queries[qname]
120141
self.stored_queries['__last'] = q
121142
except KeyError:
122-
raise QueryError("Query used before saving: '%s'"
123-
% qname)
143+
# There still may be raw version of the query
144+
try:
145+
q = self.stored_queries['__raw'+qname]
146+
except KeyError:
147+
raise QueryError("Query used before saving: '%s'"
148+
% qname)
124149
self.stored_queries['__last'] = q
125150
return q
126151

Utils/Dataflow/pyDKB/storages/es.py

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22
pyDKB.storages.es
33
"""
44

5+
import json
6+
57
from Storage import Storage
68
from . import storageType
79
from exceptions import (StorageException,
810
NotFound,
9-
InvalidRequest)
11+
InvalidRequest,
12+
MissedParameter)
1013

1114

1215
try:
@@ -29,6 +32,9 @@ class ES(Storage):
2932
# Default index
3033
index = None
3134

35+
# Default datetime format
36+
datetime_fmt = '%d-%m-%Y %H:%M:%S'
37+
3238
type = storageType.ES
3339

3440
def __init__(self, name):
@@ -106,3 +112,104 @@ def get(self, id, fields=None, index=None, doc_type='_all', parent=None):
106112
raise NotFound(self.name, id=id, index=index)
107113
raise InvalidRequest(err)
108114
return r.get('_source', {})
115+
116+
117+
def read_query(self, fname, qname=None):
118+
""" Read query from file and save it.
119+
120+
Raise ``QueryNotFound`` in case of failure.
121+
122+
:param fname: file name
123+
:type fname: str
124+
:param qname: query name (for futher usage)
125+
:type qname: str
126+
"""
127+
raw = False
128+
try:
129+
with open(fname, 'r') as f:
130+
query = f.read()
131+
query = json.loads(query)
132+
except IOError:
133+
raise QueryNotFound(qname, fname)
134+
except ValueError:
135+
# Query with parameters may fail when try to parse as JSON
136+
# In this case we just store it as "raw" version
137+
raw = True
138+
self.save_query(query, qname, raw)
139+
140+
141+
def query_is_raw(self, query):
142+
""" Check if given query is not compiled ("raw").
143+
144+
:param query: query body
145+
:type query: str, dict
146+
147+
:return: True/False
148+
:rtype: bool
149+
"""
150+
return not isinstance(query, dict)
151+
152+
153+
def exec_query(self, qname=None, **kwargs):
154+
""" Execute stored query with given parameters.
155+
156+
:param qname: query name (if None, last used/read
157+
one will be used)
158+
:type qname: str, NoneType
159+
:param kwargs: query parameters (applied with old-style
160+
string formatting operator '%'). Parameter
161+
name, started with '_', is treated as special
162+
one:
163+
* _size -- for ES request "size";
164+
* _type -- for ES request "doc_type";
165+
* _index -- for ES index to use.
166+
:type kwargs: dict
167+
168+
:return: storage response
169+
:rtype: object
170+
"""
171+
query = self.get_query(qname)
172+
raw = self.query_is_raw(query)
173+
params = {}
174+
for key in kwargs:
175+
if key.startswith('_'):
176+
continue
177+
try:
178+
params[key] = json.dumps(kwargs[key])
179+
except TypeError, err:
180+
if 'datetime' in str(err):
181+
val = json.dumps(kwargs[key].strftime(self.datetime_fmt))
182+
params[key] = val
183+
else:
184+
raise
185+
q = {}
186+
q['index'] = kwargs.get('_index', self.index)
187+
q['size'] = kwargs.get('_size')
188+
q['doc_type'] = kwargs.get('_type')
189+
if params:
190+
try:
191+
if not raw:
192+
query = json.dumps(query)
193+
raw = True
194+
query = query % params
195+
except KeyError, err:
196+
raise MissedParameter(qname, str(err))
197+
if raw:
198+
try:
199+
query = json.loads(query)
200+
except ValueError, err:
201+
msg = "Failed to parse query"
202+
if qname:
203+
msg += " (%r)" % qname
204+
msg += ": %s" % err
205+
raise QueryError(msg)
206+
q['body'] = query
207+
try:
208+
r = self.client().search(**q)
209+
except RequestError, err:
210+
msg = "Query failed"
211+
if qname:
212+
msg += ": (%r)" % qname
213+
msg += ": %s" % err
214+
raise QueryError(msg)
215+
return r

Utils/Dataflow/pyDKB/storages/exceptions.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,26 @@ def __init__(self, message, *args, **kwargs):
8080
class QueryError(StorageException):
8181
""" Exception indicating issues with stored queries. """
8282
pass
83+
84+
85+
class MissedParameter(QueryError):
86+
""" Exception indicating that some query parameters are missed. """
87+
88+
def __init__(self, qname=None, param=None):
89+
""" Initialize exception.
90+
91+
:param qname: query name
92+
:type qname: str, NoneType
93+
:param param: parameter name(s)
94+
:type param: str, list(str)
95+
"""
96+
message = 'Missed query parameters'
97+
if param:
98+
if isinstance(param, list):
99+
p = ', '.join(param)
100+
else:
101+
p = param
102+
message += ": %s" % p
103+
if qname:
104+
message += " ('%s')" % qname
105+
super(MissedParameter, self).__init__(message)

0 commit comments

Comments
 (0)