Skip to content

Commit a349cb6

Browse files
committed
[WIP] pyDKB/storages: make ES client aware of storage scheme.
1 parent 9111b75 commit a349cb6

2 files changed

Lines changed: 155 additions & 0 deletions

File tree

Utils/Dataflow/pyDKB/storages/client/es.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
from pyDKB.common.misc import try_to_import
77
from pyDKB.common.types import logLevel
88

9+
from ..exceptions import (NotFound, InvalidRequest)
10+
911

1012
_ESClient = try_to_import('elasticsearch', 'Elasticsearch')
13+
_NotFoundError = try_to_import('elasticsearch.exceptions', 'NotFoundError')
14+
_RequestError = try_to_import('elasticsearch.exceptions', 'RequestError')
15+
1116

1217
ParentClientClass = _ESClient if _ESClient else object
1318

@@ -16,6 +21,7 @@ class ESClient(Client, ParentClientClass):
1621
""" Implement common interface for ES client. """
1722

1823
index = None
24+
scheme_info = None
1925

2026
def __init__(self, *args, **kwargs):
2127
""" Initialize instance as parent client class object. """
@@ -71,5 +77,129 @@ def configure(self, cfg):
7177
if cfg.get('index'):
7278
self.index = cfg['index']
7379

80+
if cfg.get('scheme_info'):
81+
self.scheme_info = cfg['scheme_info']
82+
7483
# Re-initialize self as parent client class instance
7584
ParentClientClass.__init__(self, **kwargs)
85+
86+
def set_index(self, index):
87+
""" Set default index name.
88+
89+
:param index: default index name
90+
:type index: str
91+
"""
92+
if index:
93+
self.index = index
94+
95+
def set_scheme(self, scheme):
96+
""" Set default index name.
97+
98+
Schema info should be provided in the following form:
99+
{
100+
'index': {INDEX_NAME: {"scheme": SCHEMA_NAME}, ...},
101+
'scheme': {SCHEMA_NAME: {
102+
"default_type": DEFAULT_DOC_TYPE,
103+
DOC_TYPE_1: {},
104+
DOC_TYPE_2: {"parent_type": DOC_TYPE_N},
105+
...
106+
},
107+
...
108+
}
109+
}
110+
111+
:param scheme: storage scheme
112+
:type scheme: dict
113+
"""
114+
if scheme:
115+
self.scheme_info = scheme
116+
117+
def is_child_type(self, doc_type, index=None, scheme=None):
118+
""" Check if given document type is a child type.
119+
120+
One of additional parameters (`index`, `scheme`) must be specified.
121+
If both specified, `index` is ignored.
122+
123+
:param doc_type: type name
124+
:type doc_type: str
125+
:param index: index name for which given type is checked
126+
:type index: str
127+
:param scheme: scheme name for which given type is checked
128+
:type scheme: str
129+
130+
:return: True if type is a child type, False otherwise (or not enough
131+
information about scheme provided)
132+
:rtype: bool
133+
"""
134+
scheme_info = self.scheme_info
135+
if not (scheme_info and isinstance(scheme_info, dict)):
136+
return False
137+
if not index:
138+
index = self.index
139+
if not scheme and index:
140+
idx_scheme = scheme_info.get('index', {}).get(index, {})\
141+
.get('scheme')
142+
scheme = idx_scheme
143+
if scheme:
144+
type_scheme = scheme_info.get('scheme', {}).get(scheme, {})\
145+
.get(doc_type, {})
146+
if type_scheme.get('parent_type'):
147+
return True
148+
return False
149+
150+
def get_default_type(self, index):
151+
""" Get default document type for given index.
152+
153+
:param index: index name
154+
:type index: str
155+
156+
:return: type name or None if information not available
157+
:rtype: str, NoneType
158+
"""
159+
scheme_info = self.scheme_info
160+
if not (scheme_info and isinstance(scheme_info, dict)):
161+
return False
162+
scheme_name = scheme_info.get('index', {}).get(index, {})\
163+
.get('scheme')
164+
doc_type = scheme_info.get('scheme', {}).get(scheme_name, {})\
165+
.get('default_type')
166+
return doc_type
167+
168+
def get(self, oid, fields=None, **kwargs):
169+
""" Get document from index by ID.
170+
171+
:param oid: document identifier
172+
:type oid: str, int
173+
:param fields: document fields to retrieve. If not specified
174+
or None passed, all available fields will be
175+
retrieved
176+
:type fields: list(str)
177+
:param index: index to get document from. If not specified,
178+
default index will be used (if configured).
179+
:type index: str
180+
:param *kwargs: see ``Elasticsearch.get()`` description
181+
182+
:return: record with given ID
183+
:rtype: dict
184+
"""
185+
# If index is not passed, default one should be used
186+
index = kwargs.pop('index', self.index)
187+
if not index:
188+
raise InvalidRequest("Index not specified.")
189+
doc_type = kwargs.pop('doc_type', self.get_default_type(index))
190+
if not doc_type:
191+
raise InvalidRequest("Document type not specified.")
192+
if fields is not None:
193+
kwargs['_source'] = fields
194+
try:
195+
r = ParentClientClass.get(self, id=oid, index=index, doc_type=doc_type,
196+
**kwargs)
197+
except _NotFoundError, err:
198+
raise NotFound(id=oid, index=index, doc_type=doc_type, **kwargs)
199+
except _RequestError, err:
200+
if self.is_child_type(doc_type, index=index) \
201+
and err.args[1] == 'routing_missing_exception':
202+
self.log('Parent info missed.', logLevel.WARN)
203+
raise NotFound(id=oid, index=index, doc_type=doc_type, **kwargs)
204+
raise InvalidRequest(err)
205+
return r.get('_source', {})

Utils/Dataflow/pyDKB/storages/exceptions.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,28 @@ def __init__(self, **kwargs):
2323
params = ', '.join(params)
2424
message = message + ' (%s)' % params
2525
super(NotFound, self).__init__(message)
26+
27+
28+
class InvalidRequest(StorageException):
29+
""" Exception indicating wrong user request. """
30+
31+
def __init__(self, message, *args, **kwargs):
32+
""" Initialize exception.
33+
34+
Message formatting: old-style ('%' operator) only.
35+
36+
:param message: error message
37+
:type message: str
38+
:param args: message format positional parameters
39+
:type args: list
40+
:param kwargs: message format named parameters
41+
:type kwargs: dict
42+
"""
43+
if args and kwargs:
44+
raise ValueError("Message formatting supports only one type "
45+
"of parameters: positional OR named.")
46+
if args:
47+
message = message % params
48+
elif kwargs:
49+
message = message % kwargs
50+
super(InvalidRequest, self).__init__(message)

0 commit comments

Comments
 (0)