From d22fa5833cbd6789988233827ed804314b8d51a7 Mon Sep 17 00:00:00 2001 From: Xpirix Date: Thu, 12 Mar 2026 15:55:15 +0300 Subject: [PATCH 1/7] Make the Security Scanning as a blocking validator for critical bandit and secrects issues --- qgis-app/plugins/api.py | 14 +- .../0020_pluginversion_validation_status.py | 18 + ...er_pluginversionsecurityscan_scanned_on.py | 20 + qgis-app/plugins/models.py | 42 +- qgis-app/plugins/security_utils.py | 3 + qgis-app/plugins/tasks/__init__.py | 1 + qgis-app/plugins/tasks/run_security_scan.py | 328 +++++++++ .../templates/plugins/plugin_detail.html | 19 +- .../templates/plugins/version_detail.html | 101 ++- .../plugins/tests/test_plugin_create_empty.py | 2 +- qgis-app/plugins/tests/test_plugin_upload.py | 1 + .../plugins/tests/test_security_scanner.py | 2 +- .../plugins/tests/test_security_validator.py | 649 ++++++++++++++++++ qgis-app/plugins/urls.py | 6 + qgis-app/plugins/views.py | 282 +++++--- .../flatpages/docs_security_scanning.html | 255 ++++--- 16 files changed, 1526 insertions(+), 217 deletions(-) create mode 100644 qgis-app/plugins/migrations/0020_pluginversion_validation_status.py create mode 100644 qgis-app/plugins/migrations/0021_alter_pluginversionsecurityscan_scanned_on.py create mode 100644 qgis-app/plugins/tasks/run_security_scan.py create mode 100644 qgis-app/plugins/tests/test_security_validator.py diff --git a/qgis-app/plugins/api.py b/qgis-app/plugins/api.py index 65e8e63e..75034bb5 100644 --- a/qgis-app/plugins/api.py +++ b/qgis-app/plugins/api.py @@ -15,7 +15,7 @@ from django.utils.translation import gettext_lazy as _ from plugins.models import * from plugins.validator import validator -from plugins.views import plugin_notify +from plugins.views import plugin_notify, send_upload_confirmation_email from rpc4django import rpcmethod from taggit.models import Tag @@ -120,7 +120,10 @@ def plugin_upload(package, **kwargs): package.len, "UTF-8", ), - "approved": request.user.has_perm("plugins.can_approve") or plugin.approved, + # Always start unapproved; async security checks will auto-approve + # trusted users after validation completes. + "approved": False, + "validation_status": VALIDATION_STATUS_VALIDATING, } # Optional version metadata @@ -133,6 +136,13 @@ def plugin_upload(package, **kwargs): new_version = PluginVersion(**version_data) new_version.clean() new_version.save() + + # Send Stage 1 upload confirmation email + send_upload_confirmation_email(new_version) + + # Queue async security scan task + from plugins.tasks.run_security_scan import run_security_scan_task + run_security_scan_task.delay(new_version.pk) except IntegrityError as e: # Avoids error: current transaction is aborted, commands ignored until # end of transaction block diff --git a/qgis-app/plugins/migrations/0020_pluginversion_validation_status.py b/qgis-app/plugins/migrations/0020_pluginversion_validation_status.py new file mode 100644 index 00000000..86192b2f --- /dev/null +++ b/qgis-app/plugins/migrations/0020_pluginversion_validation_status.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.28 on 2026-03-12 02:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('plugins', '0019_remove_pluginversion_supports_qt6'), + ] + + operations = [ + migrations.AddField( + model_name='pluginversion', + name='validation_status', + field=models.CharField(choices=[('pending', 'Pending'), ('validating', 'Validating'), ('validated', 'Validated'), ('blocked', 'Blocked')], db_index=True, default='pending', help_text="Validation status based on security and quality checks. New uploads start as 'validating' until checks complete. 'blocked' means critical issues were found and the version is not available for approval or download.", max_length=20, verbose_name='Validation status'), + ), + ] diff --git a/qgis-app/plugins/migrations/0021_alter_pluginversionsecurityscan_scanned_on.py b/qgis-app/plugins/migrations/0021_alter_pluginversionsecurityscan_scanned_on.py new file mode 100644 index 00000000..a5fe0e75 --- /dev/null +++ b/qgis-app/plugins/migrations/0021_alter_pluginversionsecurityscan_scanned_on.py @@ -0,0 +1,20 @@ +import django.utils.timezone +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("plugins", "0020_pluginversion_validation_status"), + ] + + operations = [ + migrations.AlterField( + model_name="pluginversionsecurityscan", + name="scanned_on", + field=models.DateTimeField( + default=django.utils.timezone.now, + verbose_name="Scanned on", + ), + ), + ] diff --git a/qgis-app/plugins/models.py b/qgis-app/plugins/models.py index 08c0fe5d..e323280f 100644 --- a/qgis-app/plugins/models.py +++ b/qgis-app/plugins/models.py @@ -786,6 +786,19 @@ def get_queryset(self): ) +VALIDATION_STATUS_PENDING = "pending" +VALIDATION_STATUS_VALIDATING = "validating" +VALIDATION_STATUS_VALIDATED = "validated" +VALIDATION_STATUS_BLOCKED = "blocked" + +VALIDATION_STATUS_CHOICES = [ + (VALIDATION_STATUS_PENDING, _("Pending")), + (VALIDATION_STATUS_VALIDATING, _("Validating")), + (VALIDATION_STATUS_VALIDATED, _("Validated")), + (VALIDATION_STATUS_BLOCKED, _("Blocked")), +] + + def vjust(str, level=3, delim=".", bitsize=3, fillchar=" ", force_zero=False): """ Normalize a dotted version string. @@ -934,6 +947,22 @@ class PluginVersion(models.Model): null=True, ) is_from_token = models.BooleanField(_("Is uploaded using token"), default=False) + + # Validation status for security and QA checks + validation_status = models.CharField( + _("Validation status"), + max_length=20, + choices=VALIDATION_STATUS_CHOICES, + default=VALIDATION_STATUS_PENDING, + db_index=True, + help_text=_( + "Validation status based on security and quality checks. " + "New uploads start as 'validating' until checks complete. " + "'blocked' means critical issues were found and the version " + "is not available for approval or download." + ), + ) + # Link to the token if upload is using token token = models.ForeignKey( PluginOutstandingToken, @@ -949,6 +978,17 @@ class PluginVersion(models.Model): stable_objects = StablePluginVersions() experimental_objects = ExperimentalPluginVersions() + @property + def is_available(self): + """ + Returns True if the version is available for download/approval + (not blocked by security checks). + """ + return self.validation_status not in [ + VALIDATION_STATUS_VALIDATING, + VALIDATION_STATUS_BLOCKED, + ] + @property def file_name(self): return os.path.basename(self.package.file.name) @@ -1185,7 +1225,7 @@ class PluginVersionSecurityScan(models.Model): PluginVersion, on_delete=models.CASCADE, related_name="security_scan" ) scanned_on = models.DateTimeField( - _("Scanned on"), auto_now_add=True, editable=False + _("Scanned on"), default=timezone.now, editable=False ) # Summary statistics diff --git a/qgis-app/plugins/security_utils.py b/qgis-app/plugins/security_utils.py index 66502ac3..a52fa9c2 100644 --- a/qgis-app/plugins/security_utils.py +++ b/qgis-app/plugins/security_utils.py @@ -21,6 +21,8 @@ def run_security_scan(plugin_version): PluginVersionSecurityScan instance or None if scan fails """ try: + from django.utils import timezone + # Get the package file path package_path = plugin_version.package.path @@ -32,6 +34,7 @@ def run_security_scan(plugin_version): security_scan, created = PluginVersionSecurityScan.objects.update_or_create( plugin_version=plugin_version, defaults={ + "scanned_on": timezone.now(), "total_checks": report["summary"]["total_checks"], "passed_checks": report["summary"]["passed"], "warning_count": report["summary"]["warnings"], diff --git a/qgis-app/plugins/tasks/__init__.py b/qgis-app/plugins/tasks/__init__.py index a3a8ec25..1c950337 100644 --- a/qgis-app/plugins/tasks/__init__.py +++ b/qgis-app/plugins/tasks/__init__.py @@ -2,3 +2,4 @@ from plugins.tasks.update_qgis_versions import * # noqa from plugins.tasks.rebuild_search_index import * # noqa from plugins.tasks.get_sustaining_members import * # noqa +from plugins.tasks.run_security_scan import * # noqa diff --git a/qgis-app/plugins/tasks/run_security_scan.py b/qgis-app/plugins/tasks/run_security_scan.py new file mode 100644 index 00000000..446b6661 --- /dev/null +++ b/qgis-app/plugins/tasks/run_security_scan.py @@ -0,0 +1,328 @@ +""" +Asynchronous Celery task for running security and QA checks on plugin versions. + +This task is queued immediately after a plugin is uploaded and processes all +security checks (Bandit, Secrets Detection, Code Quality, File Permissions, +Suspicious Files) in the background. + +Upload flow: +1. Plugin uploaded → validation_status='validating', approved=False +2. This task is queued +3. All checks run asynchronously +4. Results stored, validation_status updated: + - 'validated': no critical issues → available for approval/auto-approval + - 'blocked': critical issues found → unavailable until re-upload +5. Email sent to maintainer(s) with results +""" + +import logging + +from celery import shared_task +from celery.utils.log import get_task_logger +from django.conf import settings +from django.contrib.sites.models import Site +from django.utils.translation import gettext_lazy as _ + +logger = get_task_logger(__name__) + + +@shared_task(bind=True, max_retries=3, default_retry_delay=60) +def run_security_scan_task(self, plugin_version_pk, is_manual=False): + """ + Run security scan on a plugin version asynchronously. + + Args: + plugin_version_pk: Primary key of the PluginVersion to scan + is_manual: If True, this is a manual re-scan on an existing version. + Manual scans are informational only and do not change + the plugin's approval or validation status. + """ + from plugins.models import ( + VALIDATION_STATUS_BLOCKED, + VALIDATION_STATUS_VALIDATED, + PluginVersion, + ) + from plugins.security_utils import run_security_scan + + try: + plugin_version = PluginVersion.objects.select_related( + "plugin", "created_by" + ).get(pk=plugin_version_pk) + except PluginVersion.DoesNotExist: + logger.error(f"PluginVersion {plugin_version_pk} not found, aborting scan") + return + + plugin = plugin_version.plugin + logger.info( + f"Starting security scan for {plugin.package_name} v{plugin_version.version} " + f"(manual={is_manual})" + ) + + # Run the security scan (synchronous scan running in the worker) + security_scan = run_security_scan(plugin_version) + + if security_scan is None: + # Scan failed to run — treat as validated to avoid blocking on tool errors + logger.warning( + f"Security scan failed for {plugin.package_name} v{plugin_version.version}, " + "treating as validated" + ) + if not is_manual: + plugin_version.validation_status = VALIDATION_STATUS_VALIDATED + _maybe_auto_approve(plugin_version) + plugin_version.save() + _send_validation_results_email(plugin_version, security_scan=None) + if not plugin_version.approved: + _notify_staff_for_review(plugin_version) + return + + # Determine blocking status: Bandit and Secrets Detection are blocking + has_critical_issues = security_scan.critical_count > 0 + + if is_manual: + # Manual scans are informational only — never change status or approval + logger.info( + f"Manual scan complete for {plugin.package_name} v{plugin_version.version}: " + f"critical={security_scan.critical_count}" + ) + return + + if has_critical_issues: + plugin_version.validation_status = VALIDATION_STATUS_BLOCKED + logger.warning( + f"Plugin {plugin.package_name} v{plugin_version.version} BLOCKED: " + f"{security_scan.critical_count} critical issue(s) found" + ) + else: + plugin_version.validation_status = VALIDATION_STATUS_VALIDATED + _maybe_auto_approve(plugin_version) + logger.info( + f"Plugin {plugin.package_name} v{plugin_version.version} validated successfully" + ) + + plugin_version.save() + + # Send validation results email to maintainer(s) + _send_validation_results_email(plugin_version, security_scan) + + # Notify staff approvers when a non-trusted plugin is ready for review + if ( + plugin_version.validation_status == VALIDATION_STATUS_VALIDATED + and not plugin_version.approved + ): + _notify_staff_for_review(plugin_version) + + +def _maybe_auto_approve(plugin_version): + """ + Auto-approve the version if the uploader is trusted or the plugin + already has at least one approved version. + """ + created_by = plugin_version.created_by + plugin = plugin_version.plugin + + if created_by and ( + created_by.has_perm("plugins.can_approve") or plugin.approved + ): + plugin_version.approved = True + logger.info( + f"Auto-approving {plugin.package_name} v{plugin_version.version} " + f"(trusted={created_by.has_perm('plugins.can_approve')}, " + f"plugin_approved={plugin.approved})" + ) + + +def _send_validation_results_email(plugin_version, security_scan): + """ + Send Stage 2 email: validation results to the plugin maintainer(s). + """ + if getattr(settings, "DEBUG", False): + logger.debug("Validation results email not sent (DEBUG=True)") + return + + from django.core.mail import send_mail + + plugin = plugin_version.plugin + recipients = [u.email for u in plugin.editors if u.email] + if not recipients: + logger.warning( + f"No recipients found for validation results email for {plugin.package_name}" + ) + return + + domain = Site.objects.get_current().domain + mail_from = settings.DEFAULT_FROM_EMAIL + version_url = f"https://{domain}{plugin_version.get_absolute_url()}" + security_url = f"{version_url}#security-tab" + docs_url = f"https://{domain}/docs/security-scanning" + + if security_scan is None: + subject = _("Plugin Validation Results: %(name)s v%(version)s") % { + "name": plugin.name, + "version": plugin_version.version, + } + message = _( + "Plugin validation for %(name)s v%(version)s completed.\n\n" + "The security scan could not be completed due to a tool error, " + "but your plugin has been made available for approval.\n\n" + "Plugin details: %(url)s\n" + ) % { + "name": plugin.name, + "version": plugin_version.version, + "url": version_url, + } + elif plugin_version.validation_status == "validated": + auto_approved = plugin_version.approved + subject = _("Plugin Validation Results: %(name)s v%(version)s - All Checks Passed") % { + "name": plugin.name, + "version": plugin_version.version, + } + if auto_approved: + status_msg = _( + "Your plugin has been automatically approved and is now available for download." + ) + else: + status_msg = _( + "Your plugin is ready for review by an approver." + ) + message = _( + "Good news! All security and quality checks passed for your plugin.\n\n" + "Plugin: %(name)s\n" + "Version: %(version)s\n" + "Status: %(status)s\n\n" + "Summary:\n" + " - Checks passed: %(passed)s / %(total)s\n" + " - Files scanned: %(files)s\n\n" + "View detailed results: %(security_url)s\n" + ) % { + "name": plugin.name, + "version": plugin_version.version, + "status": status_msg, + "passed": security_scan.passed_checks, + "total": security_scan.total_checks, + "files": security_scan.files_scanned, + "security_url": security_url, + } + else: + # blocked + subject = _("Plugin Validation Results: %(name)s v%(version)s - Critical Issues Found") % { + "name": plugin.name, + "version": plugin_version.version, + } + # Build critical issues details + critical_details = _build_critical_issues_text(security_scan) + message = _( + "Critical security issues were found in your plugin and it has been BLOCKED.\n\n" + "Plugin: %(name)s\n" + "Version: %(version)s\n" + "Status: BLOCKED - Not available for approval or download until issues are resolved\n\n" + "Critical issues found:\n" + "%(critical_details)s\n" + "To resolve this:\n" + "1. Fix the critical issues listed above\n" + "2. Upload a new version of your plugin\n" + "3. The new version will be automatically re-scanned\n\n" + "View detailed scan results: %(security_url)s\n" + "Security best practices: %(docs_url)s\n" + ) % { + "name": plugin.name, + "version": plugin_version.version, + "critical_details": critical_details, + "security_url": security_url, + "docs_url": docs_url, + } + + try: + send_mail( + str(subject), + str(message), + mail_from, + recipients, + fail_silently=True, + ) + logger.info( + f"Validation results email sent for {plugin.package_name} v{plugin_version.version} " + f"to {recipients}" + ) + except Exception as e: + logger.error(f"Failed to send validation results email: {e}") + + +def _build_critical_issues_text(security_scan): + """Build a text summary of critical issues from the scan report.""" + lines = [] + for check in security_scan.scan_report.get("checks", []): + if not check.get("passed") and check.get("severity") == "critical": + lines.append(f"\n[{check['name']}] - {check['issues_found']} issue(s)") + for detail in check.get("details", [])[:5]: + file_info = detail.get("file", "N/A") + line_info = detail.get("line", "") + msg = detail.get("message", "") + if line_info: + lines.append(f" - {file_info}:{line_info}: {msg}") + else: + lines.append(f" - {file_info}: {msg}") + return "\n".join(lines) if lines else _("No details available.") + + +def _notify_staff_for_review(plugin_version): + """ + Notify staff approvers that a plugin is ready for review + (equivalent to version_notify but triggered from the task). + """ + if getattr(settings, "DEBUG", False): + return + + from django.core.mail import send_mail + + plugin = plugin_version.plugin + domain = Site.objects.get_current().domain + mail_from = settings.DEFAULT_FROM_EMAIL + + notification_group = getattr( + settings, "NOTIFICATION_RECIPIENTS_GROUP_NAME", "Plugin Notification Recipients" + ) + from django.contrib.auth.models import User + + recipients = [ + u.email + for u in User.objects.filter( + groups__name=notification_group, + is_staff=True, + email__isnull=False, + ).exclude(email="") + ] + + if not recipients: + logger.warning( + f"No staff recipients for review notification of {plugin.package_name}" + ) + return + + try: + send_mail( + str( + _("A new plugin version is ready for review: %(plugin)s v%(version)s") + % {"plugin": plugin.name, "version": plugin_version.version} + ), + str( + _( + "Plugin %(name)s version %(version)s has passed all security checks " + "and is ready for approval.\n\n" + "Uploaded by: %(user)s\n" + "Link: http://%(domain)s%(url)s\n" + ) + % { + "name": plugin.name, + "version": plugin_version.version, + "user": plugin_version.created_by, + "domain": domain, + "url": plugin_version.get_absolute_url(), + } + ), + mail_from, + recipients, + fail_silently=True, + ) + except Exception as e: + logger.error(f"Failed to send staff review notification: {e}") diff --git a/qgis-app/plugins/templates/plugins/plugin_detail.html b/qgis-app/plugins/templates/plugins/plugin_detail.html index 3dd8feb4..ea3fcf9a 100644 --- a/qgis-app/plugins/templates/plugins/plugin_detail.html +++ b/qgis-app/plugins/templates/plugins/plugin_detail.html @@ -581,9 +581,13 @@

