Skip to content

Commit 065753c

Browse files
committed
feat(cmn): implement query_transfer_status()
The delegate logic is implemented and seems sound. It's somewhat funny that after all this back and forth, all we get is a valid start position for the upload.
1 parent 42a76e4 commit 065753c

5 files changed

Lines changed: 238 additions & 73 deletions

File tree

gen/groupsmigration1/src/cmn.rs

Lines changed: 154 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
use std::marker::MarkerTrait;
44
use std::io::{self, Read, Seek, Cursor, Write, SeekFrom};
55
use std;
6+
use std::fmt::{self, Display};
7+
use std::str::FromStr;
8+
use std::thread::sleep;
69

710
use mime::{Mime, TopLevel, SubLevel, Attr, Value};
8-
use oauth2;
9-
use oauth2::TokenType;
11+
use oauth2::{TokenType, Retry, self};
1012
use hyper;
11-
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization};
13+
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header,
14+
HeaderFormat};
1215
use hyper::http::LINE_ENDING;
1316
use hyper::method::Method;
17+
use hyper::status::StatusCode;
1418

1519
use serde;
1620

@@ -108,8 +112,8 @@ pub trait Delegate {
108112
/// Called whenever there is an [HttpError](http://hyperium.github.io/hyper/hyper/error/enum.HttpError.html), usually if there are network problems.
109113
///
110114
/// Return retry information.
111-
fn http_error(&mut self, &hyper::HttpError) -> oauth2::Retry {
112-
oauth2::Retry::Abort
115+
fn http_error(&mut self, &hyper::HttpError) -> Retry {
116+
Retry::Abort
113117
}
114118

115119
/// Called whenever there is the need for your applications API key after
@@ -162,8 +166,8 @@ pub trait Delegate {
162166
/// depends on the used API method.
163167
/// The delegate should check the status, header and decoded json error to decide
164168
/// whether to retry or not. In the latter case, the underlying call will fail.
165-
fn http_failure(&mut self, _: &hyper::client::Response, JsonServerError) -> oauth2::Retry {
166-
oauth2::Retry::Abort
169+
fn http_failure(&mut self, _: &hyper::client::Response, Option<JsonServerError>) -> Retry {
170+
Retry::Abort
167171
}
168172

169173
/// Called prior to sending the main request of the given method. It can be used to time
@@ -363,35 +367,168 @@ impl_header!(XUploadContentType,
363367
"X-Upload-Content-Type",
364368
Mime);
365369

370+
#[derive(Clone, PartialEq, Debug)]
371+
pub struct Chunk {
372+
pub first: u64,
373+
pub last: u64
374+
}
375+
376+
impl fmt::Display for Chunk {
377+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
378+
write!(fmt, "{}-{}", self.first, self.last).ok();
379+
Ok(())
380+
}
381+
}
382+
383+
impl FromStr for Chunk {
384+
type Err = &'static str;
385+
386+
/// NOTE: only implements `%i-%i`, not `*`
387+
fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
388+
let parts: Vec<&str> = s.split('-').collect();
389+
if parts.len() != 2 {
390+
return Err("Expected two parts: %i-%i")
391+
}
392+
Ok(
393+
Chunk {
394+
first: match FromStr::from_str(parts[0]) {
395+
Ok(d) => d,
396+
_ => return Err("Couldn't parse 'first' as digit")
397+
},
398+
last: match FromStr::from_str(parts[1]) {
399+
Ok(d) => d,
400+
_ => return Err("Couldn't parse 'last' as digit")
401+
}
402+
}
403+
)
404+
}
405+
}
406+
407+
/// Implements the Content-Range header, for serialization only
408+
#[derive(Clone, PartialEq, Debug)]
409+
pub struct ContentRange {
410+
pub range: Option<Chunk>,
411+
pub total_length: u64,
412+
}
413+
414+
impl Header for ContentRange {
415+
fn header_name() -> &'static str {
416+
"Content-Range"
417+
}
418+
419+
/// We are not parsable, as parsing is done by the `Range` header
420+
fn parse_header(raw: &[Vec<u8>]) -> Option<ContentRange> {
421+
None
422+
}
423+
}
424+
425+
426+
impl HeaderFormat for ContentRange {
427+
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
428+
try!(fmt.write_str("bytes "));
429+
match self.range {
430+
Some(ref c) => try!(c.fmt(fmt)),
431+
None => try!(fmt.write_str("*"))
432+
}
433+
write!(fmt, "/{}", self.total_length).ok();
434+
Ok(())
435+
}
436+
}
437+
438+
#[derive(Clone, PartialEq, Debug)]
439+
pub struct RangeResponseHeader(pub Chunk);
440+
441+
impl Header for RangeResponseHeader {
442+
fn header_name() -> &'static str {
443+
"Range"
444+
}
445+
446+
fn parse_header(raw: &[Vec<u8>]) -> Option<RangeResponseHeader> {
447+
match raw {
448+
[ref v] => {
449+
if let Ok(s) = std::str::from_utf8(v) {
450+
const PREFIX: &'static str = "bytes=";
451+
if s.starts_with(PREFIX) {
452+
let c: Chunk = match FromStr::from_str(&s[PREFIX.len()..]) {
453+
Ok(c) => c,
454+
_ => return None
455+
};
456+
return Some(RangeResponseHeader(c))
457+
}
458+
}
459+
None
460+
},
461+
_ => None
462+
}
463+
}
464+
}
465+
466+
impl HeaderFormat for RangeResponseHeader {
467+
/// No implmentation necessary, we just need to parse
468+
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
469+
Err(fmt::Error)
470+
}
471+
}
472+
366473
/// A utility type to perform a resumable upload from start to end.
367474
pub struct ResumableUploadHelper<'a, NC: 'a, A: 'a> {
368475
pub client: &'a mut hyper::client::Client<NC>,
369476
pub delegate: &'a mut Delegate,
477+
pub start_at: Option<u64>,
370478
pub auth: &'a mut A,
371479
pub user_agent: &'a str,
372480
pub auth_header: Authorization<oauth2::Scheme>,
373481
pub url: &'a str,
374482
pub reader: &'a mut ReadSeek,
375483
pub media_type: Mime,
376-
pub content_size: u64
484+
pub content_length: u64
377485
}
378486

