@@ -218,31 +218,35 @@ impl ASGIHTTPProtocol {
218218 more,
219219 self . response_chunked . load ( atomic:: Ordering :: Relaxed ) ,
220220 ) {
221- ( true , false , false ) => {
222- let ( status, headers) = self . response_intent . lock ( ) . unwrap ( ) . take ( ) . unwrap ( ) ;
223- self . send_response (
224- status,
225- headers,
226- http_body_util:: Full :: new ( body:: Bytes :: from ( body) )
227- . map_err ( std:: convert:: Into :: into)
228- . boxed ( ) ,
229- ) ;
230- self . flow_tx_waiter . notify_one ( ) ;
231- empty_future_into_py ( py)
232- }
233- ( true , true , false ) => {
234- self . response_chunked . store ( true , atomic:: Ordering :: Relaxed ) ;
235- let ( status, headers) = self . response_intent . lock ( ) . unwrap ( ) . take ( ) . unwrap ( ) ;
236- let ( body_tx, body_rx) = mpsc:: unbounded_channel :: < body:: Bytes > ( ) ;
237- let body_stream = http_body_util:: StreamBody :: new (
238- tokio_stream:: wrappers:: UnboundedReceiverStream :: new ( body_rx)
239- . map ( body:: Frame :: data)
240- . map ( Result :: Ok ) ,
241- ) ;
242- * self . body_tx . lock ( ) . unwrap ( ) = Some ( body_tx. clone ( ) ) ;
243- self . send_response ( status, headers, BodyExt :: boxed ( body_stream) ) ;
244- self . send_body ( py, & body_tx, body, false )
245- }
221+ ( true , false , false ) => match self . response_intent . lock ( ) . unwrap ( ) . take ( ) {
222+ Some ( ( status, headers) ) => {
223+ self . send_response (
224+ status,
225+ headers,
226+ http_body_util:: Full :: new ( body:: Bytes :: from ( body) )
227+ . map_err ( std:: convert:: Into :: into)
228+ . boxed ( ) ,
229+ ) ;
230+ self . flow_tx_waiter . notify_one ( ) ;
231+ empty_future_into_py ( py)
232+ }
233+ _ => error_flow ! ( "Response already finished" ) ,
234+ } ,
235+ ( true , true , false ) => match self . response_intent . lock ( ) . unwrap ( ) . take ( ) {
236+ Some ( ( status, headers) ) => {
237+ self . response_chunked . store ( true , atomic:: Ordering :: Relaxed ) ;
238+ let ( body_tx, body_rx) = mpsc:: unbounded_channel :: < body:: Bytes > ( ) ;
239+ let body_stream = http_body_util:: StreamBody :: new (
240+ tokio_stream:: wrappers:: UnboundedReceiverStream :: new ( body_rx)
241+ . map ( body:: Frame :: data)
242+ . map ( Result :: Ok ) ,
243+ ) ;
244+ * self . body_tx . lock ( ) . unwrap ( ) = Some ( body_tx. clone ( ) ) ;
245+ self . send_response ( status, headers, BodyExt :: boxed ( body_stream) ) ;
246+ self . send_body ( py, & body_tx, body, false )
247+ }
248+ _ => error_flow ! ( "Response already finished" ) ,
249+ } ,
246250 ( true , true , true ) => match & * self . body_tx . lock ( ) . unwrap ( ) {
247251 Some ( tx) => self . send_body ( py, tx, body, false ) ,
248252 _ => error_flow ! ( "Transport not initialized or closed" ) ,
0 commit comments