Skip to content
This repository was archived by the owner on Jan 14, 2022. It is now read-only.

Commit f8a82ca

Browse files
committed
Support passing access tokens via Sec-WebSocket-Protocol header
Previously, the access token needed to be passed via the query string; with this commit, the token can be passed *either* through the query string or the Sec-WebSocket-Protocol header. This was done to correspond to the changes made to the streaming.js version in [Improve streaming server security](mastodon/mastodon#10818). However, I am not sure that it *does* increase security; as explained at <https://support.ably.io/support/solutions/articles/3000075120-is-it-secure-to-send-the-access-token-as-part-of-the-websocket-url-query-params->, there is generally no security advantage to passing sensitive information via websocket headers instead of the query string—the entire connection is encrypted and is not stored in the browser history, so the typical reasons to keep sensitive info out of the query string don't apply. I would welcome any corrections on this/reasons this change improves security.
1 parent 280cc60 commit f8a82ca

4 files changed

Lines changed: 46 additions & 60 deletions

File tree

src/main.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use receiver::Receiver;
4040
use std::env;
4141
use std::net::SocketAddr;
4242
use stream::StreamManager;
43-
use user::{Method, Scope, User};
43+
use user::{Scope, User};
4444
use warp::path;
4545
use warp::Filter as WarpFilter;
4646

@@ -96,7 +96,7 @@ fn main() {
9696

9797
//let redis_updates_ws = StreamManager::new(Receiver::new());
9898
let websocket = path!("api" / "v1" / "streaming")
99-
.and(Scope::Public.get_access_token(Method::WS))
99+
.and(Scope::Public.get_access_token())
100100
.and_then(|token| User::from_access_token(token, Scope::Public))
101101
.and(warp::query())
102102
.and(query::Media::to_filter())
@@ -136,18 +136,16 @@ fn main() {
136136
// Other endpoints don't exist:
137137
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
138138
};
139+
let token = user.access_token.clone();
139140
let stream = redis_updates_ws.configure_copy(&timeline, user);
140141

141-
Ok(ws.on_upgrade(move |socket| ws::send_replies(socket, stream)))
142+
Ok((
143+
ws.on_upgrade(move |socket| ws::send_replies(socket, stream)),
144+
token,
145+
))
142146
},
143147
)
144-
.map(|reply| {
145-
warp::reply::with_header(
146-
reply,
147-
"sec-websocket-protocol",
148-
"LhbVOxKckgqyMg3nDLaEu5vgqY6Yzc9Pk1w8_yKQwS8",
149-
)
150-
});
148+
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
151149

152150
let address: SocketAddr = env::var("SERVER_ADDR")
153151
.unwrap_or("127.0.0.1:4000".to_owned())

