Skip to content

Commit 7b18a4b

Browse files
authored
Merge pull request #209 from TideDra/dev
extract text from arxiv html
2 parents 0db4eb5 + 41dc0b1 commit 7b18a4b

7 files changed

Lines changed: 768 additions & 427 deletions

File tree

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ reranker:
154154
executor:
155155
debug: false # Whether to use debug mode. Example: true
156156
send_empty: false # Whether to send an empty email even if no new papers today. Example: true
157-
max_workers: 4 # Concurrent workers for processing papers. Example: 4
158157
max_paper_num: 100 # The maximum number of the papers presented in the email. Example: 100
159158
source: ??? # The sources of papers to retrieve. Example: ['arxiv','biorxiv','medrxiv']
160159
reranker: local # The reranker to use. Example: 'local' or 'api'

config/base.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ reranker:
4545
executor:
4646
debug: false # Whether to use debug mode. Example: true
4747
send_empty: false # Whether to send an empty email even if no new papers today. Example: true
48-
max_workers: 4 # Concurrent workers for processing papers. Example: 4
4948
max_paper_num: 100 # The maximum number of the papers presented in the email. Example: 100
5049
source: ??? # The sources of papers to retrieve. Example: ['arxiv','biorxiv','medrxiv']
5150
reranker: local # The reranker to use. Example: 'local' or 'api'

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies = [
2525
"pymupdf-layout>=1.27.1",
2626
"dotenv>=0.9.9",
2727
"peft>=0.18.1",
28+
"trafilatura>=2.0.0",
2829
]
2930

3031
[tool.uv.sources]

src/zotero_arxiv_daily/retriever/arxiv_retriever.py

Lines changed: 136 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,116 @@
44
from ..protocol import Paper
55
from ..utils import extract_markdown_from_pdf, extract_tex_code_from_tar
66
from tempfile import TemporaryDirectory
7-
from concurrent.futures import ThreadPoolExecutor, TimeoutError
87
import feedparser
9-
from urllib.request import urlretrieve
108
from tqdm import tqdm
9+
import multiprocessing
1110
import os
11+
from queue import Empty
12+
from typing import Any, Callable, TypeVar
1213
from loguru import logger
14+
import requests
1315

16+
T = TypeVar("T")
17+
18+
DOWNLOAD_TIMEOUT = (10, 60)
1419
PDF_EXTRACT_TIMEOUT = 180
20+
TAR_EXTRACT_TIMEOUT = 180
21+
22+
23+
def _download_file(url: str, path: str) -> None:
24+
with requests.get(url, stream=True, timeout=DOWNLOAD_TIMEOUT) as response:
25+
response.raise_for_status()
26+
with open(path, "wb") as file:
27+
for chunk in response.iter_content(chunk_size=1024 * 1024):
28+
if chunk:
29+
file.write(chunk)
30+
31+
32+
def _run_in_subprocess(
33+
result_queue: Any,
34+
func: Callable[..., T | None],
35+
args: tuple[Any, ...],
36+
) -> None:
37+
try:
38+
result_queue.put(("ok", func(*args)))
39+
except Exception as exc:
40+
result_queue.put(("error", f"{type(exc).__name__}: {exc}"))
41+
42+
43+
def _run_with_hard_timeout(
44+
func: Callable[..., T | None],
45+
args: tuple[Any, ...],
46+
*,
47+
timeout: float,
48+
operation: str,
49+
paper_title: str,
50+
) -> T | None:
51+
start_methods = multiprocessing.get_all_start_methods()
52+
context = multiprocessing.get_context("fork" if "fork" in start_methods else start_methods[0])
53+
result_queue = context.Queue()
54+
process = context.Process(target=_run_in_subprocess, args=(result_queue, func, args))
55+
process.start()
56+
57+
try:
58+
status, payload = result_queue.get(timeout=timeout)
59+
except Empty:
60+
if process.is_alive():
61+
process.kill()
62+
process.join(5)
63+
result_queue.close()
64+
result_queue.join_thread()
65+
logger.warning(f"{operation} timed out for {paper_title} after {timeout} seconds")
66+
return None
67+
68+
process.join(5)
69+
result_queue.close()
70+
result_queue.join_thread()
71+
72+
if status == "ok":
73+
return payload
74+
75+
logger.warning(f"{operation} failed for {paper_title}: {payload}")
76+
return None
77+
78+
79+
def _extract_text_from_pdf_worker(pdf_url: str) -> str:
80+
with TemporaryDirectory() as temp_dir:
81+
path = os.path.join(temp_dir, "paper.pdf")
82+
_download_file(pdf_url, path)
83+
return extract_markdown_from_pdf(path)
84+
85+
86+
def _extract_text_from_html_worker(html_url: str) -> str | None:
87+
import trafilatura
88+
89+
downloaded = trafilatura.fetch_url(html_url)
90+
if downloaded is None:
91+
raise ValueError(f"Failed to download HTML from {html_url}")
92+
text = trafilatura.extract(downloaded, include_comments=False, include_tables=False)
93+
if not text:
94+
raise ValueError(f"No text extracted from {html_url}")
95+
return text
96+
97+
98+
def _extract_text_from_tar_worker(source_url: str, paper_id: str) -> str | None:
99+
with TemporaryDirectory() as temp_dir:
100+
path = os.path.join(temp_dir, "paper.tar.gz")
101+
_download_file(source_url, path)
102+
file_contents = extract_tex_code_from_tar(path, paper_id)
103+
if not file_contents or "all" not in file_contents:
104+
raise ValueError("Main tex file not found.")
105+
return file_contents["all"]
106+
107+
15108
@register_retriever("arxiv")
16109
class ArxivRetriever(BaseRetriever):
17110
def __init__(self, config):
18111
super().__init__(config)
19112
if self.config.source.arxiv.category is None:
20113
raise ValueError("category must be specified for arxiv.")
114+
21115
def _retrieve_raw_papers(self) -> list[ArxivResult]:
22-
client = arxiv.Client(num_retries=10,delay_seconds=10)
116+
client = arxiv.Client(num_retries=10, delay_seconds=10)
23117
query = '+'.join(self.config.source.arxiv.category)
24118
include_cross_list = self.config.source.arxiv.get("include_cross_list", False)
25119
# Get the latest paper from arxiv rss feed
@@ -38,26 +132,23 @@ def _retrieve_raw_papers(self) -> list[ArxivResult]:
38132

