Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8dcc997
refactor: extract response evaluation logic from session.py::request()
AndreyNikiforov Sep 16, 2025
18e7009
refactor: call evaluate_response() explicitly at request sites
AndreyNikiforov Sep 16, 2025
9e23ba2
refactor: replace exceptions with ADTs in session.py evaluate_response
AndreyNikiforov Sep 16, 2025
d48e848
refactor: extract throw_on_503() from session.request() into evaluate…
AndreyNikiforov Sep 16, 2025
83c123a
refactor: replace exception handling with ADT pattern in authenticati…
AndreyNikiforov Sep 16, 2025
51302c8
refactor: replace PyiCloudService creation with ADT-returning factory…
AndreyNikiforov Sep 16, 2025
5b25625
refactor: use ADTs for 2FA/2SA methods in authenticator
AndreyNikiforov Sep 16, 2025
3d98fe1
refactor: move exception re-throwing from authenticator to callers
AndreyNikiforov Sep 16, 2025
a1d0c76
refactor: Replace PhotoLibrary constructor checking with factory method
AndreyNikiforov Sep 17, 2025
aa12899
chore: Remove unused logging from pyicloud_ipd photos module
AndreyNikiforov Sep 17, 2025
1f7d099
refactor: change _fetch_libraries() to return ADTs
AndreyNikiforov Sep 17, 2025
6cae0ae
refactor: change _fetch_folders() to return ADTs
AndreyNikiforov Sep 17, 2025
fcbb0e9
refactor: replace PhotoAlbum.__len__() with ADT-returning method
AndreyNikiforov Sep 17, 2025
e2ef0b7
refactor: change download_asset() to return ADTs
AndreyNikiforov Sep 18, 2025
c3a4c7e
refactor: remove PhotoAlbum.__len__() magic method and use explicit A…
AndreyNikiforov Sep 18, 2025
aa2c6e0
refactor: migrate PhotoAlbum iteration to return ADTs
AndreyNikiforov Sep 18, 2025
c9c82b6
refactor: propagate ADT returns into core_single_run()
AndreyNikiforov Sep 18, 2025
49b4261
refactor: eliminate AuthWithTokenFailed wrapper in favor of direct AD…
AndreyNikiforov Sep 18, 2025
3f906d8
refactor: replace exception-wrapping ADTs with unions of response ADTs
AndreyNikiforov Sep 18, 2025
3364fdc
refactor: replace property calls with ADT methods in core_single_run()
AndreyNikiforov Sep 18, 2025
ec357e4
refactor: update download.py to use get_photos_service() ADT method
AndreyNikiforov Sep 18, 2025
548c611
fix: correct import path for TrustedDevice type
AndreyNikiforov Sep 18, 2025
387ec51
refactor: replace trusted_devices property with get_trusted_devices()…
AndreyNikiforov Sep 19, 2025
e6679d0
refactor: replace AuthenticationFailed exception wrapping with specif…
AndreyNikiforov Sep 19, 2025
638a0dc
refactor: add ADT return type to core_single_run (Phase 1)
AndreyNikiforov Sep 19, 2025
9cbe80a
refactor: convert AuthenticatorTwoSAExit to ADT return (Phase 2 - Ste…
AndreyNikiforov Sep 19, 2025
6918282
refactor: convert AuthPasswordNotProvided to ADT return
AndreyNikiforov Sep 19, 2025
3cdde5c
refactor: convert AuthInvalidCredentials to ADT return
AndreyNikiforov Sep 19, 2025
f084307
refactor: convert AuthServiceNotActivated to ADT return
AndreyNikiforov Sep 20, 2025
f3a5419
refactor: convert AuthAPIError to ADT return
AndreyNikiforov Sep 20, 2025
da96ca6
refactor: convert PhotoLibraryNotFinishedIndexing to ADT return
AndreyNikiforov Sep 20, 2025
2942eb5
docs: update refactoring plan with final results
AndreyNikiforov Sep 20, 2025
bebe38c
refactor: simplify exception handling by removing catch-all
AndreyNikiforov Sep 20, 2025
68ee42d
refactor: move authentication outside try-catch in core_single_run
AndreyNikiforov Sep 20, 2025
6f225ba
remove accidental context
AndreyNikiforov Sep 20, 2025
82ab010
refactor: replace first two exceptions with ADT returns in core_singl…
AndreyNikiforov Sep 21, 2025
492f6f1
refactor: replace more exceptions with ADT returns in private librari…
AndreyNikiforov Sep 21, 2025
dfef47e
refactor: replace exceptions with ADT returns in shared libraries check
AndreyNikiforov Sep 21, 2025
e3493f3
refactor: replace photo iteration 2SA exception with ADT return at li…
AndreyNikiforov Sep 21, 2025
5b387d3
refactor: replace photo iteration service not activated exception wit…
AndreyNikiforov Sep 21, 2025
4ad6b1f
refactor: replace get_album_count exceptions with ADT error handling
AndreyNikiforov Sep 21, 2025
0985a0c
refactor: handle photo iteration errors directly without exceptions
AndreyNikiforov Sep 21, 2025
dba8ab7
refactor: handle download result errors directly without exceptions
AndreyNikiforov Sep 21, 2025
8b25e1c
refactor: handle delete photo errors directly without exceptions
AndreyNikiforov Sep 22, 2025
fb23255
refactor: handle autodelete errors directly without exceptions
AndreyNikiforov Sep 22, 2025
77b80f2
refactor: remove exception handling try-catch block from core_single_run
AndreyNikiforov Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 188 additions & 38 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
"""Handles username/password authentication and two-step authentication"""

import logging
import sys
import time
from functools import partial
from typing import Any, Callable, Dict, List, Mapping, Tuple

from icloudpd.mfa_provider import MFAProvider
from icloudpd.status import Status, StatusExchange
from pyicloud_ipd.base import PyiCloudService
from pyicloud_ipd.exceptions import PyiCloudFailedMFAException
from pyicloud_ipd.exceptions import (
PyiCloudConnectionException,
)
from pyicloud_ipd.response_types import (
AuthAPIError,
AuthConnectionError,
AuthDomainMismatchError,
AuthenticationFailed,
AuthenticationSuccessWithService,
AuthenticatorConnectionError,
AuthenticatorMFAError,
AuthenticatorResult,
AuthenticatorSuccess,
AuthenticatorTwoSAExit,
AuthInvalidCredentials,
AuthPasswordNotProvided,
AuthRequires2SAWithService,
AuthServiceNotActivated,
AuthServiceUnavailable,
TwoFactorAuthFailed,
TwoFactorAuthResult,
TwoFactorAuthSuccess,
)


def prompt_int_range(message: str, default: str, min_val: int, max_val: int) -> int:
Expand Down Expand Up @@ -69,7 +90,7 @@ def authenticator(
response_observer: Callable[[Mapping[str, Any]], None] | None = None,
cookie_directory: str | None = None,
client_id: str | None = None,
) -> PyiCloudService:
) -> AuthenticatorResult:
"""Authenticate with iCloud username and password"""
logger.debug("Authenticating...")
valid_password: List[str] = []
Expand All @@ -83,17 +104,43 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
return password
return None

