|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -import builtins |
4 | 3 | import email.feedparser |
5 | 4 | import email.header |
6 | 5 | import email.message |
7 | 6 | import email.parser |
8 | 7 | import email.policy |
9 | 8 | import pathlib |
| 9 | +import sys |
10 | 10 | import typing |
11 | 11 | from typing import ( |
12 | 12 | Any, |
|
24 | 24 | T = typing.TypeVar("T") |
25 | 25 |
|
26 | 26 |
|
27 | | -if "ExceptionGroup" in builtins.__dict__: # pragma: no cover |
| 27 | +if sys.version_info >= (3, 11): # pragma: no cover |
28 | 28 | ExceptionGroup = ExceptionGroup |
29 | 29 | else: # pragma: no cover |
30 | 30 |
|
@@ -222,12 +222,14 @@ def _get_payload(msg: email.message.Message, source: bytes | str) -> str: |
222 | 222 | # If our source is a str, then our caller has managed encodings for us, |
223 | 223 | # and we don't need to deal with it. |
224 | 224 | if isinstance(source, str): |
225 | | - payload: str = msg.get_payload() |
| 225 | + payload = msg.get_payload() |
| 226 | + assert isinstance(payload, str) |
226 | 227 | return payload |
227 | 228 | # If our source is a bytes, then we're managing the encoding and we need |
228 | 229 | # to deal with it. |
229 | 230 | else: |
230 | | - bpayload: bytes = msg.get_payload(decode=True) |
| 231 | + bpayload = msg.get_payload(decode=True) |
| 232 | + assert isinstance(bpayload, bytes) |
231 | 233 | try: |
232 | 234 | return bpayload.decode("utf8", "strict") |
233 | 235 | except UnicodeDecodeError as exc: |
@@ -434,7 +436,7 @@ def parse_email(data: bytes | str) -> tuple[RawMetadata, dict[str, list[str]]]: |
434 | 436 | payload = _get_payload(parsed, data) |
435 | 437 | except ValueError: |
436 | 438 | unparsed.setdefault("description", []).append( |
437 | | - parsed.get_payload(decode=isinstance(data, bytes)) |
| 439 | + parsed.get_payload(decode=isinstance(data, bytes)) # type: ignore[call-overload] |
438 | 440 | ) |
439 | 441 | else: |
440 | 442 | if payload: |
|
0 commit comments