{% if not user.is_anonymous %} {% if version.approved %} - + + {% elif version.validation_status == 'validating' %} + + {% elif version.validation_status == 'blocked' %} + {% else %} - + {% endif %} {% endif %} @@ -609,6 +613,16 @@

{% if user.is_staff or user in version.plugin.approvers %} {% if not version.approved %} + {% if not version.is_available %} + + {% else %}
{% csrf_token %}
+ {% endif %} {% else %}
{% csrf_token %} diff --git a/qgis-app/plugins/templates/plugins/version_detail.html b/qgis-app/plugins/templates/plugins/version_detail.html index 94a5b479..8bf27003 100644 --- a/qgis-app/plugins/templates/plugins/version_detail.html +++ b/qgis-app/plugins/templates/plugins/version_detail.html @@ -52,12 +52,47 @@

{% trans "Version" %}: {{ version }}

+ {% if not version.approved and version.validation_status == 'validating' %} + + {% elif not version.approved and version.validation_status == 'blocked' %} + + {% else %} {% trans "Download" %} + {% endif %}
+ + {% if not version.approved and version.validation_status == 'validating' %} +
+

{% trans "Validation in progress" %}

+

{% trans "Security and quality checks are running asynchronously. This plugin version is not yet available for approval or download. You will receive an email with the results shortly." %}