icloud = PyiCloudService(
domain,
username,
partial(password_provider, username, valid_password),
response_observer,
auth_result = PyiCloudService.create_pyicloud_service_adt(
domain=domain,
apple_id=username,
password_provider=partial(password_provider, username, valid_password),
response_observer=response_observer,
cookie_directory=cookie_directory,
client_id=client_id,
)

if not icloud:
raise NotImplementedError("None of providers gave password")
# Handle authentication result and extract service
match auth_result:
case AuthenticationSuccessWithService(service):
icloud = service
case AuthenticationFailed(error):
# Keep for backward compatibility
return AuthenticatorConnectionError(error)
case AuthPasswordNotProvided():
return AuthPasswordNotProvided()
case AuthInvalidCredentials():
return AuthInvalidCredentials()
case AuthServiceNotActivated(reason, code):
return AuthServiceNotActivated(reason, code)
case AuthServiceUnavailable(reason):
return AuthServiceUnavailable(reason)
case AuthAPIError(reason, code):
return AuthAPIError(reason, code)
case AuthConnectionError(error_message):
return AuthConnectionError(error_message)
case AuthRequires2SAWithService(service, _):
# 2SA is handled below, service is available
icloud = service
case AuthDomainMismatchError(domain_to_use):
msg = f"Apple insists on using {domain_to_use} for your request. Please use --domain parameter"
return AuthenticatorConnectionError(PyiCloudConnectionException(msg))
case _:
# This should never happen - let it crash with a clear error
raise ValueError(f"Unexpected auth result type: {type(auth_result)}")

if valid_password:
# save valid password to all providers
Expand All @@ -105,21 +152,53 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
logger.info("Two-factor authentication is required (2fa)")
notificator()
if mfa_provider == MFAProvider.WEBUI:
request_2fa_web(icloud, logger, status_exchange)
result = request_2fa_web(icloud, logger, status_exchange)
else:
request_2fa(icloud, logger)
result = request_2fa(icloud, logger)

match result:
case TwoFactorAuthSuccess():
pass # Success, continue
case TwoFactorAuthFailed(error_msg):
return AuthenticatorMFAError(error_msg)

elif icloud.requires_2sa:
logger.info("Two-step authentication is required (2sa)")
notificator()
request_2sa(icloud, logger)
result = request_2sa(icloud, logger)

match result:
case TwoFactorAuthSuccess():
pass # Success, continue
case TwoFactorAuthFailed(_):
# For 2SA, need to exit with code 1 for backward compatibility
return AuthenticatorTwoSAExit()

return icloud
return AuthenticatorSuccess(icloud)


def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:
def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> TwoFactorAuthResult:
"""Request two-step authentication. Prompts for SMS or device"""
devices = icloud.trusted_devices
from pyicloud_ipd.response_types import (
Response2SARequired,
ResponseAPIError,
ResponseServiceNotActivated,
ResponseServiceUnavailable,
TrustedDevicesSuccess,
)

devices_result = icloud.get_trusted_devices()
match devices_result:
case TrustedDevicesSuccess(devices):
pass # Continue with devices
case (
Response2SARequired(_)
| ResponseServiceNotActivated(_, _)
| ResponseAPIError(_, _)
| ResponseServiceUnavailable(_)
):
return TwoFactorAuthFailed("Failed to get trusted devices")

devices_count = len(devices)
device_index: int = 0
if devices_count > 0:
Expand All @@ -132,31 +211,68 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:
device_index = prompt_int_range("Please choose an option:", "0", 0, devices_count - 1)

device = devices[device_index]
if not icloud.send_verification_code(device):
logger.error("Failed to send two-step authentication code")
sys.exit(1)
from pyicloud_ipd.response_types import (
SendVerificationCodeSuccess,
ValidateVerificationCodeSuccess,
)

send_result = icloud.send_verification_code(device)
match send_result:
case SendVerificationCodeSuccess(success):
if not success:
logger.error("Failed to send two-step authentication code")
return TwoFactorAuthFailed("Failed to send two-step authentication code")
case _:
logger.error("Failed to send two-step authentication code")
return TwoFactorAuthFailed("Failed to send two-step authentication code")

code = prompt_string("Please enter two-step authentication code")
if not icloud.validate_verification_code(device, code):
logger.error("Failed to verify two-step authentication code")
sys.exit(1)
validate_result = icloud.validate_verification_code(device, code)
match validate_result:
case ValidateVerificationCodeSuccess(success):
if not success:
logger.error("Failed to verify two-step authentication code")
return TwoFactorAuthFailed("Failed to verify two-step authentication code")
case _:
logger.error("Failed to verify two-step authentication code")
return TwoFactorAuthFailed("Failed to verify two-step authentication code")
logger.info(
"Great, you're all set up. The script can now be run without "
"user interaction until 2SA expires.\n"
"You can set up email notifications for when "
"the two-step authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()


def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> TwoFactorAuthResult:
"""Request two-factor authentication."""
devices = icloud.get_trusted_phone_numbers()
from pyicloud_ipd.response_types import (
Response2SARequired,
ResponseAPIError,
ResponseServiceNotActivated,
ResponseServiceUnavailable,
TrustedPhoneNumbersSuccess,
)

devices_result = icloud.get_trusted_phone_numbers()
match devices_result:
case TrustedPhoneNumbersSuccess(devices):
pass # Continue with devices
case (
Response2SARequired(_)
| ResponseServiceNotActivated(_, _)
| ResponseAPIError(_, _)
| ResponseServiceUnavailable(_)
):
return TwoFactorAuthFailed("Failed to get trusted phone numbers")

devices_count = len(devices)
device_index_alphabet = "abcdefghijklmnopqrstuvwxyz"
if devices_count > 0:
if devices_count > len(device_index_alphabet):
raise PyiCloudFailedMFAException("Too many trusted devices for authentication")
return TwoFactorAuthFailed("Too many trusted devices for authentication")

for i, device in enumerate(devices):
echo(f" {device_index_alphabet[i]}: {device.obfuscated_number}")
Expand Down Expand Up @@ -199,8 +315,15 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
# need to send code
device_index = device_index_alphabet.index(index_or_code)
device = devices[device_index]
if not icloud.send_2fa_code_sms(device.id):
raise PyiCloudFailedMFAException("Failed to send two-factor authentication code")
from pyicloud_ipd.response_types import Send2FACodeSMSSuccess

send_result = icloud.send_2fa_code_sms(device.id)
match send_result:
case Send2FACodeSMSSuccess(success):
if not success:
return TwoFactorAuthFailed("Failed to send two-factor authentication code")
case _:
return TwoFactorAuthFailed("Failed to send two-factor authentication code")
while True:
from foundation.string_utils import strip

Expand All @@ -213,11 +336,29 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
break
echo("Invalid code, should be six digits. Try again")

if not icloud.validate_2fa_code_sms(device.id, code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSMSSuccess

validate_result = icloud.validate_2fa_code_sms(device.id, code)
match validate_result:
case Validate2FACodeSMSSuccess(success):
if not success:
return TwoFactorAuthFailed(
"Failed to verify two-factor authentication code"
)
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
else:
if not icloud.validate_2fa_code(index_or_code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSuccess

validate_2fa_result = icloud.validate_2fa_code(index_or_code)
match validate_2fa_result:
case Validate2FACodeSuccess(success):
if not success:
return TwoFactorAuthFailed(
"Failed to verify two-factor authentication code"
)
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
else:
while True:
from foundation.string_utils import strip
Expand All @@ -226,23 +367,31 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
if len(code) == 6 and code.isdigit():
break
echo("Invalid code, should be six digits. Try again")
if not icloud.validate_2fa_code(code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSuccess

validate_2fa_result = icloud.validate_2fa_code(code)
match validate_2fa_result:
case Validate2FACodeSuccess(success):
if not success:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
logger.info(
"Great, you're all set up. The script can now be run without "
"user interaction until 2FA expires.\n"
"You can set up email notifications for when "
"the two-factor authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()


def request_2fa_web(
icloud: PyiCloudService, logger: logging.Logger, status_exchange: StatusExchange
) -> None:
) -> TwoFactorAuthResult:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_MFA):
raise PyiCloudFailedMFAException(
return TwoFactorAuthFailed(
f"Expected NO_INPUT_NEEDED, but got {status_exchange.get_status()}"
)

Expand All @@ -258,7 +407,7 @@ def request_2fa_web(
if status_exchange.replace_status(Status.SUPPLIED_MFA, Status.CHECKING_MFA):
code = status_exchange.get_payload()
if not code:
raise PyiCloudFailedMFAException(
return TwoFactorAuthFailed(
"Internal error: did not get code for SUPPLIED_MFA status"
)

Expand All @@ -268,7 +417,7 @@ def request_2fa_web(
# TODO give user an option to restart auth in case they missed code
continue
else:
raise PyiCloudFailedMFAException("Failed to chage status of invalid code")
return TwoFactorAuthFailed("Failed to change status of invalid code")
else:
status_exchange.replace_status(Status.CHECKING_MFA, Status.NO_INPUT_NEEDED) # done

Expand All @@ -279,5 +428,6 @@ def request_2fa_web(
"the two-factor authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()
else:
raise PyiCloudFailedMFAException("Failed to change status")
return TwoFactorAuthFailed("Failed to change status")
Loading
Loading