39133
# Get full information of each paper from arxiv api
40134
bar = tqdm(total=len(all_paper_ids))
41-
for i in range(0,len(all_paper_ids),20):
42-
search = arxiv.Search(id_list=all_paper_ids[i:i+20])
135+
for i in range(0, len(all_paper_ids), 20):
136+
search = arxiv.Search(id_list=all_paper_ids[i:i + 20])
43137
batch = list(client.results(search))
44138
bar.update(len(batch))
45139
raw_papers.extend(batch)
46140
bar.close()
47141

48142
return raw_papers
49143

50-
def convert_to_paper(self, raw_paper:ArxivResult) -> Paper:
144+
def convert_to_paper(self, raw_paper: ArxivResult) -> Paper:
51145
title = raw_paper.title
52146
authors = [a.name for a in raw_paper.authors]
53147
abstract = raw_paper.summary
54148
pdf_url = raw_paper.pdf_url
55-
try:
56-
with ThreadPoolExecutor(max_workers=1) as pool:
57-
full_text = pool.submit(extract_text_from_pdf, raw_paper).result(timeout=PDF_EXTRACT_TIMEOUT)
58-
except TimeoutError:
59-
logger.warning(f"PDF extraction timed out for {raw_paper.title}")
60-
full_text = None
149+
full_text = extract_text_from_html(raw_paper)
150+
if full_text is None:
151+
full_text = extract_text_from_pdf(raw_paper)
61152
if full_text is None:
62153
full_text = extract_text_from_tar(raw_paper)
63154
return Paper(
@@ -67,46 +158,41 @@ def convert_to_paper(self, raw_paper:ArxivResult) -> Paper:
67158
abstract=abstract,
68159
url=raw_paper.entry_id,
69160
pdf_url=pdf_url,
70-
full_text=full_text
161+
full_text=full_text,
71162
)
72163

164+
165+
def extract_text_from_html(paper: ArxivResult) -> str | None:
166+
html_url = paper.entry_id.replace("/abs/", "/html/")
167+
try:
168+
return _extract_text_from_html_worker(html_url)
169+
except Exception as exc:
170+
logger.warning(f"HTML extraction failed for {paper.title}: {exc}")
171+
return None
172+
173+
73174
def extract_text_from_pdf(paper: ArxivResult) -> str | None:
74-
with TemporaryDirectory() as temp_dir:
75-
path = os.path.join(temp_dir, "paper.pdf")
76-
if paper.pdf_url is None:
77-
logger.warning(f"No PDF URL available for {paper.title}")
78-
return None
79-
try:
80-
urlretrieve(paper.pdf_url, path)
81-
except Exception as e:
82-
logger.warning(f"Failed to download pdf for {paper.title}: {type(e).__name__}: {e}")
83-
return None
84-
try:
85-
full_text = extract_markdown_from_pdf(path)
86-
except Exception as e:
87-
logger.warning(f"Failed to extract full text of {paper.title} from pdf: {e}")
88-
full_text = None
89-
return full_text
175+
if paper.pdf_url is None:
176+
logger.warning(f"No PDF URL available for {paper.title}")
177+
return None
178+
return _run_with_hard_timeout(
179+
_extract_text_from_pdf_worker,
180+
(paper.pdf_url,),
181+
timeout=PDF_EXTRACT_TIMEOUT,
182+
operation="PDF extraction",
183+
paper_title=paper.title,
184+
)
185+
90186

