Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- fix: smtp & script notifications terminate the program [#898](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/898)

## 1.29.0 (2025-07-19)

- fix: connection errors reported with stack trace [#1187](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1187)
Expand Down
15 changes: 3 additions & 12 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@
from pyicloud_ipd.raw_policy import RawTreatmentPolicy


class TwoStepAuthRequiredError(Exception):
"""
Raised when 2SA is required. base.py catches this exception
and sends an email notification.
"""


def authenticator(
logger: logging.Logger,
domain: str,
Expand All @@ -33,8 +26,8 @@ def authenticator(
mfa_provider: MFAProvider,
status_exchange: StatusExchange,
username: str,
notificator: Callable[[], None],
cookie_directory: str | None = None,
raise_error_on_2sa: bool = False,
client_id: str | None = None,
) -> PyiCloudService:
"""Authenticate with iCloud username and password"""
Expand Down Expand Up @@ -72,18 +65,16 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
writer(username, valid_password[0])

if icloud.requires_2fa:
if raise_error_on_2sa:
raise TwoStepAuthRequiredError("Two-factor authentication is required")
logger.info("Two-factor authentication is required (2fa)")
notificator()
if mfa_provider == MFAProvider.WEBUI:
request_2fa_web(icloud, logger, status_exchange)
else:
request_2fa(icloud, logger)

elif icloud.requires_2sa:
if raise_error_on_2sa:
raise TwoStepAuthRequiredError("Two-step authentication is required")
logger.info("Two-step authentication is required (2sa)")
notificator()
request_2sa(icloud, logger)

return icloud
Expand Down
97 changes: 54 additions & 43 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from tzlocal import get_localzone

from icloudpd import constants, download, exif_datetime
from icloudpd.authentication import TwoStepAuthRequiredError, authenticator
from icloudpd.authentication import authenticator
from icloudpd.autodelete import autodelete_photos
from icloudpd.config import Config
from icloudpd.counter import Counter
Expand Down Expand Up @@ -837,9 +837,23 @@ def main(
if directory is not None
else (lambda _s, _c, _p: False)
)
notificator = partial(
notificator_builder,
logger,
username,
smtp_username,
smtp_password,
smtp_host,
smtp_port,
smtp_no_tls,
notification_email,
notification_email_from,
notification_script,
)
result = core(
passer,
downloader,
notificator,
directory,
username,
auth_only,
Expand All @@ -855,15 +869,7 @@ def main(
auto_delete,
only_print_filenames,
folder_structure,
smtp_username,
smtp_password,
smtp_host,
smtp_port,
smtp_no_tls,
notification_email,
notification_email_from,
no_progress_bar,
notification_script,
delete_after_download,
keep_icloud_recent_days,
domain,
Expand All @@ -881,6 +887,43 @@ def main(
sys.exit(result)


def notificator_builder(
logger: logging.Logger,
username: str,
smtp_username: str | None,
smtp_password: str | None,
smtp_host: str,
smtp_port: int,
smtp_no_tls: bool,
notification_email: str | None,
notification_email_from: str | None,
notification_script: str | None,
) -> None:
try:
if notification_script is not None:
logger.debug("Executing notification script...")
subprocess.call([notification_script])
else:
pass
if smtp_username is not None or notification_email is not None:
send_2sa_notification(
logger,
username,
smtp_username,
smtp_password,
smtp_host,
smtp_port,
smtp_no_tls,
notification_email,
notification_email_from,
)
else:
pass
except Exception as error:
logger.error("Notification of the required MFA failed")
logger.debug(error)


def where_builder(
logger: logging.Logger,
skip_videos: bool,
Expand Down Expand Up @@ -1239,6 +1282,7 @@ def composed(ex: Exception, retries: int) -> None:
def core(
passer: Callable[[PhotoAsset], bool],
downloader: Callable[[PyiCloudService, Counter, PhotoAsset], bool],
notificator: Callable[[], None],
directory: str | None,
username: str,
auth_only: bool,
Expand All @@ -1254,15 +1298,7 @@ def core(
auto_delete: bool,
only_print_filenames: bool,
folder_structure: str,
smtp_username: str | None,
smtp_password: str | None,
smtp_host: str,
smtp_port: int,
smtp_no_tls: bool,
notification_email: str | None,
notification_email_from: str | None,
no_progress_bar: bool,
notification_script: str | None,
delete_after_download: bool,
keep_icloud_recent_days: int | None,
domain: str,
Expand All @@ -1282,11 +1318,6 @@ def core(
skip_bar = not os.environ.get("FORCE_TQDM") and (
only_print_filenames or no_progress_bar or not sys.stdout.isatty()
)
raise_error_on_2sa = (
smtp_username is not None
or notification_email is not None
or notification_script is not None
)
while True: # watch with interval
try:
icloud = authenticator(
Expand All @@ -1300,8 +1331,8 @@ def core(
mfa_provider,
status_exchange,
username,
notificator,
cookie_directory,
raise_error_on_2sa,
os.environ.get("CLIENT_ID"),
)

Expand Down Expand Up @@ -1518,26 +1549,6 @@ def should_break(counter: Counter) -> bool:
return 1
else:
pass
except TwoStepAuthRequiredError:
if notification_script is not None:
subprocess.call([notification_script])
else:
pass
if smtp_username is not None or notification_email is not None:
send_2sa_notification(
logger,
username,
smtp_username,
smtp_password,
smtp_host,
smtp_port,
smtp_no_tls,
notification_email,
notification_email_from,
)
else:
pass
return 1
if watch_interval: # pragma: no cover
logger.info(f"Waiting for {watch_interval} sec...")
interval: Sequence[int] = range(1, watch_interval)
Expand Down
68 changes: 3 additions & 65 deletions tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# import vcr
import pyicloud_ipd
from foundation.core import constant, identity
from icloudpd.authentication import TwoStepAuthRequiredError, authenticator
from icloudpd.authentication import authenticator
from icloudpd.base import dummy_password_writter, lp_filename_concatinator, main
from icloudpd.logger import setup_logger
from icloudpd.mfa_provider import MFAProvider
Expand Down Expand Up @@ -56,8 +56,8 @@ def test_failed_auth(self) -> None:
MFAProvider.CONSOLE,
StatusExchange(),
"bad_username",
lambda: None,
cookie_dir,
False,
"EC5646DE-9423-11E8-BF21-14109FE0B321",
)

Expand Down Expand Up @@ -96,68 +96,6 @@ def test_fallback_raw_password(self) -> None:
self.assertIn("INFO Authentication completed successfully", self._caplog.text)
assert result.exit_code == 0

def test_2sa_required(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")

for dir in [base_dir, cookie_dir]:
recreate_path(dir)

with vcr.use_cassette(os.path.join(self.vcr_path, "2sa_flow_valid_code.yml")):
with self.assertRaises(TwoStepAuthRequiredError) as context:
# To re-record this HTTP request,
# delete ./tests/vcr_cassettes/auth_requires_2sa.yml,
# put your actual credentials in here, run the test,
# and then replace with dummy credentials.
authenticator(
setup_logger(),
"com",
identity,
lp_filename_concatinator,
RawTreatmentPolicy.AS_IS,
FileMatchPolicy.NAME_SIZE_DEDUP_WITH_SUFFIX,
{"test": (constant("dummy"), dummy_password_writter)},
MFAProvider.CONSOLE,
StatusExchange(),
"jdoe@gmail.com",
cookie_dir,
True,
"DE309E26-942E-11E8-92F5-14109FE0B321",
)

self.assertTrue("Two-step authentication is required" in str(context.exception))

def test_2fa_required(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")

for dir in [base_dir, cookie_dir]:
recreate_path(dir)

with vcr.use_cassette(os.path.join(self.vcr_path, "auth_requires_2fa.yml")):
with self.assertRaises(TwoStepAuthRequiredError) as context:
# To re-record this HTTP request,
# delete ./tests/vcr_cassettes/auth_requires_2fa.yml,
# put your actual credentials in here, run the test,
# and then replace with dummy credentials.
authenticator(
setup_logger(),
"com",
identity,
lp_filename_concatinator,
RawTreatmentPolicy.AS_IS,
FileMatchPolicy.NAME_SIZE_DEDUP_WITH_SUFFIX,
{"test": (constant("dummy"), dummy_password_writter)},
MFAProvider.CONSOLE,
StatusExchange(),
"jdoe@gmail.com",
cookie_dir,
True,
"EC5646DE-9423-11E8-BF21-14109FE0B321",
)

self.assertTrue("Two-factor authentication is required" in str(context.exception))

def test_successful_token_validation(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
Expand Down Expand Up @@ -338,8 +276,8 @@ def test_non_2fa(self) -> None:
MFAProvider.CONSOLE,
StatusExchange(),
"jdoe@gmail.com",
lambda: None,
cookie_dir,
True,
"EC5646DE-9423-11E8-BF21-14109FE0B321",
)

Expand Down
Loading