diff --git a/api/base/settings/defaults.py b/api/base/settings/defaults.py index 72e169c25a1..8f3683b6115 100644 --- a/api/base/settings/defaults.py +++ b/api/base/settings/defaults.py @@ -325,6 +325,7 @@ }, 'osfmetrics_es8': { 'elasticsearch_metrics.imps.elastic8': { + # passthru kwargs to elasticsearch8 connection constructor 'hosts': osf_settings.ELASTIC8_URI, 'ca_certs': osf_settings.ELASTIC8_CERT_PATH, 'basic_auth': ( @@ -332,6 +333,8 @@ if osf_settings.ELASTIC8_SECRET is not None else None ), + # djelme-specific kwargs + 'djelme_default_index_name_prefix': osf_settings.SHARE_PROVIDER_PREPEND, }, }, } diff --git a/conftest.py b/conftest.py index 198316f1cc4..e80c4e5c566 100644 --- a/conftest.py +++ b/conftest.py @@ -43,6 +43,8 @@ def pytest_configure(config): 'transitions.core', 'MARKDOWN', 'elasticsearch', + 'elastic_transport', + 'elasticsearch_metrics', ] for logger_name in SILENT_LOGGERS: logging.getLogger(logger_name).setLevel(logging.CRITICAL) diff --git a/docker-compose.yml b/docker-compose.yml index 83e8fd27483..04d64c51fda 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,10 +72,17 @@ services: # Temporary: Remove when we've upgraded to ES6 elasticsearch6: image: docker.elastic.co/elasticsearch/elasticsearch:6.3.1 + environment: + - ES_JAVA_OPTS=-Xms512m -Xmx512m # reduce memory usage ports: - 9201:9200 volumes: - elasticsearch6_data_vol:/usr/share/elasticsearch/data + healthcheck: + start_period: 15s + test: curl -s http://localhost:9200/_cluster/health | grep -vq '"status":"red"' + interval: 10s + retries: 30 stdin_open: true elasticsearch8: @@ -91,10 +98,9 @@ services: - elasticsearch8_data_vol:/usr/share/elasticsearch/data healthcheck: start_period: 15s - test: ["CMD", "curl", "-sf", "http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=30s"] + test: curl -s http://localhost:9200/_cluster/health | grep -vq '"status":"red"' interval: 10s - timeout: 30s - retries: 5 + retries: 30 stdin_open: true postgres: diff --git a/framework/celery_tasks/routers.py b/framework/celery_tasks/routers.py index c33238780e8..d9d6e335286 100644 --- a/framework/celery_tasks/routers.py +++ b/framework/celery_tasks/routers.py @@ -11,6 +11,8 @@ def match_by_module(task_path): return CeleryConfig.task_med_queue if task_subpath in CeleryConfig.high_pri_modules: return CeleryConfig.task_high_queue + if task_subpath in CeleryConfig.background_migration_modules: + return CeleryConfig.task_background_migration_queue if task_subpath in CeleryConfig.remote_computing_modules: return CeleryConfig.task_remote_computing_queue if task_subpath in CeleryConfig.account_status_changes_modules: diff --git a/osf/management/commands/fake_metrics_reports.py b/osf/management/commands/fake_metrics_reports.py index 765d6e475c1..53e13472e74 100644 --- a/osf/management/commands/fake_metrics_reports.py +++ b/osf/management/commands/fake_metrics_reports.py @@ -8,6 +8,8 @@ UserSummaryReport, PreprintSummaryReport, ) +from osf.metrics.reports import PublicItemUsageReport +from osf.metrics.utils import YearMonth from osf.models import PreprintProvider @@ -53,10 +55,27 @@ def fake_preprint_counts(days_back): ).save() +def fake_usage_reports(osfid: str, count: int): + _ym = YearMonth.from_date(date.today()).prior() + for _months in range(count): + PublicItemUsageReport.record( + item_osfid=osfid, + report_yearmonth=_ym, + view_count=(_vc := randint(0, 500)), + view_session_count=randint(0, _vc), + download_count=(_dc := randint(0, 300)), + download_session_count=randint(0, _dc), + ) + _ym = _ym.prior() + + class Command(BaseCommand): def handle(self, *args, **kwargs): if not settings.DEBUG: raise NotImplementedError('fake_reports requires DEBUG mode') fake_user_counts(1000) fake_preprint_counts(1000) + fake_usage_reports('blarg', 100) + fake_usage_reports('blerg', 50) + fake_usage_reports('bleg', 50) # TODO: more reports diff --git a/osf/management/commands/migrate_osfmetrics_6to8.py b/osf/management/commands/migrate_osfmetrics_6to8.py new file mode 100644 index 00000000000..92b01e913c3 --- /dev/null +++ b/osf/management/commands/migrate_osfmetrics_6to8.py @@ -0,0 +1,766 @@ +import collections +import datetime +import functools +import logging +import uuid + +from django.core.management import call_command +from django.core.management.base import BaseCommand +from django.db import OperationalError as DjangoOperationalError +from elasticsearch6.exceptions import ConnectionError as Elastic6ConnectionError +from elasticsearch6 import helpers as es6_helpers +from elasticsearch6_dsl.connections import connections as es6_connections +from elasticsearch8.exceptions import ConnectionError as Elastic8ConnectionError +from elasticsearch_metrics.registry import djelme_registry +from elasticsearch_metrics.imps import elastic8 as djel8me +from psycopg2 import OperationalError as PostgresOperationalError + +from framework.celery_tasks import app as celery_app +from osf.metrics.preprint_metrics import ( + PreprintView, + PreprintDownload, +) +from osf.metrics.counted_usage import CountedAuthUsage as CountedUsageEs6 +from osf.metrics import reports as es6_reports +from osf.metrics import es8_metrics, RegistriesModerationMetrics +from osf.metrics.reporters.public_item_usage import _iter_composite_bucket_keys +from osf.metrics.utils import YearMonth +from osf import models as osfdb +from website import settings as website_settings + + +_logger = logging.getLogger(__name__) + +### +# constants + +_USAGE_DAYS_BACK = 99 + +_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control + +_UNCHANGED_RECORDTYPES = { + # reports + es6_reports.StorageAddonUsage: es8_metrics.StorageAddonUsageEs8, + es6_reports.DownloadCountReport: es8_metrics.DownloadCountReportEs8, + es6_reports.InstitutionSummaryReport: es8_metrics.InstitutionSummaryReportEs8, + es6_reports.NewUserDomainReport: es8_metrics.NewUserDomainReportEs8, + es6_reports.NodeSummaryReport: es8_metrics.NodeSummaryReportEs8, + es6_reports.OsfstorageFileCountReport: es8_metrics.OsfstorageFileCountReportEs8, + es6_reports.PreprintSummaryReport: es8_metrics.PreprintSummaryReportEs8, + es6_reports.UserSummaryReport: es8_metrics.UserSummaryReportEs8, + es6_reports.SpamSummaryReport: es8_metrics.SpamSummaryReportEs8, + es6_reports.InstitutionalUserReport: es8_metrics.InstitutionalUserReportEs8, + es6_reports.InstitutionMonthlySummaryReport: es8_metrics.InstitutionMonthlySummaryReportEs8, + es6_reports.PrivateSpamMetricsReport: es8_metrics.PrivateSpamMetricsReportEs8, + # events + RegistriesModerationMetrics: es8_metrics.RegistriesModerationMetricsEs8, +} + +_TASK_KWARGS = dict( + autoretry_for=( + DjangoOperationalError, + Elastic6ConnectionError, + Elastic8ConnectionError, + PostgresOperationalError, + ), + retry_backoff=True, # exponential backoff, with jitter + max_retries=20, +) + +### +# celery tasks + + +@celery_app.task(**_TASK_KWARGS) +def migrate_unchanged_recordtype(es6_recordtype_name: str, until_when: str): + _es6_recordtype = djelme_registry.get_recordtype('osf', es6_recordtype_name) + _es8_recordtype = _UNCHANGED_RECORDTYPES[_es6_recordtype] + _convert_kwargs = ( + _convert_unchanged_cyclicrecord_kwargs + if issubclass(_es8_recordtype, djel8me.CyclicRecord) + else (lambda _kw: _kw) # no conversion needed for event record + ) + _each_new = ( + _es8_recordtype(**_convert_kwargs(_hit['_source'])) + for _hit in _es6_scan_range(_es6_recordtype, until_when=until_when) + ) + return _es8_bulk_save(_es8_recordtype, _each_new) + + +@celery_app.task(**_TASK_KWARGS) +def migrate_counted_usages(from_when: str, until_when: str): + # CountedAuthUsage => OsfCountedUsageRecord + _each_new = ( + _convert_counted_usage(_hit['_source']) + for _hit in _es6_scan_range( + CountedUsageEs6, + from_when=from_when, + until_when=until_when, + addl_filter={'exists': {'field': 'item_guid'}}, + ) + ) + return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new) + + +@celery_app.task(**_TASK_KWARGS) +def migrate_preprint_views(from_when: str, until_when: str): + # PreprintView => OsfCountedUsageRecord + _action_labels = ['view', 'web'] + _each_new = ( + _convert_preprint_metric(_hit['_source'], _action_labels) + for _hit in _es6_scan_range( + PreprintView, from_when=from_when, until_when=until_when + ) + ) + return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new) + + +@celery_app.task(**_TASK_KWARGS) +def migrate_preprint_downloads(from_when: str, until_when: str): + # PreprintDownload => OsfCountedUsageRecord + _action_labels = ['download'] + _each_new = ( + _convert_preprint_metric(_hit['_source'], _action_labels) + for _hit in _es6_scan_range( + PreprintDownload, from_when=from_when, until_when=until_when + ) + ) + return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new) + + +@celery_app.task(**_TASK_KWARGS) +def migrate_usage_reports(osfid: str, until_when: str): + # from PublicItemUsageReport to PublicItemUsageReportEs8 + def _each_new(): + # go in sorted order to build cumulative counts + # (only a few dozen of these per item; should be fine to sort and load all at once) + _each_hit = _es6_scan_range( + es6_reports.PublicItemUsageReport, + until_when=until_when, + addl_filter={'term': {'item_osfid': osfid}}, + sort='report_yearmonth', + ) + _prior_report = None + for _hit in list(_each_hit): + yield ( + _prior_report := _convert_public_usage_report( + _hit['_source'], _prior_report + ) + ) + + return _es8_bulk_save(es8_metrics.PublicItemUsageReportEs8, _each_new()) + + +### +# various helper functions + + +def _es6_connection(): + return es6_connections.get_connection('osfmetrics_es6') + + +def _es8_bulk_save(es8_recordtype, each_new_record): + _success_count, _fail_count = es8_recordtype.bulk( + each_new_record, + stats_only=True, + ) + return _success_count + + +def _date_range( + range_start: datetime.date, + range_end: datetime.date, + step: datetime.timedelta = datetime.timedelta(days=1), +) -> collections.abc.Iterator[tuple[datetime.date, datetime.date]]: + _from_date = range_start + _until_date = range_start + step + while _from_date < range_end: + yield (_from_date, _until_date) + (_from_date, _until_date) = (_until_date, _until_date + step) + + +def _es6_scan_range( + es6_recordtype, + *, + from_when: str = '', + until_when: str, + addl_filter=None, + sort=None, +): + _timestamp_range = {'lt': until_when} + if from_when: + _timestamp_range['gte'] = from_when + _filters = [ + {'range': {'timestamp': _timestamp_range}}, + ] + if addl_filter: + _filters.append(addl_filter) + _query_body = {'query': {'bool': {'filter': _filters}}} + if sort: + _query_body['sort'] = sort + return es6_helpers.scan( + _es6_connection(), + index=es6_recordtype._template_pattern, + query=_query_body, + ) + + +def _es6_usage_report_counts() -> tuple[int, int]: + _search = es6_reports.PublicItemUsageReport.search() + _search.aggs.metric( + 'agg_item_count', + 'cardinality', + field='item_osfid', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _total_count = _response.hits.total + _item_count = ( + _response.aggregations.agg_item_count.value + if 'agg_item_count' in _response.aggregations + else 0 + ) + return (_total_count, _item_count) + + +def _es8_usage_report_counts() -> tuple[int, int]: + _search = es8_metrics.PublicItemUsageReportEs8.search() + _search.aggs.metric( + 'agg_item_count', + 'cardinality', + field='item_osfid', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _total_count = _response.hits.total.value + _item_count = ( + _response.aggregations.agg_item_count.value + if 'agg_item_count' in _response.aggregations + else 0 + ) + return (_total_count, _item_count) + + +def _get_es6_field_names(es6_recordtype): + ''' + adapted from DocumentBase._get_field_names in elasticsearch8.dsl + ''' + for _field_name in es6_recordtype._doc_type.mapping: + _field = es6_recordtype._doc_type.mapping[_field_name] + if hasattr(_field, '_doc_class'): + for _sub_field in _get_es6_field_names(_field._doc_class): + yield f'{_field_name}.{_sub_field}' + else: + yield _field_name + + +def _assert_field_unchangedness(es6_recordtype, es8_recordtype): + _es6_fields = set(_get_es6_field_names(es6_recordtype)) + _es8_fields = set(es8_recordtype._get_field_names()) + + # remove fields intentionally removed in migration + if issubclass(es6_recordtype, es6_reports.DailyReport): + assert issubclass(es8_recordtype, djel8me.CyclicRecord) + _es6_fields.remove('timestamp') + _es6_fields.remove('report_date') + elif issubclass(es6_recordtype, es6_reports.MonthlyReport): + assert issubclass(es8_recordtype, djel8me.CyclicRecord) + _es6_fields.remove('timestamp') + _es6_fields.remove('report_yearmonth') + else: + assert issubclass(es8_recordtype, djel8me.EventRecord) + + # remove fields intentionally added in migration + _es8_fields.remove('timeseries_timeparts') + if issubclass(es8_recordtype, djel8me.CyclicRecord): + _es8_fields.remove('created') + _es8_fields.remove('cycle_coverage') + + # all remaining fields should match + assert _es6_fields == _es8_fields + + +def _semverish_from_yearmonth(given_yearmonth: str): + _ym = YearMonth.from_str(given_yearmonth) + return f'{_ym.year}.{_ym.month}' + + +def _semverish_from_date(given_date: str): + _d = datetime.date.fromisoformat(given_date) + return f'{_d.year}.{_d.month}.{_d.day}' + + +def _convert_unchanged_cyclicrecord_kwargs(es6_source: dict) -> dict: + def _each_kwarg(): + for _key, _val in es6_source.items(): + if _key == 'report_yearmonth': + # report_yearmonth converts to cycle_coverage Y.M + yield ('cycle_coverage', _semverish_from_yearmonth(_val)) + elif _key == 'report_date': + # report_date converts to cycle_coverage Y.M.D + yield ('cycle_coverage', _semverish_from_date(_val)) + elif _key != 'timestamp': + # skipping timestamp; on daily/monthly reports just copied from yearmonth/date + yield (_key, _val) + + return dict(_each_kwarg()) + + +def _convert_counted_usage(source: dict) -> es8_metrics.OsfCountedUsageRecord: + _item_iri = _iri_from_osfid(source['item_guid']) + return es8_metrics.OsfCountedUsageRecord( + # fields from djelme.CountedUsageRecord: + timestamp=source['timestamp'], + sessionhour_id=source['session_id'], + platform_iri=source.get('platform_iri') or website_settings.DOMAIN, + database_iri=_convert_database_iri( + source.get('provider_id'), source.get('item_type') + ), + item_iri=_item_iri, + within_iris=[ + _iri_from_osfid(_within_osfid) + for _within_osfid in source.get('surrounding_guids', ()) + ], + # fields from OsfCountedUsageRecord: + item_osfid=source['item_guid'], + item_type=source.get('item_type', 'osf:Object'), + item_public=source.get('item_public'), + provider_id=source.get('provider_id'), + user_is_authenticated=source.get('user_is_authenticated'), + action_labels=source.get('action_labels'), + pageview_info=source.get('pageview_info'), + ) + + +def _convert_preprint_metric( + source: dict, action_labels: list[str] +) -> es8_metrics.OsfCountedUsageRecord: + _preprint_iri = _iri_from_osfid(source['preprint_id']) + return es8_metrics.OsfCountedUsageRecord.record( + using=False, # don't save yet; will save in bulk + # fields used to compute a sessionhour_id: + timestamp=datetime.datetime.fromisoformat(source['timestamp']), + user_id=source.get('user_id'), + client_session_id=str(uuid.uuid4()), + # fields from djelme.CountedUsageRecord: + platform_iri=website_settings.DOMAIN, + database_iri=_convert_database_iri(source.get('provider_id'), 'preprint'), + item_iri=_preprint_iri, + within_iris=[_preprint_iri], + # fields from OsfCountedUsageRecord: + item_osfid=source['preprint_id'], + item_type='preprint', + item_public=True, + provider_id=source.get('provider_id'), + user_is_authenticated=bool(source.get('user_id')), + action_labels=action_labels, + ) + + +def _convert_public_usage_report( + source: dict, + prior_report: es8_metrics.PublicItemUsageReportEs8 | None, +) -> es8_metrics.PublicItemUsageReportEs8: + if prior_report is None: + _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( + osfid=source['item_osfid'], + until_when=YearMonth.from_str(source['report_yearmonth']).month_end(), + item_type=source.get('item_type'), + ) + else: + _c_views = prior_report.cumulative_view_count + source.get('view_count', 0) + _c_view_sess = prior_report.cumulative_view_session_count + source.get( + 'view_session_count', 0 + ) + _c_downloads = prior_report.cumulative_download_count + source.get( + 'download_count', 0 + ) + _c_download_sess = prior_report.cumulative_download_session_count + source.get( + 'download_session_count', 0 + ) + return es8_metrics.PublicItemUsageReportEs8( + cycle_coverage=_semverish_from_yearmonth(source['report_yearmonth']), + item_osfid=source['item_osfid'], + item_type=source.get('item_type'), + provider_id=source.get('provider_id'), + platform_iri=source.get('platform_iri') or website_settings.DOMAIN, + view_count=source.get('view_count'), + view_session_count=source.get('view_session_count'), + cumulative_view_count=_c_views, + cumulative_view_session_count=_c_view_sess, + download_count=source.get('download_count'), + download_session_count=source.get('download_session_count'), + cumulative_download_count=_c_downloads, + cumulative_download_session_count=_c_download_sess, + ) + + +def _get_cumulative_usage(osfid: str, until_when, item_type: str | None): + if item_type == 'preprint': + _views = _cumulative_preprint_count(PreprintView, osfid, until_when) + _downloads = _cumulative_preprint_count(PreprintDownload, osfid, until_when) + _view_sess, _download_sess = 0, 0 # no session info on preprints (yet) + else: + _views, _view_sess = _cumulative_countedusage_views(osfid, until_when) + _downloads, _download_sess = _cumulative_countedusage_downloads( + osfid, until_when + ) + return (_views, _view_sess, _downloads, _download_sess) + + +def _cumulative_countedusage_views(osfid: str, until_when: str) -> tuple[int, int]: + '''compute view_session_count separately to avoid double-counting + + (the same session may be represented in both the composite agg on `item_guid` + and that on `surrounding_guids`) + ''' + # copied/adapted from osf.metrics.reporters.public_item_usage + _search = ( + CountedUsageEs6.search() + .filter('term', item_public=True) + .filter('range', timestamp={'lt': until_when}) + .filter('term', action_labels='view') + .filter( + 'bool', + should=[ + {'term': {'item_guid': osfid}}, + {'term': {'surrounding_guids': osfid}}, + ], + minimum_should_match=1, + ) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _view_count = _response.hits.total + _view_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) + return (_view_count, _view_session_count) + + +def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]: + '''aggregate downloads on each osfid (not including components/files)''' + # copied/adapted from osf.metrics.reporters.public_item_usage + _search = ( + CountedUsageEs6.search() + .filter('term', item_public=True) + .filter('range', timestamp={'lt': until_when}) + .filter('term', action_labels='download') + .filter('term', item_guid=osfid) + ) + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _download_count = _response.hits.total + _download_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) + return (_download_count, _download_session_count) + + +def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str) -> int: + '''aggregate views on each preprint''' + # copied/adapted from osf.metrics.preprint_metrics + _search = ( + preprint_metric_cls.search() + .filter('term', preprint_id=osfid) + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # no hits; only aggs + ) + _search.aggs.metric('agg_count', 'sum', field='count') + _response = _search.execute() + _view_count = ( + int(_response.aggregations.agg_count.value) + if hasattr(_response.aggregations, 'agg_count') + else 0 + ) + return _view_count + + +def _iri_from_osfid(osfid: str) -> str: + return f'{website_settings.DOMAIN}{osfid}' + + +@functools.lru_cache +def _convert_database_iri(provider_id: str | None, item_type: str) -> str: + if not provider_id: + return website_settings.DOMAIN # osf is a provider, sure why not + + def _fallback_iri(): + return f'urn:osf.io:{provider_id}' + + match item_type: # lower-cased osf.models class names + case 'node' | 'osfuser': + # implicit 'osf' provider + return website_settings.DOMAIN + case 'preprint': + try: + _provider = osfdb.PreprintProvider.objects.get(_id=provider_id) + except osfdb.PreprintProvider.DoesNotExist: + _logger.error(f'unknown preprint provider {provider_id!r}') + return _fallback_iri() + else: + return _provider.get_semantic_iri() + case 'registration': + try: + _provider = osfdb.RegistrationProvider.objects.get(_id=provider_id) + except osfdb.RegistrationProvider.DoesNotExist: + _logger.error(f'unknown registration provider {provider_id!r}') + return _fallback_iri() + else: + return _provider.get_semantic_iri() + case _ if 'file' in item_type: + # file providers are a different thing that don't really have an iri, just an id + return f'urn:files.osf.io:{provider_id}' + case _: # give up gracefully + _logger.error( + f'unknown item type {item_type!r} with provider {provider_id!r}' + ) + return _fallback_iri() + + +def _each_usage_report_osfid(until_when, after_osfid=None): + _search = ( + es6_reports.PublicItemUsageReport.search() + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'item_osfid'}}}], + size=500, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +### +# the command itself + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + '--no-setup', + action='store_true', + ) + parser.add_argument( + '--no-counts', + action='store_true', + ) + parser.add_argument( + '--clear-state', + action='store_true', + ) + parser.add_argument( + '--start', + action='store_true', + ) + parser.add_argument( + '--unchanged', + action='store_true', + ) + parser.add_argument( + '--usage-events', + action='store_true', + ) + parser.add_argument( + '--usage-reports', + action='store_true', + ) + + @functools.cached_property + def _migration_started_at(self): + return es8_metrics.Elastic6To8State.get_started_at() + + def handle( + self, + *, + no_setup, + no_counts, + clear_state, + start, + unchanged, + usage_events, + usage_reports, + **kwargs, + ): + self._quiet_chatty_loggers() + if not no_setup: + call_command('djelme_backend_setup') + if clear_state: + self._clear_state() + self._check_started_at(start_now=start) + _default_all = not any((unchanged, usage_events, usage_reports)) + if unchanged or _default_all: + self._handle_unchanged(start=start, no_counts=no_counts) + if usage_events or _default_all: + self._handle_usage_events(start=start, no_counts=no_counts) + if usage_reports or _default_all: + self._handle_usage_reports(start=start, no_counts=no_counts) + if not no_counts: + self.stdout.write('(counts may be approximate)') + + def _handle_unchanged(self, *, start: bool, no_counts: bool): + # for each (unchanged) report/event: + for _es6_cls, _es8_cls in _UNCHANGED_RECORDTYPES.items(): + _assert_field_unchangedness(_es6_cls, _es8_cls) + if not no_counts: + # display counts + _es6_count = _es6_cls.search().count() + _es8_count = _es8_cls.search().count() + self._write_tabbed('es6', _es6_cls, _es6_count) + self._write_tabbed( + 'es8', + _es8_cls, + _es8_count, + style=self._eq_style(_es8_count, _es6_count), + ) + if start: # schedule task + self.stdout.write( + f'starting {_es6_cls.__name__} => {_es8_cls.__name__}' + ) + migrate_unchanged_recordtype.delay( + _es6_cls.__name__, self._migration_started_at.isoformat() + ) + + def _handle_usage_events(self, *, start: bool, no_counts: bool): + # for counted-usage events: + _started = self._migration_started_at or datetime.datetime.now() + _range_start = (_started - datetime.timedelta(days=_USAGE_DAYS_BACK)).date() + _range_end = _started.date() + datetime.timedelta(days=1) + if not no_counts: + # display counts for each view/download event type + _range_q = { + 'range': { + 'timestamp': { + 'gte': _range_start.isoformat(), + 'lt': _range_end.isoformat(), + } + } + } + _es6_pview_count = PreprintView.search().filter(_range_q).count() + _es6_pdownload_count = PreprintDownload.search().filter(_range_q).count() + _es6_usage_event_count = CountedUsageEs6.search().filter(_range_q).count() + _es6_count = ( + _es6_pview_count + _es6_pdownload_count + _es6_usage_event_count + ) + _es8_count = es8_metrics.OsfCountedUsageRecord.search().count() + self._write_tabbed('es6', PreprintView, _es6_pview_count) + self._write_tabbed('es6', PreprintDownload, _es6_pdownload_count) + self._write_tabbed('es6', CountedUsageEs6, _es6_usage_event_count) + self._write_tabbed( + 'es6', f'(total between {_range_start} and {_range_end})', _es6_count + ) + self._write_tabbed( + 'es8', + es8_metrics.OsfCountedUsageRecord, + _es8_count, + style=self._eq_style(_es8_count, _es6_count), + ) + if start: # schedule (per-day?) tasks (if --start) + self.stdout.write( + f'starting usages => {es8_metrics.OsfCountedUsageRecord.__name__}' + ) + for _from_date, _until_date in _date_range(_range_start, _range_end): + _from_str = _from_date.isoformat() + _until_str = _until_date.isoformat() + migrate_counted_usages.delay(_from_str, _until_str) + migrate_preprint_views.delay(_from_str, _until_str) + migrate_preprint_downloads.delay(_from_str, _until_str) + + def _handle_usage_reports(self, *, start: bool, no_counts: bool): + if not no_counts: + # display counts of reports and distinct items + _es6_count, _es6_item_count = _es6_usage_report_counts() + _es8_count, _es8_item_count = _es8_usage_report_counts() + self._write_tabbed('es6', es6_reports.PublicItemUsageReport, _es6_count) + self._write_tabbed( + 'es8', + es8_metrics.PublicItemUsageReportEs8, + _es8_count, + style=self._eq_style(_es8_count, _es6_count), + ) + self._write_tabbed( + 'es6', + es6_reports.PublicItemUsageReport, + 'osfid count:', + _es6_item_count, + ) + self._write_tabbed( + 'es8', + es8_metrics.PublicItemUsageReportEs8, + '(items)', + _es8_item_count, + style=self._eq_style(_es8_item_count, _es6_item_count), + ) + # (if --start) schedule task per item (by composite agg on es6 public usage reports) + # each item-task iter thru reports oldest to newest, adding cumulative counts + if start: + self.stdout.write( + f'starting per-item {es6_reports.PublicItemUsageReport.__name__} => {es8_metrics.PublicItemUsageReportEs8.__name__}' + ) + for _osfid in _each_usage_report_osfid( + until_when=self._migration_started_at + ): + migrate_usage_reports.delay( + _osfid, self._migration_started_at.isoformat() + ) + + def _check_started_at(self, start_now): + _started_at = self._migration_started_at + if _started_at: + self.stdout.write( + f'osf.metrics 6->8 migration started previously, at {_started_at.isoformat()}' + ) + elif start_now: + _started_at = es8_metrics.Elastic6To8State.set_started_at_now() + del self._migration_started_at # clear cache + self.stdout.write( + f'osf.metrics 6->8 migration starting now, at {_started_at.isoformat()}' + ) + else: + self.stdout.write( + 'osf.metrics 6->8 migration not started nor starting (run with `--start` to start)' + ) + + def _clear_state(self): + self.stdout.write( + 'clearing all migration state (start time, etc)', self.style.NOTICE + ) + es8_metrics.Elastic6To8State.search().query({'match_all': {}}).delete() + es8_metrics.Elastic6To8State.refresh() + + def _eq_style(self, num: int, should_be: int): + return self.style.SUCCESS if (num == should_be) else self.style.WARNING + + def _write_tabbed(self, *strables, style=None): + def _to_str(strable): + if isinstance(strable, type): + return strable.__name__ + return str(strable) + + self.stdout.write('\t'.join(map(_to_str, strables)), style) + + def _quiet_chatty_loggers(self): + _chatty_loggers = [ + 'elasticsearch', + 'elastic_transport', + 'elasticsearch_metrics', + ] + for logger_name in _chatty_loggers: + logging.getLogger(logger_name).setLevel(logging.ERROR) diff --git a/osf/management/commands/osf_shell.py b/osf/management/commands/osf_shell.py index 851895623ac..69443d004be 100644 --- a/osf/management/commands/osf_shell.py +++ b/osf/management/commands/osf_shell.py @@ -32,7 +32,7 @@ def get_user_imports(): from django.db.models import Model from django_extensions.management.commands import shell_plus from django_extensions.management.utils import signalcommand -from elasticsearch_metrics.registry import registry as metrics_registry +from elasticsearch_metrics.registry import djelme_registry def header(text): @@ -160,7 +160,7 @@ def get_osf_imports(self): def get_metrics(self): return { each.__name__: each - for each in metrics_registry.get_metrics() + for each in djelme_registry.each_recordtype() } def get_grouped_imports(self, options): diff --git a/osf/management/commands/sync_databases.py b/osf/management/commands/sync_databases.py index c31d63ea16e..b5030b4bba7 100644 --- a/osf/management/commands/sync_databases.py +++ b/osf/management/commands/sync_databases.py @@ -20,7 +20,7 @@ def handle(self, *args, **options): ['migrate'], ] if waffle.switch_is_active(features.ELASTICSEARCH_METRICS): - COMMANDS.append(['sync_metrics']) + COMMANDS.append(['djelme_backend_setup']) for check in COMMANDS: call_command(*check) diff --git a/osf/metrics/es8_metrics.py b/osf/metrics/es8_metrics.py index 436a1c62d46..4c46710748c 100644 --- a/osf/metrics/es8_metrics.py +++ b/osf/metrics/es8_metrics.py @@ -3,7 +3,7 @@ from urllib.parse import urlsplit import elasticsearch8.dsl as esdsl -from elasticsearch_metrics import DAILY, MONTHLY +from elasticsearch_metrics import DAILY, MONTHLY, YEARLY import elasticsearch_metrics.imps.elastic8 as djelme from osf.metrics.utils import YearMonth @@ -57,19 +57,19 @@ class PageviewInfo(esdsl.InnerDoc): """ # fields that should be provided - referer_url: str - page_url: str - page_title: str - route_name: str = esdsl.mapped_field(esdsl.Keyword( + referer_url: str | None + page_url: str | None + page_title: str | None + route_name: str | None = esdsl.mapped_field(esdsl.Keyword( fields={ 'by_prefix': esdsl.Text(analyzer=route_prefix_analyzer), }, )) # fields auto-filled - page_path: str - referer_domain: str - hour_of_day: int + page_path: str | None + referer_domain: str | None + hour_of_day: int | None ### @@ -77,23 +77,30 @@ class PageviewInfo(esdsl.InnerDoc): class OsfCountedUsageRecord(djelme.CountedUsageRecord): ''' - - inherited fields: - platform_iri: str - database_iri: str - item_iri: str - sessionhour_id: str - within_iris: list[str] + Aim to support a COUNTER-style reporting api + https://cop5.projectcounter.org/en/5.1/appendices/a-glossary-of-terms.html + https://coprd.countermetrics.org/en/1.0.1/appendices/a-glossary.html ''' - # osf-specific fields + + # inherited fields: + # timestamp: datetime.datetime + # platform_iri: str + # database_iri: str + # item_iri: str + # sessionhour_id: str + # within_iris: list[str] + + # osf-specific fields: item_osfid: str item_type: str item_public: bool + provider_id: str | None user_is_authenticated: bool action_labels: list[str] - pageview_info: PageviewInfo + pageview_info: PageviewInfo | None - def save(self, *args, **kwargs): + def clean(self): + super().clean() # autofill pageview_info fields if self.pageview_info: self.pageview_info.hour_of_day = self.timestamp.hour @@ -103,7 +110,43 @@ def save(self, *args, **kwargs): _ref_url = self.pageview_info.referer_url if _ref_url: self.pageview_info.referer_domain = urlsplit(_ref_url).netloc - super().save(*args, **kwargs) + # ensure inclusive "within" + if not self.within_iris: + self.within_iris = [self.item_iri] + elif self.item_iri not in self.within_iris: + self.within_iris = [self.item_iri, *self.within_iris] + + def _get_unique_together_values(self): + """get "unique together" values for "ON CONFLICT UPDATE" behavior + + override djelme.BaseDjelmeRecord._get_unique_together_values + for more complex logic than UNIQUE_TOGETHER_FIELDS + to slightly better approximate `counter:Double-Click Filtering` + """ + # note: copied from osf.metrics.counted_usage._fill_document_id + target_identifier = ( + self.pageview_info.page_url + if self.pageview_info is not None and self.pageview_info.page_url is not None + else self.item_osfid + ) + # slice the day into an array of 30-second windows, + # find this timestamp's windowslice index + day_start = datetime.datetime( + self.timestamp.year, + self.timestamp.month, + self.timestamp.day, + tzinfo=self.timestamp.tzinfo, + ) + time_in_seconds = (self.timestamp - day_start).total_seconds() + time_window = int(time_in_seconds / 30) # 30-second windows + return ( # unique-together values: + self.platform_iri, + target_identifier, + self.sessionhour_id, + self.timestamp.date(), + time_window, + ','.join(sorted(self.action_labels)), + ) class ActionLabel(enum.Enum): @@ -121,7 +164,7 @@ class RegistriesModerationMetricsEs8(djelme.EventRecord): from_state: str to_state: str user_id: str - comment: str + comment: str | None class Index: settings = { @@ -130,6 +173,9 @@ class Index: 'refresh_interval': '1s', } + class Meta: + timeseries_recordtype_name = 'RegistriesModerationMetrics' + ### # Reusable inner objects for reports @@ -192,12 +238,20 @@ class StorageAddonUsageEs8(djelme.CyclicRecord): usage_by_addon: list[UsageByStorageAddon] + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'StorageAddonUsage' + class DownloadCountReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY daily_file_downloads: int + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'DownloadCountReport' + class InstitutionSummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY @@ -211,6 +265,10 @@ class InstitutionSummaryReportEs8(djelme.CyclicRecord): registered_nodes: RegistrationRunningTotals registered_projects: RegistrationRunningTotals + class Meta: + timeseries_index_timedepth = MONTHLY + timeseries_recordtype_name = 'InstitutionSummaryReport' + class NewUserDomainReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY @@ -219,6 +277,10 @@ class NewUserDomainReportEs8(djelme.CyclicRecord): domain_name: str new_user_count: int + class Meta: + timeseries_index_timedepth = MONTHLY + timeseries_recordtype_name = 'NewUserDomainReport' + class NodeSummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY @@ -228,12 +290,20 @@ class NodeSummaryReportEs8(djelme.CyclicRecord): registered_nodes: RegistrationRunningTotals registered_projects: RegistrationRunningTotals + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'NodeSummaryReport' + class OsfstorageFileCountReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY files: FileRunningTotals + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'OsfstorageFileCountReport' + class PreprintSummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY @@ -242,6 +312,10 @@ class PreprintSummaryReportEs8(djelme.CyclicRecord): provider_key: str preprint_count: int + class Meta: + timeseries_index_timedepth = MONTHLY + timeseries_recordtype_name = 'PreprintSummaryReport' + class UserSummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = DAILY @@ -253,6 +327,10 @@ class UserSummaryReportEs8(djelme.CyclicRecord): new_users_with_institution_daily: int unconfirmed: int + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'UserSummaryReport' + class SpamSummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = MONTHLY @@ -269,6 +347,10 @@ class SpamSummaryReportEs8(djelme.CyclicRecord): user_marked_as_spam: int user_marked_as_ham: int + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'SpamSummaryReport' + class InstitutionalUserReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = MONTHLY @@ -282,7 +364,7 @@ class InstitutionalUserReportEs8(djelme.CyclicRecord): month_last_login = YearmonthField() month_last_active = YearmonthField() account_creation_date = YearmonthField() - orcid_id: str + orcid_id: str | None # counts: public_project_count: int private_project_count: int @@ -292,6 +374,10 @@ class InstitutionalUserReportEs8(djelme.CyclicRecord): public_file_count: int = esdsl.mapped_field(esdsl.Long()) storage_byte_count: int = esdsl.mapped_field(esdsl.Long()) + class Meta: + timeseries_index_timedepth = MONTHLY + timeseries_recordtype_name = 'InstitutionalUserReport' + class InstitutionMonthlySummaryReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = MONTHLY @@ -309,6 +395,10 @@ class InstitutionMonthlySummaryReportEs8(djelme.CyclicRecord): monthly_logged_in_user_count: int = esdsl.mapped_field(esdsl.Long()) monthly_active_user_count: int = esdsl.mapped_field(esdsl.Long()) + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'InstitutionMonthlySummaryReport' + class PublicItemUsageReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = MONTHLY @@ -334,6 +424,10 @@ class PublicItemUsageReportEs8(djelme.CyclicRecord): cumulative_download_count: int = esdsl.mapped_field(esdsl.Long()) cumulative_download_session_count: int = esdsl.mapped_field(esdsl.Long()) + class Meta: + timeseries_index_timedepth = MONTHLY + timeseries_recordtype_name = 'PublicItemUsageReport' + class PrivateSpamMetricsReportEs8(djelme.CyclicRecord): CYCLE_TIMEDEPTH = MONTHLY @@ -346,3 +440,40 @@ class PrivateSpamMetricsReportEs8(djelme.CyclicRecord): preprint_oopspam_hammed: int preprint_akismet_flagged: int preprint_akismet_hammed: int + + class Meta: + timeseries_index_timedepth = YEARLY + timeseries_recordtype_name = 'PrivateSpamMetricsReport' + + +### +# data migration state + +class Elastic6To8State(djelme.SimpleRecord): + """index for storing values helpful for keeping track of the elastic 6->8 data migration""" + UNIQUE_TOGETHER_FIELDS = ('key',) + key: str + value: str | None + timestamp: datetime.datetime = esdsl.mapped_field( + default_factory=lambda: datetime.datetime.now(datetime.UTC), + ) + + @classmethod + def get_by_key(cls, key: str): + _response = cls.search().query({'term': {'key': key}})[0].execute() + return _response[0] if _response else None + + @classmethod + def get_timestamp(cls, key: str) -> datetime.datetime | None: + _record = cls.get_by_key(key) + return _record.timestamp if _record else None + + @classmethod + def get_started_at(cls): + return cls.get_timestamp('started_at') + + @classmethod + def set_started_at_now(cls): + _record = cls.record(key='started_at') + cls.refresh() + return _record.timestamp diff --git a/osf_tests/metrics/test_es8_metrics.py b/osf_tests/metrics/test_es8_metrics.py index e93579628dc..a871054e96b 100644 --- a/osf_tests/metrics/test_es8_metrics.py +++ b/osf_tests/metrics/test_es8_metrics.py @@ -1,4 +1,4 @@ -from datetime import datetime +import datetime from elasticsearch_metrics.tests.util import djelme_test_backends import pytest @@ -20,7 +20,7 @@ def _real_elastic(self): def test_nested_pageview_autofill(self): usage = OsfCountedUsageRecord.record( - timestamp=datetime(2024, 1, 1, 15, 0), + timestamp=datetime.datetime(2024, 1, 1, 15, 0, tzinfo=datetime.UTC), sessionhour_id='blah', database_iri='https://osf.example/provider', item_iri='https://osf.example/itemm', @@ -39,13 +39,52 @@ def test_nested_pageview_autofill(self): assert usage.pageview_info.page_path == '/path/test' assert usage.pageview_info.referer_domain == 'google.com' assert usage.pageview_info.hour_of_day == 15 + assert usage.item_iri in usage.within_iris + + def test_nested_pageview_autofill_dict(self): + usage = OsfCountedUsageRecord.record( + timestamp=datetime.datetime(2024, 1, 1, 15, 0, tzinfo=datetime.UTC), + sessionhour_id='blah', + database_iri='https://osf.example/provider', + item_iri='https://osf.example/itemm', + item_osfid='itemm', + item_public=True, + item_type='https://osf.example/Preprint', + platform_iri='https://osf.example', + user_is_authenticated=False, + pageview_info={ + 'page_url': 'https://example.com/path/test', + 'referer_url': 'https://google.com', + 'route_name': 'foo.bar', + 'page_title': 'title title', + }, + ) + assert usage.pageview_info.page_path == '/path/test' + assert usage.pageview_info.referer_domain == 'google.com' + assert usage.pageview_info.hour_of_day == 15 + assert usage.item_iri in usage.within_iris + + def test_none_pageview_nested_autofill(self): + usage = OsfCountedUsageRecord.record( + timestamp=datetime.datetime(2024, 1, 1, 15, 0, tzinfo=datetime.UTC), + sessionhour_id='blah', + database_iri='https://osf.example/provider', + item_iri='https://osf.example/itemm', + item_osfid='itemm', + item_public=True, + item_type='https://osf.example/Preprint', + platform_iri='https://osf.example', + user_is_authenticated=False, + ) + assert not usage.pageview_info + assert usage.item_iri in usage.within_iris def test_save_report(self): _saved = DownloadCountReportEs8.record( cycle_coverage='2026.1.1', daily_file_downloads=17, ) - DownloadCountReportEs8.refresh_timeseries_indexes() + DownloadCountReportEs8.refresh() _response = DownloadCountReportEs8.search().execute() (_fetched,) = _response assert _fetched.meta.id == _saved.meta.id diff --git a/poetry.lock b/poetry.lock index 90665bce81f..c16b7d021e0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1085,7 +1085,7 @@ Django = ">=2.0" [[package]] name = "django-elasticsearch-metrics" -version = "2026.0.3" +version = "2026.0.4" description = "Django app for storing time-series metrics in Elasticsearch." optional = false python-versions = ">=3.10,<4" @@ -1101,8 +1101,8 @@ elastic8 = ["elasticsearch8 (>=8.0.0,<9.0.0)"] [package.source] type = "git" url = "https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git" -reference = "8025d58e23b4e0c562e1d59c98b10ec936eb56e6" -resolved_reference = "8025d58e23b4e0c562e1d59c98b10ec936eb56e6" +reference = "4e833670178beb682bb0d64e4f33db012cf8f014" +resolved_reference = "4e833670178beb682bb0d64e4f33db012cf8f014" [[package]] name = "django-extensions" @@ -4711,4 +4711,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "ef1d6d327f5557e43482793b276ccb6c5fd07989f27367af3a3736a8547b4d1a" +content-hash = "d08b71fd886f9c6bd3d8d6cb1eda9f08431b7e84398b107e25f0371a4111266b" diff --git a/pyproject.toml b/pyproject.toml index 013df3f448d..fcc0decc86d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,7 +91,7 @@ datacite = "1.1.3" rdflib = "7.0.0" colorlog = "6.8.2" # Metrics -django-elasticsearch-metrics = {git ="https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git", rev = "8025d58e23b4e0c562e1d59c98b10ec936eb56e6"} +django-elasticsearch-metrics = {git ="https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git", rev = "4e833670178beb682bb0d64e4f33db012cf8f014"} # Impact Metrics CSV Export djangorestframework-csv = "3.0.2" gevent = "24.2.1" diff --git a/website/settings/defaults.py b/website/settings/defaults.py index 1e8032cc95c..69f82d2d2a7 100644 --- a/website/settings/defaults.py +++ b/website/settings/defaults.py @@ -421,6 +421,7 @@ class CeleryConfig: task_account_status_changes_queue = 'account_status_changes' task_external_high_queue = 'external_high' task_external_low_queue = 'external_low' + task_background_migration_queue = 'background_migration' external_high_modules = { 'osf.tasks.log_gv_addon', @@ -487,6 +488,10 @@ class CeleryConfig: 'api.share.utils', } + background_migration_modules = { + 'osf.management.commands.migrate_osfmetrics_6to8', + } + try: from kombu import Queue, Exchange except ImportError: @@ -540,12 +545,19 @@ class CeleryConfig: routing_key=task_external_low_queue, consumer_arguments={'x-priority': -2}, ), + Queue( + task_background_migration_queue, + Exchange(task_background_migration_queue), + routing_key=task_background_migration_queue, + consumer_arguments={'x-priority': -1}, + ), ) task_default_exchange_type = 'direct' task_routes = ('framework.celery_tasks.routers.CeleryRouter', ) task_ignore_result = True task_store_errors_even_if_ignored = True + result_extended = True broker_url = os.environ.get('BROKER_URL', f'amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}') broker_use_ssl = False @@ -596,6 +608,7 @@ class CeleryConfig: 'scripts.remove_after_use.merge_notification_subscription_provider_ct', 'scripts.disable_removed_beat_tasks', 'osf.management.commands.delete_withdrawn_or_failed_registration_files', + 'osf.management.commands.migrate_osfmetrics_6to8', ) # Modules that need metrics and release requirements