Skip to content

Commit 02b7c01

Browse files
dump auth traffic on failures #1176 (#1208)
1 parent da3781f commit 02b7c01

File tree

12 files changed

+616
-33
lines changed

12 files changed

+616
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## Unreleased
44

5+
- debug: dump auth traffic in case of a failure [#1176](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1176)
56
- fix: transfer chunk issues terminate with error [#1202](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/1202)
67

78
## 1.29.2 (2025-07-21)

src/foundation/__init__.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import datetime
2-
from typing import Callable, NamedTuple, TypeVar
2+
from functools import partial
3+
from operator import is_, not_
4+
from typing import Callable, Iterable, Mapping, NamedTuple, Tuple, TypeVar
35

46
import pytz
57
from tzlocal import get_localzone
68

9+
from foundation.core import compose, fst, snd
10+
711

812
class VersionInfo(NamedTuple):
913
version: str
@@ -50,3 +54,31 @@ def _internal(input: T_in) -> T_out:
5054
raise ValueError(f"Invalid Input ({caption}): {input!r}") from err
5155

5256
return _internal
57+
58+
59+
# def is_none(input: T_in | None) -> bool:
60+
# return input is None
61+
62+
is_none = partial(is_, None)
63+
64+
65+
is_not_none = compose(not_, is_none)
66+
67+
empty_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[Tuple[T_in, T_out]]] = partial(
68+
filter, compose(is_none, snd)
69+
)
70+
keys_from_pairs: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = partial(map, fst)
71+
keys_for_empty_values: Callable[[Iterable[Tuple[T_in, T_out]]], Iterable[T_in]] = compose(
72+
keys_from_pairs, empty_pairs
73+
)
74+
75+
non_empty_pairs: Callable[[Iterable[Tuple[T_in, T_out | None]]], Iterable[Tuple[T_in, T_out]]] = (
76+
partial(filter, compose(is_not_none, snd))
77+
)
78+
79+
80+
def flat_dict(input: Iterable[Mapping[T_in, T_out]]) -> Mapping[T_in, T_out]:
81+
flattened_dict: dict[T_in, T_out] = {}
82+
for d in input:
83+
flattened_dict.update(d)
84+
return flattened_dict

src/foundation/http.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import json
2+
from functools import partial
3+
from http.cookiejar import Cookie
4+
from http.cookies import SimpleCookie
5+
from operator import eq, not_
6+
from typing import Any, Callable, Iterable, Mapping, Tuple
7+
8+
from requests import PreparedRequest, Response
9+
10+
from foundation import flat_dict, non_empty_pairs
11+
from foundation.core import compose, fst, snd
12+
13+
14+
def cookie_to_pair(cookie: Cookie) -> Tuple[str, str | None]:
15+
return (
16+
cookie.name,
17+
cookie.value,
18+
)
19+
20+
21+
jar_to_pairs: Callable[[Iterable[Cookie]], Iterable[Tuple[str, str]]] = compose(
22+
non_empty_pairs, partial(map, cookie_to_pair)
23+
)
24+
25+
26+
def cookie_str_to_dict(cookie_header: str) -> Mapping[str, str]:
27+
"""replace cookie header with dict object"""
28+
simple_cookie = SimpleCookie()
29+
simple_cookie.load(cookie_header)
30+
cookies = {k: v.value for k, v in simple_cookie.items()}
31+
return cookies
32+
33+
34+
def response_body(response: Response) -> Any:
35+
try:
36+
return response.json()
37+
except Exception:
38+
return response.text
39+
40+
41+
def request_body(request: PreparedRequest) -> Any:
42+
if request.body is not None:
43+
try:
44+
return json.loads(request.body) # ignore: type
45+
except Exception:
46+
pass
47+
return request.body
48+
49+
50+
def response_to_har_entry(response: Response) -> Mapping[str, Any]:
51+
# headers
52+
# converted_pairs = map(xxx)
53+
54+
# cookies from request header
55+
is_request_cookie = partial(eq, "Cookie")
56+
is_not_request_cookie = compose(not_, partial(eq, "Cookie"))
57+
cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = partial(
58+
filter, compose(is_request_cookie, fst)
59+
)
60+
not_request_cookie_headers: Callable[[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]] = (
61+
partial(filter, compose(is_not_request_cookie, fst))
62+
)
63+
cookie_header_strings: Callable[[Iterable[Tuple[str, str]]], Iterable[str]] = compose(
64+
partial(map, snd), cookie_headers
65+
)
66+
cookie_maps = compose(partial(map, cookie_str_to_dict), cookie_header_strings)
67+
extract_response_request_cookies = compose(flat_dict, cookie_maps)
68+
69+
is_not_response_cookie = compose(not_, partial(eq, "Set-Cookie"))
70+
not_response_cookie_headers: Callable[
71+
[Iterable[Tuple[str, str]]], Iterable[Tuple[str, str]]
72+
] = partial(filter, compose(is_not_response_cookie, fst))
73+
74+
return {
75+
"request": {
76+
"method": response.request.method,
77+
"url": response.request.url,
78+
"headers": dict(not_request_cookie_headers(response.request.headers.items())),
79+
"cookies": extract_response_request_cookies(response.request.headers.items()),
80+
"content": request_body(response.request),
81+
},
82+
"response": {
83+
"status_code": response.status_code,
84+
"headers": dict(not_response_cookie_headers(response.headers.items())),
85+
"cookies": dict(jar_to_pairs(response.cookies)),
86+
"content": response_body(response),
87+
},
88+
}

src/foundation/json.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import re
2+
from functools import partial, singledispatch
3+
from operator import is_, not_
4+
from typing import Any, Callable, Iterable, Mapping, Sequence, Tuple, TypeVar
5+
6+
from foundation import non_empty_pairs
7+
from foundation.core import compose, flip, fst
8+
9+
T1 = TypeVar("T1")
10+
T2 = TypeVar("T2")
11+
12+
# def extract_context(context: Context[T1], pair: Tuple[T1, T2]) -> Tuple[Context[T1], T2]:
13+
# return (context + [pair[0]], pair[1])
14+
15+
Context = str
16+
17+
18+
def extract_context(context: Context, pair: Tuple[str, T1]) -> Tuple[Context, T1]:
19+
new_context = context + ("." if context and not context.endswith(".") else "") + pair[0]
20+
return (new_context, pair[1])
21+
22+
23+
Rule = Tuple[re.Pattern[str], Callable[[str], str | None]]
24+
25+
26+
def first(input: Iterable[T1]) -> T1 | StopIteration:
27+
for item in input:
28+
return item
29+
return StopIteration()
30+
31+
32+
def first_or_default(input: Iterable[T1], default: T2) -> T1 | T2:
33+
for item in input:
34+
return item
35+
return default
36+
37+
38+
first_or_none: Callable[[Iterable[T1]], T1 | None] = partial(flip(first_or_default), None)
39+
40+
is_none = partial(is_, None)
41+
42+
is_not_none = compose(not_, is_none)
43+
44+
45+
def first_matching_rule(context: Context, rules: Sequence[Rule]) -> Rule | None:
46+
match_with_context = partial(flip(re.search), context)
47+
match_on_key_of_pair = compose(match_with_context, fst)
48+
is_matching_pair = compose(is_not_none, match_on_key_of_pair)
49+
filter_pairs: Callable[[Iterable[Rule]], Iterable[Rule]] = partial(filter, is_matching_pair)
50+
first_matching = compose(first_or_none, filter_pairs)
51+
first_found = first_matching(rules)
52+
return first_found
53+
54+
55+
@singledispatch
56+
def _apply_rules_internal(input: Any, context: Context, rules: Sequence[Rule]) -> Any:
57+
# print(f"ANY {context} {input}")
58+
return input
59+
60+
61+
@_apply_rules_internal.register(str)
62+
def _(input: str, context: Context, rules: Sequence[Rule]) -> str | None:
63+
# print(f"STR {context} {input}")
64+
first_found_rule = first_matching_rule(context, rules)
65+
if first_found_rule is None:
66+
# no pattern matched - return unchanged
67+
return input
68+
else:
69+
return first_found_rule[1](input)
70+
71+
72+
@_apply_rules_internal.register(tuple)
73+
def _(input: Tuple[str, Any], context: Context, rules: Sequence[Rule]) -> Any:
74+
# print(f"TUPLE {context} {input}")
75+
first_found_rule = first_matching_rule(context, rules)
76+
if first_found_rule is None:
77+
# no pattern matched - continue recursive
78+
new_context, new_value = extract_context(context, input)
79+
return (input[0], _apply_rules_internal(new_value, new_context, rules))
80+
else:
81+
# this is to allow overriding the whole object with string
82+
return (input[0], first_found_rule[1]("tuple"))
83+
84+
85+
filter_not_none: Callable[[Iterable[T1 | None]], Iterable[T1]] = partial(filter, is_not_none)
86+
87+
88+
def apply_rules(context: Context, rules: Sequence[Rule], input: Any) -> Any:
89+
return _apply_rules_internal(input, context, rules)
90+
91+
92+
@_apply_rules_internal.register(list)
93+
def _(input: Sequence[Any], context: Context, rules: Sequence[Rule]) -> Any:
94+
# print(f"LIST {context} {input}")
95+
first_found_rule = first_matching_rule(context, rules)
96+
if first_found_rule is None:
97+
# no pattern matched - continue recursive
98+
apply_context_rules = partial(apply_rules, context + ".", rules)
99+
apply_rule_iter = partial(map, apply_context_rules)
100+
apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
101+
filter_not_none, apply_rule_iter
102+
)
103+
materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
104+
list, apply_and_filter
105+
)
106+
return materialized_apply_and_filter(input)
107+
else:
108+
# this is to allow overriding the whole list with string
109+
return first_found_rule[1]("list")
110+
111+
112+
@_apply_rules_internal.register(dict)
113+
def _(input: Mapping[str, Any], context: Context, rules: Sequence[Rule]) -> Any:
114+
# print(f"DICT {context} {input}")
115+
first_found_rule = first_matching_rule(context, rules)
116+
if first_found_rule is None:
117+
# no pattern matched - continue recursive
118+
apply_context_rules = partial(apply_rules, context, rules)
119+
apply_rule_iter = partial(map, apply_context_rules)
120+
apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
121+
non_empty_pairs, apply_rule_iter
122+
)
123+
materialized_apply_and_filter: Callable[[Iterable[Any]], Iterable[Any]] = compose(
124+
dict, apply_and_filter
125+
)
126+
return materialized_apply_and_filter(input.items())
127+
else:
128+
# this is to allow overriding the whole list with string
129+
return first_found_rule[1]("dict")
130+
131+
132+
def re_compile_flag(flags: re.RegexFlag, input: str) -> re.Pattern[str]:
133+
return re.compile(input, flags)
134+
135+
136+
re_compile_ignorecase: Callable[[str], re.Pattern[str]] = partial(
137+
re_compile_flag, re.RegexFlag.IGNORECASE
138+
)
139+
140+
compile_patterns: Callable[[Iterable[str]], Iterable[re.Pattern[str]]] = partial(
141+
map, re_compile_ignorecase
142+
)

