Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 232 additions & 29 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import re
import shutil
import tempfile
import uuid

Expand Down Expand Up @@ -91,6 +92,7 @@
is_previous_version,
get_sha256,
urlpath_sanitize,
format_nevra,
)
from pulp_rpm.app.rpm_version import RpmVersion

Expand Down Expand Up @@ -565,7 +567,9 @@ def is_subrepo(directory):
namespace=directory,
)

dv = RpmDeclarativeVersion(first_stage=stage, repository=repo, mirror=mirror)
dv = RpmDeclarativeVersion(
first_stage=stage, repository=repo, mirror=mirror, sync_policy=sync_policy
)
repo_version = dv.create() or repo.latest_version()

repo_config["sync_details"]["most_recent_version"] = repo_version.number
Expand Down Expand Up @@ -625,9 +629,37 @@ def __init__(self, *args, **kwargs):

Adding it here, because we call RpmDeclarativeVersion multiple times in sync.
"""
self.sync_policy = kwargs.pop("sync_policy", None)
kwargs["acs"] = True
super().__init__(*args, **kwargs)

def should_sign(self):
"""
Check if the repository should do signing during sync.

Returns:
bool: True, if the repostiory should sign; False, otherwise.

"""
# Get the signing service
signing_service = self.repository.package_signing_service
fingerprint = self.repository.package_signing_fingerprint

# Check if this is a re-sign sync policy
should_sign = (
self.sync_policy in (SYNC_POLICIES.MIRROR_CONTENT_ONLY, SYNC_POLICIES.ADDITIVE)
and signing_service
and fingerprint
)

if should_sign:
log.info(
f"Re-signing RPM packages in {self.repository.name} with signing service "
f"{signing_service.name} with fingerprint {fingerprint}"
)

return should_sign

def pipeline_stages(self, new_version):
"""
Build a list of stages feeding into the ContentUnitAssociation stage.
Expand All @@ -648,9 +680,18 @@ def pipeline_stages(self, new_version):
]
if self.acs:
pipeline.append(ACSArtifactHandler())
pipeline.append(ArtifactDownloader())

if self.should_sign():
pipeline.append(
RpmArtifactSigningStage(
self.repository.package_signing_service,
self.repository.package_signing_fingerprint,
)
)