91187
def extract_text_from_tar(paper: ArxivResult) -> str | None:
92-
with TemporaryDirectory() as temp_dir:
93-
path = os.path.join(temp_dir, "paper.tar.gz")
94-
source_url = paper.source_url()
95-
if source_url is None:
96-
logger.warning(f"No source URL available for {paper.title}")
97-
return None
98-
try:
99-
urlretrieve(source_url, path)
100-
except Exception as e:
101-
logger.warning(f"Failed to download source for {paper.title}: {type(e).__name__}: {e}")
102-
return None
103-
try:
104-
file_contents = extract_tex_code_from_tar(path, paper.entry_id)
105-
if "all" not in file_contents:
106-
logger.warning(f"Failed to extract full text of {paper.title} from tar: Main tex file not found.")
107-
return None
108-
full_text = file_contents["all"]
109-
except Exception as e:
110-
logger.warning(f"Failed to extract full text of {paper.title} from tar: {e}")
111-
full_text = None
112-
return full_text
188+
source_url = paper.source_url()
189+
if source_url is None:
190+
logger.warning(f"No source URL available for {paper.title}")
191+
return None
192+
return _run_with_hard_timeout(
193+
_extract_text_from_tar_worker,
194+
(source_url, paper.entry_id),
195+
timeout=TAR_EXTRACT_TIMEOUT,
196+
operation="Tar extraction",
197+
paper_title=paper.title,
198+
)

src/zotero_arxiv_daily/retriever/base.py

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,12 @@
11
from abc import ABC, abstractmethod
22
from omegaconf import DictConfig
33
from ..protocol import Paper, RawPaperItem
4-
from concurrent.futures import ProcessPoolExecutor, as_completed
54
from tqdm import tqdm
65
from typing import Type
6+
from time import sleep
77
from loguru import logger
88

99

10-
def _describe_raw_paper(raw_paper: RawPaperItem) -> str:
11-
title = getattr(raw_paper, "title", None)
12-
if title:
13-
return str(title)
14-
if isinstance(raw_paper, dict):
15-
for key in ("title", "entry_id", "id", "doi"):
16-
value = raw_paper.get(key)
17-
if value:
18-
return str(value)
19-
return repr(raw_paper)
20-
21-
22-
def _convert_to_paper_safe(retriever: "BaseRetriever", raw_paper: RawPaperItem) -> Paper | None:
23-
try:
24-
return retriever.convert_to_paper(raw_paper)
25-
except Exception as exc:
26-
logger.warning(
27-
f"Skipping paper {_describe_raw_paper(raw_paper)}: {type(exc).__name__}: {exc}"
28-
)
29-
return None
30-
31-
3210
class BaseRetriever(ABC):
3311
name: str
3412
def __init__(self, config:DictConfig):
@@ -45,21 +23,18 @@ def convert_to_paper(self, raw_paper:RawPaperItem) -> Paper | None:
4523

4624
def retrieve_papers(self) -> list[Paper]:
4725
raw_papers = self._retrieve_raw_papers()
48-
papers = []
4926
logger.info("Processing papers...")
50-
with ProcessPoolExecutor(max_workers=self.config.executor.max_workers) as exec_pool:
51-
futures = {exec_pool.submit(_convert_to_paper_safe, self, rp): i for i, rp in enumerate(raw_papers)}
52-
papers = [None] * len(raw_papers)
53-
for future in tqdm(as_completed(futures), total=len(raw_papers), desc="Converting papers"):
54-
try:
55-
papers[futures[future]] = future.result()
56-
except Exception as exc:
57-
raw_paper = raw_papers[futures[future]]
58-
logger.warning(
59-
f"Skipping paper {_describe_raw_paper(raw_paper)} after worker failure: "
60-
f"{type(exc).__name__}: {exc}"
61-
)
62-
return [p for p in papers if p is not None]
27+
papers = []
28+
for raw_paper in tqdm(raw_papers, total=len(raw_papers), desc="Converting papers"):
29+
try:
30+
paper = self.convert_to_paper(raw_paper)
31+
except Exception as exc:
32+
logger.warning(f"Skipping paper {getattr(raw_paper, 'title', raw_paper)}: {exc}")
33+
continue
34+
if paper is not None:
35+
papers.append(paper)
36+
sleep(1)
37+
return papers
6338

6439
registered_retrievers = {}
6540

0 commit comments

Comments
 (0)