@@ -3,17 +3,25 @@ use std::{
33 fs:: File ,
44 io:: BufReader ,
55 path:: Path ,
6+ sync:: {
7+ Arc ,
8+ atomic:: { AtomicI32 , Ordering } ,
9+ } ,
610 time:: { Duration , Instant } ,
711} ;
812use tokio:: sync:: broadcast:: { self , Sender } ;
913use webrtc:: media:: io:: h264_reader:: H264Reader ;
10- use wrtc:: { Event , PacketData , session:: WebRTCServerSessionConfig , webrtc:: WebRTCServer } ;
14+ use webrtc:: media:: io:: ogg_reader:: OggReader ;
15+ use wrtc:: {
16+ Event , OPUS_SAMPLE_RATE , PacketData , session:: WebRTCServerSessionConfig , webrtc:: WebRTCServer ,
17+ } ;
1118
1219#[ tokio:: main]
1320async fn main ( ) -> Result < ( ) > {
1421 env_logger:: init ( ) ;
1522
1623 let video_path = "./data/test.h264" . to_string ( ) ;
24+ let audio_path = "./data/test.ogg" . to_string ( ) ;
1725 let config = WebRTCServerSessionConfig :: default ( ) ;
1826 let ( packet_sender, _) = broadcast:: channel ( 128 ) ;
1927 let ( event_sender, mut event_receiver) = broadcast:: channel ( 16 ) ;
@@ -22,34 +30,42 @@ async fn main() -> Result<()> {
2230 bail ! ( "video file: '{video_path}' not exist" ) ;
2331 }
2432
33+ if !Path :: new ( & audio_path) . exists ( ) {
34+ bail ! ( "audio file: '{audio_path}' not exist" ) ;
35+ }
36+
2537 let packet_sender_clone = packet_sender. clone ( ) ;
2638 tokio:: spawn ( async move {
27- let mut connections = 0 ;
39+ let connections = Arc :: new ( AtomicI32 :: new ( 0 ) ) ;
40+
2841 loop {
2942 tokio:: select! {
3043 ev = event_receiver. recv( ) => {
3144 match ev {
3245 Ok ( Event :: PeerConnected ( _) ) => {
33- if connections == 0 {
34- h264_streaming_thread( packet_sender_clone. clone( ) , video_path. clone( ) ) ;
46+ if connections. load( Ordering :: Relaxed ) == 0 {
47+ h264_streaming_thread( packet_sender_clone. clone( ) , video_path. clone( ) , connections. clone( ) ) ;
48+ ogg_stream_thread( packet_sender_clone. clone( ) , audio_path. clone( ) , connections. clone( ) ) ;
3549 }
3650
37- connections += 1 ;
38- log:: info!( "connections count: {connections}" ) ;
51+ let count = connections . fetch_add ( 1 , Ordering :: Relaxed ) ;
52+ log:: info!( "connections count: {}" , count + 1 ) ;
3953 }
4054 Ok ( Event :: LocalClosed ( addr) ) => {
41- if connections > 0 {
42- connections -= 1 ;
55+ if connections. fetch_sub ( 1 , Ordering :: Relaxed ) == 0 {
56+ connections. store ( 0 , Ordering :: Relaxed ) ;
4357 }
58+
4459 log:: info!( "LocalClosed({addr})" ) ;
45- log:: info!( "connections count: {connections}" ) ;
60+ log:: info!( "connections count: {}" , connections . load ( Ordering :: Relaxed ) ) ;
4661 }
4762 Ok ( Event :: PeerClosed ( addr) ) => {
48- if connections > 0 {
49- connections -= 1 ;
63+ if connections. fetch_sub ( 1 , Ordering :: Relaxed ) == 0 {
64+ connections. store ( 0 , Ordering :: Relaxed ) ;
5065 }
66+
5167 log:: info!( "PeerClosed({addr})" ) ;
52- log:: info!( "connections count: {connections}" ) ;
68+ log:: info!( "connections count: {}" , connections . load ( Ordering :: Relaxed ) ) ;
5369 }
5470 _ => ( ) ,
5571 }
@@ -75,9 +91,13 @@ async fn main() -> Result<()> {
7591 Ok ( ( ) )
7692}
7793
78- fn h264_streaming_thread ( packet_sender : Sender < PacketData > , video_file : String ) {
94+ fn h264_streaming_thread (
95+ packet_sender : Sender < PacketData > ,
96+ video_file : String ,
97+ connections : Arc < AtomicI32 > ,
98+ ) {
7999 tokio:: spawn ( async move {
80- loop {
100+ ' out : loop {
81101 let file = File :: open ( & video_file. clone ( ) ) . unwrap ( ) ;
82102 let reader = BufReader :: new ( file) ;
83103 let mut h264 = H264Reader :: new ( reader, 1_048_576 ) ;
@@ -106,6 +126,58 @@ fn h264_streaming_thread(packet_sender: Sender<PacketData>, video_file: String)
106126 } ;
107127
108128 _ = ticker. tick ( ) . await ;
129+
130+ if connections. load ( Ordering :: Relaxed ) <= 0 {
131+ break ' out;
132+ }
133+ }
134+
135+ if connections. load ( Ordering :: Relaxed ) <= 0 {
136+ break ;
137+ }
138+ }
139+
140+ log:: info!( "h264_streaming_thread exit..." ) ;
141+ } ) ;
142+ }
143+
144+ fn ogg_stream_thread (
145+ packet_sender : Sender < PacketData > ,
146+ audio_file : String ,
147+ connections : Arc < AtomicI32 > ,
148+ ) {
149+ tokio:: spawn ( async move {
150+ ' out: loop {
151+ let file = File :: open ( & audio_file) . unwrap ( ) ;
152+ let reader = BufReader :: new ( file) ;
153+ let ( mut ogg, _) = OggReader :: new ( reader, true ) . unwrap ( ) ;
154+ let page_duration = Duration :: from_millis ( 20 ) ;
155+
156+ let mut last_granule = 0 ;
157+ let mut ticker = tokio:: time:: interval ( page_duration) ;
158+
159+ while let Ok ( ( page_data, page_header) ) = ogg. parse_next_page ( ) {
160+ let sample_count = page_header. granule_position - last_granule;
161+ last_granule = page_header. granule_position ;
162+ let sample_duration = Duration :: from_millis ( sample_count * 1000 / OPUS_SAMPLE_RATE ) ;
163+
164+ if let Err ( e) = packet_sender. send ( PacketData :: Audio {
165+ timestamp : Instant :: now ( ) ,
166+ duration : sample_duration,
167+ data : page_data. freeze ( ) . into ( ) ,
168+ } ) {
169+ log:: warn!( "send audio data failed: {e}" ) ;
170+ }
171+
172+ _ = ticker. tick ( ) . await ;
173+
174+ if connections. load ( Ordering :: Relaxed ) <= 0 {
175+ break ' out;
176+ }
177+ }
178+
179+ if connections. load ( Ordering :: Relaxed ) <= 0 {
180+ break ;
109181 }
110182 }
111183 } ) ;
0 commit comments