-
-
Notifications
You must be signed in to change notification settings - Fork 91
Expand file tree
/
Copy pathmultitenancy.py
More file actions
236 lines (207 loc) · 9 KB
/
multitenancy.py
File metadata and controls
236 lines (207 loc) · 9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from swapper import load_model
from openwisp_utils.admin_theme.filters import AutocompleteFilter
from .widgets import SHARED_SYSTEMWIDE_LABEL, OrganizationAutocompleteSelect
User = get_user_model()
OrganizationUser = load_model("openwisp_users", "OrganizationUser")
class MultitenantAdminMixin(object):
"""
Mixin that makes a ModelAdmin class multitenant:
users will see only the objects related to the organizations
they are associated with.
"""
multitenant_shared_relations = None
multitenant_parent = None
def get_sensitive_fields(self, request, obj=None):
return getattr(self.model, "sensitive_fields", [])
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
parent = self.multitenant_parent
shared_relations = self.multitenant_shared_relations or []
# copy to avoid modifying class attribute
shared_relations = list(shared_relations)
# add multitenant_parent to multitenant_shared_relations if necessary
if parent and parent not in shared_relations:
shared_relations.append(parent)
self.multitenant_shared_relations = shared_relations
def get_repr(self, obj):
return str(obj)
get_repr.short_description = _("name")
def get_fields(self, request, obj=None):
"""
Return the list of fields to be displayed in the admin.
If the user is not a superuser, it will remove sensitive fields.
"""
fields = super().get_fields(request, obj)
if obj and not request.user.is_superuser:
if self.multitenant_parent:
obj = getattr(obj, self.multitenant_parent)
if getattr(obj, "organization_id", None) is None:
sensitive_fields = self.get_sensitive_fields(request, obj)
return [f for f in fields if f not in sensitive_fields]
return fields
@property
def org_field(self):
if hasattr(self.model, "organization"):
return "organization"
if self.multitenant_parent:
return f"{self.multitenant_parent}__organization"
return None
def get_queryset(self, request):
"""
If current user is not superuser, show only the
objects associated to organizations he/she is associated with
"""
qs = super().get_queryset(request)
user = request.user
if self.model == User:
return self.multitenant_behaviour_for_user_admin(request)
if user.is_superuser:
return qs
if self.model.__name__ == "Organization":
return qs.filter(pk__in=user.organizations_managed)
if not self.org_field:
# if there is no organization field, return the queryset as is
return qs
return qs.filter(
Q(**{f"{self.org_field}__in": user.organizations_managed})
| Q(**{self.org_field: None})
)
def get_search_results(self, request, queryset, search_term):
"""
Override to ensure that the search results are filtered by the
organization of the current user.
"""
if (
request.GET.get("field_name")
and not request.user.is_superuser
and not self.multitenant_shared_relations
and self.org_field
):
queryset = queryset.filter(
**{f"{self.org_field}__in": request.user.organizations_managed}
)
return super().get_search_results(request, queryset, search_term)
def _has_org_permission(self, request, obj, perm_func):
"""
Helper method to check object-level permissions for users
associated with specific organizations.
"""
perm = perm_func(request, obj)
if obj and self.multitenant_parent:
# In case of a multitenant parent, we need to check if the
# user has permission on the parent object.
obj = getattr(obj, self.multitenant_parent)
if not request.user.is_superuser and obj and hasattr(obj, "organization_id"):
perm = perm and (
obj.organization_id
and str(obj.organization_id) in request.user.organizations_managed
)
return perm
def has_change_permission(self, request, obj=None):
"""
Returns True if the user has permission to change the object.
Non-superusers cannot change shared objects.
"""
return self._has_org_permission(request, obj, super().has_change_permission)
def has_delete_permission(self, request, obj=None):
"""
Returns True if the user has permission to delete the object.
Non-superusers cannot change shared objects.
"""
return self._has_org_permission(request, obj, super().has_delete_permission)
def _edit_form(self, request, form):
"""
Modifies the form querysets as follows;
if current user is not superuser:
* show only relevant organizations
* show only relations associated to relevant organizations
or shared relations
* do not allow organization field to be empty (shared org)
else show everything
"""
fields = form.base_fields
user = request.user
org_field = fields.get("organization")
if user.is_superuser and org_field and not org_field.required:
org_field.empty_label = SHARED_SYSTEMWIDE_LABEL
elif not user.is_superuser:
orgs_pk = user.organizations_managed
# organizations relation;
# may be readonly and not present in field list
if org_field:
org_field.queryset = org_field.queryset.filter(pk__in=orgs_pk)
org_field.empty_label = None
org_field.required = True
# other relations
q = Q(organization__in=orgs_pk) | Q(organization=None)
for field_name in self.multitenant_shared_relations:
# each relation may be readonly
# and not present in field list
if field_name not in fields:
continue
field = fields[field_name]
field.queryset = field.queryset.filter(q)
def get_form(self, request, obj=None, **kwargs):
form = super().get_form(request, obj, **kwargs)
self._edit_form(request, form)
return form
def get_formset(self, request, obj=None, **kwargs):
formset = super().get_formset(request, obj=None, **kwargs)
self._edit_form(request, formset.form)
return formset
def multitenant_behaviour_for_user_admin(self, request):
"""
if operator is logged in - show only users
from same organization and hide superusers
if superuser is logged in - show all users
"""
user = request.user
qs = super().get_queryset(request)
if user.is_superuser:
return qs
# Instead of querying the User model using the many-to-many relation
# openwisp_users__organizationuser__organization, a separate query is
# made to fetch users of organizations managed by the logged-in user.
# This approach avoids duplicate objects for users that are admin of
# multiple organizations managed by the logged-in user.
# See https://github.com/openwisp/openwisp-users/issues/324.
# We cannot use .distinct() on the User query directly, because
# it causes issues when performing delete action from the admin.
user_ids = (
OrganizationUser.objects.filter(
organization_id__in=user.organizations_managed
)
.values_list("user_id")
.distinct()
)
# hide superusers from organization operators
# so they can't edit nor delete them
return qs.filter(id__in=user_ids, is_superuser=False)
def formfield_for_foreignkey(self, db_field, request, **kwargs):
if db_field.name == "organization":
kwargs["widget"] = OrganizationAutocompleteSelect(
db_field, self.admin_site, using=kwargs.get("using")
)
return super().formfield_for_foreignkey(db_field, request, **kwargs)
class MultitenantOrgFilter(AutocompleteFilter):
"""
Admin filter that shows only organizations the current
user is associated with in its available choices
"""
field_name = "organization"
parameter_name = "organization"
org_lookup = "id__in"
title = _("organization")
widget_attrs = AutocompleteFilter.widget_attrs.copy()
widget_attrs.update(
{"data-empty-label": SHARED_SYSTEMWIDE_LABEL, "data-is-filter": "true"}
)
class MultitenantRelatedOrgFilter(MultitenantOrgFilter):
"""
Admin filter that shows only objects which have a relation with
one of the organizations the current user is associated with
"""
org_lookup = "organization__in"