Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
694133a
limit `icloud` to keyring management
AndreyNikiforov Sep 3, 2025
e71fa12
remove unused parameter
AndreyNikiforov Sep 3, 2025
fc09c46
move re-auth due to global session error into bigger auth loop
AndreyNikiforov Sep 3, 2025
462d9a8
fix tests for invalid global session during photo iteration
AndreyNikiforov Sep 3, 2025
132f058
fix tests for invalid global session during deletion
AndreyNikiforov Sep 4, 2025
c433a6b
fix tests to support retry on internal error
AndreyNikiforov Sep 4, 2025
1a61826
remove local retries during downloads
AndreyNikiforov Sep 4, 2025
596c04c
always start auth with validation request
AndreyNikiforov Sep 5, 2025
56e78d9
ignore hars
AndreyNikiforov Sep 5, 2025
d48c255
better testing on 404 during downloading
AndreyNikiforov Sep 6, 2025
c82491d
move response processing out of session
AndreyNikiforov Sep 7, 2025
7b6871b
make cli test run with cassette to avoid accidental request
AndreyNikiforov Sep 7, 2025
c8575cd
fix test for missing dir
AndreyNikiforov Sep 7, 2025
b90889a
fix tests to use cassette
AndreyNikiforov Sep 7, 2025
7e7e117
Revert "move response processing out of session"
AndreyNikiforov Sep 12, 2025
b05a4d8
Replace session error test mocks with cassette-based approach
AndreyNikiforov Sep 11, 2025
c2cca05
refactor: replace mock-based session error tests with cassette-based …
AndreyNikiforov Sep 11, 2025
8f85605
refactor: remove unnecessary time.sleep mocks from session error tests
AndreyNikiforov Sep 11, 2025
8c3ec05
refactor: replace mock with VCR cassette for send_verification_code test
AndreyNikiforov Sep 11, 2025
6d083be
fix: update cassette to match Apple's actual API response format
AndreyNikiforov Sep 11, 2025
b18d1e1
refactor: convert test_handle_albums_error to use cassette instead of…
AndreyNikiforov Sep 12, 2025
e6aa964
fix: correct albums error cassette to use 200 status with error payload
AndreyNikiforov Sep 12, 2025
54e8b8d
refactor: convert more error tests to use cassettes instead of mocks
AndreyNikiforov Sep 12, 2025
bdbb122
fix: clarify download error handling with cassettes
AndreyNikiforov Sep 12, 2025
882a975
fix: remove is_streaming_response check for proper error handling
AndreyNikiforov Sep 12, 2025
5d34a86
refactor: convert error simulation tests from mocks to cassettes
AndreyNikiforov Sep 12, 2025
87e4f00
fix: update session error tests to expect successful retry
AndreyNikiforov Sep 12, 2025
fa20431
style: clean up test formatting for session error tests
AndreyNikiforov Sep 12, 2025
3e38874
skip 2sa failed auth test
AndreyNikiforov Sep 12, 2025
ef7b0db
fix: remove incorrect delete operations from session error iteration …
AndreyNikiforov Sep 12, 2025
4883633
refactor: simplify test paths by replacing datetime calculations with…
AndreyNikiforov Sep 12, 2025
203a2f3
refactor: remove mocks from test_delete_after_download_session_error
AndreyNikiforov Sep 12, 2025
b362f87
refactor: remove mocks from test_retry_delete_after_download_internal…
AndreyNikiforov Sep 12, 2025
ba95913
remove unused use case test
AndreyNikiforov Sep 12, 2025
99db5c6
test: simplify EXIF tests to download only one photo
AndreyNikiforov Sep 12, 2025
d217f79
refactor: update test_until_found* to use actual cassette downloads
AndreyNikiforov Sep 13, 2025
acb8b22
refactor: update size fallback tests to use cassette downloads
AndreyNikiforov Sep 13, 2025
630fc6b
refactor: update dedup test for name-id7 to match standard pattern
AndreyNikiforov Sep 13, 2025
729fd63
refactor: update adjusted size fallback test
AndreyNikiforov Sep 13, 2025
67d0a57
fix asset size in cassettes
AndreyNikiforov Sep 13, 2025
92dc2d2
refactor: remove mocks from test_download_two_sizes_with_force_size
AndreyNikiforov Sep 13, 2025
236a38e
refactor: remove mocks from live photo tests
AndreyNikiforov Sep 14, 2025
90bb641
refactor: remove mocks from Chinese live photo tests
AndreyNikiforov Sep 14, 2025
502ac49
refactor: remove mocks from delete-after-download dry-run tests
AndreyNikiforov Sep 14, 2025
94c275e
refactor: replace mocks with cassette in internal error test
AndreyNikiforov Sep 14, 2025
f575e67
refactor: share internal error download cassette between tests
AndreyNikiforov Sep 15, 2025
4be7ef4
refactor: share error handling cassettes between standard and name-id…
AndreyNikiforov Sep 15, 2025
3ba2eba
feat: add strict VCR cassette playback configuration
AndreyNikiforov Sep 15, 2025
096713a
remove error code overriding in tests
AndreyNikiforov Sep 15, 2025
4a9c29d
fix: disable request reuse in cassettes
AndreyNikiforov Sep 15, 2025
43da84b
refactor: remove mocks from test_force_size
AndreyNikiforov Sep 15, 2025
8e22a74
refactor: remove mocks from test_force_size_name_id7
AndreyNikiforov Sep 15, 2025
fa8d29c
remove unused imports
AndreyNikiforov Sep 15, 2025
64d2313
chore: update cassette files with modified content
AndreyNikiforov Sep 15, 2025
13c45e9
refactor: extract response evaluation logic from session.py::request()
AndreyNikiforov Sep 16, 2025
d7839a1
refactor: call evaluate_response() explicitly at request sites
AndreyNikiforov Sep 16, 2025
1757d9d
refactor: replace exceptions with ADTs in session.py evaluate_response
AndreyNikiforov Sep 16, 2025
115bc04
refactor: extract throw_on_503() from session.request() into evaluate…
AndreyNikiforov Sep 16, 2025
a4adbcc
refactor: replace exception handling with ADT pattern in authenticati…
AndreyNikiforov Sep 16, 2025
ab18a0f
refactor: replace PyiCloudService creation with ADT-returning factory…
AndreyNikiforov Sep 16, 2025
b542a95
refactor: use ADTs for 2FA/2SA methods in authenticator
AndreyNikiforov Sep 16, 2025
aaa9149
refactor: move exception re-throwing from authenticator to callers
AndreyNikiforov Sep 16, 2025
49b636a
refactor: Replace PhotoLibrary constructor checking with factory method
AndreyNikiforov Sep 17, 2025
e988e85
chore: Remove unused logging from pyicloud_ipd photos module
AndreyNikiforov Sep 17, 2025
630d49e
refactor: change _fetch_libraries() to return ADTs
AndreyNikiforov Sep 17, 2025
2537186
refactor: change _fetch_folders() to return ADTs
AndreyNikiforov Sep 17, 2025
4cf6bb9
refactor: replace PhotoAlbum.__len__() with ADT-returning method
AndreyNikiforov Sep 17, 2025
ebb5b5f
refactor: change download_asset() to return ADTs
AndreyNikiforov Sep 18, 2025
d119d7f
refactor: remove PhotoAlbum.__len__() magic method and use explicit A…
AndreyNikiforov Sep 18, 2025
5a22b4a
refactor: migrate PhotoAlbum iteration to return ADTs
AndreyNikiforov Sep 18, 2025
0b68bb6
refactor: propagate ADT returns into core_single_run()
AndreyNikiforov Sep 18, 2025
ea99fe8
refactor: eliminate AuthWithTokenFailed wrapper in favor of direct AD…
AndreyNikiforov Sep 18, 2025
1f248f4
refactor: replace exception-wrapping ADTs with unions of response ADTs
AndreyNikiforov Sep 18, 2025
08078c2
refactor: replace property calls with ADT methods in core_single_run()
AndreyNikiforov Sep 18, 2025
0542b82
refactor: update download.py to use get_photos_service() ADT method
AndreyNikiforov Sep 18, 2025
3d8e131
fix: correct import path for TrustedDevice type
AndreyNikiforov Sep 18, 2025
24a2c0a
refactor: replace trusted_devices property with get_trusted_devices()…
AndreyNikiforov Sep 19, 2025
cac635a
refactor: replace AuthenticationFailed exception wrapping with specif…
AndreyNikiforov Sep 19, 2025
b0f95b9
refactor: add ADT return type to core_single_run (Phase 1)
AndreyNikiforov Sep 19, 2025
5acda14
refactor: convert AuthenticatorTwoSAExit to ADT return (Phase 2 - Ste…
AndreyNikiforov Sep 19, 2025
623ec3b
refactor: convert AuthPasswordNotProvided to ADT return
AndreyNikiforov Sep 19, 2025
a5c9d12
refactor: convert AuthInvalidCredentials to ADT return
AndreyNikiforov Sep 19, 2025
48a23ea
refactor: convert AuthServiceNotActivated to ADT return
AndreyNikiforov Sep 20, 2025
9b3d1c7
refactor: convert AuthAPIError to ADT return
AndreyNikiforov Sep 20, 2025
dac14dd
refactor: convert PhotoLibraryNotFinishedIndexing to ADT return
AndreyNikiforov Sep 20, 2025
a57d1b8
docs: update refactoring plan with final results
AndreyNikiforov Sep 20, 2025
5cff247
refactor: simplify exception handling by removing catch-all
AndreyNikiforov Sep 20, 2025
f4952b9
refactor: move authentication outside try-catch in core_single_run
AndreyNikiforov Sep 20, 2025
dc20b1c
remove accidental context
AndreyNikiforov Sep 20, 2025
a35a57f
refactor: replace first two exceptions with ADT returns in core_singl…
AndreyNikiforov Sep 21, 2025
ff08ad4
refactor: replace more exceptions with ADT returns in private librari…
AndreyNikiforov Sep 21, 2025
09d1a09
refactor: replace exceptions with ADT returns in shared libraries check
AndreyNikiforov Sep 21, 2025
21b0afe
refactor: replace photo iteration 2SA exception with ADT return at li…
AndreyNikiforov Sep 21, 2025
fd99c9d
refactor: replace photo iteration service not activated exception wit…
AndreyNikiforov Sep 21, 2025
c713610
refactor: replace get_album_count exceptions with ADT error handling
AndreyNikiforov Sep 21, 2025
6d8a2bb
refactor: handle photo iteration errors directly without exceptions
AndreyNikiforov Sep 21, 2025
3906695
refactor: handle download result errors directly without exceptions
AndreyNikiforov Sep 21, 2025
4ef1463
refactor: handle delete photo errors directly without exceptions
AndreyNikiforov Sep 22, 2025
af8a995
refactor: handle autodelete errors directly without exceptions
AndreyNikiforov Sep 22, 2025
f09784b
refactor: remove exception handling try-catch block from core_single_run
AndreyNikiforov Sep 22, 2025
f6f4edd
Initial metadata_management: add favorite to ratings, processing exis…
spmp Dec 31, 2025
fc1b65c
More complete metadata management implementation. Remove separate exi…
spmp Jan 1, 2026
e3ce489
Clean up some functions and fix tests for metadata management
spmp Jan 1, 2026
f46d2fd
Format and lint metadata management
spmp Jan 1, 2026
f99611c
Metadata management code reduction and test cleanup
spmp Jan 1, 2026
485b443
src/icloudpd/base.py
spmp Jan 1, 2026
faf77b5
Revert change to where metadata management is called, fix failing exi…
spmp Jan 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ node_modules/
.cookies/

.claude/

*.har
226 changes: 188 additions & 38 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
"""Handles username/password authentication and two-step authentication"""

import logging
import sys
import time
from functools import partial
from typing import Any, Callable, Dict, List, Mapping, Tuple

from icloudpd.mfa_provider import MFAProvider
from icloudpd.status import Status, StatusExchange
from pyicloud_ipd.base import PyiCloudService
from pyicloud_ipd.exceptions import PyiCloudFailedMFAException
from pyicloud_ipd.exceptions import (
PyiCloudConnectionException,
)
from pyicloud_ipd.response_types import (
AuthAPIError,
AuthConnectionError,
AuthDomainMismatchError,
AuthenticationFailed,
AuthenticationSuccessWithService,
AuthenticatorConnectionError,
AuthenticatorMFAError,
AuthenticatorResult,
AuthenticatorSuccess,
AuthenticatorTwoSAExit,
AuthInvalidCredentials,
AuthPasswordNotProvided,
AuthRequires2SAWithService,
AuthServiceNotActivated,
AuthServiceUnavailable,
TwoFactorAuthFailed,
TwoFactorAuthResult,
TwoFactorAuthSuccess,
)


def prompt_int_range(message: str, default: str, min_val: int, max_val: int) -> int:
Expand Down Expand Up @@ -69,7 +90,7 @@ def authenticator(
response_observer: Callable[[Mapping[str, Any]], None] | None = None,
cookie_directory: str | None = None,
client_id: str | None = None,
) -> PyiCloudService:
) -> AuthenticatorResult:
"""Authenticate with iCloud username and password"""
logger.debug("Authenticating...")
valid_password: List[str] = []
Expand All @@ -83,17 +104,43 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
return password
return None

