Skip to content

Commit 5a9f104

Browse files
committed
fix(xtask): stream fixture downloads to disk with HTTP-Range resume
The upstream-fixture download read the whole asset into memory via .bytes() and, on any mid-stream error, re-downloaded the entire asset from scratch (no resume). For large assets (ledger-rules ~875MB) a single transient blip surfaced as 'error decoding response body' and failed all 3 retries — exactly the failure hit this session (worked around with a manual gh release download). Both download-upstream-fixtures implementations (the cargo-xtask mod and the standalone bin) now stream the body to a temp file and retry with 'Range: bytes={written}-' resuming from the last byte written (5 attempts, restarting cleanly if the server answers 200 instead of 206). Mirrors the Mithril ranged-download retry fix (135f34a). Avoids the multi-hundred-MB in-memory buffer too.
1 parent 135f34a commit 5a9f104

2 files changed

Lines changed: 155 additions & 36 deletions

File tree

xtask/src/bin/download-upstream-fixtures.rs

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -144,33 +144,52 @@ fn download_and_extract(url: &str, target_dir: &Path, token: Option<&str>) -> Re
144144
fs::create_dir_all(target_dir)
145145
.with_context(|| format!("Cannot create {}", target_dir.display()))?;
146146

147-
let bytes = download_with_retry(url, token, 3)?;
148-
149-
let cursor = std::io::Cursor::new(bytes);
150-
let gz = flate2::read::GzDecoder::new(cursor);
147+
// Stream to a temp file (not an in-memory Vec) with HTTP-Range resume —
148+
// some assets are hundreds of MB (ledger-rules ~875MB) and a whole-body
149+
// `.bytes()` read aborts on any mid-stream blip with "error decoding
150+
// response body". 5 attempts, resuming from the last byte written.
151+
let tmp = download_to_temp_with_resume(url, token, 5)?;
152+
let file = fs::File::open(&tmp).with_context(|| format!("open {}", tmp.display()))?;
153+
let gz = flate2::read::GzDecoder::new(std::io::BufReader::new(file));
151154
let mut archive = tar::Archive::new(gz);
152155
archive.set_overwrite(true);
153-
archive
156+
let unpack = archive
154157
.unpack(target_dir)
155-
.with_context(|| format!("Cannot extract to {}", target_dir.display()))?;
158+
.with_context(|| format!("Cannot extract to {}", target_dir.display()));
159+
let _ = fs::remove_file(&tmp);
160+
unpack?;
156161

157162
let count = count_files(target_dir);
158163
eprintln!(" extracted {count} files");
159164
Ok(())
160165
}
161166

