Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,10 @@ def should_break(counter: Counter) -> bool:
PyiCloudConnectionErrorException,
) as error:
logger.info(error)
if isinstance(error, PyiCloudAPIResponseException) and "Invalid global session" in str(
error
):
continue
dump_responses(logger.debug, captured_responses)
# webui will display error and wait for password again
if (
Expand All @@ -1234,6 +1238,10 @@ def should_break(counter: Counter) -> bool:
logger.debug("Retrying...")
# these errors we can safely retry
continue
except OSError as error:
logger.error("IOError during file operations: %s", error)
dump_responses(logger.debug, captured_responses)
return 1
except Exception:
dump_responses(logger.debug, captured_responses)
raise
Expand Down
95 changes: 23 additions & 72 deletions src/icloudpd/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from tzlocal import get_localzone

# Import the constants object so that we can mock WAIT_SECONDS in tests
from icloudpd import constants
from pyicloud_ipd.asset_version import AssetVersion, calculate_version_filename
from pyicloud_ipd.base import PyiCloudService
from pyicloud_ipd.exceptions import PyiCloudAPIResponseException
from pyicloud_ipd.services.photos import PhotoAsset
from pyicloud_ipd.version_size import VersionSize

Expand Down Expand Up @@ -128,76 +126,29 @@ def download_media(
partial(download_response_to_path_dry_run, logger) if dry_run else download_response_to_path
)

retries = 0
while True:
try:
append_mode = os.path.exists(temp_download_path)
current_size = os.path.getsize(temp_download_path) if append_mode else 0
if append_mode:
logger.debug(f"Resuming downloading of {download_path} from {current_size}")

photo_response = photo.download(icloud.photos.session, version.url, current_size)
if photo_response.ok:
return download_local(
photo_response, temp_download_path, append_mode, download_path, photo.created
)
else:
# Use the standard original filename generator for error logging
from icloudpd.base import lp_filename_original as simple_lp_filename_generator

# Get the proper filename using filename_builder
base_filename = filename_builder(photo)
version_filename = calculate_version_filename(
base_filename, version, size, simple_lp_filename_generator, photo.item_type
)
logger.error(
"Could not find URL to download %s for size %s",
version_filename,
size.value,
)
break

except PyiCloudAPIResponseException as ex:
if "Invalid global session" in str(ex):
logger.error("Session error, re-authenticating...")
if retries > 0:
# If the first re-authentication attempt failed,
# start waiting a few seconds before retrying in case
# there are some issues with the Apple servers
time.sleep(constants.WAIT_SECONDS)

icloud.authenticate()
else:
# short circuiting 0 retries
if retries == constants.MAX_RETRIES:
break
# you end up here when p.e. throttling by Apple happens
wait_time = (retries + 1) * constants.WAIT_SECONDS
# Get the proper filename for error messages
error_filename = filename_builder(photo)
logger.error(
"Error downloading %s, retrying after %s seconds...", error_filename, wait_time
)
time.sleep(wait_time)

except OSError:
logger.error(
"IOError while writing file to %s. "
+ "You might have run out of disk space, or the file "
+ "might be too large for your OS. "
+ "Skipping this file...",
download_path,
)
break
retries = retries + 1
if retries >= constants.MAX_RETRIES:
break
if retries >= constants.MAX_RETRIES:
# Get the proper filename for error messages
error_filename = filename_builder(photo)
append_mode = os.path.exists(temp_download_path)
current_size = os.path.getsize(temp_download_path) if append_mode else 0
if append_mode:
logger.debug(f"Resuming downloading of {download_path} from {current_size}")

photo_response = photo.download(icloud.photos.session, version.url, current_size)
if photo_response.ok:
return download_local(
photo_response, temp_download_path, append_mode, download_path, photo.created
)
else:
# Use the standard original filename generator for error logging
from icloudpd.base import lp_filename_original as simple_lp_filename_generator

# Get the proper filename using filename_builder
base_filename = filename_builder(photo)
version_filename = calculate_version_filename(
base_filename, version, size, simple_lp_filename_generator, photo.item_type
)
logger.error(
"Could not download %s. Please try again later.",
error_filename,
"Could not find URL to download %s for size %s",
version_filename,
size.value,
)

return False
return False
4 changes: 2 additions & 2 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ def use_rules(self, rules: Sequence[Rule]) -> Generator[Sequence[Rule], Any, Non
finally:
self.observer_rules = temp_rules

def authenticate(self, force_refresh: bool = False) -> None:
def authenticate(self) -> None:
"""
Handles authentication, and persists cookies so that
subsequent logins will not cause additional e-mails from Apple.
"""

login_successful = False
if self.session_data.get("session_token") and not force_refresh:
if self.session_data.get("session_token"):
LOGGER.debug("Checking session token validity")
try:
self.data = self._validate_token()
Expand Down
182 changes: 39 additions & 143 deletions src/pyicloud_ipd/cmdline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#! /usr/bin/env python
"""
A Command Line Wrapper to allow easy use of pyicloud for
command line scripts, and related.
A Command Line Tool for managing iCloud keyring credentials.
"""

import argparse
Expand All @@ -10,60 +9,32 @@
from typing import NoReturn, Sequence

from foundation import version_info_formatted
from pyicloud_ipd.base import PyiCloudService
from pyicloud_ipd.exceptions import PyiCloudFailedLoginException
from foundation.predicates import in_pred
from foundation.string_utils import strip_and_lower

from . import utils


def main(args: Sequence[str] | None = None) -> NoReturn:
"""Main commandline entrypoint."""
"""Main commandline entrypoint for iCloud keyring management."""
if args is None:
args = sys.argv[1:]

parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool")
parser = argparse.ArgumentParser(description="iCloud Keyring Management Tool")

parser.add_argument(
"--username",
action="store",
dest="username",
default="",
help="Apple ID to Use",
)
parser.add_argument(
"--password",
action="store",
dest="password",
default="",
help=(
"Apple ID Password to Use; if unspecified, password will be "
"fetched from the system keyring."
),
)
parser.add_argument(
"-n",
"--non-interactive",
action="store_false",
dest="interactive",
default=True,
help="Disable interactive prompts.",
help="Apple ID username",
)
parser.add_argument(
"--delete-from-keyring",
action="store_true",
dest="delete_from_keyring",
default=False,
help="Delete stored password in system keyring for this username.",
help="Delete stored password from system keyring for this username",
)

parser.add_argument(
"--domain",
action="store",
dest="domain",
default="com",
help="Root domain for requests to iCloud. com or cn",
)

parser.add_argument(
"--version",
action="store_true",
Expand All @@ -77,117 +48,42 @@ def main(args: Sequence[str] | None = None) -> NoReturn:
print(version_info_formatted())
sys.exit(0)

from foundation.string_utils import strip

username: str | None = strip(command_line.username) or None
password: str | None = strip(command_line.password) or None
domain = command_line.domain
username: str | None = strip_and_lower(command_line.username) if command_line.username else None

if username is not None and command_line.delete_from_keyring:
utils.delete_password_in_keyring(username)
print("Password delete from keyring")
if username is None:
parser.error("--username is required")

failure_count = 0
while True:
# Which password we use is determined by your username, so we
# do need to check for this first and separately.
if username is None:
parser.error("No username supplied")

got_from_keyring = False

if password is None:
password = utils.get_password_from_keyring(username)
got_from_keyring = password is not None

if password is None:
password = strip(getpass.getpass(f"Enter iCloud password for {username}: ")) or None

if password is None:
parser.error("No password supplied")

try:

def password_provider(pwd: str | None = password) -> str | None:
return pwd

api = PyiCloudService(
domain,
username,
password_provider,
lambda _: None,
if command_line.delete_from_keyring:
# Delete password from keyring
if utils.password_exists_in_keyring(username):
user_response = strip_and_lower(
input(f"Delete password for {username} from keyring? [y/N] ")
)
from foundation.predicates import in_pred
from foundation.string_utils import strip_and_lower

if not got_from_keyring and command_line.interactive:
user_response = strip_and_lower(input("Save password in keyring? [y/N] "))
is_affirmative = in_pred(["y", "yes"])

if is_affirmative(user_response):
utils.store_password_in_keyring(username, password)

if api.requires_2fa:
# fmt: off
print(
"\nTwo-step authentication required.",
"\nPlease enter validation code"
)
# fmt: on

code = input("(string) --> ")
if not api.validate_2fa_code(code):
print("Failed to verify verification code")
sys.exit(1)

print("")

elif api.requires_2sa:
# fmt: off
print(
"\nTwo-step authentication required.",
"\nYour trusted devices are:"
)
# fmt: on

devices = api.trusted_devices
for i, device in enumerate(devices):
print(
" {}: {}".format(
i,
device.get("deviceName", "SMS to {}".format(device.get("phoneNumber"))),
)
)

print("\nWhich device would you like to use?")
device_index = int(input("(number) --> "))
device = devices[device_index]
if not api.send_verification_code(device):
print("Failed to send verification code")
sys.exit(1)

print("\nPlease enter validation code")
code = input("(string) --> ")
if not api.validate_verification_code(device, code):
print("Failed to verify verification code")
sys.exit(1)

print("")
break
except PyiCloudFailedLoginException as err:
# If they have a stored password; we just used it and
# it did not work; let's delete it if there is one.
if utils.password_exists_in_keyring(username):
utils.delete_password_in_keyring(username)

message = f"Bad username or password for {username}"
password = None
is_affirmative = in_pred(["y", "yes"])

failure_count += 1
if failure_count >= 3:
raise RuntimeError(message) from err

print(message, file=sys.stderr)
if is_affirmative(user_response):
utils.delete_password_in_keyring(username)
print(f"Password for {username} deleted from keyring")
else:
print("Operation cancelled")
else:
print(f"No password found in keyring for {username}")
else:
# Store password in keyring
password = getpass.getpass(f"Enter iCloud password for {username}: ")

if not password:
print("No password provided", file=sys.stderr)
sys.exit(1)

user_response = strip_and_lower(input(f"Save password for {username} to keyring? [y/N] "))
is_affirmative = in_pred(["y", "yes"])

if is_affirmative(user_response):
utils.store_password_in_keyring(username, password)
print(f"Password for {username} saved to keyring")
else:
print("Password not saved")

sys.exit(0)

Expand Down
Loading
Loading