src/foundation/string.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import binascii
2+
from functools import singledispatch
3+
from typing import Any
4+
5+
6+
@singledispatch
7+
def obfuscate(_input: Any) -> str:
8+
raise NotImplementedError()
9+
10+
11+
@obfuscate.register(str)
12+
def _(input: str) -> str:
13+
return f"OBFUSCATED-{binascii.crc32(input.encode('utf-8'))}"

src/icloudpd/authentication.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sys
55
import time
66
from functools import partial
7-
from typing import Callable, Dict, List, Tuple
7+
from typing import Any, Callable, Dict, List, Mapping, Tuple
88

99
import click
1010

@@ -27,6 +27,7 @@ def authenticator(
2727
status_exchange: StatusExchange,
2828
username: str,
2929
notificator: Callable[[], None],
30+
response_observer: Callable[[Mapping[str, Any]], None] | None = None,
3031
cookie_directory: str | None = None,
3132
client_id: str | None = None,
3233
) -> PyiCloudService:
@@ -51,6 +52,7 @@ def password_provider(username: str, valid_password: List[str]) -> str | None:
5152
file_match_policy,
5253
username,
5354
partial(password_provider, username, valid_password),
55+
response_observer,
5456
cookie_directory=cookie_directory,
5557
client_id=client_id,
5658
)

0 commit comments

Comments
 (0)