162-
fn download_with_retry(url: &str, token: Option<&str>, max_retries: u32) -> Result<Vec<u8>> {
167+
/// Download `url` to a temp file, retrying with an HTTP-Range resume from the
168+
/// last durably-written byte on transient stream errors. Returns the temp file
169+
/// path. Falls back to a fresh restart if the server ignores Range (200 vs 206).
170+
fn download_to_temp_with_resume(
171+
url: &str,
172+
token: Option<&str>,
173+
max_retries: u32,
174+
) -> Result<PathBuf> {
175+
use std::io::{Read, Write};
176+
163177
let client = reqwest::blocking::Client::builder()
164178
.user_agent("dugite-xtask/1.0")
165179
.build()
166180
.context("Cannot build HTTP client")?;
167181

168-
let mut last_err = None;
182+
let asset = url.rsplit('/').next().unwrap_or("fixture");
183+
let tmp = std::env::temp_dir().join(format!("dugite-xtask-{asset}.download.tmp"));
184+
let _ = fs::remove_file(&tmp);
185+
186+
let mut written: u64 = 0;
187+
let mut last_err: Option<anyhow::Error> = None;
169188
for attempt in 0..=max_retries {
170189
if attempt > 0 {
171-
let delay = Duration::from_secs(1u64 << (attempt - 1));
190+
let delay = Duration::from_secs(1u64 << (attempt - 1).min(4));
172191
eprintln!(
173-
" retry {attempt}/{max_retries} after {}s…",
192+
" retry {attempt}/{max_retries} (resume from {written} bytes) after {}s…",
174193
delay.as_secs()
175194
);
176195
thread::sleep(delay);
@@ -179,21 +198,60 @@ fn download_with_retry(url: &str, token: Option<&str>, max_retries: u32) -> Resu
179198
if let Some(tok) = token {
180199
req = req.header("Authorization", format!("Bearer {tok}"));
181200
}
182-
match req
183-
.send()
184-
.and_then(|r| r.error_for_status())
185-
.and_then(|r| r.bytes())
186-
{
187-
Ok(bytes) => return Ok(bytes.to_vec()),
201+
if written > 0 {
202+
req = req.header(reqwest::header::RANGE, format!("bytes={written}-"));
203+
}
204+
let resp = match req.send().and_then(|r| r.error_for_status()) {
205+
Ok(r) => r,
206+
Err(e) => {
207+
eprintln!(" attempt {attempt} request failed: {e}");
208+
last_err = Some(e.into());
209+
continue;
210+
}
211+
};
212+
let resuming = written > 0 && resp.status() == reqwest::StatusCode::PARTIAL_CONTENT;
213+
let mut file = if resuming {
214+
match fs::OpenOptions::new().append(true).open(&tmp) {
215+
Ok(f) => f,
216+
Err(e) => {
217+
last_err = Some(e.into());
218+
written = 0;
219+
continue;
220+
}
221+
}
222+
} else {
223+
written = 0;
224+
fs::File::create(&tmp).with_context(|| format!("create {}", tmp.display()))?
225+
};
226+
227+
let mut buf = vec![0u8; 1 << 20];
228+
let stream_result: Result<()> = (|| {
229+
let mut r = resp;
230+
loop {
231+
let n = r.read(&mut buf).context("error reading response body")?;
232+
if n == 0 {
233+
break;
234+
}
235+
file.write_all(&buf[..n]).context("write to temp file")?;
236+
written += n as u64;
237+
}
238+
file.flush().context("flush temp file")?;
239+
Ok(())
240+
})();
241+
match stream_result {
242+
Ok(()) => return Ok(tmp),
188243
Err(e) => {
189-
eprintln!(" attempt {attempt} failed: {e}");
244+
eprintln!(" attempt {attempt} stream failed at {written} bytes: {e}");
190245
last_err = Some(e);
191246
}
192247
}
193248
}
249+
let _ = fs::remove_file(&tmp);
194250
bail!(
195251
"Download failed after {max_retries} retries: {}",
196-
last_err.unwrap()
252+
last_err
253+
.map(|e| e.to_string())
254+
.unwrap_or_else(|| "unknown error".into())
197255
);
198256
}
199257

xtask/src/bin/xtask.rs

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,33 +150,53 @@ mod download_upstream_fixtures {
150150
fs::create_dir_all(target_dir)
151151
.with_context(|| format!("Cannot create {}", target_dir.display()))?;
152152

153-
let bytes = download_with_retry(url, token, 3)?;
154-
155-
let cursor = std::io::Cursor::new(bytes);
156-
let gz = flate2::read::GzDecoder::new(cursor);
153+
// Stream to a temp file (not an in-memory Vec) with HTTP-Range resume —
154+
// some assets are hundreds of MB (ledger-rules ~875MB) and a whole-body
155+
// `.bytes()` read aborts on any mid-stream blip with "error decoding
156+
// response body". 5 attempts, resuming from the last byte written.
157+
let tmp = download_to_temp_with_resume(url, token, 5)?;
158+
let file = fs::File::open(&tmp).with_context(|| format!("open {}", tmp.display()))?;
159+
let gz = flate2::read::GzDecoder::new(std::io::BufReader::new(file));
157160
let mut archive = tar::Archive::new(gz);
158161
archive.set_overwrite(true);
159-
archive
162+
let unpack = archive
160163
.unpack(target_dir)
161-
.with_context(|| format!("Cannot extract to {}", target_dir.display()))?;
164+
.with_context(|| format!("Cannot extract to {}", target_dir.display()));
165+
let _ = fs::remove_file(&tmp);
166+
unpack?;
162167

163168
let count = count_files(target_dir);
164169
eprintln!(" extracted {count} files");
165170
Ok(())
166171
}
167172

168-
fn download_with_retry(url: &str, token: Option<&str>, max_retries: u32) -> Result<Vec<u8>> {
173+
/// Download `url` to a temp file, retrying with an HTTP-Range resume from the
174+
/// last durably-written byte on transient stream errors. Returns the temp
175+
/// file path. Falls back to a fresh restart if the server ignores Range
176+
/// (responds 200 instead of 206).
177+
fn download_to_temp_with_resume(
178+
url: &str,
179+
token: Option<&str>,
180+
max_retries: u32,
181+
) -> Result<PathBuf> {
182+
use std::io::{Read, Write};
183+
169184
let client = reqwest::blocking::Client::builder()
170185
.user_agent("dugite-xtask/1.0")
171186
.build()
172187
.context("Cannot build HTTP client")?;
173188

174-
let mut last_err = None;
189+
let asset = url.rsplit('/').next().unwrap_or("fixture");
190+
let tmp = std::env::temp_dir().join(format!("dugite-xtask-{asset}.download.tmp"));
191+
let _ = fs::remove_file(&tmp);
192+
193+
let mut written: u64 = 0;
194+
let mut last_err: Option<anyhow::Error> = None;
175195
for attempt in 0..=max_retries {
176196
if attempt > 0 {
177-
let delay = Duration::from_secs(1u64 << (attempt - 1));
197+
let delay = Duration::from_secs(1u64 << (attempt - 1).min(4));
178198
eprintln!(
179-
" retry {attempt}/{max_retries} after {}s…",
199+
" retry {attempt}/{max_retries} (resume from {written} bytes) after {}s…",
180200
delay.as_secs()
181201
);
182202
thread::sleep(delay);
@@ -185,21 +205,62 @@ mod download_upstream_fixtures {
185205
if let Some(tok) = token {
186206
req = req.header("Authorization", format!("Bearer {tok}"));
187207
}
188-
match req
189-
.send()
190-
.and_then(|r| r.error_for_status())
191-
.and_then(|r| r.bytes())
192-
{
193-
Ok(bytes) => return Ok(bytes.to_vec()),
208+
if written > 0 {
209+
req = req.header(reqwest::header::RANGE, format!("bytes={written}-"));
210+
}
211+
let resp = match req.send().and_then(|r| r.error_for_status()) {
212+
Ok(r) => r,
213+
Err(e) => {
214+
eprintln!(" attempt {attempt} request failed: {e}");
215+
last_err = Some(e.into());
216+
continue;
217+
}
218+
};
219+
// 206 = the server honoured Range → append; otherwise (200) it sent
220+
// the whole body → restart the file from scratch.
221+
let resuming = written > 0 && resp.status() == reqwest::StatusCode::PARTIAL_CONTENT;
222+
let mut file = if resuming {
223+
match fs::OpenOptions::new().append(true).open(&tmp) {
224+
Ok(f) => f,
225+
Err(e) => {
226+
last_err = Some(e.into());
227+
written = 0;
228+
continue;
229+
}
230+
}
231+
} else {
232+
written = 0;
233+
fs::File::create(&tmp).with_context(|| format!("create {}", tmp.display()))?
234+
};
235+
236+
let mut buf = vec![0u8; 1 << 20];
237+
let stream_result: Result<()> = (|| {
238+
let mut r = resp;
239+
loop {
240+
let n = r.read(&mut buf).context("error reading response body")?;
241+
if n == 0 {
242+
break;
243+
}
244+
file.write_all(&buf[..n]).context("write to temp file")?;
245+
written += n as u64;
246+
}
247+
file.flush().context("flush temp file")?;
248+
Ok(())
249+
})();
250+
match stream_result {
251+
Ok(()) => return Ok(tmp),
194252
Err(e) => {
195-
eprintln!(" attempt {attempt} failed: {e}");
253+
eprintln!(" attempt {attempt} stream failed at {written} bytes: {e}");
196254
last_err = Some(e);
197255
}
198256
}
199257
}
258+
let _ = fs::remove_file(&tmp);
200259
bail!(
201260
"Download failed after {max_retries} retries: {}",
202-
last_err.unwrap()
261+
last_err
262+
.map(|e| e.to_string())
263+
.unwrap_or_else(|| "unknown error".into())
203264
);
204265
}
205266

0 commit comments

Comments
 (0)