@@ -85,8 +85,7 @@ pub struct RecordingSession {
8585
8686 // statistic
8787 start_time : Instant ,
88- fps_counter : SimpleFpsCounter ,
89- total_frame_count : u64 ,
88+ total_frame_count : Arc < AtomicU64 > ,
9089 loss_frame_count : Arc < AtomicU64 > ,
9190
9291 frame_sender : Option < Arc < Sender < Frame > > > ,
@@ -181,8 +180,7 @@ impl RecordingSession {
181180 config,
182181
183182 start_time : std:: time:: Instant :: now ( ) ,
184- fps_counter : Default :: default ( ) ,
185- total_frame_count : 0 ,
183+ total_frame_count : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
186184 loss_frame_count : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
187185
188186 frame_sender : Some ( Arc :: new ( frame_sender) ) ,
@@ -319,19 +317,19 @@ impl RecordingSession {
319317 /// }
320318 /// ```
321319 pub fn wait (
322- mut self ,
320+ self ,
323321 combine_progress_cb : impl FnMut ( f32 ) ,
324322 ) -> Result < ProgressState , RecorderError > {
325- let ( resize_sender, resize_receiver) =
326- bounded :: < ResizeChannelData > ( self . config . max_queue_size ) ;
327323 let ( encoder_sender, encoder_receiver) = bounded ( self . config . max_queue_size * 2 ) ;
328324
329325 let resize_handles = Self :: resize_workers (
330- resize_receiver ,
326+ self . frame_receiver . clone ( ) ,
331327 encoder_sender,
328+ self . frame_sender_user . clone ( ) ,
332329 ( self . capture_workers . len ( ) / 2 ) . max ( 2 ) ,
333330 self . start_time ,
334331 self . config . resolution . clone ( ) ,
332+ self . total_frame_count . clone ( ) ,
335333 self . loss_frame_count . clone ( ) ,
336334 self . config . enable_preview_mode ,
337335 ) ;
@@ -350,86 +348,53 @@ impl RecordingSession {
350348 } ;
351349
352350 // Create encoder in main thread since x264 is not thread-safe
353- let mut encoder = VideoEncoder :: new ( width, height, self . config . fps ) ?;
351+ let mut video_encoder = VideoEncoder :: new ( width, height, self . config . fps ) ?;
354352
355353 // Write encoder headers first if we have a writer
356354 if let Some ( ref writer) = h264_writer {
357- let headers = encoder . headers ( ) ?;
355+ let headers = video_encoder . headers ( ) ?;
358356 let headers_data = headers. entirety ( ) . to_vec ( ) ;
359357 writer. write_frame ( EncodedFrame :: Frame ( ( 0 , headers_data) ) ) ;
360358 }
361359
362360 loop {
363- // Use select to handle both frame receiving and encoding
364- crossbeam:: select! {
365- recv( self . frame_receiver) -> frame_result => {
366- match frame_result {
367- Ok ( frame) => {
368- self . total_frame_count += 1 ;
369- let index = frame. thread_id as usize ;
370-
371- if let Some ( ref sender) = self . frame_sender_user {
372- let frame_user = FrameUser {
373- stats: StatsUser {
374- fps: self . fps_counter. add_frame( frame. timestamp) ,
375- total_frames: self . total_frame_count,
376- loss_frames: self . loss_frame_count. load( Ordering :: Relaxed ) ,
377- } ,
378- frame: frame. clone( ) ,
379- } ;
380- if let Err ( e) = sender. try_send( frame_user) {
381- log:: warn!( "try send frame failed: {e}" ) ;
382- }
383- }
361+ match encoder_receiver. recv ( ) {
362+ Ok ( ( total_frame_index, img) ) => {
363+ let now = std:: time:: Instant :: now ( ) ;
364+ match video_encoder. encode_frame ( img. into ( ) ) {
365+ Ok ( EncodedFrame :: Frame ( ( _, encoded_frame) ) ) => {
366+ log:: debug!(
367+ "total encoded frame[{total_frame_index}] {} bytes" ,
368+ encoded_frame. len( )
369+ ) ;
384370
385- if let Err ( e) = resize_sender. try_send( ( self . total_frame_count, frame) ) {
386- self . loss_frame_count. fetch_add( 1 , Ordering :: Relaxed ) ;
387- log:: warn!( "try send to handle_worker[{index}] failed: {e}" ) ;
371+ if let Some ( ref writer) = h264_writer {
372+ writer. write_frame ( EncodedFrame :: Frame ( (
373+ total_frame_index,
374+ encoded_frame,
375+ ) ) ) ;
388376 }
389377 }
390- _ => {
391- self . stop( ) ;
392- self . wait_stop(
393- resize_sender,
394- resize_handles,
395- None , // No encoder handle since we're encoding in main thread
396- h264_writer,
397- Some ( encoder) , // Pass encoder for flushing
398- combine_progress_cb,
399- ) ?;
400- break ;
401- }
378+ Err ( e) => log:: warn!( "encode frame failed: {e}" ) ,
379+ _ => unreachable ! ( "invalid EncodedFrame" ) ,
402380 }
403- }
404- recv( encoder_receiver) -> encoded_result => {
405- match encoded_result {
406- Ok ( ( total_frame_index, img) ) => {
407- let now = std:: time:: Instant :: now( ) ;
408-
409- match encoder. encode_frame( img. into( ) ) {
410- Ok ( EncodedFrame :: Frame ( ( _, encoded_frame) ) ) => {
411- log:: debug!(
412- "total encoded frame[{total_frame_index}] {} bytes" ,
413- encoded_frame. len( )
414- ) ;
415-
416- if let Some ( ref writer) = h264_writer {
417- writer. write_frame( EncodedFrame :: Frame ( (
418- total_frame_index,
419- encoded_frame,
420- ) ) ) ;
421- }
422- }
423- Err ( e) => log:: warn!( "encode frame failed: {e}" ) ,
424- _ => unreachable!( "invalid EncodedFrame" ) ,
425- }
426381
427- log:: debug!( "frame encoding time: {:.2?}\n " , now. elapsed( ) ) ;
428- }
429- _ => {
430- // Encoder channel closed, continue with frame receiving
431- }
432- }
382+ log:: debug!(
383+ "frame encoding time: {:.2?}. encoder receiver channel remained: {}\n " ,
384+ now. elapsed( ) ,
385+ encoder_receiver. capacity( ) . unwrap_or_default( ) - encoder_receiver. len( ) ,
386+ ) ;
387+ }
388+ _ => {
389+ log:: info!( "exit encoder receiver channel" ) ;
390+ self . stop ( ) ;
391+ self . wait_stop (
392+ resize_handles,
393+ h264_writer,
394+ Some ( video_encoder) ,
395+ combine_progress_cb,
396+ ) ?;
397+ break ;
433398 }
434399 }
435400 }
@@ -509,16 +474,18 @@ impl RecordingSession {
509474 }
510475
511476 fn resize_workers (
512- receiver : Receiver < ResizeChannelData > ,
477+ capture_receiver : Receiver < Frame > ,
513478 encoder_sender : Sender < EncoderChannelData > ,
479+ frame_sender_user : Option < Sender < FrameUser > > ,
514480 thread_counts : usize ,
515481 start_time : Instant ,
516482 resolution : Resolution ,
483+ total_frame_count : Arc < AtomicU64 > ,
517484 loss_frame_count : Arc < AtomicU64 > ,
518485 enable_previwe_model : bool ,
519486 ) -> Vec < JoinHandle < ( ) > > {
520487 let mut thread_handles = vec ! [ ] ;
521- let channel_size = receiver . capacity ( ) . unwrap_or ( 256 ) ;
488+ let channel_size = capture_receiver . capacity ( ) . unwrap_or ( 256 ) ;
522489 let ( collect_sender, collect_receiver) = bounded ( channel_size) ;
523490
524491 let ( handles, resize_senders) = Self :: resize_workers_main (
@@ -539,9 +506,11 @@ impl RecordingSession {
539506 ) ) ;
540507
541508 thread_handles. push ( Self :: resize_forward_worker (
542- receiver ,
509+ capture_receiver ,
543510 resize_senders,
511+ frame_sender_user,
544512 start_time,
513+ total_frame_count,
545514 loss_frame_count,
546515 ) ) ;
547516
@@ -696,37 +665,56 @@ impl RecordingSession {
696665 }
697666
698667 fn resize_forward_worker (
699- receiver : Receiver < ResizeChannelData > ,
668+ capture_receiver : Receiver < Frame > ,
700669 resize_senders : Vec < Sender < ResizeChannelData > > ,
670+ frame_sender_user : Option < Sender < FrameUser > > ,
701671 start_time : Instant ,
672+ total_frame_count : Arc < AtomicU64 > ,
702673 loss_frame_count : Arc < AtomicU64 > ,
703674 ) -> JoinHandle < ( ) > {
704- // forward frame to specified reiszed thread
705- let loss_frame_count_clone = loss_frame_count . clone ( ) ;
675+ let mut fps_counter = SimpleFpsCounter :: new ( ) ;
676+
706677 let handle = thread:: spawn ( move || {
707678 loop {
708- match receiver. recv ( ) {
709- Ok ( ( total_frame_index, frame) ) => {
710- let frame_timestamp = frame. timestamp . duration_since ( start_time) ;
679+ match capture_receiver. recv ( ) {
680+ Ok ( frame) => {
681+ let total_frame_count =
682+ total_frame_count. fetch_add ( 1 , Ordering :: Relaxed ) + 1 ;
683+
711684 log:: debug!(
712- "total frame[{}] thread[{}] thread_frame[{}] capture time: {:.2?}. thread_fps: {:.2}. timestamp: {:.2?}. chanenel remained: {}" ,
713- total_frame_index ,
685+ "total frame[{}] thread[{}] thread_frame[{}] capture time: {:.2?}. thread_fps: {:.2}. timestamp: {:.2?}. capture receiver channel remained: {}" ,
686+ total_frame_count ,
714687 frame. thread_id,
715688 frame. cb_data. frame_index,
716689 frame. cb_data. capture_time,
717690 ( frame. cb_data. frame_index + 1 ) as f64
718691 / frame. cb_data. elapse. as_secs_f64( ) ,
719- frame_timestamp,
720- receiver. capacity( ) . unwrap_or_default( ) - receiver. len( )
692+ frame. timestamp. duration_since( start_time) ,
693+ capture_receiver. capacity( ) . unwrap_or_default( )
694+ - capture_receiver. len( )
721695 ) ;
722696
697+ if let Some ( ref sender) = frame_sender_user {
698+ let frame_user = FrameUser {
699+ stats : StatsUser {
700+ fps : fps_counter. add_frame ( frame. timestamp ) ,
701+ total_frames : total_frame_count,
702+ loss_frames : loss_frame_count. load ( Ordering :: Relaxed ) ,
703+ } ,
704+ frame : frame. clone ( ) ,
705+ } ;
706+ if let Err ( e) = sender. try_send ( frame_user) {
707+ log:: warn!( "try send frame failed: {e}" ) ;
708+ }
709+ }
710+
723711 let sender = resize_senders
724712 . iter ( )
725713 . min_by ( |a, b| a. len ( ) . cmp ( & b. len ( ) ) )
726714 . unwrap ( ) ;
727715
728- if let Err ( e) = sender. try_send ( ( total_frame_index , frame) ) {
729- loss_frame_count_clone . fetch_add ( 1 , Ordering :: Relaxed ) ;
716+ if let Err ( e) = sender. try_send ( ( total_frame_count , frame) ) {
717+ loss_frame_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
730718 log:: warn!( "resize worker try send failed: {e}" ) ;
731719 }
732720 }
@@ -743,9 +731,7 @@ impl RecordingSession {
743731
744732 fn wait_stop (
745733 mut self ,
746- resize_sender : Sender < ResizeChannelData > ,
747734 resize_handles : Vec < JoinHandle < ( ) > > ,
748- encoder_handle : Option < JoinHandle < ( ) > > ,
749735 mut h264_writer : Option < H264Writer > ,
750736 encoder : Option < VideoEncoder > ,
751737 combine_progress_cb : impl FnMut ( f32 ) ,
@@ -778,20 +764,12 @@ impl RecordingSession {
778764 }
779765 }
780766
781- drop ( resize_sender) ;
782-
783767 for ( index, resize_handle) in resize_handles. into_iter ( ) . enumerate ( ) {
784768 if let Err ( e) = resize_handle. join ( ) {
785769 log:: warn!( "join resize thread[{index}] failed: {:?}" , e) ;
786770 }
787771 }
788772
789- if let Some ( encoder_handle) = encoder_handle {
790- if let Err ( e) = encoder_handle. join ( ) {
791- log:: warn!( "join encoder thread failed: {:?}" , e) ;
792- }
793- }
794-
795773 // Flush encoder if provided to process any delayed frames
796774 if let Some ( encoder) = encoder
797775 && let Some ( ref writer) = h264_writer
@@ -891,12 +869,11 @@ impl RecordingSession {
891869 }
892870
893871 log:: info!(
894- "Total frame: {}. loss frame: {} ({:.2}%). fps: {:.2} " ,
895- self . total_frame_count,
872+ "Total frame: {}. loss frame: {} ({:.2}%)" ,
873+ self . total_frame_count. load ( Ordering :: Relaxed ) ,
896874 self . loss_frame_count. load( Ordering :: Relaxed ) ,
897875 self . loss_frame_count. load( Ordering :: Relaxed ) as f64 * 100.0
898- / self . total_frame_count. max( 1 ) as f64 ,
899- self . fps_counter. fps,
876+ / self . total_frame_count. load( Ordering :: Relaxed ) . max( 1 ) as f64 ,
900877 ) ;
901878
902879 Ok ( ( ) )
0 commit comments