forked from CenterForOpenScience/osf.io
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonthly_reporters_go.py
More file actions
130 lines (111 loc) · 4.09 KB
/
monthly_reporters_go.py
File metadata and controls
130 lines (111 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import datetime
import logging
from django.core.management.base import BaseCommand
from django.db import OperationalError as DjangoOperationalError
from elasticsearch6.exceptions import ConnectionError as ElasticConnectionError
from psycopg2 import OperationalError as PostgresOperationalError
from framework.celery_tasks import app as celery_app
import framework.sentry
from osf.metrics.reporters import AllMonthlyReporters
from osf.metrics.utils import YearMonth
logger = logging.getLogger(__name__)
_CONTINUE_AFTER_ERRORS = (
DjangoOperationalError,
ElasticConnectionError,
PostgresOperationalError,
)
@celery_app.task(name='management.commands.monthly_reporters_go')
def monthly_reporters_go(yearmonth: str = '', reporter_key: str = ''):
_yearmonth = (
YearMonth.from_str(yearmonth)
if yearmonth
else YearMonth.from_date(datetime.date.today()).prior() # default last month
)
_reporter_keys = (
[reporter_key]
if reporter_key
else _enum_names(AllMonthlyReporters)
)
for _reporter_key in _reporter_keys:
schedule_monthly_reporter.apply_async(kwargs={
'yearmonth': str(_yearmonth),
'reporter_key': _reporter_key,
})
@celery_app.task(name='management.commands.schedule_monthly_reporter')
def schedule_monthly_reporter(
yearmonth: str,
reporter_key: str,
continue_after: dict | None = None,
):
_reporter = _get_reporter(reporter_key, yearmonth)
_last_kwargs = None
try:
for _kwargs in _reporter.iter_report_kwargs(continue_after=continue_after):
monthly_reporter_do.apply_async(kwargs={
'yearmonth': yearmonth,
'reporter_key': reporter_key,
'report_kwargs': _kwargs,
})
_last_kwargs = _kwargs
except _CONTINUE_AFTER_ERRORS as _error:
# let the celery task succeed but log the error
framework.sentry.log_exception(_error)
# schedule another task to continue scheduling
if _last_kwargs is not None:
schedule_monthly_reporter.apply_async(kwargs={
'yearmonth': yearmonth,
'reporter_key': reporter_key,
'continue_after': _last_kwargs,
})
@celery_app.task(
name='management.commands.monthly_reporter_do',
autoretry_for=(
DjangoOperationalError,
ElasticConnectionError,
PostgresOperationalError,
),
max_retries=5,
retry_backoff=True,
)
def monthly_reporter_do(reporter_key: str, yearmonth: str, report_kwargs: dict):
try:
_reporter = _get_reporter(reporter_key, yearmonth)
except KeyError as exc:
framework.sentry.log_exception(exc)
return
_reports = _reporter.report(**report_kwargs)
for _report in _reports:
_report.report_yearmonth = _reporter.yearmonth
_report.save()
_followup_task = _reporter.followup_task(_report)
if _followup_task is not None:
_followup_task.apply_async()
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
'yearmonth',
type=str,
help='year and month (YYYY-MM)',
)
parser.add_argument(
'-r', '--reporter',
type=str,
choices={_name.lower() for _name in _enum_names(AllMonthlyReporters)},
default='',
help='name of the reporter to run (default all)',
)
def handle(self, *args, **kwargs):
monthly_reporters_go(
yearmonth=kwargs['yearmonth'],
reporter_key=kwargs['reporter'].upper(),
)
self.stdout.write(self.style.SUCCESS(
f'scheduling tasks for monthly reporter "{kwargs['reporter']}"...'
if kwargs['reporter']
else 'scheduling tasks for all monthly reporters...'
))
def _get_reporter(reporter_key: str, yearmonth: str):
_reporter_class = AllMonthlyReporters[reporter_key].value
return _reporter_class(YearMonth.from_str(yearmonth))
def _enum_names(enum_cls) -> list[str]:
return list(enum_cls.__members__.keys())