-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix(machinery): limit allowed URLs #18684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,8 +15,9 @@ | |
| from html import escape, unescape | ||
| from itertools import chain | ||
| from typing import TYPE_CHECKING, ClassVar | ||
| from urllib.parse import quote | ||
| from urllib.parse import quote, urlparse | ||
|
|
||
| from django.conf import settings | ||
| from django.core.cache import cache | ||
| from django.core.exceptions import ValidationError | ||
| from django.utils.functional import cached_property | ||
|
|
@@ -28,8 +29,10 @@ | |
| from weblate.machinery.forms import BaseMachineryForm | ||
| from weblate.utils.docs import DocVersionsMixin | ||
| from weblate.utils.errors import report_error | ||
| from weblate.utils.forms import WeblateServiceURLField | ||
| from weblate.utils.hash import calculate_dict_hash, calculate_hash, hash_to_checksum | ||
| from weblate.utils.requests import http_request | ||
| from weblate.utils.outbound import is_allowlisted_hostname | ||
| from weblate.utils.requests import http_request, validate_request_url | ||
| from weblate.utils.similarity import Comparer | ||
| from weblate.utils.site import get_site_url | ||
|
|
||
|
|
@@ -108,20 +111,21 @@ | |
|
|
||
| validate_source_language = "en" | ||
| validate_target_language = "de" | ||
| trusted_error_hosts: ClassVar[set[str]] = set() | ||
|
|
||
| @classmethod | ||
| def get_rank(cls): | ||
| return cls.max_score + cls.rank_boost | ||
|
|
||
| def __init__(self, settings: SettingsDict) -> None: | ||
| def __init__(self, configuration: SettingsDict) -> None: | ||
| """Create new machine translation object.""" | ||
| self.mtid = self.get_identifier() | ||
| self.rate_limit_cache = f"{self.mtid}-rate-limit" | ||
| self.languages_cache = f"{self.mtid}-languages" | ||
| self.comparer = Comparer() | ||
| self.supported_languages_error: Exception | None = None | ||
| self.supported_languages_error_age: float = 0 | ||
| self.settings = settings | ||
| self.settings = configuration | ||
|
|
||
| def delete_cache(self) -> None: | ||
| cache.delete_many([self.rate_limit_cache, self.languages_cache]) | ||
|
|
@@ -187,29 +191,117 @@ | |
| try: | ||
| response.raise_for_status() | ||
| except HTTPError as error: | ||
| detail = response.text | ||
| try: | ||
| payload = response.json() | ||
| except JSONDecodeError: | ||
| pass | ||
| else: | ||
| if isinstance(payload, dict) and payload: | ||
| if detail_error := payload.get("error"): | ||
| if isinstance(detail_error, str): | ||
| detail = detail_error | ||
| elif isinstance(detail_error, dict): | ||
| if "message" in detail_error: | ||
| detail = detail_error["message"] | ||
| else: | ||
| detail = str(detail_error) | ||
| else: | ||
| detail = str(payload) | ||
|
|
||
| if detail: | ||
| if detail := self.get_error_detail(response): | ||
| message = f"{error.args[0]}: {detail[:200]}" | ||
| raise HTTPError(message, response=response) from error | ||
| raise | ||
|
|
||
| @property | ||
| def allow_private_targets(self) -> bool: | ||
| return "_project" not in self.settings | ||
|
|
||
| def validate_runtime_url(self, url: str) -> None: | ||
| validate_request_url( | ||
| url, | ||
| allow_private_targets=self.allow_private_targets, | ||
| allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def get_host_from_setting(value: object) -> str | None: | ||
| if not isinstance(value, str): | ||
| return None | ||
| if "://" in value: | ||
| return urlparse(value).hostname | ||
| return value or None | ||
|
|
||
| def get_trusted_error_hosts(self) -> set[str]: | ||
| hosts = set(settings.ALLOWED_MACHINERY_DOMAINS) | ||
| hosts.update(self.trusted_error_hosts) | ||
| if self.allow_private_targets or self.settings_form is None: | ||
| return hosts | ||
|
|
||
| form = self.settings_form(self.__class__) | ||
| for field_name, field in form.fields.items(): | ||
| is_endpoint_field = isinstance(field, WeblateServiceURLField) or ( | ||
| field_name in form.network_host_fields | ||
| ) | ||
| if not is_endpoint_field: | ||
| continue | ||
|
|
||
| values: set[str] = set() | ||
| if initial := getattr(field, "initial", None): | ||
| values.add(initial) | ||
| values.update(value for value, _label in getattr(field, "choices", ())) | ||
|
|
||
| current_value = self.settings.get(field_name) | ||
| if current_value in values and ( | ||
| host := self.get_host_from_setting(current_value) | ||
| ): | ||
| hosts.add(host) | ||
|
|
||
|
nijel marked this conversation as resolved.
|
||
| for value in values: | ||
| if host := self.get_host_from_setting(value): | ||
| hosts.add(host) | ||
| return hosts | ||
|
|
||
| @classmethod | ||
| def has_configurable_outbound_target(cls) -> bool: | ||
| if cls.settings_form is None: | ||
| return False | ||
|
|
||
| form = cls.settings_form(cls) | ||
| for field_name, field in form.fields.items(): | ||
| if isinstance(field, WeblateServiceURLField): | ||
| return True | ||
| if field_name in form.network_host_fields: | ||
| return True | ||
| return False | ||
|
|
||
| def can_display_error_detail(self, response: Response) -> bool: | ||
| if self.allow_private_targets: | ||
| return True | ||
| return self.is_trusted_error_host(response) | ||
|
|
||
| def is_trusted_error_host(self, response: Response) -> bool: | ||
| if ( | ||
| self.settings_form is not None | ||
| and not self.has_configurable_outbound_target() | ||
| ): | ||
| return True | ||
| hostname = urlparse(response.url).hostname or "" | ||
| return is_allowlisted_hostname(hostname, list(self.get_trusted_error_hosts())) | ||
|
|
||
| def get_error_detail(self, response: Response) -> str | None: | ||
| if not self.can_display_error_detail(response): | ||
| return None | ||
| trusted_host = self.is_trusted_error_host(response) | ||
|
|
||
| try: | ||
| payload = response.json() | ||
| except JSONDecodeError: | ||
| if trusted_host: | ||
| return response.text or None | ||
| return None | ||
|
|
||
|
nijel marked this conversation as resolved.
|
||
| if isinstance(payload, dict): | ||
| if (message := payload.get("message")) and isinstance(message, str): | ||
| return message | ||
| if (detail := payload.get("detail")) and isinstance(detail, str): | ||
| return detail | ||
| if error := payload.get("error"): | ||
| if isinstance(error, str): | ||
| return error | ||
| if isinstance(error, dict): | ||
| detail_message = error.get("message") | ||
| if isinstance(detail_message, str): | ||
| return detail_message | ||
| elif isinstance(payload, str) and trusted_host: | ||
| return payload | ||
| if trusted_host: | ||
| return response.text or None | ||
| return None | ||
|
|
||
|
nijel marked this conversation as resolved.
|
||
| def request(self, method, url, skip_auth=False, **kwargs): | ||
| """Perform JSON request.""" | ||
| # Create custom headers | ||
|
|
@@ -231,6 +323,9 @@ | |
| timeout=self.request_timeout, | ||
| auth=self.get_auth(), | ||
| raise_for_status=False, | ||
| validate_url=not self.allow_private_targets, | ||
| allow_private_targets=self.allow_private_targets, | ||
| allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS, | ||
| **kwargs, | ||
| ) | ||
|
|
||
|
|
@@ -638,9 +733,12 @@ | |
| return output | ||
|
|
||
| def get_error_message(self, exc: Exception) -> str: | ||
| if isinstance(exc, RequestException) and exc.response and exc.response.text: | ||
| return f"{exc.__class__.__name__}: {exc}: {exc.response.text}" | ||
| return f"{exc.__class__.__name__}: {exc}" | ||
| message = f"{exc.__class__.__name__}: {exc}" | ||
| if isinstance(exc, RequestException) and exc.response: | ||
| detail = self.get_error_detail(exc.response) | ||
| if detail and detail not in str(exc): | ||
| return f"{message}: {detail}" | ||
|
Comment on lines
735
to
+740
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new sanitization here only helps machinery classes that use the base implementation, but project-scoped backends can bypass it by overriding Useful? React with 👍 / 👎. |
||
| return message | ||
|
|
||
| def signed_salt(self, appid, secret, text): | ||
| """Generate salt and sign as used by Chinese services.""" | ||
|
|
@@ -697,7 +795,7 @@ | |
| continue | ||
| translation_lists = [translations[text] for text in unit.plural_map] | ||
| plural_count = len(translation_lists) | ||
| translation = result.setdefault("translation", [""] * plural_count) | ||
| quality = result.setdefault("quality", [0] * plural_count) | ||
| origin = result.setdefault("origin", [None] * plural_count) | ||
| for plural, possible_translations in enumerate(translation_lists): | ||
|
|
@@ -705,7 +803,7 @@ | |
| if quality[plural] > item["quality"]: | ||
| continue | ||
| quality[plural] = item["quality"] | ||
| translation[plural] = item["text"] | ||
| origin[plural] = self | ||
|
|
||
| @cached_property | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.