-
Notifications
You must be signed in to change notification settings - Fork 358
Expand file tree
/
Copy pathtasks.py
More file actions
432 lines (366 loc) · 15.3 KB
/
tasks.py
File metadata and controls
432 lines (366 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
import requests
import json
from furl import furl
from rest_framework import status as http_status
import celery
from celery.utils.log import get_task_logger
from django.utils import timezone
from datetime import timedelta
from framework.celery_tasks import app as celery_app
from framework.celery_tasks.utils import logged
from framework.exceptions import HTTPError
from framework import sentry
from api.base.utils import waterbutler_api_url_for
from api.waffle.utils import flag_is_active
from website.archiver import (
ARCHIVER_SUCCESS,
ARCHIVER_FAILURE,
ARCHIVER_SIZE_EXCEEDED,
ARCHIVER_NETWORK_ERROR,
ARCHIVER_FILE_NOT_FOUND,
ARCHIVER_UNCAUGHT_ERROR,
NO_ARCHIVE_LIMIT,
AggregateStatResult,
)
from website.archiver import utils
from website.archiver.utils import normalize_unicode_filenames
from website.archiver import signals as archiver_signals
from scripts.check_manual_restart_approval import delayed_manual_restart_approval
from website.project import signals as project_signals
from website import settings
from website.app import init_addons
from osf.models.admin_log_entry import AdminLogEntry, MANUAL_ARCHIVE_RESTART
from osf.models import (
ArchiveJob,
AbstractNode,
Registration,
DraftRegistration,
)
from osf import features
from osf.utils.requests import get_current_request
from osf.external.gravy_valet import request_helpers, translations
def create_app_context():
try:
init_addons(settings)
except AssertionError: # ignore AssertionErrors
pass
logger = get_task_logger(__name__)
class ArchiverSizeExceeded(Exception):
def __init__(self, result):
self.result = result
super().__init__(result)
class ArchiverStateError(Exception):
def __init__(self, info):
self.info = info
super().__init__(info)
class ArchivedFileNotFound(Exception):
def __init__(self, registration, missing_files):
self.draft_registration = DraftRegistration.objects.get(registered_node=registration)
self.missing_files = missing_files
super().__init__(registration, missing_files)
class ArchiverTask(celery.Task):
abstract = True
max_retries = 0
ignore_result = False
def on_failure(self, exc, task_id, args, kwargs, einfo):
job = ArchiveJob.load(kwargs.get('job_pk'))
compact_traceback = utils.compact_traceback(einfo)
if not job:
archiver_state_exc = ArchiverStateError({
'exception': exc,
'args': args,
'kwargs': kwargs,
'einfo': compact_traceback,
})
sentry.log_exception(archiver_state_exc)
raise archiver_state_exc
if job.status == ARCHIVER_FAILURE:
# already captured
return
src, dst, _ = job.info()
errors = []
if isinstance(exc, ArchiverSizeExceeded):
dst.archive_status = ARCHIVER_SIZE_EXCEEDED
errors = exc.result
elif isinstance(exc, HTTPError):
dst.archive_status = ARCHIVER_NETWORK_ERROR
errors = [
each for each in
dst.archive_job.target_info()
if each is not None
]
elif isinstance(exc, ArchivedFileNotFound):
dst.archive_status = ARCHIVER_FILE_NOT_FOUND
errors = {
'missing_files': exc.missing_files,
'draft': exc.draft_registration
}
else:
dst.archive_status = ARCHIVER_UNCAUGHT_ERROR
errors = [f'{exc.__class__.__name__}: {exc}']
if compact_traceback:
errors.append(f'Traceback tail:\n{compact_traceback}')
dst.save()
# Always capture full exception; keep log_message payload compact.
sentry.log_exception(exc)
sentry.log_message(
f'An error occurred while archiving node: {src._id} and registration: {dst._id}',
extra_data={
'source node guid': src._id,
'registration node guid': dst._id,
'task_id': task_id,
'task_name': self.name,
'exception_type': exc.__class__.__name__,
'exception_message': str(exc),
'traceback_tail': compact_traceback,
'errors': errors,
},
)
archiver_signals.archive_fail.send(dst, errors=errors)
def get_addon_from_gv(src_node, addon_name, requesting_user):
addon_data = request_helpers.get_addon(
gv_addon_pk=f'{src_node._id}:{addon_name}',
requested_resource=src_node,
requesting_user=requesting_user,
addon_type='configured-storage-addons'
)
return translations.make_ephemeral_node_settings(
gv_addon_data=addon_data,
requested_resource=src_node,
requesting_user=requesting_user
)
@celery_app.task(base=ArchiverTask, ignore_result=False)
@logged('stat_addon')
def stat_addon(addon_short_name, job_pk):
"""Collect metadata about the file tree of a given addon
:param addon_short_name: AddonConfig.short_name of the addon to be examined
:param job_pk: primary key of archive_job
:return: AggregateStatResult containing file tree metadata
"""
# Dataverse requires special handling for draft and
# published content
addon_name = addon_short_name
version = None
if 'dataverse' in addon_short_name:
addon_name = 'dataverse'
version = 'latest' if addon_short_name.split('-')[-1] == 'draft' else 'latest-published'
create_app_context()
job = ArchiveJob.load(job_pk)
src, dst, user = job.info()
src_addon = None
if addon_name != 'osfstorage' and flag_is_active(get_current_request(), features.ENABLE_GV):
src_addon = get_addon_from_gv(src, addon_name, user)
else:
src_addon = src.get_addon(addon_name)
if hasattr(src_addon, 'configured') and not src_addon.configured:
# Addon enabled but not configured - no file trees, nothing to archive.
return AggregateStatResult(src_addon._id, addon_short_name)
try:
file_tree = src_addon._get_file_tree(user=user, version=version)
except HTTPError as e:
dst.archive_job.update_target(
addon_short_name,
ARCHIVER_NETWORK_ERROR,
errors=[e.data['error']],
)
raise
result = AggregateStatResult(
src_addon._id,
addon_short_name,
targets=[utils.aggregate_file_tree_metadata(addon_short_name, file_tree, user)],
)
return result
@celery_app.task(base=ArchiverTask, ignore_result=False)
@logged('make_copy_request')
def make_copy_request(job_pk, url, data):
"""Make the copy request to the WaterButler API and handle
successful and failed responses
:param job_pk: primary key of ArchiveJob
:param url: URL to send request to
:param data: <dict> of setting to send in POST to WaterButler API
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
src, dst, user = job.info()
logger.info(f"Sending copy request for addon: {data['provider']} on node: {dst._id}")
cookie = furl(url).query.params.get('cookie')
res = requests.post(url, data=json.dumps(data), cookies={settings.COOKIE_NAME: cookie})
if res.status_code not in (http_status.HTTP_200_OK, http_status.HTTP_201_CREATED, http_status.HTTP_202_ACCEPTED):
raise HTTPError(res.status_code)
def make_waterbutler_payload(dst_id, rename):
return {
'action': 'copy',
'path': '/',
'rename': rename.replace('/', '-'),
'resource': dst_id,
'provider': settings.ARCHIVE_PROVIDER,
}
@celery_app.task(base=ArchiverTask, ignore_result=False)
@logged('archive_addon')
def archive_addon(addon_short_name, job_pk):
"""Archive the contents of an addon by making a copy request to the
WaterButler API
:param addon_short_name: AddonConfig.short_name of the addon to be archived
:param job_pk: primary key of ArchiveJob
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
src, dst, user = job.info()
logger.info(f'Archiving addon: {addon_short_name} on node: {src._id}')
cookie = user.get_or_create_cookie().decode()
params = {'cookie': cookie}
rename_suffix = ''
# The dataverse API will not differentiate between published and draft files
# unless explicitly asked. We need to create separate folders for published and
# draft in the resulting archive.
#
# Additionally trying to run the archive without this distinction creates a race
# condition that non-deterministically caused archive jobs to fail.
if 'dataverse' in addon_short_name:
params['revision'] = 'latest' if addon_short_name.split('-')[-1] == 'draft' else 'latest-published'
rename_suffix = ' (draft)' if addon_short_name.split('-')[-1] == 'draft' else ' (published)'
addon_short_name = 'dataverse'
src_provider = None
if addon_short_name != 'osfstorage' and flag_is_active(get_current_request(), features.ENABLE_GV):
src_provider = get_addon_from_gv(src, addon_short_name, user)
else:
src_provider = src.get_addon(addon_short_name)
folder_name_nfd, folder_name_nfc = normalize_unicode_filenames(src_provider.archive_folder_name)
rename = f'{folder_name_nfd}{rename_suffix}'
url = waterbutler_api_url_for(src._id, addon_short_name, _internal=True, base_url=src.osfstorage_region.waterbutler_url, **params)
data = make_waterbutler_payload(dst._id, rename)
make_copy_request.delay(job_pk=job_pk, url=url, data=data)
@celery_app.task(base=ArchiverTask, ignore_result=False)
@logged('archive_node')
def archive_node(stat_results, job_pk):
"""First use the results of #stat_node to check disk usage of the
initiated registration, then either fail the registration or
create a celery.group group of subtasks to archive addons
:param results: results from the #stat_addon subtasks spawned in #stat_node
:param job_pk: primary key of ArchiveJob
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
src, dst, user = job.info()
logger.info(f'Archiving node: {src._id}')
if not isinstance(stat_results, list):
stat_results = [stat_results]
stat_result = AggregateStatResult(
dst._id,
dst.title,
targets=stat_results
)
if (NO_ARCHIVE_LIMIT not in job.initiator.system_tags) and (stat_result.disk_usage > settings.MAX_ARCHIVE_SIZE):
raise ArchiverSizeExceeded(result=stat_result)
else:
if not stat_result.targets:
job.status = ARCHIVER_SUCCESS
job.save()
for result in stat_result.targets:
if not result['num_files']:
job.update_target(result['target_name'], ARCHIVER_SUCCESS)
else:
archive_addon.delay(
addon_short_name=result['target_name'],
job_pk=job_pk
)
project_signals.archive_callback.send(dst)
def archive(job_pk):
"""Starts a celery.chord that runs stat_addon for each
complete addon attached to the Node, then runs
#archive_node with the result
:param job_pk: primary key of ArchiveJob
:return: None
"""
create_app_context()
job = ArchiveJob.load(job_pk)
src, dst, user = job.info()
logger = get_task_logger(__name__)
logger.info(f'Received archive task for Node: {src._id} into Node: {dst._id}')
return celery.chain(
[
celery.group([
stat_addon.si(
addon_short_name=target.name,
job_pk=job_pk,
)
for target in job.target_addons.all()
]),
archive_node.s(
job_pk=job_pk
)
]
)
@celery_app.task(bind=True, base=ArchiverTask, ignore_result=False, max_retries=1, default_retry_delay=60 * 5, acks_late=True)
@logged('archive_success')
def archive_success(self, dst_pk, job_pk):
"""Archiver's final callback. For the time being the use case for this task
is to rewrite references to files selected in a registration schema (the Prereg
Challenge being the first to expose this feature). The created references point
to files on the registered_from Node (needed for previewing schema data), and
must be re-associated with the corresponding files in the newly created registration.
:param str dst_pk: primary key of registration Node
note:: At first glance this task makes redundant calls to utils.get_file_map (which
returns a generator yielding (<sha256>, <file_metadata>) pairs) on the dst Node. Two
notes about utils.get_file_map: 1) this function memoizes previous results to reduce
overhead and 2) this function returns a generator that lazily fetches the file metadata
of child Nodes (it is possible for a selected file to belong to a child Node) using a
non-recursive DFS. Combined this allows for a relatively efficient implementation with
seemingly redundant calls.
"""
create_app_context()
dst = AbstractNode.load(dst_pk)
# Cache registration files count
dst.update_files_count()
# Update file references in the Registration's responses to point to the archived
# file on the Registration instead of the "live" version on the backing project
try:
utils.migrate_file_metadata(dst)
except ArchivedFileNotFound as err:
sentry.log_message(
f'Some files were not found while archiving the node {dst_pk}',
extra_data={
'missing_files': err.missing_files,
},
)
self.retry(exc=err)
job = ArchiveJob.load(job_pk)
if not job.sent:
job.sent = True
job.save()
dst.sanction.ask(dst.get_active_contributors_recursive(unique_users=True))
if was_manually_restarted(dst):
logger.info(f'Registration {dst._id} was manually restarted, scheduling approval check')
delayed_manual_restart_approval.delay(dst._id, delay_minutes=5)
dst.update_search()
def was_manually_restarted(registration):
recent_logs = AdminLogEntry.objects.filter(
object_id=registration.pk,
action_flag=MANUAL_ARCHIVE_RESTART,
action_time__gte=timezone.now() - timedelta(hours=48)
)
return recent_logs.exists()
@celery_app.task(bind=True)
def force_archive(self, registration_id, permissible_addons, allow_unconfigured=False, skip_collisions=False, delete_collisions=False):
from osf.management.commands.force_archive import archive, verify
create_app_context()
try:
registration = AbstractNode.load(registration_id)
if not registration or not isinstance(registration, Registration):
return f'Registration {registration_id} not found'
verify(registration, permissible_addons=set(permissible_addons), raise_error=True)
archive(
registration,
permissible_addons=set(permissible_addons),
allow_unconfigured=allow_unconfigured,
skip_collisions=skip_collisions,
delete_collisions=delete_collisions,
)
return f'Registration {registration_id} archive completed'
except Exception as exc:
sentry.log_message(f'Archive task failed for {registration_id}: {exc}')
sentry.log_exception(exc)
return f'{exc.__class__.__name__}: {str(exc)}'