Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- debug: dump auth traffic in case of a failure [#1176](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1176)
- fix: transfer chunk issues terminate with error [#1202](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1202)

## 1.29.2 (2025-07-21)
Expand Down
34 changes: 33 additions & 1 deletion src/foundation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import datetime
from typing import Callable, NamedTuple, TypeVar
from functools import partial
from operator import is_, not_
from typing import Callable, Iterable, Mapping, NamedTuple, Tuple, TypeVar

import pytz
from tzlocal import get_localzone

from foundation.core import compose, fst, snd


class VersionInfo(NamedTuple):
version: str
Expand Down Expand Up @@ -50,3 +54,31 @@ def _internal(input: T_in) -> T_out:
raise ValueError(f"Invalid Input ({caption}): {input!r}") from err

return _internal


# def is_none(input: T_in | None) -> bool:
# return input is None

is_none = partial(is_, None)


is_not_none = compose(not_, is_none)

empty_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[Tuple[T_in, T_out]]] = partial(
filter, compose(is_none, snd)
)
keys_from_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = partial(map, fst)
keys_for_empty_values: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = compose(
keys_from_pairs, empty_pairs
)

non_empty_pairs: Callable[[Iterable[Tuple[T_in, T_out | None]]], Iterable[Tuple[T_in, T_out]]] = (
partial(filter, compose(is_not_none, snd))
)


def flat_dict(input: Iterable[Mapping[T_in, T_out]]) -> Mapping[T_in, T_out]:
flattened_dict: dict[T_in, T_out] = {}
for d in input:
flattened_dict.update(d)
return flattened_dict
88 changes: 88 additions & 0 deletions src/foundation/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import json
from functools import partial
from http.cookiejar import Cookie
from http.cookies import SimpleCookie
from operator import eq, not_
from typing import Any, Callable, Iterable, Mapping, Tuple

from requests import PreparedRequest, Response

from foundation import flat_dict, non_empty_pairs
from foundation.core import compose, fst, snd


def cookie_to_pair(cookie: Cookie) -> Tuple[str, str | None]:
return (
cookie.name,
cookie.value,
)


jar_to_pairs: Callable[[Iterable[Cookie]], Iterable[Tuple[str, str]]] = compose(
non_empty_pairs, partial(map, cookie_to_pair)
)


def cookie_str_to_dict(cookie_header: str) -> Mapping[str, str]:
"""replace cookie header with dict object"""
simple_cookie = SimpleCookie()
simple_cookie.load(cookie_header)
cookies = {k: v.value for k, v in simple_cookie.items()}
return cookies


def response_body(response: Response) -> Any:
try:
return response.json()
except Exception:
return response.text


def request_body(request: PreparedRequest) -> Any:
if request.body is not None:
try:
return json.loads(request.body) # ignore: type
except Exception:
pass
return request.body


def response_to_har_entry(response: Response) -> Mapping[str, Any]:
# headers
# converted_pairs = map(xxx)

# cookies from request header
is_request_cookie = partial(eq, "Cookie")
is_not_request_cookie = compose(not_, partial(eq, "Cookie"))
cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = partial(
filter, compose(is_request_cookie, fst)
)
not_request_cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = (
partial(filter, compose(is_not_request_cookie, fst))
)
cookie_header_strings: Callable[[Iterable[Tuple[str, str]]], Iterable[str]] = compose(
partial(map, snd), cookie_headers
)
cookie_maps = compose(partial(map, cookie_str_to_dict), cookie_header_strings)
extract_response_request_cookies = compose(flat_dict, cookie_maps)

is_not_response_cookie = compose(not_, partial(eq, "Set-Cookie"))
not_response_cookie_headers: Callable[
[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]
] = partial(filter, compose(is_not_response_cookie, fst))

return {
"request": {
"method": response.request.method,
"url": response.request.url,
"headers": dict(not_request_cookie_headers(response.request.headers.items())),
"cookies": extract_response_request_cookies(response.request.headers.items()),
"content": request_body(response.request),
},
"response": {
"status_code": response.status_code,
"headers": dict(not_response_cookie_headers(response.headers.items())),
"cookies": dict(jar_to_pairs(response.cookies)),
"content": response_body(response),
},
}
142 changes: 142 additions & 0 deletions src/foundation/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import re
from functools import partial, singledispatch
from operator import is_, not_
from typing import Any, Callable, Iterable, Mapping, Sequence, Tuple, TypeVar

from foundation import non_empty_pairs
from foundation.core import compose, flip, fst

T1 = TypeVar("T1")
T2 = TypeVar("T2")

# def extract_context(context: Context[T1], pair: Tuple[T1, T2]) -> Tuple[Context[T1], T2]:
# return (context + [pair[0]], pair[1])

Context = str


def extract_context(context: Context, pair: Tuple[str, T1]) -> Tuple[Context, T1]:
new_context = context + ("." if context and not context.endswith(".") else "") + pair[0]
return (new_context, pair[1])


Rule = Tuple[re.Pattern[str], Callable[[str], str | None]]


def first(input: Iterable[T1]) -> T1 | StopIteration:
for item in input:
return item
return StopIteration()


def first_or_default(input: Iterable[T1], default: T2) -> T1 | T2:
for item in input:
return item
return default


first_or_none: Callable[[Iterable[T1]], T1 | None] = partial(flip(first_or_default), None)

is_none = partial(is_, None)

is_not_none = compose(not_, is_none)


def first_matching_rule(context: Context, rules: Sequence[Rule]) -> Rule | None:
match_with_context = partial(flip(re.search), context)
match_on_key_of_pair = compose(match_with_context, fst)
is_matching_pair = compose(is_not_none, match_on_key_of_pair)
filter_pairs: Callable[[Iterable[Rule]], Iterable[Rule]] = partial(filter, is_matching_pair)
first_matching = compose(first_or_none, filter_pairs)
first_found = first_matching(rules)
return first_found


@singledispatch
def _apply_rules_internal(input: Any, context: Context, rules: Sequence[Rule]) -> Any:
# print(f"ANY {context} {input}")
return input


@_apply_rules_internal.register(str)
def _(input: str, context: Context, rules: Sequence[Rule]) -> str | None:
# print(f"STR {context} {input}")
first_found_rule = first_matching_rule(context, rules)
if first_found_rule is None:
# no pattern matched - return unchanged
return input
else:
return first_found_rule[1](input)


@_apply_rules_internal.register(tuple)
def _(input: Tuple[str, Any], context: Context, rules: Sequence[Rule]) -> Any:
# print(f"TUPLE {context} {input}")
first_found_rule = first_matching_rule(context, rules)
if first_found_rule is None:
# no pattern matched - continue recursive
new_context, new_value = extract_context(context, input)
return (input[0], _apply_rules_internal(new_value, new_context, rules))
else:
# this is to allow overriding the whole object with string
return (input[0], first_found_rule[1]("tuple"))


filter_not_none: Callable[[Iterable[T1 | None]], Iterable[T1]] = partial(filter, is_not_none)


def apply_rules(context: Context, rules: Sequence[Rule], input: Any) -> Any:
return _apply_rules_internal(input, context, rules)


@_apply_rules_internal.register(list)
def _(input: Sequence[Any], context: Context, rules: Sequence[Rule]) -> Any:
# print(f"LIST {context} {input}")
first_found_rule = first_matching_rule(context, rules)
if first_found_rule is None:
# no pattern matched - continue recursive
apply_context_rules = partial(apply_rules, context + ".", rules)
apply_rule_iter = partial(map, apply_context_rules)
apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
filter_not_none, apply_rule_iter
)
materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
list, apply_and_filter
)
return materialized_apply_and_filter(input)
else:
# this is to allow overriding the whole list with string
return first_found_rule[1]("list")


