Skip to content

Commit 168f2cb

Browse files
authored
Merge pull request #203 from TideDra/fix-issue-202-bufferedreader
Fix non-pickleable retriever worker failures
2 parents c2330b9 + 8c2d48e commit 168f2cb

3 files changed

Lines changed: 89 additions & 8 deletions

File tree

src/zotero_arxiv_daily/retriever/arxiv_retriever.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ def extract_text_from_pdf(paper: ArxivResult) -> str | None:
7676
if paper.pdf_url is None:
7777
logger.warning(f"No PDF URL available for {paper.title}")
7878
return None
79-
urlretrieve(paper.pdf_url, path)
79+
try:
80+
urlretrieve(paper.pdf_url, path)
81+
except Exception as e:
82+
logger.warning(f"Failed to download pdf for {paper.title}: {type(e).__name__}: {e}")
83+
return None
8084
try:
8185
full_text = extract_markdown_from_pdf(path)
8286
except Exception as e:
@@ -91,7 +95,11 @@ def extract_text_from_tar(paper: ArxivResult) -> str | None:
9195
if source_url is None:
9296
logger.warning(f"No source URL available for {paper.title}")
9397
return None
94-
urlretrieve(source_url, path)
98+
try:
99+
urlretrieve(source_url, path)
100+
except Exception as e:
101+
logger.warning(f"Failed to download source for {paper.title}: {type(e).__name__}: {e}")
102+
return None
95103
try:
96104
file_contents = extract_tex_code_from_tar(path, paper.entry_id)
97105
if "all" not in file_contents:
@@ -101,4 +109,4 @@ def extract_text_from_tar(paper: ArxivResult) -> str | None:
101109
except Exception as e:
102110
logger.warning(f"Failed to extract full text of {paper.title} from tar: {e}")
103111
full_text = None
104-
return full_text
112+
return full_text

src/zotero_arxiv_daily/retriever/base.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,30 @@
55
from tqdm import tqdm
66
from typing import Type
77
from loguru import logger
8+
9+
10+
def _describe_raw_paper(raw_paper: RawPaperItem) -> str:
11+
title = getattr(raw_paper, "title", None)
12+
if title:
13+
return str(title)
14+
if isinstance(raw_paper, dict):
15+
for key in ("title", "entry_id", "id", "doi"):
16+
value = raw_paper.get(key)
17+
if value:
18+
return str(value)
19+
return repr(raw_paper)
20+
21+
22+
def _convert_to_paper_safe(retriever: "BaseRetriever", raw_paper: RawPaperItem) -> Paper | None:
23+
try:
24+
return retriever.convert_to_paper(raw_paper)
25+
except Exception as exc:
26+
logger.warning(
27+
f"Skipping paper {_describe_raw_paper(raw_paper)}: {type(exc).__name__}: {exc}"
28+
)
29+
return None
30+
31+
832
class BaseRetriever(ABC):
933
name: str
1034
def __init__(self, config:DictConfig):
@@ -24,10 +48,17 @@ def retrieve_papers(self) -> list[Paper]:
2448
papers = []
2549
logger.info("Processing papers...")
2650
with ProcessPoolExecutor(max_workers=self.config.executor.max_workers) as exec_pool:
27-
futures = {exec_pool.submit(self.convert_to_paper, rp): i for i, rp in enumerate(raw_papers)}
51+
futures = {exec_pool.submit(_convert_to_paper_safe, self, rp): i for i, rp in enumerate(raw_papers)}
2852
papers = [None] * len(raw_papers)
2953
for future in tqdm(as_completed(futures), total=len(raw_papers), desc="Converting papers"):
30-
papers[futures[future]] = future.result()
54+
try:
55+
papers[futures[future]] = future.result()
56+
except Exception as exc:
57+
raw_paper = raw_papers[futures[future]]
58+
logger.warning(
59+
f"Skipping paper {_describe_raw_paper(raw_paper)} after worker failure: "
60+
f"{type(exc).__name__}: {exc}"
61+
)
3162
return [p for p in papers if p is not None]
3263

3364
registered_retrievers = {}
@@ -42,4 +73,4 @@ def decorator(cls):
4273
def get_retriever_cls(name:str) -> Type[BaseRetriever]:
4374
if name not in registered_retrievers:
4475
raise ValueError(f"Retriever {name} not found")
45-
return registered_retrievers[name]
76+
return registered_retrievers[name]

tests/retriever/test_arxiv_retriever.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from zotero_arxiv_daily.retriever.arxiv_retriever import ArxivRetriever
2+
from zotero_arxiv_daily.retriever.base import BaseRetriever, register_retriever
3+
from zotero_arxiv_daily.protocol import Paper
24
import feedparser
3-
import pickle
5+
import io
6+
from omegaconf import open_dict
7+
from urllib.error import HTTPError
48

59
def test_arxiv_retriever(config, monkeypatch):
610

@@ -18,4 +22,42 @@ def mock_feedparser_parse(url):
1822
assert len(papers) == len(parsed_results)
1923
paper_titles = [i.title for i in papers]
2024
parsed_titles = [i.title for i in parsed_results]
21-
assert set(paper_titles) == set(parsed_titles)
25+
assert set(paper_titles) == set(parsed_titles)
26+
27+
28+
@register_retriever("failing_test")
29+
class FailingTestRetriever(BaseRetriever):
30+
def _retrieve_raw_papers(self) -> list[dict[str, str]]:
31+
return [
32+
{"title": "good paper", "mode": "ok"},
33+
{"title": "bad paper", "mode": "fail"},
34+
]
35+
36+
def convert_to_paper(self, raw_paper: dict[str, str]) -> Paper | None:
37+
if raw_paper["mode"] == "fail":
38+
raise HTTPError(
39+
url="https://example.com/paper.pdf",
40+
code=404,
41+
msg="not found",
42+
hdrs=None,
43+
fp=io.BufferedReader(io.BytesIO(b"missing")),
44+
)
45+
return Paper(
46+
source=self.name,
47+
title=raw_paper["title"],
48+
authors=[],
49+
abstract="",
50+
url=f"https://example.com/{raw_paper['mode']}",
51+
)
52+
53+
54+
def test_retrieve_papers_skips_non_pickleable_worker_errors(config):
55+
with open_dict(config.source):
56+
config.source.failing_test = {}
57+
config.executor.max_workers = 2
58+
59+
retriever = FailingTestRetriever(config)
60+
61+
papers = retriever.retrieve_papers()
62+
63+
assert [paper.title for paper in papers] == ["good paper"]

0 commit comments

Comments
 (0)