diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f14cfa86..a88d99f76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- debug: dump auth traffic in case of a failure [#1176](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1176) - fix: transfer chunk issues terminate with error [#1202](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1202) ## 1.29.2 (2025-07-21) diff --git a/src/foundation/__init__.py b/src/foundation/__init__.py index 5c9f5ef0a..eeae92e0b 100644 --- a/src/foundation/__init__.py +++ b/src/foundation/__init__.py @@ -1,9 +1,13 @@ import datetime -from typing import Callable, NamedTuple, TypeVar +from functools import partial +from operator import is_, not_ +from typing import Callable, Iterable, Mapping, NamedTuple, Tuple, TypeVar import pytz from tzlocal import get_localzone +from foundation.core import compose, fst, snd + class VersionInfo(NamedTuple): version: str @@ -50,3 +54,31 @@ def _internal(input: T_in) -> T_out: raise ValueError(f"Invalid Input ({caption}): {input!r}") from err return _internal + + +# def is_none(input: T_in | None) -> bool: +# return input is None + +is_none = partial(is_, None) + + +is_not_none = compose(not_, is_none) + +empty_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[Tuple[T_in, T_out]]] = partial( + filter, compose(is_none, snd) +) +keys_from_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = partial(map, fst) +keys_for_empty_values: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = compose( + keys_from_pairs, empty_pairs +) + +non_empty_pairs: Callable[[Iterable[Tuple[T_in, T_out | None]]], Iterable[Tuple[T_in, T_out]]] = ( + partial(filter, compose(is_not_none, snd)) +) + + +def flat_dict(input: Iterable[Mapping[T_in, T_out]]) -> Mapping[T_in, T_out]: + flattened_dict: dict[T_in, T_out] = {} + for d in input: + flattened_dict.update(d) + return flattened_dict diff --git a/src/foundation/http.py b/src/foundation/http.py new file mode 100644 index 000000000..48523b1e0 --- /dev/null +++ b/src/foundation/http.py @@ -0,0 +1,88 @@ +import json +from functools import partial +from http.cookiejar import Cookie +from http.cookies import SimpleCookie +from operator import eq, not_ +from typing import Any, Callable, Iterable, Mapping, Tuple + +from requests import PreparedRequest, Response + +from foundation import flat_dict, non_empty_pairs +from foundation.core import compose, fst, snd + + +def cookie_to_pair(cookie: Cookie) -> Tuple[str, str | None]: + return ( + cookie.name, + cookie.value, + ) + + +jar_to_pairs: Callable[[Iterable[Cookie]], Iterable[Tuple[str, str]]] = compose( + non_empty_pairs, partial(map, cookie_to_pair) +) + + +def cookie_str_to_dict(cookie_header: str) -> Mapping[str, str]: + """replace cookie header with dict object""" + simple_cookie = SimpleCookie() + simple_cookie.load(cookie_header) + cookies = {k: v.value for k, v in simple_cookie.items()} + return cookies + + +def response_body(response: Response) -> Any: + try: + return response.json() + except Exception: + return response.text + + +def request_body(request: PreparedRequest) -> Any: + if request.body is not None: + try: + return json.loads(request.body) # ignore: type + except Exception: + pass + return request.body + + +def response_to_har_entry(response: Response) -> Mapping[str, Any]: + # headers + # converted_pairs = map(xxx) + + # cookies from request header + is_request_cookie = partial(eq, "Cookie") + is_not_request_cookie = compose(not_, partial(eq, "Cookie")) + cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = partial( + filter, compose(is_request_cookie, fst) + ) + not_request_cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = ( + partial(filter, compose(is_not_request_cookie, fst)) + ) + cookie_header_strings: Callable[[Iterable[Tuple[str, str]]], Iterable[str]] = compose( + partial(map, snd), cookie_headers + ) + cookie_maps = compose(partial(map, cookie_str_to_dict), cookie_header_strings) + extract_response_request_cookies = compose(flat_dict, cookie_maps) + + is_not_response_cookie = compose(not_, partial(eq, "Set-Cookie")) + not_response_cookie_headers: Callable[ + [Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]] + ] = partial(filter, compose(is_not_response_cookie, fst)) + + return { + "request": { + "method": response.request.method, + "url": response.request.url, + "headers": dict(not_request_cookie_headers(response.request.headers.items())), + "cookies": extract_response_request_cookies(response.request.headers.items()), + "content": request_body(response.request), + }, + "response": { + "status_code": response.status_code, + "headers": dict(not_response_cookie_headers(response.headers.items())), + "cookies": dict(jar_to_pairs(response.cookies)), + "content": response_body(response), + }, + } diff --git a/src/foundation/json.py b/src/foundation/json.py new file mode 100644 index 000000000..5ce859873 --- /dev/null +++ b/src/foundation/json.py @@ -0,0 +1,142 @@ +import re +from functools import partial, singledispatch +from operator import is_, not_ +from typing import Any, Callable, Iterable, Mapping, Sequence, Tuple, TypeVar + +from foundation import non_empty_pairs +from foundation.core import compose, flip, fst + +T1 = TypeVar("T1") +T2 = TypeVar("T2") + +# def extract_context(context: Context[T1], pair: Tuple[T1, T2]) -> Tuple[Context[T1], T2]: +# return (context + [pair[0]], pair[1]) + +Context = str + + +def extract_context(context: Context, pair: Tuple[str, T1]) -> Tuple[Context, T1]: + new_context = context + ("." if context and not context.endswith(".") else "") + pair[0] + return (new_context, pair[1]) + + +Rule = Tuple[re.Pattern[str], Callable[[str], str | None]] + + +def first(input: Iterable[T1]) -> T1 | StopIteration: + for item in input: + return item + return StopIteration() + + +def first_or_default(input: Iterable[T1], default: T2) -> T1 | T2: + for item in input: + return item + return default + + +first_or_none: Callable[[Iterable[T1]], T1 | None] = partial(flip(first_or_default), None) + +is_none = partial(is_, None) + +is_not_none = compose(not_, is_none) + + +def first_matching_rule(context: Context, rules: Sequence[Rule]) -> Rule | None: + match_with_context = partial(flip(re.search), context) + match_on_key_of_pair = compose(match_with_context, fst) + is_matching_pair = compose(is_not_none, match_on_key_of_pair) + filter_pairs: Callable[[Iterable[Rule]], Iterable[Rule]] = partial(filter, is_matching_pair) + first_matching = compose(first_or_none, filter_pairs) + first_found = first_matching(rules) + return first_found + + +@singledispatch +def _apply_rules_internal(input: Any, context: Context, rules: Sequence[Rule]) -> Any: + # print(f"ANY {context} {input}") + return input + + +@_apply_rules_internal.register(str) +def _(input: str, context: Context, rules: Sequence[Rule]) -> str | None: + # print(f"STR {context} {input}") + first_found_rule = first_matching_rule(context, rules) + if first_found_rule is None: + # no pattern matched - return unchanged + return input + else: + return first_found_rule[1](input) + + +@_apply_rules_internal.register(tuple) +def _(input: Tuple[str, Any], context: Context, rules: Sequence[Rule]) -> Any: + # print(f"TUPLE {context} {input}") + first_found_rule = first_matching_rule(context, rules) + if first_found_rule is None: + # no pattern matched - continue recursive + new_context, new_value = extract_context(context, input) + return (input[0], _apply_rules_internal(new_value, new_context, rules)) + else: + # this is to allow overriding the whole object with string + return (input[0], first_found_rule[1]("tuple")) + + +filter_not_none: Callable[[Iterable[T1 | None]], Iterable[T1]] = partial(filter, is_not_none) + + +def apply_rules(context: Context, rules: Sequence[Rule], input: Any) -> Any: + return _apply_rules_internal(input, context, rules) + + +@_apply_rules_internal.register(list) +def _(input: Sequence[Any], context: Context, rules: Sequence[Rule]) -> Any: + # print(f"LIST {context} {input}") + first_found_rule = first_matching_rule(context, rules) + if first_found_rule is None: + # no pattern matched - continue recursive + apply_context_rules = partial(apply_rules, context + ".", rules) + apply_rule_iter = partial(map, apply_context_rules) + apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose( + filter_not_none, apply_rule_iter + ) + materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose( + list, apply_and_filter + ) + return materialized_apply_and_filter(input) + else: + # this is to allow overriding the whole list with string + return first_found_rule[1]("list") + + +@_apply_rules_internal.register(dict) +def _(input: Mapping[str, Any], context: Context, rules: Sequence[Rule]) -> Any: + # print(f"DICT {context} {input}") + first_found_rule = first_matching_rule(context, rules) + if first_found_rule is None: + # no pattern matched - continue recursive + apply_context_rules = partial(apply_rules, context, rules) + apply_rule_iter = partial(map, apply_context_rules) + apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose( + non_empty_pairs, apply_rule_iter + ) + materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose( + dict, apply_and_filter + ) + return materialized_apply_and_filter(input.items()) + else: + # this is to allow overriding the whole list with string + return first_found_rule[1]("dict") + + +def re_compile_flag(flags: re.RegexFlag, input: str) -> re.Pattern[str]: + return re.compile(input, flags) + + +re_compile_ignorecase: Callable[[str], re.Pattern[str]] = partial( + re_compile_flag, re.RegexFlag.IGNORECASE +) + +compile_patterns: Callable[[Iterable[str]], Iterable[re.Pattern[str]]] = partial( + map, re_compile_ignorecase +) diff --git a/src/foundation/string.py b/src/foundation/string.py new file mode 100644 index 000000000..1f9d3416e --- /dev/null +++ b/src/foundation/string.py @@ -0,0 +1,13 @@ +import binascii +from functools import singledispatch +from typing import Any + + +@singledispatch +def obfuscate(_input: Any) -> str: + raise NotImplementedError() + + +@obfuscate.register(str) +def _(input: str) -> str: + return f"OBFUSCATED-{binascii.crc32(input.encode('utf-8'))}" diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index bf48829f2..9b9460aa1 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -4,7 +4,7 @@ import sys import time from functools import partial -from typing import Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Mapping, Tuple import click @@ -27,6 +27,7 @@ def authenticator( status_exchange: StatusExchange, username: str, notificator: Callable[[], None], + response_observer: Callable[[Mapping[str, Any]], None] | None = None, cookie_directory: str | None = None, client_id: str | None = None, ) -> PyiCloudService: @@ -51,6 +52,7 @@ def password_provider(username: str, valid_password: List[str]) -> str | None: file_match_policy, username, partial(password_provider, username, valid_password), + response_observer, cookie_directory=cookie_directory, client_id=client_id, ) diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index 3bff36ccf..c8f490250 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -34,9 +34,12 @@ from logging import Logger from threading import Thread from typing import ( + Any, Callable, Dict, Iterable, + List, + Mapping, NoReturn, Sequence, Tuple, @@ -1278,6 +1281,13 @@ def composed(ex: Exception, retries: int) -> None: return composed +def dump_responses(dumper: Callable[[Any], None], responses: List[Mapping[str, Any]]) -> None: + # dump captured responses + for entry in responses: + # compose(logger.debug, compose(json.dumps, response_to_har))(response) + dumper(json.dumps(entry, indent=2)) + + def core( passer: Callable[[PhotoAsset], bool], downloader: Callable[[PyiCloudService, Counter, PhotoAsset], bool], @@ -1318,6 +1328,11 @@ def core( only_print_filenames or no_progress_bar or not sys.stdout.isatty() ) while True: # watch with interval & retry + captured_responses: List[Mapping[str, Any]] = [] + + def append_response(captured: List[Mapping[str, Any]], response: Mapping[str, Any]) -> None: + captured.append(response) + try: icloud = authenticator( logger, @@ -1331,10 +1346,17 @@ def core( status_exchange, username, notificator, + partial(append_response, captured_responses), cookie_directory, os.environ.get("CLIENT_ID"), ) + # dump captured responses for debugging + # dump_responses(logger.debug, captured_responses) + + # turn off response capture + icloud.response_observer = None + if auth_only: logger.info("Authentication completed successfully") return 0 @@ -1535,6 +1557,7 @@ def should_break(counter: Counter) -> bool: pass except (PyiCloudServiceNotActivatedException, PyiCloudServiceUnavailableException) as error: logger.info(error) + dump_responses(logger.debug, captured_responses) # it not watching then return error if not watch_interval: return 1 @@ -1542,6 +1565,7 @@ def should_break(counter: Counter) -> bool: pass except (ConnectionError, TimeoutError, Timeout, NewConnectionError) as _error: logger.info("Cannot connect to Apple iCloud service") + dump_responses(logger.debug, captured_responses) # logger.debug(error) # it not watching then return error if not watch_interval: @@ -1558,6 +1582,10 @@ def should_break(counter: Counter) -> bool: logger.debug("Retrying...") # these errors we can safely retry continue + except Exception: + dump_responses(logger.debug, captured_responses) + raise + if watch_interval: # pragma: no cover logger.info(f"Waiting for {watch_interval} sec...") interval: Sequence[int] = range(1, watch_interval) diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index c95e8167f..cf4479130 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -1,12 +1,15 @@ +from contextlib import contextmanager +from functools import partial +from itertools import chain import sys -from typing import Any, Callable, Dict, NamedTuple, Optional, Sequence +from typing import Any, Callable, Dict, Generator, List, Mapping, NamedTuple, Optional, Sequence import typing from uuid import uuid1 import json import logging from tempfile import gettempdir from os import path, mkdir -from re import match +from re import Pattern, match import http.cookiejar as cookielib import getpass import srp @@ -14,7 +17,10 @@ import hashlib from requests import PreparedRequest, Request, Response +from foundation.core import compose, constant, identity +from foundation.json import Rule, _apply_rules_internal, apply_rules, compile_patterns, re_compile_ignorecase +from foundation.string import obfuscate from pyicloud_ipd.exceptions import ( PyiCloudConnectionException, PyiCloudFailedLoginException, @@ -53,6 +59,7 @@ class TrustedPhoneContextProvider(NamedTuple): + class PyiCloudService: """ A base authentication class for the iCloud service. Handles the @@ -73,6 +80,7 @@ def __init__( file_match_policy: FileMatchPolicy, apple_id: str, password_provider: Callable[[], str | None], + response_observer: Callable[[Mapping[str, Any]], None] | None=None, cookie_directory:Optional[str]=None, verify:bool=True, client_id:Optional[str]=None, @@ -90,6 +98,8 @@ def __init__( self.client_id: str = client_id or ("auth-%s" % str(uuid1()).lower()) self.with_family = with_family self.http_timeout = http_timeout + self.response_observer = response_observer + self.observer_rules: Sequence[Rule] = [] # set it when we get password self.password_filter: PyiCloudPasswordFilter|None = None @@ -135,7 +145,11 @@ def __init__( else: self.session_data.update({"client_id": self.client_id}) - self.session:PyiCloudSession = PyiCloudSession(self) + def apply_rules_and_observe(response: Mapping[str, Any]) -> None: + if self.response_observer: + self.response_observer(apply_rules("", self.observer_rules, response)) + + self.session:PyiCloudSession = PyiCloudSession(self, apply_rules_and_observe if self.response_observer else None) self.session.verify = verify self.session.headers.update({ 'Origin': self.HOME_ENDPOINT, @@ -162,10 +176,92 @@ def __init__( 'clientId': self.client_id, } + # set observer rules + obfuscate_rule: Callable[[Pattern[str]], Rule] = lambda r: (r, obfuscate) + obfuscate_rules_from_pattern: Callable[[Sequence[str]], List[Rule]] = compose(list, partial(map, compose(obfuscate_rule, re_compile_ignorecase))) + self.cookie_obfuscate_rules = obfuscate_rules_from_pattern([r"X_APPLE_.*", r"DES.*", r"acn01", r"aasp"]) + + self.header_obfuscate_rules = obfuscate_rules_from_pattern([r"X-APPLE-.*", r"scnt"]) + + pass_rule: Callable[[Pattern[str]], Rule] = lambda r: (r, identity) + pass_rules_from_pattern: Callable[[Sequence[str]], List[Rule]] = compose(list, partial(map, compose(pass_rule, re_compile_ignorecase))) + self.header_pass_rules = pass_rules_from_pattern([r"^(request|response)\.headers\.(Origin|Referer|Content-Type|Location)$"]) + + drop_rule: Callable[[Pattern[str]], Rule] = lambda r: (r, constant(None)) + drop_rules_from_pattern: Callable[[Sequence[str]], List[Rule]] = compose(list, partial(map, compose(drop_rule, re_compile_ignorecase))) + self.header_drop_rules: List[Rule] = drop_rules_from_pattern([r"^(request|response)\.headers\..+"]) + + self.validate_response_body_obfuscate_rules = obfuscate_rules_from_pattern([ + r"^response\.content\.dsInfo\.appleId$", + r"^response\.content\.dsInfo\.appleIdAlias$", + r"^response\.content\.dsInfo\.iCloudAppleIdAlias$", + r"^response\.content\.dsInfo\.appleIdEntries\.value$", + r"^response\.content\.dsInfo\.fullName", + r"^response\.content\.dsInfo\.firstName", + r"^response\.content\.dsInfo\.lastName", + r"^response\.content\.dsInfo\.dsid", + r"^response\.content\.dsInfo\.notificationId", + r"^response\.content\.dsInfo\.aDsID", + r"^response\.content\.dsInfo\.primaryEmail", + ]) + self.validate_response_body_drop_rules = drop_rules_from_pattern([ + r"^response\.content\.webservices\.", + r"^response\.content\.configBag\.urls\.", + r"^response\.content\.apps.*", + r"^response\.content\.dsInfo\.mailFlags", + ]) + self.auth_srp_init_body_obfuscate_rules = obfuscate_rules_from_pattern([ + r"^request\.content\.accountName", + r"^request\.content\.a$", + r"^response\.content\.salt", + r"^response\.content\.b", + r"^response\.content\.c", + ]) + self.auth_srp_init_body_drop_rules = drop_rules_from_pattern([ + ]) + self.auth_srp_complete_body_obfuscate_rules = obfuscate_rules_from_pattern([ + r"^request\.content\.accountName", + r"^request\.content\.c", + r"^request\.content\.m1", + r"^request\.content\.m2", + r"^request\.content\.trustTokens\.", + ]) + self.auth_srp_complete_body_drop_rules = drop_rules_from_pattern([ + ]) + self.auth_srp_repair_complete_body_obfuscate_rules = obfuscate_rules_from_pattern([ + ]) + self.auth_srp_repair_complete_body_drop_rules = drop_rules_from_pattern([ + ]) + self.auth_raw_body_obfuscate_rules = obfuscate_rules_from_pattern([ + r"^request\.content\.accountName", + r"^request\.content\.password", + r"^request\.content\.trustTokens\.", + ]) + self.auth_raw_body_drop_rules = drop_rules_from_pattern([ + ]) + self.auth_token_body_obfuscate_rules = list( + chain( + obfuscate_rules_from_pattern([ + r"^request\.content\.dsWebAuthToken", + r"^request\.content\.trustToken", + ]), + self.validate_response_body_obfuscate_rules + )) + self.auth_token_body_drop_rules = self.validate_response_body_drop_rules + self.authenticate() self._photos: Optional[PhotosService] = None + @contextmanager + def use_rules(self, rules: Sequence[Rule]) -> Generator[Sequence[Rule], Any, None]: + temp_rules = self.observer_rules + try: + self.observer_rules = rules + yield temp_rules + finally: + self.observer_rules = temp_rules + def authenticate(self, force_refresh:bool=False) -> None: """ Handles authentication, and persists cookies so that @@ -221,9 +317,25 @@ def _authenticate_with_token(self) -> None: } try: - req = self.session.post( - "%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data) - ) + # set observer with obfuscator + if self.response_observer: + + rules = list( + chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.auth_token_body_obfuscate_rules, + self.auth_token_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + req = self.session.post( + "%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data) + ) self.data = req.json() except PyiCloudAPIResponseException as error: msg = "Invalid authentication token." @@ -266,7 +378,21 @@ def encode(self) -> bytes: headers = self._get_auth_headers() try: - response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers) + if self.response_observer: + + rules = list(chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.auth_srp_init_body_obfuscate_rules, + self.auth_srp_init_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers) if response.status_code == 401: raise PyiCloudAPIResponseException(response.text, str(response.status_code)) except PyiCloudAPIResponseException as error: @@ -299,23 +425,53 @@ def encode(self) -> bytes: data["trustTokens"] = [self.session_data.get("trust_token")] try: - response = self.session.post( - "%s/signin/complete" % self.AUTH_ENDPOINT, - params={"isRememberMeEnabled": "true"}, - data=json.dumps(data), - headers=headers, - ) + # set observer with obfuscator + if self.response_observer: + + rules = list(chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.auth_srp_complete_body_obfuscate_rules, + self.auth_srp_complete_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + response = self.session.post( + "%s/signin/complete" % self.AUTH_ENDPOINT, + params={"isRememberMeEnabled": "true"}, + data=json.dumps(data), + headers=headers, + ) if response.status_code == 409: # requires 2FA pass elif response.status_code == 412: # non 2FA account returns 412 "precondition no met" headers = self._get_auth_headers() - response = self.session.post( - "%s/repair/complete" % self.AUTH_ENDPOINT, - data=json.dumps({}), - headers=headers, - ) + # set observer with obfuscator + if self.response_observer: + + rules = list(chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.auth_srp_repair_complete_body_obfuscate_rules, + self.auth_srp_repair_complete_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + response = self.session.post( + "%s/repair/complete" % self.AUTH_ENDPOINT, + data=json.dumps({}), + headers=headers, + ) elif response.status_code >= 400 and response.status_code < 600: raise PyiCloudAPIResponseException(response.text, str(response.status_code)) except PyiCloudAPIResponseException as error: @@ -333,12 +489,27 @@ def _authenticate_raw_password(self, password: str) -> None: headers = self._get_auth_headers() try: - self.session.post( - "%s/signin" % self.AUTH_ENDPOINT, - params={"isRememberMeEnabled": "true"}, - data=json.dumps(data), - headers=headers, - ) + # set observer with obfuscator + if self.response_observer: + + rules = list(chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.auth_raw_body_obfuscate_rules, + self.auth_raw_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + self.session.post( + "%s/signin" % self.AUTH_ENDPOINT, + params={"isRememberMeEnabled": "true"}, + data=json.dumps(data), + headers=headers, + ) except PyiCloudAPIResponseException as error: msg = "Invalid email/password combination." raise PyiCloudFailedLoginException(msg, error) from error @@ -347,9 +518,24 @@ def _validate_token(self) -> Dict[str, Any]: """Checks if the current access token is still valid.""" LOGGER.debug("Checking session token validity") try: - req = self.session.post("%s/validate" % self.SETUP_ENDPOINT, data="null") + # set observer with obfuscator + if self.response_observer: + + rules = list(chain( + self.cookie_obfuscate_rules, + self.header_obfuscate_rules, + self.header_pass_rules, + self.header_drop_rules, + self.validate_response_body_obfuscate_rules, + self.validate_response_body_drop_rules, + )) + else: + rules = [] + + with self.use_rules(rules): + response = self.session.post("%s/validate" % self.SETUP_ENDPOINT, data="null") LOGGER.debug("Session token is still valid") - result: Dict[str, Any] = req.json() + result: Dict[str, Any] = response.json() return result except PyiCloudAPIResponseException as err: LOGGER.debug("Invalid authentication token") diff --git a/src/pyicloud_ipd/cmdline.py b/src/pyicloud_ipd/cmdline.py index d0b7dce57..da5a9bd80 100644 --- a/src/pyicloud_ipd/cmdline.py +++ b/src/pyicloud_ipd/cmdline.py @@ -263,6 +263,7 @@ def main_aux(args:Optional[Sequence[str]]=None) -> NoReturn: FileMatchPolicy.NAME_SIZE_DEDUP_WITH_SUFFIX, username, lambda : password, + lambda _: None, ) if ( not got_from_keyring diff --git a/src/pyicloud_ipd/session.py b/src/pyicloud_ipd/session.py index 620271194..311f1892c 100644 --- a/src/pyicloud_ipd/session.py +++ b/src/pyicloud_ipd/session.py @@ -1,11 +1,12 @@ -from typing import Any, Dict, NoReturn, Optional, Sequence +from typing import Any, Callable, Dict, Mapping, NoReturn, Optional, Sequence from typing_extensions import override import typing import inspect import json import logging -from requests import Session +from requests import Response, Session +from foundation.http import response_to_har_entry from pyicloud_ipd.exceptions import ( PyiCloudAPIResponseException, PyiCloud2SARequiredException, @@ -44,10 +45,16 @@ def filter(self, record: logging.LogRecord) -> bool: class PyiCloudSession(Session): """iCloud session.""" - def __init__(self, service: Any): + def __init__(self, service: Any, response_observer:Callable[[Mapping[str, Any]], None] | None = None): self.service = service + self.response_observer = response_observer super().__init__() + def observe(self, response: Response) -> Response: + if self.response_observer: + self.response_observer(response_to_har_entry(response)) + return response + @override def request(self, method: str, url, **kwargs): # type: ignore @@ -59,10 +66,10 @@ def request(self, method: str, url, **kwargs): # type: ignore request_logger.addFilter(self.service.password_filter) request_logger.debug("%s %s %s", method, url, kwargs.get("data", "")) - + if "timeout" not in kwargs and self.service.http_timeout is not None: kwargs["timeout"] = self.service.http_timeout - response = throw_on_503(super().request(method, url, **kwargs)) + response = throw_on_503(self.observe(super().request(method, url, **kwargs))) content_type = response.headers.get("Content-Type", "").split(";")[0] json_mimetypes = ["application/json", "text/json"] diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 49cbe243d..2faa81149 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -57,6 +57,7 @@ def test_failed_auth(self) -> None: StatusExchange(), "bad_username", lambda: None, + None, cookie_dir, "EC5646DE-9423-11E8-BF21-14109FE0B321", ) @@ -277,6 +278,7 @@ def test_non_2fa(self) -> None: StatusExchange(), "jdoe@gmail.com", lambda: None, + None, cookie_dir, "EC5646DE-9423-11E8-BF21-14109FE0B321", ) diff --git a/tests/test_json_rules.py b/tests/test_json_rules.py new file mode 100644 index 000000000..d990f0a1c --- /dev/null +++ b/tests/test_json_rules.py @@ -0,0 +1,81 @@ +from unittest import TestCase + +from foundation.core import constant +from foundation.json import apply_rules, compile_patterns + + +class AppleRuleTestCase(TestCase): + def test_str_match(self) -> None: + input = "abc" + context = "def" + rules = list(map(lambda p: (p, constant("hij")), compile_patterns([r"de.*"]))) + result = apply_rules(context, rules, input) + self.assertEqual("hij", result, "match") + + def test_str_not_match(self) -> None: + input = "abc" + context = "def" + rules = list(map(lambda p: (p, constant("hij")), compile_patterns([r"ae.*"]))) + result = apply_rules(context, rules, input) + self.assertEqual("abc", result, "not match") + + def test_str_match_for_none(self) -> None: + input = "abc" + context = "def" + rules = list(map(lambda p: (p, constant(None)), compile_patterns([r"de.*"]))) + result = apply_rules(context, rules, input) + self.assertIsNone(result, "match for None") + + def test_tuple_match(self) -> None: + input = ("abc", "def") + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r"abc.*"]))) + result = apply_rules("", rules, input) + self.assertEqual(("abc", "lmn"), result, "tuple match") + + def test_list_match(self) -> None: + input = ["abc", "def"] + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r".*"]))) + result = apply_rules("", rules, input) + self.assertEqual("lmn", result, "list match") + + def test_list_match_each(self) -> None: + input = ["abc", "def"] + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r"\..*"]))) + result = apply_rules("", rules, input) + self.assertEqual(["lmn", "lmn"], result, "list match") + + def test_json_match(self) -> None: + input = {"abc": {"def": "hij"}} + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r"abc\.def.*"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": {"def": "lmn"}}, result, "json match") + + def test_json_match_by_leaf(self) -> None: + input = {"abc": {"def": "hij"}} + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r"def.*"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": {"def": "lmn"}}, result, "json match") + + def test_json_update_object_in_list(self) -> None: + input = {"abc": [{"def": "hij"}]} + rules = list(map(lambda p: (p, constant("lmn")), compile_patterns([r"^abc\.def$"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": [{"def": "lmn"}]}, result, "json match") + + def test_json_drop_object_in_list(self) -> None: + input = {"abc": [{"def": "hij"}]} + rules = list(map(lambda p: (p, constant(None)), compile_patterns([r"^abc\.$"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": []}, result, "json match") + + def test_json_dict_drop(self) -> None: + input = {"abc": {"def": "hij"}} + rules = list(map(lambda p: (p, constant(None)), compile_patterns([r"abc\.def.*"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": {}}, result, "json dict drop") + + def test_json_list_drop(self) -> None: + input = {"abc": {"def": ["hij"]}} + rules = list(map(lambda p: (p, constant(None)), compile_patterns([r"abc\.def\..*"]))) + result = apply_rules("", rules, input) + self.assertDictEqual({"abc": {"def": []}}, result, "json list drop")