@_apply_rules_internal.register(dict)
def _(input: Mapping[str, Any], context: Context, rules: Sequence[Rule]) -> Any:
# print(f"DICT {context} {input}")
first_found_rule = first_matching_rule(context, rules)
if first_found_rule is None:
# no pattern matched - continue recursive
apply_context_rules = partial(apply_rules, context, rules)
apply_rule_iter = partial(map, apply_context_rules)
apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
non_empty_pairs, apply_rule_iter
)
materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
dict, apply_and_filter
)
return materialized_apply_and_filter(input.items())
else:
# this is to allow overriding the whole list with string
return first_found_rule[1]("dict")


def re_compile_flag(flags: re.RegexFlag, input: str) -> re.Pattern[str]:
return re.compile(input, flags)


re_compile_ignorecase: Callable[[str], re.Pattern[str]] = partial(
re_compile_flag, re.RegexFlag.IGNORECASE
)

compile_patterns: Callable[[Iterable[str]], Iterable[re.Pattern[str]]] = partial(
map, re_compile_ignorecase
)
13 changes: 13 additions & 0 deletions src/foundation/string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import binascii
from functools import singledispatch
from typing import Any


@singledispatch
def obfuscate(_input: Any) -> str:
raise NotImplementedError()


@obfuscate.register(str)
def _(input: str) -> str:
return f"OBFUSCATED-{binascii.crc32(input.encode('utf-8'))}"
4 changes: 3 additions & 1 deletion src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import time
from functools import partial
from typing import Callable, Dict, List, Tuple
from typing import Any, Callable, Dict, List, Mapping, Tuple

import click

Expand All @@ -27,6 +27,7 @@ def authenticator(
status_exchange: StatusExchange,
username: str,
notificator: Callable[[], None],
response_observer: Callable[[Mapping[str, Any]], None] | None = None,
cookie_directory: str | None = None,
client_id: str | None = None,
) -> PyiCloudService:
Expand All @@ -51,6 +52,7 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
file_match_policy,
username,
partial(password_provider, username, valid_password),
response_observer,
cookie_directory=cookie_directory,
client_id=client_id,
)
Expand Down
Loading
Loading