Skip to content
146 changes: 116 additions & 30 deletions api/share/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"""
from http import HTTPStatus
import logging
from rdflib import Graph

from django.apps import apps
from django.db.models import Q
from celery.utils.time import get_exponential_backoff_interval
import requests

Expand All @@ -14,6 +16,7 @@
from framework.encryption import ensure_bytes
from framework.sentry import log_exception
from osf.external.gravy_valet.exceptions import GVException
from osf.metadata.rdfutils import OSF
from osf.metadata.osf_gathering import (
OsfmapPartition,
pls_get_magic_metadata_basket,
Expand Down Expand Up @@ -64,6 +67,86 @@ def _enqueue_update_share(osfresource):
enqueue_task(task__update_share.s(_osfguid_value))


def retry_shtrove_request(self_celery_task, _response):
try:
_response.raise_for_status()
except Exception as e:
log_exception(e)
if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = _response.headers.get('Retry-After')
try:
countdown = int(retry_after)
except (TypeError, ValueError):
retries = getattr(self_celery_task.request, 'retries', 0)
countdown = get_exponential_backoff_interval(
factor=4,
retries=retries,
maximum=2 * 60,
full_jitter=True,
)
raise self_celery_task.retry(exc=e, countdown=countdown)

raise self_celery_task.retry(exc=e)


def cedar_record_to_turtle(referent, cedar_record):
graph = Graph()
iri = referent.get_semantic_iri()
full_metadata = {
'@id': iri,
OSF.hasCedarRecord: cedar_record.metadata,
}
graph.parse(data=full_metadata, format='json-ld')

return graph.serialize(format='turtle')


@celery_app.task(bind=True)
def share_update_cedar_metadata_record(self, referent_id, cedar_record_pk):
from osf.models import Guid, CedarMetadataRecord

guid = Guid.load(referent_id)
referent = guid.referent
cedar_record = CedarMetadataRecord.objects.filter(pk=cedar_record_pk).first()
if not cedar_record:
return

serialized_data = cedar_record_to_turtle(referent, cedar_record)
response = requests.post(
shtrove_ingest_url(),
params={
'focus_iri': referent.get_semantic_iri(),
'record_identifier': _shtrove_cedar_record_identifier(cedar_record._id, cedar_record.template.cedar_id),
'is_supplementary': True,
},
headers={
'Content-Type': 'text/turtle; charset=utf-8',
**_shtrove_auth_headers(referent),
},
data=ensure_bytes(serialized_data),
)
retry_shtrove_request(self, response)


@celery_app.task(bind=True)
def share_delete_cedar_metadata_record(
self,
cedar_referent___id,
cedar_record___id,
cedar_template_cedar_id,
):
from osf.models import Guid
referent = Guid.load(cedar_referent___id).referent
response = requests.delete(
shtrove_ingest_url(),
params={
'record_identifier': _shtrove_cedar_record_identifier(cedar_record___id, cedar_template_cedar_id),
},
headers=_shtrove_auth_headers(referent),
)
retry_shtrove_request(self, response)


@celery_app.task(
bind=True,
acks_late=True,
Expand Down Expand Up @@ -94,36 +177,35 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name
log_exception(e)
raise self.retry(exc=e)

try:
_response.raise_for_status()
except Exception as e:
log_exception(e)
if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = _response.headers.get('Retry-After')
try:
countdown = int(retry_after)
except (TypeError, ValueError):
retries = getattr(self.request, 'retries', 0)
countdown = get_exponential_backoff_interval(
factor=4,
retries=retries,
maximum=2 * 60,
full_jitter=True,
)
raise self.retry(exc=e, countdown=countdown)

if HTTPStatus(_response.status_code).is_server_error:
raise self.retry(exc=e)
else: # success response
if not _is_deletion:
# enqueue followup task for supplementary metadata
_next_partition = _next_osfmap_partition(_osfmap_partition)
if _next_partition is not None:
task__update_share.delay(
guid,
is_backfill=is_backfill,
osfmap_partition_name=_next_partition.name,
)
retry_shtrove_request(self, _response)
# success response
if _is_deletion:
return

# enqueue followup task for supplementary metadata
_next_partition = _next_osfmap_partition(_osfmap_partition)
if _next_partition is not None:
task__update_share.delay(
guid,
is_backfill=is_backfill,
osfmap_partition_name=_next_partition.name,
)
for cedar_record in _osfid_instance.cedar_metadata_records.filter(
is_published=True,
template__should_index_for_search=True,
):
enqueue_task(share_update_cedar_metadata_record.s(_osfid_instance._id, cedar_record.pk))

for cedar_record in _osfid_instance.cedar_metadata_records.filter(
Q(is_published=False) | Q(template__should_index_for_search=False),
):
enqueue_task(
share_delete_cedar_metadata_record.s(
cedar_record.guid._id,
cedar_record._id,
cedar_record.template.cedar_id,
),
)
Comment on lines +193 to +208
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these'll be run multiple times, for each osfmap partition -- probably want them in an else block, run when _next_partition is None (after the last osfmap partition has been sent)

also for cleanliness sake, would make sense in a separate function

_next_partition = _next_osfmap_partition(_osfmap_partition)
if _next_partition is not None:
   ...
else:  # schedule non-osfmap supplements
   _schedule_cedar_record_updates(_osfid_instance)



def pls_send_trove_record(osf_item, *, is_backfill: bool, osfmap_partition: OsfmapPartition):
Expand Down Expand Up @@ -179,6 +261,10 @@ def _shtrove_record_identifier(osf_item, osfmap_partition: OsfmapPartition):
)


def _shtrove_cedar_record_identifier(cedar_record___id, template_cedar_id) -> str:
return f'{cedar_record___id}/CedarMetadataRecord:{template_cedar_id}'


def _shtrove_auth_headers(osf_item):
_nonfile_item = (
osf_item.target
Expand Down
12 changes: 10 additions & 2 deletions api_tests/share/test_share_preprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,20 @@ def test_call_async_update_on_500_failure(self, mock_share_responses, preprint,
with expect_preprint_ingest_request(mock_share_responses, preprint, count=5):
preprint.update_search()

def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprint, auth):
@mock.patch('api.share.utils.task__update_share.delay')
def test_no_call_async_update_on_400_failure(self, share_delay, mock_share_responses, preprint, auth):
with capture_notifications():
mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400)
preprint.set_published(True, auth=auth, save=True)
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, error_response=True):
preprint.update_search()
try:
preprint.update_search()
except Exception as err:
share_delay.assert_not_called()
assert str(err).startswith("Retry in 180s: HTTPError('400 Client Error:")
assert len(mock_share_responses.calls) == 1
else:
pytest.fail('Expected Retry(HTTPError) to be raised')

def test_delete_from_share(self, mock_share_responses):
preprint = PreprintFactory()
Expand Down
17 changes: 0 additions & 17 deletions osf/models/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,6 @@ def update_or_create_from_json(cls, provider_data, user):
related_name='required_by_providers',
)

def validate_required_metadata(self, obj):
"""
Raises ValidationError if obj does not have a CedarMetadataRecord for
this provider's required_metadata_template.
Does nothing when required_metadata_template is not set.
"""
if not self.required_metadata_template_id:
return
guid = obj.guids.first()
if guid is None or not guid.cedar_metadata_records.filter(
template_id=self.required_metadata_template_id
).exists():
raise ValidationError(
f'Submitted object must have a CEDAR metadata record for template '
f'"{self.required_metadata_template.schema_name}" to be submitted to this collection.'
)

Comment thread
aaxelb marked this conversation as resolved.
def __repr__(self):
return ('(name={self.name!r}, default_license={self.default_license!r}, '
'allow_submissions={self.allow_submissions!r}) with id {self.id!r}').format(self=self)
Expand Down
Loading
Loading