diff --git a/CHANGES/plugin_api/6201.feature b/CHANGES/plugin_api/6201.feature new file mode 100644 index 00000000000..5bd5b97795f --- /dev/null +++ b/CHANGES/plugin_api/6201.feature @@ -0,0 +1,5 @@ +Added ability for plugins to dispatch a task to add pull-through content to an associated repository. + +Add the class var `PULL_THROUGH_SUPPORTED = True` to the plugin's repository model to enable this +feature. Plugins can also customize the dispatched task by supplying their own +`pull_through_add_content` method on their repository model. diff --git a/pulpcore/app/models/repository.py b/pulpcore/app/models/repository.py index ecb78538a2b..ae3a73088e6 100644 --- a/pulpcore/app/models/repository.py +++ b/pulpcore/app/models/repository.py @@ -65,6 +65,7 @@ class Repository(MasterModel): TYPE = "repository" CONTENT_TYPES = [] REMOTE_TYPES = [] + PULL_THROUGH_SUPPORTED = False name = models.TextField(db_index=True) pulp_labels = HStoreField(default=dict) @@ -353,6 +354,33 @@ def protected_versions(self): return qs.distinct() + def pull_through_add_content(self, content_artifact): + """ + Dispatch a task to add the passed in content_artifact from the content app's pull-through + feature to this repository. + + Defaults to adding the associated content of the passed in content_artifact to the + repository. Plugins should overwrite this method if more complex behavior is necessary, i.e. + adding multiple associated content units in the same task. + + Args: + content_artifact (pulpcore.app.models.ContentArtifact): the content artifact to add + + Returns: + Optional(Task): Returns the dispatched task or None if nothing was done + """ + cpk = content_artifact.content_id + already_present = RepositoryContent.objects.filter( + content__pk=cpk, repository=self, version_removed__isnull=True + ) + if not cpk or already_present.exists(): + return None + + from pulpcore.plugin.tasking import dispatch, add_and_remove + + body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []} + return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True) + @hook(AFTER_UPDATE, when="retain_repo_versions", has_changed=True) def _cleanup_old_versions_hook(self): # Do not attempt to clean up anything, while there is a transaction involving repo versions diff --git a/pulpcore/content/__init__.py b/pulpcore/content/__init__.py index 640673559f9..10dae5d1be3 100644 --- a/pulpcore/content/__init__.py +++ b/pulpcore/content/__init__.py @@ -25,15 +25,15 @@ from .handler import Handler # noqa: E402: module level not at top of file from .instrumentation import instrumentation # noqa: E402: module level not at top of file -from .authentication import authenticate # noqa: E402: module level not at top of file +from .authentication import authenticate, guid # noqa: E402: module level not at top of file log = logging.getLogger(__name__) if settings.OTEL_ENABLED: - app = web.Application(middlewares=[authenticate, instrumentation()]) + app = web.Application(middlewares=[guid, authenticate, instrumentation()]) else: - app = web.Application(middlewares=[authenticate]) + app = web.Application(middlewares=[guid, authenticate]) CONTENT_MODULE_NAME = "content" diff --git a/pulpcore/content/authentication.py b/pulpcore/content/authentication.py index a78cf748d4a..016c2990c3e 100644 --- a/pulpcore/content/authentication.py +++ b/pulpcore/content/authentication.py @@ -7,6 +7,8 @@ from django.conf import settings from django.db.utils import InterfaceError, DatabaseError from django.http.request import HttpRequest +from django_guid import set_guid +from django_guid.utils import generate_guid from rest_framework.views import APIView from rest_framework.exceptions import APIException @@ -18,6 +20,13 @@ _ = gettext.gettext +@middleware +async def guid(request, handler): + """Sets the django_guid for each request.""" + set_guid(generate_guid()) + return await handler(request) + + @middleware async def authenticate(request, handler): """Authenticates the request to the content app using the DRF authentication classes""" diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 93dfcd637cc..58766e26118 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -745,6 +745,7 @@ async def _match_and_stream(self, path, request): repo_version = distro.repository_version if repository: + repository = await repository.acast() # Search for publication serving the latest (last complete) version if not publication: try: @@ -895,8 +896,11 @@ async def _match_and_stream(self, path, request): .filter(remote=remote, url=url) .afirst() ): - # Try to stream the ContentArtifact if already created ca = ra.content_artifact + # Try to add content to repository if present & supported + if repository and repository.PULL_THROUGH_SUPPORTED: + await sync_to_async(repository.pull_through_add_content)(ca) + # Try to stream the ContentArtifact if already created if ca.artifact: return await self._serve_content_artifact(ca, headers, request) else: @@ -916,6 +920,7 @@ async def _match_and_stream(self, path, request): StreamResponse(headers=headers), ra, save_artifact=save_artifact, + repository=repository, ) except ClientResponseError as ce: @@ -1008,11 +1013,12 @@ def _save_artifact(self, download_result, remote_artifact, request=None): request (aiohttp.web.Request) The request. Returns: - The associated [pulpcore.plugin.models.Artifact][]. + A dictionary of created ContentArtifact objects by relative path. """ content_artifact = remote_artifact.content_artifact remote = remote_artifact.remote artifact = Artifact(**download_result.artifact_attributes, file=download_result.path) + cas = [] with transaction.atomic(): try: with transaction.atomic(): @@ -1040,7 +1046,6 @@ def _save_artifact(self, download_result, remote_artifact, request=None): c_type = remote.get_remote_artifact_content_type(rel_path) artifacts = {rel_path: artifact} content = c_type.init_from_artifact_and_relative_path(artifact, rel_path) - cas = [] if isinstance(content, tuple): content, artifacts = content try: @@ -1070,21 +1075,21 @@ def _save_artifact(self, download_result, remote_artifact, request=None): # Now try to save RemoteArtifacts for each ContentArtifact for ca in cas: if url := remote.get_remote_artifact_url(ca.relative_path, request=request): - remote_artifact = RemoteArtifact( - remote=remote, content_artifact=ca, url=url - ) + ra = RemoteArtifact(remote=remote, content_artifact=ca, url=url) try: with transaction.atomic(): - remote_artifact.save() + ra.save() except IntegrityError: # Remote artifact must have already been saved during a parallel request log.info(f"RemoteArtifact for {url} already exists.") - else: # Normal on-demand downloading, update CA to point to new saved Artifact content_artifact.artifact = artifact content_artifact.save() - return artifact + ret = {content_artifact.relative_path: content_artifact} + if cas: + ret.update({ca.relative_path: ca for ca in cas}) + return ret async def _serve_content_artifact(self, content_artifact, headers, request): """ @@ -1165,7 +1170,9 @@ def _build_url(**kwargs): else: raise NotImplementedError() - async def _stream_remote_artifact(self, request, response, remote_artifact, save_artifact=True): + async def _stream_remote_artifact( + self, request, response, remote_artifact, save_artifact=True, repository=None + ): """ Stream and save a RemoteArtifact. @@ -1175,6 +1182,8 @@ async def _stream_remote_artifact(self, request, response, remote_artifact, save remote_artifact (pulpcore.plugin.models.RemoteArtifact) The RemoteArtifact to fetch and then stream back to the client save_artifact (bool): Override the save behavior on the streamed RemoteArtifact + repository (:class:`~pulpcore.plugin.models.Repository`): An optional repository to save + the content to if supported Raises: [aiohttp.web.HTTPNotFound][] when no @@ -1309,9 +1318,13 @@ async def finalize(): artifacts_size_counter.add(size) if save_artifact and remote.policy != Remote.STREAMED: - await asyncio.shield( + content_artifacts = await asyncio.shield( sync_to_async(self._save_artifact)(download_result, remote_artifact, request) ) + # Try to add content to repository if present & supported + if repository and repository.PULL_THROUGH_SUPPORTED: + ca = content_artifacts[remote_artifact.content_artifact.relative_path] + await sync_to_async(repository.pull_through_add_content)(ca) await response.write_eof() if response.status == 404: diff --git a/pulpcore/tests/unit/content/test_handler.py b/pulpcore/tests/unit/content/test_handler.py index e39098d3e54..c9cbc86702f 100644 --- a/pulpcore/tests/unit/content/test_handler.py +++ b/pulpcore/tests/unit/content/test_handler.py @@ -119,21 +119,27 @@ def checkpoint_publication_2(repo_version_3, noncheckpoint_publication): def test_save_artifact(c1, ra1, download_result_mock): """Artifact needs to be created.""" handler = Handler() - new_artifact = handler._save_artifact(download_result_mock, ra1) + content_artifacts = handler._save_artifact(download_result_mock, ra1) c1 = Content.objects.get(pk=c1.pk) - assert new_artifact is not None - assert c1._artifacts.get().pk == new_artifact.pk + assert content_artifacts is not None + assert ra1.content_artifact.relative_path in content_artifacts + artifact = content_artifacts[ra1.content_artifact.relative_path].artifact + assert c1._artifacts.get().pk == artifact.pk def test_save_artifact_artifact_already_exists(c2, ra1, ra2, download_result_mock): """Artifact turns out to already exist.""" cch = Handler() - new_artifact = cch._save_artifact(download_result_mock, ra1) + new_content_artifacts = cch._save_artifact(download_result_mock, ra1) - existing_artifact = cch._save_artifact(download_result_mock, ra2) + existing_content_artifacts = cch._save_artifact(download_result_mock, ra2) c2 = Content.objects.get(pk=c2.pk) - assert existing_artifact.pk == new_artifact.pk - assert c2._artifacts.get().pk == existing_artifact.pk + assert ra1.content_artifact.relative_path in new_content_artifacts + assert ra2.content_artifact.relative_path in existing_content_artifacts + new_artifact = new_content_artifacts[ra1.content_artifact.relative_path] + existing_artifact = existing_content_artifacts[ra2.content_artifact.relative_path] + assert new_artifact.artifact.pk == existing_artifact.artifact.pk + assert c2._artifacts.get().pk == existing_artifact.artifact.pk # Test pull through features @@ -176,9 +182,15 @@ async def create_remote_artifact(remote, ca): ) -async def create_distribution(remote): +async def create_repository(): + return await Repository.objects.acreate(name=str(uuid.uuid4())) + + +async def create_distribution(remote, repository=None): name = str(uuid.uuid4()) - return await Distribution.objects.acreate(name=name, base_path=name, remote=remote) + return await Distribution.objects.acreate( + name=name, base_path=name, remote=remote, repository=repository + ) @pytest.mark.asyncio @@ -285,7 +297,8 @@ def test_pull_through_save_single_artifact_content( ra = RemoteArtifact(url=f"{remote123.url}/c123", remote=remote123, content_artifact=ca) # Content is saved during handler._save_artifact - artifact = handler._save_artifact(download_result_mock, ra, request=request123) + content_artifacts = handler._save_artifact(download_result_mock, ra, request=request123) + artifact = content_artifacts[ra.content_artifact.relative_path].artifact remote123.get_remote_artifact_content_type.assert_called_once_with("c123") content_init_mock.assert_called_once_with(artifact, "c123") @@ -319,14 +332,16 @@ def content_init(art, path): ca = ContentArtifact(relative_path="c123") ra = RemoteArtifact(url=f"{remote123.url}/c123", remote=remote123, content_artifact=ca) - artifact = handler._save_artifact(download_result_mock, ra, request123) - - ca = artifact.content_memberships.first() - assert ca.content is not None + content_artifacts = handler._save_artifact(download_result_mock, ra, request123) + ca1 = content_artifacts["c123"] + ca2 = content_artifacts["c123abc"] + assert ca1.content is not None + assert ca2.content == ca1.content + assert ca1.artifact == artifact123 - artifacts = set(ca.content._artifacts.all()) + artifacts = set(ca1.content._artifacts.all()) assert len(artifacts) == 2 - assert {artifact, artifact123} == artifacts + assert {ca2.artifact, artifact123} == artifacts @pytest.mark.django_db @@ -446,3 +461,41 @@ def test_handle_checkpoint_before_first_ts( ) with pytest.raises(PathNotResolved): Handler._select_checkpoint_publication(checkpoint_distribution, f"{request_ts}/") + + +@pytest.mark.asyncio +@pytest.mark.django_db +async def test_pull_through_repository_add(request123, monkeypatch): + """Test that repository adding is called when supported.""" + handler = Handler() + handler._stream_content_artifact = AsyncMock() + + content = await create_content() + ca = await create_content_artifact(content) + remote = await create_remote() + await create_remote_artifact(remote, ca) + repo = await create_repository() + monkeypatch.setattr(Remote, "get_remote_artifact_content_type", Mock(return_value=Content)) + monkeypatch.setattr(Repository, "pull_through_add_content", Mock()) + distro = await create_distribution(remote, repository=repo) + + try: + # Assert with Repository.PULL_THROUGH_SUPPORTED=False the method isn't called + await handler._match_and_stream(f"{distro.base_path}/c123", request123) + handler._stream_content_artifact.assert_called_once() + assert ca in handler._stream_content_artifact.call_args[0] + repo.pull_through_add_content.assert_not_called() + + # Now set PULL_THROUGH_SUPPORTED=True and see the method is called with CA + monkeypatch.setattr(Repository, "PULL_THROUGH_SUPPORTED", True) + handler._stream_content_artifact.reset_mock() + await handler._match_and_stream(f"{distro.base_path}/c123", request123) + handler._stream_content_artifact.assert_called_once() + assert ca in handler._stream_content_artifact.call_args[0] + repo.pull_through_add_content.assert_called_once() + assert ca in repo.pull_through_add_content.call_args[0] + finally: + await content.adelete() + await repo.adelete() + await remote.adelete() + await distro.adelete()