Skip to content

Commit bf18ed4

Browse files
LeoniePhilineDarksonn
authored andcommitted
sync: fix panic in Chan::recv_many when called with non-empty vector on closed channel (#7991)
`Chan::recv_many` intends to assert that no slots have been consumed when exiting with `Ready` via the `rx_closed` code path. Instead of asserting no items were added to the buffer, it asserted buffer emptiness, incorrectly making assumptions about the provided buffer. When `recv_many` was called on an empty channel with idle semaphore after the receiver was closed, the method would panic. The branch coverage had been previously missing. This changeset corrects the assertion and adds tests covering the code path. Fixes #7990.
1 parent f320197 commit bf18ed4

2 files changed

Lines changed: 29 additions & 9 deletions

File tree

tokio/src/sync/mpsc/chan.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -306,13 +306,11 @@ impl<T, S: Semaphore> Rx<T, S> {
306306
return Ready(Some(value));
307307
}
308308
Some(Read::Closed) => {
309-
// TODO: This check may not be required as it most
310-
// likely can only return `true` at this point. A
311-
// channel is closed when all tx handles are
309+
// A channel is closed when all tx handles are
312310
// dropped. Dropping a tx handle releases memory,
313311
// which ensures that if dropping the tx handle is
314312
// visible, then all messages sent are also visible.
315-
assert!(self.inner.semaphore.is_idle());
313+
debug_assert!(self.inner.semaphore.is_idle());
316314
coop.made_progress();
317315
return Ready(None);
318316
}
@@ -380,13 +378,11 @@ impl<T, S: Semaphore> Rx<T, S> {
380378
if number_added > 0 {
381379
self.inner.semaphore.add_permits(number_added);
382380
}
383-
// TODO: This check may not be required as it most
384-
// likely can only return `true` at this point. A
385-
// channel is closed when all tx handles are
381+
// A channel is closed when all tx handles are
386382
// dropped. Dropping a tx handle releases memory,
387383
// which ensures that if dropping the tx handle is
388384
// visible, then all messages sent are also visible.
389-
assert!(self.inner.semaphore.is_idle());
385+
debug_assert!(self.inner.semaphore.is_idle());
390386
coop.made_progress();
391387
return Ready(number_added);
392388
}
@@ -415,7 +411,7 @@ impl<T, S: Semaphore> Rx<T, S> {
415411
try_recv!();
416412

417413
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
418-
assert!(buffer.is_empty());
414+
debug_assert_eq!(buffer.len(), initial_length);
419415
coop.made_progress();
420416
Ready(0usize)
421417
} else {

tokio/tests/sync_mpsc.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,30 @@ async fn send_recv_many_unbounded_capacity() {
358358
assert_eq!(expected, buffer);
359359
}
360360

361+
#[maybe_tokio_test]
362+
async fn recv_many_with_non_empty_buffer_bounded_rx_closed_and_idle() {
363+
let (_tx, mut rx) = mpsc::channel::<i32>(1);
364+
365+
let mut buffer: Vec<i32> = vec![1];
366+
367+
rx.close();
368+
369+
assert_eq!(0, rx.recv_many(&mut buffer, 1).await);
370+
assert_eq!(vec![1], buffer);
371+
}
372+
373+
#[maybe_tokio_test]
374+
async fn recv_many_with_non_empty_buffer_unbounded_rx_closed_and_idle() {
375+
let (_tx, mut rx) = mpsc::unbounded_channel::<i32>();
376+
377+
let mut buffer: Vec<i32> = vec![1];
378+
379+
rx.close();
380+
381+
assert_eq!(0, rx.recv_many(&mut buffer, 1).await);
382+
assert_eq!(vec![1], buffer);
383+
}
384+
361385
#[tokio::test]
362386
#[cfg(feature = "full")]
363387
async fn async_send_recv_unbounded() {

0 commit comments

Comments
 (0)