pipeline.extend(
[
ArtifactDownloader(),
ArtifactSaver(),
QueryExistingContents(),
RpmContentSaver(),
Expand Down Expand Up @@ -1211,6 +1252,49 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, modulemd_l
# we might want to pick the latest based on the build time.
latest_build_time_by_nevra = {}

# Cache packages already in the repository to later check for duplicates
# This needs to be @sync_to_async because self.repository.latest_version() is synchronous
@sync_to_async
def get_existing_package_nevras():
contents = self.repository.latest_version().content.filter(
pulp_type=Package.get_pulp_type()
)
nevra_to_declarative = {}

for content in contents:
# Get package object from content
pkg = content.rpm_package
nevra = format_nevra(pkg.name, pkg.epoch, pkg.version, pkg.release, pkg.arch)

# Get the associated ContentArtifact for this package
content_artifacts = ContentArtifact.objects.filter(content=content)
d_artifacts = []

base_url = pkg.location_base
url = urlpath_sanitize(base_url, pkg.location_href)

for ca in content_artifacts:
if ca.artifact:
# Create DeclarativeArtifact with the existing artifact
da = DeclarativeArtifact(
artifact=ca.artifact,
url=url, # No URL needed for existing artifacts
relative_path=ca.relative_path,
remote=None, # No remote needed for existing artifacts
extra_data={"already_in_repo": True},
deferred_download=False,
)
d_artifacts.append(da)

if len(d_artifacts) > 0:
# Create a DeclarativeContent object for the existing package with artifacts
dc = DeclarativeContent(content=pkg, d_artifacts=d_artifacts)
nevra_to_declarative[nevra] = dc

return nevra_to_declarative

existing_package_nevras = await get_existing_package_nevras()

# Perform various checks and potentially filter out unwanted packages
# We parse all of primary.xml first and fail fast if something is wrong.
# Collect a list of any package nevras() we don't want to include.
Expand Down Expand Up @@ -1329,33 +1413,48 @@ def verification_and_skip_callback(pkg):
# entries with the same NEVRA, pick the one with the larger build time
elif pkg.time_build != latest_build_time_by_nevra[pkg_nevra]:
continue
# Implicit: There can be multiple package entries that are completely identical
# (same NEVRA, same build time, same checksum / pkgid) and the same or different
# location_href. We're not explicitly handling this, the pipeline will deduplicate.
package = Package(**Package.createrepo_to_dict(pkg))
base_url = pkg.location_base or self.remote_url
url = urlpath_sanitize(base_url, package.location_href)
del pkg # delete it as soon as we're done with it

# Location_href is not a property of the Package in isolation [0], and Pulp has
# a well defined way of generating the layout/locations on publication time.
# We only need to use the original location_href for metadata mirroring
# [0] https://github.com/pulp/pulp_rpm/issues/2580
original_location_href = package.location_href
package.location_href = package.filename
store_package_for_mirroring(self.repository, package.pkgId, original_location_href)

artifact = Artifact(size=package.size_package)
checksum_type = getattr(CHECKSUM_TYPES, package.checksum_type.upper())
setattr(artifact, checksum_type, package.pkgId)
da = DeclarativeArtifact(
artifact=artifact,
url=url,
relative_path=package.location_href,
remote=self.remote,
deferred_download=self.deferred_download,
)
dc = DeclarativeContent(content=package, d_artifacts=[da])

# Check for identical NEVRA to what's in the repo right now. For these, we don't
# want to "skip" them, but we also don't want to re-download them, so let's just
# make the declarative content point to the existing package with no artifact.
if pkg_nevra in existing_package_nevras:
log.debug(
f"Substituting already existing package {pkg_nevra} into repo to avoid "
f"re-download"
)
dc = existing_package_nevras[pkg_nevra]
else:
# Implicit: There can be multiple package entries that are completely identical
# (same NEVRA, same build time, same checksum / pkgid) and the same or different
# location_href. We're not explicitly handling this, the pipeline will
# deduplicate.
package = Package(**Package.createrepo_to_dict(pkg))
base_url = pkg.location_base or self.remote_url
url = urlpath_sanitize(base_url, package.location_href)
del pkg # delete it as soon as we're done with it

# Location_href is not a property of the Package in isolation [0], and Pulp has
# a well defined way of generating the layout/locations on publication time.
# We only need to use the original location_href for metadata mirroring
# [0] https://github.com/pulp/pulp_rpm/issues/2580
original_location_href = package.location_href
package.location_href = package.filename
store_package_for_mirroring(
self.repository, package.pkgId, original_location_href
)

artifact = Artifact(size=package.size_package)
checksum_type = getattr(CHECKSUM_TYPES, package.checksum_type.upper())
setattr(artifact, checksum_type, package.pkgId)
da = DeclarativeArtifact(
artifact=artifact,
url=url,
relative_path=package.location_href,
remote=self.remote,
deferred_download=self.deferred_download,
)
dc = DeclarativeContent(content=package, d_artifacts=[da])

dc.extra_data = defaultdict(list)

# find if a package relates to a modulemd
Expand Down Expand Up @@ -1588,3 +1687,107 @@ def _handle_distribution_tree(declarative_content):

if update_references_to_save:
UpdateReference.objects.bulk_create(update_references_to_save, ignore_conflicts=True)


class RpmArtifactSigningStage(Stage):
"""
Stage for signing RPM artifacts during sync when repository has signing service configured.

This stage runs after ArtifactDownloader, allowing us to sign the synchronized rpms
"""

def __init__(self, signing_service, fingerprint):
"""
Initialize the signing stage.

Args:
signing_service: RpmPackageSigningService instance
fingerprint: GPG fingerprint to use for signing
"""
super().__init__()
self.signing_service = signing_service
self.fingerprint = fingerprint

async def run(self):
"""
Process DeclarativeContent items and sign RPM packages.
"""

async for batch in self.batches():
for d_content in batch:
if isinstance(d_content.content, Package) and len(d_content.d_artifacts) > 0:
await self._sign_rpm_content(d_content)

await self.put(d_content)
Comment thread
jdieter marked this conversation as resolved.

async def _sign_rpm_content(self, d_content):
"""
Sign an RPM content and create a new content with updated metadata.

Args:
d_content: DeclarativeContent containing an RPM package
signing_service: RpmPackageSigningService instance
fingerprint: GPG fingerprint to use for signing
"""

if len(d_content.d_artifacts) != 1:
raise ValueError("Expected exactly one artifact for signing")

d_artifact = d_content.d_artifacts[0]

# If the artifact is already in the repo, it should already be signed, so let's not sign it
# again.
if "already_in_repo" in d_artifact.extra_data and d_artifact.extra_data["already_in_repo"]:
return

if (
not hasattr(d_artifact, "artifact")
or not d_artifact.artifact
or not hasattr(d_artifact.artifact, "file")
or not d_artifact.artifact.file
):
raise ValueError("No file found in d_artifact.artifact")

# Extract the full path for the artifact file
temp_file_path = str(d_artifact.artifact.file)

# There are situations where deduplication before now has caused the artifact to point to
# a file that's already in the database, but still needs to be signed. In this case we
# need to copy the file to a temporary location for signing.
if not os.path.exists(temp_file_path):

@sync_to_async
def copy_artifact_to_temp_file(artifact):
"""Copy artifact from storage to a temporary file for signing."""
filename = f"{artifact.pulp_id}.rpm"
artifact_file = artifact.pulp_domain.get_storage().open(artifact.file.name)
temp_file = tempfile.NamedTemporaryFile(
"wb", dir=".", suffix=filename, delete=False
)
try:
shutil.copyfileobj(artifact_file, temp_file)
temp_file.flush()
return temp_file.name
finally:
artifact_file.close()
temp_file.close()

temp_file_path = await copy_artifact_to_temp_file(d_artifact.artifact)

# Verify file exists and is accessible
if not os.path.exists(temp_file_path):
raise FileNotFoundError(f"Downloaded file does not exist: {temp_file_path}")

# Sign the temporary file in-place
log.info(
f"Calling signing service with file {temp_file_path} and fingerprint {self.fingerprint}"
)
self.signing_service.sign(temp_file_path, pubkey_fingerprint=self.fingerprint)

# Create a new artifact from the signed file (this handles all checksums automatically)
new_artifact = Artifact.init_and_validate(temp_file_path)

# Update the DeclarativeArtifact to use the new artifact
d_artifact.artifact = new_artifact
d_content.content.size_package = new_artifact.size
d_content.content.pkgId = getattr(new_artifact, d_content.content.checksum_type)
13 changes: 13 additions & 0 deletions pulp_rpm/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ def sync(self, request, pk):
if not sync_policy:
sync_policy = SYNC_POLICIES.ADDITIVE if not mirror else SYNC_POLICIES.MIRROR_COMPLETE

# Check for incompatible combinations of signing service and remote or sync policies
if repository.package_signing_service and repository.package_signing_fingerprint:
if remote.policy == remote.ON_DEMAND:
raise DRFValidationError(
"Cannot use 'on_demand' remote policy when repository has a package signing "
"service."
)
if sync_policy == SYNC_POLICIES.MIRROR_COMPLETE:
raise DRFValidationError(
"Cannot use 'mirror_complete' sync policy when repository has a package "
"signing service."
)

# validate some invariants that involve repository-wide settings.
if sync_policy in (SYNC_POLICIES.MIRROR_COMPLETE, SYNC_POLICIES.MIRROR_CONTENT_ONLY):
err_msg = (
Expand Down
Loading