+

{% trans "Learn more about security scanning" %}

+
+ {% elif not version.approved and version.validation_status == 'blocked' %} +
+

{% trans "Plugin Blocked" %}

+

{% trans "This plugin version has been blocked because critical security issues were found during validation. It is not available for approval or download until the issues are resolved." %}

+ {% if user.is_staff or user in version.plugin.editors %} +

{% trans "To resolve this:" %}

+
    +
  1. {% trans "Fix the critical issues listed in the Security Scan tab below" %}
  2. +
  3. {% trans "Upload a new version of your plugin" %}
  4. +
  5. {% trans "The new version will be automatically re-scanned" %}
  6. +
+

{% trans "Learn more about fixing security issues" %}

+ {% endif %} +
+ {% endif %} + {% if not version.created_by.is_active and not version.is_from_token %}
{% trans "The plugin author has been blocked." %} @@ -70,7 +105,11 @@

{% trans "Version" %}: {{ version }}

{% if user.is_staff or user in version.plugin.editors %}
  • {% trans "Security Scan" %} - {% if security_scan %} + {% if version.validation_status == 'validating' %} + {% trans "Validating" %} + {% elif version.validation_status == 'blocked' %} + {% trans "Blocked" %} + {% elif security_scan %} {% if security_scan.overall_status == 'passed' %}✓{% elif security_scan.overall_status == 'critical' %}!{% else %}ℹ{% endif %} @@ -162,7 +201,13 @@

    {% trans "Version" %}: {{ version }}

    {% if user.is_staff or user in version.plugin.editors %}
    - {% if security_scan %} + {% if version.validation_status == 'validating' %} +
    +

    {% trans "Security validation in progress" %}

    +

    {% trans "Security and quality checks are currently running. This tab will show the results once complete. Refresh the page in a few moments." %}

    +
    + + {% elif security_scan %}
    + + + {% csrf_token %} + + +

    + {% trans "Manual scans on existing versions are informational only and will not change the plugin's approval status." %} +

    + {% endif %} + + {% if security_scan and version.validation_status not in 'validating' %} + +
    +
    + {% csrf_token %} + +
    +

    + {% trans "Re-running a scan on an existing version is informational only and will not change the plugin's status." %} +

    +
    {% endif %} +
    {% endif %} @@ -384,10 +461,24 @@

    {% trans "Version management"%}

    {% trans "Version approval"%}

    {% csrf_token %} {% if not version.approved %} + {% if not version.is_available %} + +

    + {% if version.validation_status == 'blocked' %} + {% trans "Blocked: critical security issues must be resolved first" %} + {% else %} + {% trans "Validation in progress: approval not available yet" %} + {% endif %} +

    + {% else %} + {% endif %} {% else %}