src/stream.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ impl Stream for StreamManager {
5151
type Error = Error;
5252

5353
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
54-
let mut receiver = self.receiver.lock().expect("No other thread panic");
54+
let mut receiver = self
55+
.receiver
56+
.lock()
57+
.expect("StreamManager: No other thread panic");
5558
receiver.update(self.id, &self.target_timeline.clone());
5659
match receiver.poll() {
5760
Ok(Async::Ready(Some(value))) => {
@@ -61,19 +64,19 @@ impl Stream for StreamManager {
6164
.expect("Previously set current user");
6265

6366
let user_langs = user.langs.clone();
64-
let copy = value.clone();
65-
let event = copy["event"].as_str().expect("Redis string");
66-
let copy = value.clone();
67-
let payload = copy["payload"].to_string();
68-
let copy = value.clone();
69-
let toot_lang = copy["payload"]["language"]
70-
.as_str()
71-
.expect("redis str")
72-
.to_string();
67+
let event = value["event"].as_str().expect("Redis string");
68+
let payload = value["payload"].to_string();
7369

7470
match (&user.filter, user_langs) {
7571
(Filter::Notification, _) if event != "notification" => Ok(Async::NotReady),
76-
(Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => {
72+
(Filter::Language, Some(ref user_langs))
73+
if !user_langs.contains(
74+
&value["payload"]["language"]
75+
.as_str()
76+
.expect("Redis str")
77+
.to_string(),
78+
) =>
79+
{
7780
Ok(Async::NotReady)
7881
}
7982
_ => Ok(Async::Ready(Some(json!(

src/timeline.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Filters for all the endpoints accessible for Server Sent Event updates
22
use crate::query;
3-
use crate::user::{Method, Scope, User};
3+
use crate::user::{Scope, User};
44
use warp::filters::BoxedFilter;
55
use warp::{path, Filter};
66

@@ -14,7 +14,7 @@ type TimelineUser = ((String, User),);
1414
pub fn user() -> BoxedFilter<TimelineUser> {
1515
path!("api" / "v1" / "streaming" / "user")
1616
.and(path::end())
17-
.and(Scope::Private.get_access_token(Method::HttpPush))
17+
.and(Scope::Private.get_access_token())
1818
.and_then(|token| User::from_access_token(token, Scope::Private))
1919
.map(|user: User| (user.id.to_string(), user))
2020
.boxed()
@@ -30,7 +30,7 @@ pub fn user() -> BoxedFilter<TimelineUser> {
3030
pub fn user_notifications() -> BoxedFilter<TimelineUser> {
3131
path!("api" / "v1" / "streaming" / "user" / "notification")
3232
.and(path::end())
33-
.and(Scope::Private.get_access_token(Method::HttpPush))
33+
.and(Scope::Private.get_access_token())
3434
.and_then(|token| User::from_access_token(token, Scope::Private))
3535
.map(|user: User| (user.id.to_string(), user.with_notification_filter()))
3636
.boxed()
@@ -43,7 +43,7 @@ pub fn user_notifications() -> BoxedFilter<TimelineUser> {
4343
pub fn public() -> BoxedFilter<TimelineUser> {
4444
path!("api" / "v1" / "streaming" / "public")
4545
.and(path::end())
46-
.and(Scope::Public.get_access_token(Method::HttpPush))
46+
.and(Scope::Public.get_access_token())
4747
.and_then(|token| User::from_access_token(token, Scope::Public))
4848
.map(|user: User| ("public".to_owned(), user.with_language_filter()))
4949
.boxed()
@@ -56,7 +56,7 @@ pub fn public() -> BoxedFilter<TimelineUser> {
5656
pub fn public_media() -> BoxedFilter<TimelineUser> {
5757
path!("api" / "v1" / "streaming" / "public")
5858
.and(path::end())
59-
.and(Scope::Public.get_access_token(Method::HttpPush))
59+
.and(Scope::Public.get_access_token())
6060
.and_then(|token| User::from_access_token(token, Scope::Public))
6161
.and(warp::query())
6262
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
@@ -73,7 +73,7 @@ pub fn public_media() -> BoxedFilter<TimelineUser> {
7373
pub fn public_local() -> BoxedFilter<TimelineUser> {
7474
path!("api" / "v1" / "streaming" / "public" / "local")
7575
.and(path::end())
76-
.and(Scope::Public.get_access_token(Method::HttpPush))
76+
.and(Scope::Public.get_access_token())
7777
.and_then(|token| User::from_access_token(token, Scope::Public))
7878
.map(|user: User| ("public:local".to_owned(), user.with_language_filter()))
7979
.boxed()
@@ -85,7 +85,7 @@ pub fn public_local() -> BoxedFilter<TimelineUser> {
8585
/// **public**. Filter: `Language`
8686
pub fn public_local_media() -> BoxedFilter<TimelineUser> {
8787
path!("api" / "v1" / "streaming" / "public" / "local")
88-
.and(Scope::Public.get_access_token(Method::HttpPush))
88+
.and(Scope::Public.get_access_token())
8989
.and_then(|token| User::from_access_token(token, Scope::Public))
9090
.and(warp::query())
9191
.and(path::end())
@@ -103,7 +103,7 @@ pub fn public_local_media() -> BoxedFilter<TimelineUser> {
103103
pub fn direct() -> BoxedFilter<TimelineUser> {
104104
path!("api" / "v1" / "streaming" / "direct")
105105
.and(path::end())
106-
.and(Scope::Private.get_access_token(Method::HttpPush))
106+
.and(Scope::Private.get_access_token())
107107
.and_then(|token| User::from_access_token(token, Scope::Private))
108108
.map(|user: User| (format!("direct:{}", user.id), user.with_no_filter()))
109109
.boxed()
@@ -139,7 +139,7 @@ pub fn hashtag_local() -> BoxedFilter<TimelineUser> {
139139
/// **private**. Filter: `None`
140140
pub fn list() -> BoxedFilter<TimelineUser> {
141141
path!("api" / "v1" / "streaming" / "list")
142-
.and(Scope::Private.get_access_token(Method::HttpPush))
142+
.and(Scope::Private.get_access_token())
143143
.and_then(|token| User::from_access_token(token, Scope::Private))
144144
.and(warp::query())
145145
.and_then(|user: User, q: query::List| (user.authorized_for_list(q.list), Ok(user)))

src/user.rs

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ LIMIT 1",
5050
&[&token],
5151
)
5252
.expect("Hard-coded query will return Some([0 or more rows])");
53-
dbg!(&result);
5453
if !result.is_empty() {
5554
let only_row = result.get(0);
5655
let id: i64 = only_row.get(1);
@@ -133,41 +132,27 @@ pub enum Scope {
133132
Public,
134133
Private,
135134
}
136-
pub enum Method {
137-
WS,
138-
HttpPush,
139-
}
140135
impl Scope {
141-
pub fn get_access_token(self, method: Method) -> warp::filters::BoxedFilter<(String,)> {
142-
let token_from_header_http_push =
143-
warp::header::header::<String>("authorization").map(|auth: String| {
144-
dbg!(auth.split(' ').nth(1).unwrap_or("invalid").to_string());
145-
auth.split(' ').nth(1).unwrap_or("invalid").to_string()
146-
});
136+
pub fn get_access_token(self) -> warp::filters::BoxedFilter<(String,)> {
137+
let token_from_header_http_push = warp::header::header::<String>("authorization")
138+
.map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string());
147139
let token_from_header_ws =
148-
warp::header::header::<String>("Sec-WebSocket-Protocol").map(|auth: String| {
149-
dbg!(&auth);
150-
auth
151-
});
152-
let token_from_query = warp::query().map(|q: query::Auth| {
153-
dbg!(&q.access_token);
154-
q.access_token
155-
});
140+
warp::header::header::<String>("Sec-WebSocket-Protocol").map(|auth: String| auth);
141+
let token_from_query = warp::query().map(|q: query::Auth| q.access_token);
142+
143+
let private_scopes = any_of!(
144+
token_from_header_http_push,
145+
token_from_header_ws,
146+
token_from_query
147+
);
148+
156149
let public = warp::any().map(|| "no access token".to_string());
157150

158-
match (self, method) {
151+
match self {
159152
// if they're trying to access a private scope without an access token, reject the request
160-
(Scope::Private, Method::HttpPush) => {
161-
any_of!(token_from_query, token_from_header_http_push).boxed()
162-
}
163-
(Scope::Private, Method::WS) => any_of!(token_from_query, token_from_header_ws).boxed(),
153+
Scope::Private => private_scopes.boxed(),
164154
// if they're trying to access a public scope without an access token, proceed
165-
(Scope::Public, Method::HttpPush) => {
166-
any_of!(token_from_query, token_from_header_http_push, public).boxed()
167-
}
168-
(Scope::Public, Method::WS) => {
169-
any_of!(token_from_query, token_from_header_ws, public).boxed()
170-
}
155+
Scope::Public => any_of!(private_scopes, public).boxed(),
171156
}
172157
}
173158
}

0 commit comments

Comments
 (0)