-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdata_model.py
More file actions
278 lines (212 loc) · 8.27 KB
/
data_model.py
File metadata and controls
278 lines (212 loc) · 8.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# SPDX-FileCopyrightText: 2025 Helmholtz-Zentrum Dresden - Rossendorf (HZDR)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileContributor: David Pape
import operator
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from pathlib import Path
from types import NoneType
from typing import Any, Dict, List, Tuple
from pyshacl import validate
from rdflib import BNode, Graph, Literal, Node
from rdflib.collection import Collection
from rdflib.namespace import RDF, RDFS, SH, XSD
from rdflib.term import URIRef
from software_card_policies.config import Config
from software_card_policies.namespaces import PREFIXES, SC
from software_card_policies.rdf_helpers import get_language_tagged_literal
#################################### Software CaRD ####################################
@dataclass
class Parameter:
"""Model of a `sc:Parameter`."""
uri: URIRef
outer_type: URIRef
inner_type: URIRef
default_value: URIRef
config_path: str
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
return cls(
uri=reference,
outer_type=graph.value(reference, SC.parameterOuterType, None),
inner_type=graph.value(reference, SC.parameterInnerType, None),
default_value=graph.value(reference, SC.parameterDefaultValue, None),
config_path=str(graph.value(reference, SC.parameterConfigPath, None)),
)
# TODO: This only works for constraints of type NodeShape. Is this enough?
@dataclass
class Policy:
"""Model of a Software CaRD Policy."""
name: str
description: str
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
return cls(
name=get_language_tagged_literal(graph, reference, SH.name),
description=get_language_tagged_literal(graph, reference, SH.description),
)
######################################## SHACL ########################################
class SeverityLevel(Enum):
"""Model for severity levels."""
INFO = 1
WARNING = 2
VIOLATION = 3
OTHER = 4
def __str__(self):
return self.name.title()
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
if reference == SH.Info:
return cls.INFO
if reference == SH.Warning:
return cls.WARNING
if reference == SH.Violation:
return cls.VIOLATION
return cls.OTHER
@dataclass
class Severity:
"""Model of `sh:Severity`."""
label: str
comment: str
level: SeverityLevel
def __str__(self):
return self.label if self.label else str(self.level)
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
return cls(
label=get_language_tagged_literal(graph, reference, RDFS.label),
comment=get_language_tagged_literal(graph, reference, RDFS.comment),
level=SeverityLevel.from_graph(reference, graph),
)
@dataclass
class ValidationResult:
"""Model of a `sh:ValidationResult`."""
severity: Severity
message: str
source_policy: Policy
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
severity = graph.value(reference, SH.resultSeverity, None)
source_policy = graph.value(reference, SH.sourceShape, None)
return cls(
severity=Severity.from_graph(severity, graph),
message=get_language_tagged_literal(graph, reference, SH.resultMessage),
source_policy=Policy.from_graph(source_policy, graph),
)
@dataclass
class ValidationReport:
"""Model of a `sh:ValidationReport`."""
conforms: bool
results: List[ValidationResult]
@classmethod
def from_graph(cls, reference: URIRef, graph: Graph):
results = graph.objects(reference, SH.result)
return cls(
conforms=(reference, SH.conforms, Literal(True)) in graph,
results=[ValidationResult.from_graph(result, graph) for result in results],
)
################################## Graph Interaction ##################################
# TODO: Add debug messages to all asserts.
_ALLOWED_OUTER_TYPES = (
SC.Scalar,
RDF.List,
)
_ALLOWED_INNER_TYPES = (
XSD.string,
XSD.boolean,
XSD.integer,
XSD.int,
XSD.decimal,
XSD.float,
XSD.double,
XSD.anyURI,
RDFS.Resource,
)
def read_rdf_resource(source: Path | str) -> Graph:
graph = Graph()
graph.parse(source)
for prefix, iri in PREFIXES.items():
graph.bind(prefix, iri, replace=True)
return graph
def _create_list_parameter(
parameter: Parameter, graph: Graph, config_parameter: Any
) -> Node:
assert parameter.outer_type == RDF.List
assert (parameter.default_value, RDF.first, None) in graph
assert (parameter.default_value, RDF.rest, None) in graph
if config_parameter is None:
return Collection(graph, parameter.default_value).uri
# TODO: What happens if `parameter_value` is empty list?
assert isinstance(config_parameter, list)
return Collection(
graph, BNode(), seq=[Literal(value) for value in config_parameter]
).uri
def _create_scalar_parameter(
parameter: Parameter, graph: Graph, config_parameter: Any
) -> Node:
assert parameter.outer_type == SC.Scalar
assert isinstance(config_parameter, (str, int, float, NoneType))
if config_parameter:
return Literal(config_parameter)
return graph.value(parameter.uri, SC.parameterDefaultValue, None)
def parameterize_graph(graph: Graph, config_parameters: Dict[str, Any]) -> Graph:
"""Parameterize the ``graph`` using the ``config_parameters``."""
# Extract references into a list so we don't have to iterate over the graph we want
# to modify!
parameter_refs = list(graph.subjects(RDF.type, SC.Parameter))
for parameter_ref in parameter_refs:
parameter = Parameter.from_graph(parameter_ref, graph)
config_parameter = config_parameters.get(parameter.config_path)
if parameter.outer_type not in _ALLOWED_OUTER_TYPES:
raise ValueError(
f"'Parameter {parameter.uri}' has unknown outer type "
f"'{parameter.outer_type}'"
)
if parameter.inner_type not in _ALLOWED_INNER_TYPES:
raise ValueError(
f"Parameter '{parameter.uri}' has unknown inner type "
f"'{parameter.inner_type}'"
)
if parameter.outer_type == SC.Scalar:
o = _create_scalar_parameter(parameter, graph, config_parameter)
else:
o = _create_list_parameter(parameter, graph, config_parameter)
# Add replacements for all occurences of the parameter. Again, extract
# occurences before modifying the graph!
occurences = list(graph.subject_predicates(parameter.uri))
for s, p in occurences:
graph.add((s, p, o))
# remove all references to the parameter from the graph
graph.remove((parameter.uri, None, None))
graph.remove((None, None, parameter.uri))
if parameter.outer_type != SC.Scalar:
Collection(graph, parameter.default_value).clear()
return graph
def validate_graph(data_graph: Graph, shacl_graph: Graph) -> Tuple[bool, Graph]:
conforms, validation_graph, _validation_text = validate(
data_graph,
shacl_graph=shacl_graph,
advanced=True,
inference="rdfs",
meta_shacl=True,
do_owl_imports=True,
js=True,
)
# Read the whole SHACL vocabulary into the graph so that we can use additional info
# such as labels and descriptions.
validation_graph.parse(SH._NS)
# Bind namespace prefixes for better readability. These _should_ already be bound;
# we're just making sure.
for prefix, iri in PREFIXES.items():
validation_graph.bind(prefix, iri, replace=True)
return conforms, validation_graph
def make_shacl_graph(config: Config) -> Graph:
shacl_graphs = []
# TODO: We're only using the values. Make use of the keys which contain the config
# names of the policies.
for policy in config.policies.values():
policy_graph = read_rdf_resource(policy.source)
shacl_graph = parameterize_graph(policy_graph, policy.parameters)
shacl_graphs.append(shacl_graph)
return reduce(operator.add, shacl_graphs)