379487
impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
380488
where NC: hyper::net::NetworkConnector,
381489
A: oauth2::GetToken {
382490

383-
fn query_transfer_status(&'a mut self) -> (u64, hyper::HttpResult<hyper::client::Response>) {
384-
self.client.post(self.url)
385-
.header(UserAgent(self.user_agent.to_string()))
386-
.header(self.auth_header.clone());
387-
(0, Err(hyper::error::HttpError::HttpStatusError))
491+
fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
492+
loop {
493+
match self.client.post(self.url)
494+
.header(UserAgent(self.user_agent.to_string()))
495+
.header(ContentRange { range: None, total_length: self.content_length })
496+
.header(self.auth_header.clone())
497+
.send() {
498+
Ok(r) => {
499+
// 308 = resume-incomplete == PermanentRedirect
500+
let headers = r.headers.clone();
501+
let h: &RangeResponseHeader = match headers.get() {
502+
Some(hh) if r.status == StatusCode::PermanentRedirect => hh,
503+
None|Some(_) => {
504+
if let Retry::After(d) = self.delegate.http_failure(&r, None) {
505+
sleep(d);
506+
continue;
507+
}
508+
return (None, Ok(r))
509+
}
510+
};
511+
return (Some(h.0.last), Ok(r))
512+
}
513+
Err(err) => {
514+
if let Retry::After(d) = self.delegate.http_error(&err) {
515+
sleep(d);
516+
continue;
517+
}
518+
return (None, Err(err))
519+
}
520+
}
521+
}
388522
}
389523

390524
pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
391-
let (start, result) = self.query_transfer_status();
392-
if let Err(_) = result {
393-
return result
394-
}
525+
let start = match self.start_at {
526+
Some(s) => s,
527+
None => match self.query_transfer_status() {
528+
(Some(s), _) => s,
529+
(_, result) => return result
530+
}
531+
};
395532
Err(hyper::error::HttpError::HttpStatusError)
396533
}
397534
}

gen/groupsmigration1/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
585585
let mut json_err = String::new();
586586
res.read_to_string(&mut json_err).unwrap();
587587
let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap();
588-
if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) {
588+
if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) {
589589
sleep(d);
590590
continue;
591591
}
@@ -608,13 +608,14 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
608608
cmn::ResumableUploadHelper {
609609
client: &mut client.borrow_mut(),
610610
delegate: dlg,
611+
start_at: if upload_url_from_server { Some(0) } else { None },
611612
auth: &mut *self.hub.auth.borrow_mut(),
612613
user_agent: &self.hub._user_agent,
613614
auth_header: auth_header.clone(),
614615
url: url,
615616
reader: &mut reader,
616617
media_type: reader_mime_type.clone(),
617-
content_size: size
618+
content_length: size
618619
}.upload()
619620
};
620621
match upload_result {

src/mako/lib/mbuild.mako

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ else {
765765
let mut json_err = String::new();
766766
res.read_to_string(&mut json_err).unwrap();
767767
let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap();
768-
if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) {
768+
if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) {
769769
sleep(d);
770770
continue;
771771
}
@@ -785,6 +785,7 @@ else {
785785
cmn::ResumableUploadHelper {
786786
client: &mut client.borrow_mut(),
787787
delegate: dlg,
788+
start_at: if upload_url_from_server { Some(0) } else { None },
788789
auth: &mut *self.hub.auth.borrow_mut(),
789790
user_agent: &self.hub._user_agent,
790791
auth_header: auth_header.clone(),

0 commit comments

Comments
 (0)