diff --git a/CHANGELOG.md b/CHANGELOG.md index 15b4d06eb..0bd0a3e0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ - fix: iCloud clean up with `--keep-icloud-recent-days` does not respect `--skip-*` params [#1180](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1180) - chore: replace build & test platform from retired windows-2019 to windows-2025 - Service Temporary Unavailable responses are less ambiguous [#1078](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1078) +- feat: re-authenticate on errors when using `--watch-with-interval` [#1078](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1078) +- feat: use stored cookies before attempting to authenticate with credentials ## 1.28.1 (2025-06-08) diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index 0a3869982..43741ad77 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -3,7 +3,8 @@ import logging import sys import time -from typing import Callable, Dict, Tuple +from functools import partial +from typing import Callable, Dict, List, Tuple import click @@ -38,34 +39,37 @@ def authenticator( ) -> PyiCloudService: """Authenticate with iCloud username and password""" logger.debug("Authenticating...") - icloud: PyiCloudService | None = None - _valid_password: str | None = None - for _, _pair in password_providers.items(): - _reader, _ = _pair - _password = _reader(username) - if _password: - icloud = PyiCloudService( - filename_cleaner, - lp_filename_generator, - domain, - raw_policy, - file_match_policy, - username, - _password, - cookie_directory=cookie_directory, - client_id=client_id, - ) - _valid_password = _password - break + valid_password: List[str] = [] + + def password_provider(username: str, valid_password: List[str]) -> str | None: + for _, _pair in password_providers.items(): + reader, _ = _pair + password = reader(username) + if password: + valid_password.append(password) + return password + return None + + icloud = PyiCloudService( + filename_cleaner, + lp_filename_generator, + domain, + raw_policy, + file_match_policy, + username, + partial(password_provider, username, valid_password), + cookie_directory=cookie_directory, + client_id=client_id, + ) if not icloud: raise NotImplementedError("None of providers gave password") - if _valid_password: + if valid_password: # save valid password to all providers for _, _pair in password_providers.items(): - _, _writer = _pair - _writer(username, _valid_password) + _, writer = _pair + writer(username, valid_password[0]) if icloud.requires_2fa: if raise_error_on_2sa: diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index 5b72d2722..7da32adf3 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -687,9 +687,11 @@ def main( ) sys.exit(2) - if watch_with_interval and (list_albums or only_print_filenames): # pragma: no cover + if watch_with_interval and ( + list_albums or only_print_filenames or auth_only or list_libraries + ): # pragma: no cover print( - "--watch_with_interval is not compatible with --list_albums, --only_print_filenames" + "--watch-with-interval is not compatible with --list-albums, --list-libraries, --only-print-filenames, --auth-only" ) sys.exit(2) @@ -783,7 +785,7 @@ def main( ) # start web server - if mfa_provider == MFAProvider.WEBUI: + if mfa_provider == MFAProvider.WEBUI or "webui" in password_providers: server_thread = Thread(target=serve_app, daemon=True, args=[logger, status_exchange]) server_thread.start() @@ -1237,264 +1239,275 @@ def core( ) -> int: """Download all iCloud photos to a local directory""" + skip_bar = not os.environ.get("FORCE_TQDM") and ( + only_print_filenames or no_progress_bar or not sys.stdout.isatty() + ) raise_error_on_2sa = ( smtp_username is not None or notification_email is not None or notification_script is not None ) - try: - icloud = authenticator( - logger, - domain, - filename_cleaner, - lp_filename_generator, - raw_policy, - file_match_policy, - password_providers, - mfa_provider, - status_exchange, - username, - cookie_directory, - raise_error_on_2sa, - os.environ.get("CLIENT_ID"), - ) - except TwoStepAuthRequiredError: - if notification_script is not None: - subprocess.call([notification_script]) - if smtp_username is not None or notification_email is not None: - send_2sa_notification( + while True: # watch with interval + try: + icloud = authenticator( logger, + domain, + filename_cleaner, + lp_filename_generator, + raw_policy, + file_match_policy, + password_providers, + mfa_provider, + status_exchange, username, - smtp_username, - smtp_password, - smtp_host, - smtp_port, - smtp_no_tls, - notification_email, - notification_email_from, + cookie_directory, + raise_error_on_2sa, + os.environ.get("CLIENT_ID"), ) - return 1 - - if auth_only: - logger.info("Authentication completed successfully") - return 0 + except PyiCloudAPIResponseException as error: + # for 503 with watching, we just continue waiting + if error.code == "503" and watch_interval: + icloud = None + else: + raise + except TwoStepAuthRequiredError: + if notification_script is not None: + subprocess.call([notification_script]) + if smtp_username is not None or notification_email is not None: + send_2sa_notification( + logger, + username, + smtp_username, + smtp_password, + smtp_host, + smtp_port, + smtp_no_tls, + notification_email, + notification_email_from, + ) + return 1 - if list_libraries: - library_names = ( - icloud.photos.private_libraries.keys() | icloud.photos.shared_libraries.keys() - ) - print(*library_names, sep="\n") + if auth_only: + logger.info("Authentication completed successfully") + return 0 - else: - download_photo = partial(downloader, icloud) + if icloud: # it can be None for 503 errors + if list_libraries: + library_names = ( + icloud.photos.private_libraries.keys() | icloud.photos.shared_libraries.keys() + ) + print(*library_names, sep="\n") + return 0 - # After 6 or 7 runs within 1h Apple blocks the API for some time. In that - # case exit. - try: - # Access to the selected library. Defaults to the primary photos object. - library_object: PhotoLibrary = icloud.photos - if library: - if library in icloud.photos.private_libraries: - library_object = icloud.photos.private_libraries[library] - elif library in icloud.photos.shared_libraries: - library_object = icloud.photos.shared_libraries[library] - else: - logger.error("Unknown library: %s", library) + else: + # After 6 or 7 runs within 1h Apple blocks the API for some time. In that + # case exit. + try: + # Access to the selected library. Defaults to the primary photos object. + library_object: PhotoLibrary = icloud.photos + if library: + if library in icloud.photos.private_libraries: + library_object = icloud.photos.private_libraries[library] + elif library in icloud.photos.shared_libraries: + library_object = icloud.photos.shared_libraries[library] + else: + logger.error("Unknown library: %s", library) + return 1 + except PyiCloudAPIResponseException as err: + # For later: come up with a nicer message to the user. For now take the + # exception text + logger.error("error?? %s", err) return 1 - except PyiCloudAPIResponseException as err: - # For later: come up with a nicer message to the user. For now take the - # exception text - logger.error("error?? %s", err) - return 1 - while True: - photos = library_object.albums[album] if album else library_object.all - - if list_albums: - print("Albums:") - albums_dict = library_object.albums - albums = albums_dict.values() # pragma: no cover - album_titles = [str(a) for a in albums] - print(*album_titles, sep="\n") - return 0 - # casting is okay since we checked for list_albums and directory compatibily upstream - # would be better to have that in types though - directory = os.path.normpath(cast(str, directory)) + photos = library_object.albums[album] if album else library_object.all - videos_phrase = "" if skip_videos else " and videos" - album_phrase = f" from album {album}" if album else "" - logger.debug(f"Looking up all photos{videos_phrase}{album_phrase}...") + if list_albums: + print("Albums:") + albums_dict = library_object.albums + albums = albums_dict.values() # pragma: no cover + album_titles = [str(a) for a in albums] + print(*album_titles, sep="\n") + return 0 + # casting is okay since we checked for list_albums and directory compatibily upstream + # would be better to have that in types though + directory = os.path.normpath(cast(str, directory)) - session_exception_handler = session_error_handle_builder(logger, icloud) - internal_error_handler = internal_error_handle_builder(logger) + videos_phrase = "" if skip_videos else " and videos" + album_phrase = f" from album {album}" if album else "" + logger.debug(f"Looking up all photos{videos_phrase}{album_phrase}...") - error_handler = compose_handlers([session_exception_handler, internal_error_handler]) + session_exception_handler = session_error_handle_builder(logger, icloud) + internal_error_handler = internal_error_handle_builder(logger) - photos.exception_handler = error_handler + error_handler = compose_handlers( + [session_exception_handler, internal_error_handler] + ) - photos_count: int | None = len(photos) + photos.exception_handler = error_handler - photos_enumerator: Iterable[PhotoAsset] = photos + photos_count: int | None = len(photos) - # Optional: Only download the x most recent photos. - if recent is not None: - photos_count = recent - photos_enumerator = itertools.islice(photos_enumerator, recent) + photos_enumerator: Iterable[PhotoAsset] = photos - if until_found is not None: - photos_count = None - # ensure photos iterator doesn't have a known length - photos_enumerator = (p for p in photos_enumerator) + # Optional: Only download the x most recent photos. + if recent is not None: + photos_count = recent + photos_enumerator = itertools.islice(photos_enumerator, recent) - # Skip the one-line progress bar if we're only printing the filenames, - # or if the progress bar is explicitly disabled, - # or if this is not a terminal (e.g. cron or piping output to file) - skip_bar = not os.environ.get("FORCE_TQDM") and ( - only_print_filenames or no_progress_bar or not sys.stdout.isatty() - ) - if skip_bar: - photos_enumerator = photos_enumerator - # logger.set_tqdm(None) - else: - photos_enumerator = tqdm( - iterable=photos_enumerator, - total=photos_count, - leave=False, - dynamic_ncols=True, - ascii=True, - ) - # logger.set_tqdm(photos_enumerator) + if until_found is not None: + photos_count = None + # ensure photos iterator doesn't have a known length + photos_enumerator = (p for p in photos_enumerator) - if photos_count is not None: - plural_suffix = "" if photos_count == 1 else "s" - video_suffix = "" - photos_count_str = "the first" if photos_count == 1 else photos_count + # Skip the one-line progress bar if we're only printing the filenames, + # or if the progress bar is explicitly disabled, + # or if this is not a terminal (e.g. cron or piping output to file) + if skip_bar: + photos_enumerator = photos_enumerator + # logger.set_tqdm(None) + else: + photos_enumerator = tqdm( + iterable=photos_enumerator, + total=photos_count, + leave=False, + dynamic_ncols=True, + ascii=True, + ) + # logger.set_tqdm(photos_enumerator) - if not skip_videos: - video_suffix = " or video" if photos_count == 1 else " and videos" - else: - photos_count_str = "???" - plural_suffix = "s" - video_suffix = " and videos" if not skip_videos else "" - logger.info( - ("Downloading %s %s" + " photo%s%s to %s ..."), - photos_count_str, - ",".join([_s.value for _s in primary_sizes]), - plural_suffix, - video_suffix, - directory, - ) + if photos_count is not None: + plural_suffix = "" if photos_count == 1 else "s" + video_suffix = "" + photos_count_str = "the first" if photos_count == 1 else photos_count - consecutive_files_found = Counter(0) + if not skip_videos: + video_suffix = " or video" if photos_count == 1 else " and videos" + else: + photos_count_str = "???" + plural_suffix = "s" + video_suffix = " and videos" if not skip_videos else "" + logger.info( + ("Downloading %s %s" + " photo%s%s to %s ..."), + photos_count_str, + ",".join([_s.value for _s in primary_sizes]), + plural_suffix, + video_suffix, + directory, + ) - def should_break(counter: Counter) -> bool: - """Exit if until_found condition is reached""" - return until_found is not None and counter.value() >= until_found + consecutive_files_found = Counter(0) - status_exchange.get_progress().photos_count = ( - 0 if photos_count is None else photos_count - ) - photos_counter = 0 + def should_break(counter: Counter) -> bool: + """Exit if until_found condition is reached""" + return until_found is not None and counter.value() >= until_found - now = datetime.datetime.now(get_localzone()) - photos_iterator = iter(photos_enumerator) - while True: - try: - if should_break(consecutive_files_found): - logger.info( - "Found %s consecutive previously downloaded photos. Exiting", - until_found, - ) - break - item = next(photos_iterator) - should_delete = False + status_exchange.get_progress().photos_count = ( + 0 if photos_count is None else photos_count + ) + photos_counter = 0 - passer_result = passer(item) - download_result = passer_result and download_photo( - consecutive_files_found, item - ) - if download_result and delete_after_download: - should_delete = True - - if passer_result and keep_icloud_recent_days is not None: - created_date = item.created.astimezone(get_localzone()) - age_days = (now - created_date).days - logger.debug(f"Created date: {created_date}") - logger.debug(f"Keep iCloud recent days: {keep_icloud_recent_days}") - logger.debug(f"Age days: {age_days}") - if age_days < keep_icloud_recent_days: - logger.debug( - "Skipping deletion of %s as it is within the keep_icloud_recent_days period (%d days old)", - item.filename, - age_days, + now = datetime.datetime.now(get_localzone()) + photos_iterator = iter(photos_enumerator) + + download_photo = partial(downloader, icloud) + + while True: + try: + if should_break(consecutive_files_found): + logger.info( + "Found %s consecutive previously downloaded photos. Exiting", + until_found, ) - else: - should_delete = True + break + item = next(photos_iterator) + should_delete = False - if should_delete: - delete_local = partial( - delete_photo_dry_run if dry_run else delete_photo, - logger, - icloud.photos, - library_object, - item, + passer_result = passer(item) + download_result = passer_result and download_photo( + consecutive_files_found, item ) + if download_result and delete_after_download: + should_delete = True - retrier(delete_local, error_handler) - if photos.direction != "DESCENDING": - photos.increment_offset(-1) + if passer_result and keep_icloud_recent_days is not None: + created_date = item.created.astimezone(get_localzone()) + age_days = (now - created_date).days + logger.debug(f"Created date: {created_date}") + logger.debug(f"Keep iCloud recent days: {keep_icloud_recent_days}") + logger.debug(f"Age days: {age_days}") + if age_days < keep_icloud_recent_days: + logger.debug( + "Skipping deletion of %s as it is within the keep_icloud_recent_days period (%d days old)", + item.filename, + age_days, + ) + else: + should_delete = True + + if should_delete: + delete_local = partial( + delete_photo_dry_run if dry_run else delete_photo, + logger, + icloud.photos, + library_object, + item, + ) - photos_counter += 1 - status_exchange.get_progress().photos_counter = photos_counter + retrier(delete_local, error_handler) + if photos.direction != "DESCENDING": + photos.increment_offset(-1) - if status_exchange.get_progress().cancel: - break + photos_counter += 1 + status_exchange.get_progress().photos_counter = photos_counter - except StopIteration: - break + if status_exchange.get_progress().cancel: + break - if only_print_filenames: - return 0 + except StopIteration: + break - if status_exchange.get_progress().cancel: - logger.info("Iteration was cancelled") - status_exchange.get_progress().photos_last_message = "Iteration was cancelled" - else: - logger.info("All photos have been downloaded") - status_exchange.get_progress().photos_last_message = ( - "All photos have been downloaded" - ) - status_exchange.get_progress().reset() + if only_print_filenames: + return 0 - if auto_delete: - autodelete_photos( - logger, dry_run, library_object, folder_structure, directory, primary_sizes - ) + if status_exchange.get_progress().cancel: + logger.info("Iteration was cancelled") + status_exchange.get_progress().photos_last_message = "Iteration was cancelled" + else: + logger.info("All photos have been downloaded") + status_exchange.get_progress().photos_last_message = ( + "All photos have been downloaded" + ) + status_exchange.get_progress().reset() - if watch_interval: # pragma: no cover - logger.info(f"Waiting for {watch_interval} sec...") - interval: Sequence[int] = range(1, watch_interval) - iterable: Sequence[int] = ( - interval - if skip_bar - else typing.cast( - Sequence[int], - tqdm( - iterable=interval, - desc="Waiting...", - ascii=True, - leave=False, - dynamic_ncols=True, - ), + if auto_delete: + autodelete_photos( + logger, dry_run, library_object, folder_structure, directory, primary_sizes ) + + if watch_interval: # pragma: no cover + logger.info(f"Waiting for {watch_interval} sec...") + interval: Sequence[int] = range(1, watch_interval) + iterable: Sequence[int] = ( + interval + if skip_bar + else typing.cast( + Sequence[int], + tqdm( + iterable=interval, + desc="Waiting...", + ascii=True, + leave=False, + dynamic_ncols=True, + ), ) - for counter in iterable: - status_exchange.get_progress().waiting = watch_interval - counter - if status_exchange.get_progress().resume: - status_exchange.get_progress().reset() - break - time.sleep(1) - else: - break # pragma: no cover + ) + for counter in iterable: + status_exchange.get_progress().waiting = watch_interval - counter + if status_exchange.get_progress().resume: + status_exchange.get_progress().reset() + break + time.sleep(1) + else: + break # pragma: no cover return 0 diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index 75333dd17..88c78cd6a 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -71,22 +71,28 @@ def __init__( domain:str, raw_policy: RawTreatmentPolicy, file_match_policy: FileMatchPolicy, - apple_id: str, password:str, cookie_directory:Optional[str]=None, verify:bool=True, - client_id:Optional[str]=None, with_family:bool=True, http_timeout:float=30.0 + apple_id: str, + password_provider: Callable[[], str | None], + cookie_directory:Optional[str]=None, + verify:bool=True, + client_id:Optional[str]=None, + with_family:bool=True, + http_timeout:float=30.0 ): self.filename_cleaner = filename_cleaner self.lp_filename_generator = lp_filename_generator self.raw_policy = raw_policy self.file_match_policy = file_match_policy - self.user: Dict[str, Any] = {"accountName": apple_id, "password": password} + self.apple_id = apple_id + self.password_provider: Callable[[], str|None] = password_provider self.data: Dict[str, Any] = {} self.params: Dict[str, Any] = {} self.client_id: str = client_id or ("auth-%s" % str(uuid1()).lower()) self.with_family = with_family self.http_timeout = http_timeout - self.password_filter = PyiCloudPasswordFilter(password) - LOGGER.addFilter(self.password_filter) + # set it when we get password + self.password_filter: PyiCloudPasswordFilter|None = None if (domain == 'com'): self.AUTH_ENDPOINT = "https://idmsa.apple.com/appleauth/auth" @@ -162,7 +168,7 @@ def __init__( self._photos: Optional[PhotosService] = None - def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) -> None: + def authenticate(self, force_refresh:bool=False) -> None: """ Handles authentication, and persists cookies so that subsequent logins will not cause additional e-mails from Apple. @@ -177,28 +183,22 @@ def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) -> except PyiCloudAPIResponseException: LOGGER.debug("Invalid authentication token, will log in from scratch.") - if not login_successful and service is not None: - app = self.data["apps"][service] - if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"]: - LOGGER.debug( - "Authenticating as %s for %s", self.user["accountName"], service - ) - try: - self._authenticate_with_credentials_service(service) - login_successful = True - except Exception: - LOGGER.debug( - "Could not log into service. Attempting brand new login." - ) - if not login_successful: - LOGGER.debug("Authenticating as %s", self.user["accountName"]) + password = self.password_provider() + if not password: + LOGGER.debug("Password Provider did not give any data") + return None + # set logging filter + self.password_filter = PyiCloudPasswordFilter(password) + LOGGER.addFilter(self.password_filter) + + LOGGER.debug(f"Authenticating as {self.apple_id}") try: - self._authenticate_srp() + self._authenticate_srp(password) except PyiCloudFailedLoginException as error: LOGGER.error("Failed to login with srp, falling back to old raw password authentication. Error: %s", error) try: - self._authenticate_raw_password() + self._authenticate_raw_password(password) except PyiCloudFailedLoginException as error: LOGGER.error("Failed to login with raw password. Error: %s", error) raise error @@ -237,25 +237,7 @@ def _authenticate_with_token(self) -> None: msg = f'Apple insists on using {domain_to_use} for your request. Please use --domain parameter' raise PyiCloudConnectionException(msg) - def _authenticate_with_credentials_service(self, service: str) -> None: - """Authenticate to a specific service using credentials.""" - data = { - "appName": service, - "apple_id": self.user["accountName"], - "password": self.user["password"], - } - - try: - self.session.post( - "%s/accountLogin" % self.SETUP_ENDPOINT, data=json.dumps(data) - ) - - self.data = self._validate_token() - except PyiCloudAPIResponseException as error: - msg = "Invalid email/password combination." - raise PyiCloudFailedLoginException(msg, error) from error - - def _authenticate_srp(self) -> None: + def _authenticate_srp(self, password:str) -> None: class SrpPassword(): # srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that def __init__(self, password: str): @@ -273,10 +255,10 @@ def encode(self) -> bytes: return hashlib.pbkdf2_hmac('sha256', password_digest, self.salt, self.iterations, key_length) # Step 1: client generates private key a (stored in srp.User) and public key A, sends to server - srp_password = SrpPassword(self.user["password"]) + srp_password = SrpPassword(password) srp.rfc5054_enable() srp.no_username_in_x() - usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048) + usr = srp.User(self.apple_id, srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048) uname, A = usr.start_authentication() data = { 'a': base64.b64encode(A).decode(), @@ -290,7 +272,7 @@ def encode(self) -> bytes: if response.status_code == 401: raise PyiCloudAPIResponseException(response.text, str(response.status_code)) except PyiCloudAPIResponseException as error: - if response.status_code == 503: + if error.code == "503": raise msg = "Failed to initiate srp authentication." raise PyiCloudFailedLoginException(msg, error) from error @@ -344,11 +326,12 @@ def encode(self) -> bytes: msg = "Invalid email/password combination." raise PyiCloudFailedLoginException(msg, error) from error - def _authenticate_raw_password(self) -> None: - data = dict(self.user) - - data["rememberMe"] = True - data["trustTokens"] = [] + def _authenticate_raw_password(self, password: str) -> None: + data = { + "accountName": self.apple_id, "password": password, + "rememberMe": True, + "trustTokens": [], + } if self.session_data.get("trust_token"): data["trustTokens"] = [self.session_data.get("trust_token")] @@ -406,7 +389,7 @@ def cookiejar_path(self) -> str: """Get path for cookiejar file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), # type: ignore[union-attr] + "".join([c for c in self.apple_id if match(r"\w", c)]), ) @property @@ -414,7 +397,7 @@ def session_path(self) -> str: """Get path for session data file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) # type: ignore[union-attr] + "".join([c for c in self.apple_id if match(r"\w", c)]) + ".session", ) @@ -659,7 +642,7 @@ def reminders(self): # type: ignore return RemindersService(service_root, self.session, self.params)# type: ignore def __unicode__(self) -> str: - return 'iCloud API: %s' % self.user.get('accountName') + return 'iCloud API: %s' % self.apple_id def __str__(self) -> str: as_unicode = self.__unicode__() diff --git a/src/pyicloud_ipd/cmdline.py b/src/pyicloud_ipd/cmdline.py index e28bc7761..d0b7dce57 100644 --- a/src/pyicloud_ipd/cmdline.py +++ b/src/pyicloud_ipd/cmdline.py @@ -262,7 +262,7 @@ def main_aux(args:Optional[Sequence[str]]=None) -> NoReturn: RawTreatmentPolicy.AS_IS, FileMatchPolicy.NAME_SIZE_DEDUP_WITH_SUFFIX, username, - password, + lambda : password, ) if ( not got_from_keyring diff --git a/src/pyicloud_ipd/session.py b/src/pyicloud_ipd/session.py index 6057e856e..0d1de5c13 100644 --- a/src/pyicloud_ipd/session.py +++ b/src/pyicloud_ipd/session.py @@ -54,7 +54,7 @@ def request(self, method: str, url, **kwargs): # type: ignore callee = inspect.stack()[2] module = inspect.getmodule(callee[0]) request_logger = logging.getLogger(module.__name__).getChild("http") #type: ignore[union-attr] - if self.service.password_filter not in request_logger.filters: + if self.service.password_filter and self.service.password_filter not in request_logger.filters: request_logger.addFilter(self.service.password_filter) request_logger.debug("%s %s %s", method, url, kwargs.get("data", "")) diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 845769cea..92bc15267 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -8,6 +8,7 @@ from click.testing import CliRunner from vcr import VCR +# import vcr import pyicloud_ipd from foundation.core import constant, identity from icloudpd.authentication import TwoStepAuthRequiredError, authenticator @@ -370,6 +371,44 @@ def test_failed_auth_503(self) -> None: self.assertIn("ERROR Service Temporary Unavailable (503)", self._caplog.text) assert result.exit_code == 1 + def test_failed_auth_503_watch(self) -> None: + base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) + cookie_dir = os.path.join(base_dir, "cookie") + + for dir in [base_dir, cookie_dir]: + recreate_path(dir) + + with vcr.use_cassette(os.path.join(self.vcr_path, "failed_auth_503.yml")): # noqa: SIM117 + # errors.CannotOverwriteExistingCassetteException + runner = CliRunner(env={"CLIENT_ID": "EC5646DE-9423-11E8-BF21-14109FE0B321"}) + result = runner.invoke( + main, + [ + "--username", + "jdoe@gmail.com", + "--password", + "password1", + "--no-progress-bar", + "--directory", + base_dir, + "--cookie-directory", + cookie_dir, + "--watch-with-interval", + "1", + ], + ) + self.assertNotIn( + "ERROR Failed to login with srp, falling back to old raw password authentication.", + self._caplog.text, + ) + self.assertEqual( + 2, self._caplog.text.count("ERROR Service Temporary Unavailable (503)") + ) + self.assertEqual(2, self._caplog.text.count("INFO Waiting for 1 sec...")) + # self.assertTrue("Can't overwrite existing cassette" in str(context.exception)) + assert result.exit_code == 1 # should error for vcr + + class _TrustedDevice(NamedTuple): id: int obfuscated_number: str diff --git a/tests/vcr_cassettes/failed_auth_503.yml b/tests/vcr_cassettes/failed_auth_503.yml index a7a9467c1..99b5bcfd5 100644 --- a/tests/vcr_cassettes/failed_auth_503.yml +++ b/tests/vcr_cassettes/failed_auth_503.yml @@ -1,4 +1,50 @@ interactions: +- request: + body: !!python/unicode '{"accountName": "whatever_username", "protocols": ["s2k", "s2k_fo"]}' + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: ['keep-alive'] + Content-Length: ['98'] + Content-Type: ['application/json'] + Origin: ['https://www.icloud.com'] + Referer: ['https://www.icloud.com/'] + User-Agent: ['Opera/9.52 (X11; Linux i686; U; en)'] + X-Apple-OAuth-Client-Id: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d'] + X-Apple-OAuth-Client-Type: ['firstPartyAuth'] + X-Apple-OAuth-Redirect-URI: ['https://www.icloud.com'] + X-Apple-OAuth-Require-Grant-Code: ['true'] + X-Apple-OAuth-Response-Mode: ['web_message'] + X-Apple-OAuth-Response-Type: ['code'] + X-Apple-OAuth-State: ['EC5646DE-9423-11E8-BF21-14109FE0B321'] + X-Apple-Widget-Key: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d'] + method: POST + uri: https://idmsa.apple.com/appleauth/auth/signin/init + response: + body: {string: 'Service Temporary Unavailable'} + headers: + Cache-Control: + - 'no-cache' + - 'no-store' + Connection: ['keep-alive'] + Content-Type: ['text/html;charset=UTF-8'] + Date: ['Fri, 15 Dec 2023 17:28:03 GMT'] + Pragma: ['no-cache'] + Referrer-Policy: ['origin'] + Server: ['Apple'] + Strict-Transport-Security: ['max-age=31536000; includeSubDomains; preload'] + Transfer-Encoding: ['chunked'] + X-Apple-I-Request-ID: ['12345678-1234-1234-1234-123456789012'] + X-BuildVersion: ['R4_1'] + X-Content-Type-Options: ['nosniff'] + X-FRAME-OPTIONS: ['DENY'] + X-XSS-Protection: ['1; mode=block'] + content-length: ['29'] + scnt: ['scnt-1234567890'] + vary: ['accept-encoding'] + status: + code: 503 + message: 'Service Temporary Unavailable' - request: body: !!python/unicode '{"accountName": "whatever_username", "protocols": ["s2k", "s2k_fo"]}' headers: