This repository was archived by the owner on Mar 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathdata_format.py
More file actions
204 lines (162 loc) · 6.55 KB
/
data_format.py
File metadata and controls
204 lines (162 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import json
import logging
import os
import string
import sys
import yaml
class DataFormat:
def __str__(self):
"""Returns the name of the format in upper case."""
return self.name().upper()
def name(self):
"""Returns the name of the format."""
raise NotImplementedError('Method has not been implemented!')
def parse(self, data):
"""Returns the parsed data."""
raise NotImplementedError('Method has not been implemented!')
def dump(self, out_data, out_file=None):
"""
If the file is provided, writes the string representation of the data to the file.
Else it returns the string representation of the data in the format.
"""
raise NotImplementedError('Method has not been implemented!')
class JsonDataFormat(DataFormat):
def name(self):
return 'json'
def parse(self, data):
try:
logging.debug(f'parsing input data as json')
content = json.loads(data)
return content
except Exception:
raise ValueError('Malformed JSON in input.')
def dump(self, out_data, out_file=None):
if out_file:
json.dump(out_data, out_file, indent=2, sort_keys=True)
else:
return json.dumps(out_data, indent=2, sort_keys=True)
class YamlDataFormat(DataFormat):
def name(self):
return 'yaml'
def parse(self, data):
try:
logging.debug(f'parsing input data as yaml')
content = yaml.safe_load(data)
return content
except Exception:
raise ValueError('Malformed YAML in input.')
def dump(self, out_data, out_file=None):
if out_file:
yaml.safe_dump(out_data, out_file)
else:
return yaml.safe_dump(out_data)
JSON = JsonDataFormat()
YAML = YamlDataFormat()
class AnySupportedFormat(DataFormat):
def name(self):
return 'data'
def parse(self, data):
content = None
for input_format in [JSON, YAML]:
try:
logging.debug(f'attempting to parse input data as {input_format}')
content = input_format.parse(data)
logging.debug(f'successfully parsed input data as {input_format}')
continue
except Exception:
logging.debug(f'error parsing input data as {input_format}')
if content is None:
raise ValueError('Malformed data in input.')
return content
def dump(self, out_data, out_file=None):
raise NotImplementedError('Data format needs to be specified explicitly!')
ANY_FORMAT = AnySupportedFormat()
def validate_options(options):
"""Validates whether unique file format is specified."""
as_json = options.get(JSON.name())
as_yaml = options.get(YAML.name())
if as_json and as_yaml:
raise Exception(f'JSON and YAML mode cannot be used simultaneously!')
def determine_format(options):
"""
Determines whether the configured format is YAML or JSON.
JSON is the default format if YAML is not explicitly enabled.
"""
validate_options(options)
as_yaml = options.get(YAML.name())
data_format = YAML if as_yaml else JSON
return data_format
def read_from_standard_input(input_format):
"""Prompts for and then reads token JSON from stdin"""
logging.debug(f'expecting {input_format} input from stdin')
print(f'Enter the raw {input_format} (press Ctrl+D on a blank line to submit)', file=sys.stderr)
data = sys.stdin.read()
return data
def load_file(path):
"""Loads the file"""
content = None
if os.path.isfile(path):
with open(path) as data_file:
try:
logging.debug(f'attempting to load content from {path}')
content = data_file.read()
except Exception:
logging.exception(f'encountered exception when loading content from {path}')
else:
logging.info(f'{path} is not a file')
return content
def load_data(options):
"""
Decode a JSON/YAML formatted file.
Data must be a dict of attributes.
Throws a ValueError if there is a problem parsing the data.
"""
input_format = ANY_FORMAT if options.get('data') else determine_format(options)
input_file = options.get(input_format.name())
if input_file == '-':
logging.debug(f'reading {input_format} input from stdin')
content = read_from_standard_input(input_format)
if not content:
raise Exception(f'Unable to load {input_format} from stdin.')
else:
logging.debug(f'reading {input_format} input from {input_file}')
content = load_file(input_file)
if not content:
raise Exception(f'Unable to load {input_format} from {input_file}.')
context_dict = {}
context_provided = False
context_file = options.get('context_file')
if context_file:
context_provided = True
logging.debug(f'reading context from {context_file}')
context_content = load_file(context_file)
if not context_content:
raise Exception(f'Unable to load context from {context_file}.')
context_file_obj = YAML.parse(context_content)
if not isinstance(context_file_obj, dict):
raise Exception(f'Provided context file must evaluate to a dictionary, instead it is {context_file_obj}')
context_dict.update(context_file_obj)
context_overrides = options.get('context_overrides')
if context_overrides:
context_provided = True
logging.debug(f'merging additional context {context_overrides}')
context_dict.update(context_overrides)
if context_provided:
try:
logging.debug(f'applying string templating to input using context {context_dict}')
string_template = string.Template(content)
content = string_template.substitute(context_dict)
except Exception as ex:
message = f'missing variable {ex}' if isinstance(ex, KeyError) else str(ex)
raise Exception(f'Error when processing template: {message}')
content = input_format.parse(content)
if type(content) is dict:
return content
else:
raise ValueError(f'Input {input_format} must be a dictionary of attributes.')
def display_data(options, data):
"""Display data as JSON/YAML format to standard output."""
input_format = determine_format(options)
result = input_format.dump(data)
if result:
print(result)