icloud = PyiCloudService(
domain,
username,
partial(password_provider, username, valid_password),
response_observer,
auth_result = PyiCloudService.create_pyicloud_service_adt(
domain=domain,
apple_id=username,
password_provider=partial(password_provider, username, valid_password),
response_observer=response_observer,
cookie_directory=cookie_directory,
client_id=client_id,
)

if not icloud:
raise NotImplementedError("None of providers gave password")
# Handle authentication result and extract service
match auth_result:
case AuthenticationSuccessWithService(service):
icloud = service
case AuthenticationFailed(error):
# Keep for backward compatibility
return AuthenticatorConnectionError(error)
case AuthPasswordNotProvided():
return AuthPasswordNotProvided()
case AuthInvalidCredentials():
return AuthInvalidCredentials()
case AuthServiceNotActivated(reason, code):
return AuthServiceNotActivated(reason, code)
case AuthServiceUnavailable(reason):
return AuthServiceUnavailable(reason)
case AuthAPIError(reason, code):
return AuthAPIError(reason, code)
case AuthConnectionError(error_message):
return AuthConnectionError(error_message)
case AuthRequires2SAWithService(service, _):
# 2SA is handled below, service is available
icloud = service
case AuthDomainMismatchError(domain_to_use):
msg = f"Apple insists on using {domain_to_use} for your request. Please use --domain parameter"
return AuthenticatorConnectionError(PyiCloudConnectionException(msg))
case _:
# This should never happen - let it crash with a clear error
raise ValueError(f"Unexpected auth result type: {type(auth_result)}")

if valid_password:
# save valid password to all providers
Expand All @@ -105,21 +152,53 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
logger.info("Two-factor authentication is required (2fa)")
notificator()
if mfa_provider == MFAProvider.WEBUI:
request_2fa_web(icloud, logger, status_exchange)
result = request_2fa_web(icloud, logger, status_exchange)
else:
request_2fa(icloud, logger)
result = request_2fa(icloud, logger)

match result:
case TwoFactorAuthSuccess():
pass # Success, continue
case TwoFactorAuthFailed(error_msg):
return AuthenticatorMFAError(error_msg)

elif icloud.requires_2sa:
logger.info("Two-step authentication is required (2sa)")
notificator()
request_2sa(icloud, logger)
result = request_2sa(icloud, logger)

match result:
case TwoFactorAuthSuccess():
pass # Success, continue
case TwoFactorAuthFailed(_):
# For 2SA, need to exit with code 1 for backward compatibility
return AuthenticatorTwoSAExit()

return icloud
return AuthenticatorSuccess(icloud)


def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:
def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> TwoFactorAuthResult:
"""Request two-step authentication. Prompts for SMS or device"""
devices = icloud.trusted_devices
from pyicloud_ipd.response_types import (
Response2SARequired,
ResponseAPIError,
ResponseServiceNotActivated,
ResponseServiceUnavailable,
TrustedDevicesSuccess,
)

devices_result = icloud.get_trusted_devices()
match devices_result:
case TrustedDevicesSuccess(devices):
pass # Continue with devices
case (
Response2SARequired(_)
| ResponseServiceNotActivated(_, _)
| ResponseAPIError(_, _)
| ResponseServiceUnavailable(_)
):
return TwoFactorAuthFailed("Failed to get trusted devices")

devices_count = len(devices)
device_index: int = 0
if devices_count > 0:
Expand All @@ -132,31 +211,68 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:
device_index = prompt_int_range("Please choose an option:", "0", 0, devices_count - 1)

device = devices[device_index]
if not icloud.send_verification_code(device):
logger.error("Failed to send two-step authentication code")
sys.exit(1)
from pyicloud_ipd.response_types import (
SendVerificationCodeSuccess,
ValidateVerificationCodeSuccess,
)

send_result = icloud.send_verification_code(device)
match send_result:
case SendVerificationCodeSuccess(success):
if not success:
logger.error("Failed to send two-step authentication code")
return TwoFactorAuthFailed("Failed to send two-step authentication code")
case _:
logger.error("Failed to send two-step authentication code")
return TwoFactorAuthFailed("Failed to send two-step authentication code")

code = prompt_string("Please enter two-step authentication code")
if not icloud.validate_verification_code(device, code):
logger.error("Failed to verify two-step authentication code")
sys.exit(1)
validate_result = icloud.validate_verification_code(device, code)
match validate_result:
case ValidateVerificationCodeSuccess(success):
if not success:
logger.error("Failed to verify two-step authentication code")
return TwoFactorAuthFailed("Failed to verify two-step authentication code")
case _:
logger.error("Failed to verify two-step authentication code")
return TwoFactorAuthFailed("Failed to verify two-step authentication code")
logger.info(
"Great, you're all set up. The script can now be run without "
"user interaction until 2SA expires.\n"
"You can set up email notifications for when "
"the two-step authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()


def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> TwoFactorAuthResult:
"""Request two-factor authentication."""
devices = icloud.get_trusted_phone_numbers()
from pyicloud_ipd.response_types import (
Response2SARequired,
ResponseAPIError,
ResponseServiceNotActivated,
ResponseServiceUnavailable,
TrustedPhoneNumbersSuccess,
)

devices_result = icloud.get_trusted_phone_numbers()
match devices_result:
case TrustedPhoneNumbersSuccess(devices):
pass # Continue with devices
case (
Response2SARequired(_)
| ResponseServiceNotActivated(_, _)
| ResponseAPIError(_, _)
| ResponseServiceUnavailable(_)
):
return TwoFactorAuthFailed("Failed to get trusted phone numbers")

devices_count = len(devices)
device_index_alphabet = "abcdefghijklmnopqrstuvwxyz"
if devices_count > 0:
if devices_count > len(device_index_alphabet):
raise PyiCloudFailedMFAException("Too many trusted devices for authentication")
return TwoFactorAuthFailed("Too many trusted devices for authentication")

for i, device in enumerate(devices):
echo(f" {device_index_alphabet[i]}: {device.obfuscated_number}")
Expand Down Expand Up @@ -199,8 +315,15 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
# need to send code
device_index = device_index_alphabet.index(index_or_code)
device = devices[device_index]
if not icloud.send_2fa_code_sms(device.id):
raise PyiCloudFailedMFAException("Failed to send two-factor authentication code")
from pyicloud_ipd.response_types import Send2FACodeSMSSuccess

send_result = icloud.send_2fa_code_sms(device.id)
match send_result:
case Send2FACodeSMSSuccess(success):
if not success:
return TwoFactorAuthFailed("Failed to send two-factor authentication code")
case _:
return TwoFactorAuthFailed("Failed to send two-factor authentication code")
while True:
from foundation.string_utils import strip

Expand All @@ -213,11 +336,29 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
break
echo("Invalid code, should be six digits. Try again")

if not icloud.validate_2fa_code_sms(device.id, code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSMSSuccess

validate_result = icloud.validate_2fa_code_sms(device.id, code)
match validate_result:
case Validate2FACodeSMSSuccess(success):
if not success:
return TwoFactorAuthFailed(
"Failed to verify two-factor authentication code"
)
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
else:
if not icloud.validate_2fa_code(index_or_code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSuccess

validate_2fa_result = icloud.validate_2fa_code(index_or_code)
match validate_2fa_result:
case Validate2FACodeSuccess(success):
if not success:
return TwoFactorAuthFailed(
"Failed to verify two-factor authentication code"
)
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
else:
while True:
from foundation.string_utils import strip
Expand All @@ -226,23 +367,31 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
if len(code) == 6 and code.isdigit():
break
echo("Invalid code, should be six digits. Try again")
if not icloud.validate_2fa_code(code):
raise PyiCloudFailedMFAException("Failed to verify two-factor authentication code")
from pyicloud_ipd.response_types import Validate2FACodeSuccess

validate_2fa_result = icloud.validate_2fa_code(code)
match validate_2fa_result:
case Validate2FACodeSuccess(success):
if not success:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
case _:
return TwoFactorAuthFailed("Failed to verify two-factor authentication code")
logger.info(
"Great, you're all set up. The script can now be run without "
"user interaction until 2FA expires.\n"
"You can set up email notifications for when "
"the two-factor authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()


def request_2fa_web(
icloud: PyiCloudService, logger: logging.Logger, status_exchange: StatusExchange
) -> None:
) -> TwoFactorAuthResult:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_MFA):
raise PyiCloudFailedMFAException(
return TwoFactorAuthFailed(
f"Expected NO_INPUT_NEEDED, but got {status_exchange.get_status()}"
)

Expand All @@ -258,7 +407,7 @@ def request_2fa_web(
if status_exchange.replace_status(Status.SUPPLIED_MFA, Status.CHECKING_MFA):
code = status_exchange.get_payload()
if not code:
raise PyiCloudFailedMFAException(
return TwoFactorAuthFailed(
"Internal error: did not get code for SUPPLIED_MFA status"
)

Expand All @@ -268,7 +417,7 @@ def request_2fa_web(
# TODO give user an option to restart auth in case they missed code
continue
else:
raise PyiCloudFailedMFAException("Failed to chage status of invalid code")
return TwoFactorAuthFailed("Failed to change status of invalid code")
else:
status_exchange.replace_status(Status.CHECKING_MFA, Status.NO_INPUT_NEEDED) # done

Expand All @@ -279,5 +428,6 @@ def request_2fa_web(
"the two-factor authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
return TwoFactorAuthSuccess()
else:
raise PyiCloudFailedMFAException("Failed to change status")
return TwoFactorAuthFailed("Failed to change status")
Loading