Skip to content

Commit a77e512

Browse files
committed
[*] fix: webrtc whep_server_demos
1 parent 11fc4d9 commit a77e512

File tree

5 files changed

+55
-75
lines changed

5 files changed

+55
-75
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/wrtc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ reqwest = { workspace = true, features = ["json"] }
2929
[dev-dependencies]
3030
image.workspace = true
3131
hound.workspace = true
32+
once_cell.workspace = true
3233
stunclient.workspace = true
3334
env_logger.workspace = true
3435
serde_json.workspace = true

lib/wrtc/examples/whep_server2_demo.rs

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use anyhow::{Result, bail};
22
use hound::WavReader;
33
use image::{ImageBuffer, Rgb};
4+
use once_cell::sync::Lazy;
45
use std::{
6+
collections::HashSet,
57
fs::File,
68
path::Path,
79
sync::{
8-
Arc,
10+
Arc, Mutex,
911
atomic::{AtomicI32, Ordering},
1012
},
1113
time::{Duration, Instant},
@@ -19,6 +21,7 @@ use wrtc::{
1921

2022
const IMG_WIDTH: u32 = 1920;
2123
const IMG_HEIGHT: u32 = 1080;
24+
static CONNECTIONS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::default()));
2225

2326
#[tokio::main]
2427
async fn main() -> Result<()> {
@@ -35,35 +38,32 @@ async fn main() -> Result<()> {
3538

3639
let packet_sender_clone = packet_sender.clone();
3740
tokio::spawn(async move {
38-
let connections = Arc::new(AtomicI32::new(0));
3941
loop {
4042
tokio::select! {
4143
ev = event_receiver.recv() => {
4244
match ev {
43-
Ok(Event::PeerConnected(_)) => {
44-
if connections.load(Ordering::Relaxed) == 0 {
45-
h264_streaming_thread(packet_sender_clone.clone(), connections.clone());
46-
wav_stream_thread(packet_sender_clone.clone(), audio_path.clone(), connections.clone());
45+
Ok(Event::PeerConnected(addr)) => {
46+
let mut connections = CONNECTIONS.lock().unwrap();
47+
if connections.is_empty(){
48+
h264_streaming_thread(packet_sender_clone.clone());
49+
wav_stream_thread(packet_sender_clone.clone(), audio_path.clone());
4750
}
4851

49-
let count = connections.fetch_add(1, Ordering::Relaxed);
50-
log::info!("connections count: {}", count + 1);
52+
connections.insert(addr);
53+
log::info!("connections count: {}", connections.len());
5154
}
5255
Ok(Event::LocalClosed(addr)) => {
53-
if connections.fetch_sub(1, Ordering::Relaxed) == 0 {
54-
connections.store(0, Ordering::Relaxed);
55-
}
56-
5756
log::info!("LocalClosed({addr})");
58-
log::info!("connections count: {}", connections.load(Ordering::Relaxed));
57+
58+
let mut connections = CONNECTIONS.lock().unwrap();
59+
connections.remove(&addr);
60+
log::info!("connections count: {}", connections.len());
5961
}
6062
Ok(Event::PeerClosed(addr)) => {
61-
if connections.fetch_sub(1, Ordering::Relaxed) == 0 {
62-
connections.store(0, Ordering::Relaxed);
63-
}
64-
6563
log::info!("PeerClosed({addr})");
66-
log::info!("connections count: {}", connections.load(Ordering::Relaxed));
64+
let mut connections = CONNECTIONS.lock().unwrap();
65+
connections.remove(&addr);
66+
log::info!("connections count: {}", connections.len());
6767
}
6868
_ => (),
6969
}
@@ -89,7 +89,7 @@ async fn main() -> Result<()> {
8989
Ok(())
9090
}
9191

92-
fn h264_streaming_thread(packet_sender: Sender<PacketData>, connections: Arc<AtomicI32>) {
92+
fn h264_streaming_thread(packet_sender: Sender<PacketData>) {
9393
std::thread::spawn(move || {
9494
let fps = 25;
9595

@@ -133,7 +133,7 @@ fn h264_streaming_thread(packet_sender: Sender<PacketData>, connections: Arc<Ato
133133

134134
std::thread::sleep(Duration::from_secs_f64(1 as f64 / fps as f64));
135135

136-
if connections.load(Ordering::Relaxed) <= 0 {
136+
if CONNECTIONS.lock().unwrap().is_empty() {
137137
break 'out;
138138
}
139139
}
@@ -149,7 +149,7 @@ fn h264_streaming_thread(packet_sender: Sender<PacketData>, connections: Arc<Ato
149149
log::warn!("Failed to flush encoder frame: {:?}", e);
150150
}
151151

152-
if connections.load(Ordering::Relaxed) <= 0 {
152+
if CONNECTIONS.lock().unwrap().is_empty() {
153153
break;
154154
}
155155
}
@@ -158,11 +158,7 @@ fn h264_streaming_thread(packet_sender: Sender<PacketData>, connections: Arc<Ato
158158
});
159159
}
160160

161-
fn wav_stream_thread(
162-
packet_sender: Sender<PacketData>,
163-
audio_file: String,
164-
connections: Arc<AtomicI32>,
165-
) {
161+
fn wav_stream_thread(packet_sender: Sender<PacketData>, audio_file: String) {
166162
tokio::spawn(async move {
167163
'out: loop {
168164
let file = File::open(&audio_file).unwrap();
@@ -236,12 +232,12 @@ fn wav_stream_thread(
236232

237233
ticker.tick().await;
238234

239-
if connections.load(Ordering::Relaxed) <= 0 {
235+
if CONNECTIONS.lock().unwrap().is_empty() {
240236
break 'out;
241237
}
242238
}
243239

244-
if connections.load(Ordering::Relaxed) <= 0 {
240+
if CONNECTIONS.lock().unwrap().is_empty() {
245241
break;
246242
}
247243
}

lib/wrtc/examples/whep_server_demo.rs

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
use anyhow::{Result, bail};
2+
use once_cell::sync::Lazy;
23
use std::{
4+
collections::HashSet,
35
fs::File,
46
io::BufReader,
57
path::Path,
6-
sync::{
7-
Arc,
8-
atomic::{AtomicI32, Ordering},
9-
},
8+
sync::Mutex,
109
time::{Duration, Instant},
1110
};
1211
use tokio::sync::broadcast::{self, Sender};
13-
use webrtc::media::io::h264_reader::H264Reader;
14-
use webrtc::media::io::ogg_reader::OggReader;
12+
use webrtc::media::io::{h264_reader::H264Reader, ogg_reader::OggReader};
1513
use wrtc::{
1614
Event, PacketData, opus::OPUS_SAMPLE_RATE, session::WebRTCServerSessionConfig,
1715
webrtc::WebRTCServer,
1816
};
1917

18+
static CONNECTIONS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::default()));
19+
2020
#[tokio::main]
2121
async fn main() -> Result<()> {
2222
env_logger::init();
@@ -37,36 +37,32 @@ async fn main() -> Result<()> {
3737

3838
let packet_sender_clone = packet_sender.clone();
3939
tokio::spawn(async move {
40-
let connections = Arc::new(AtomicI32::new(0));
41-
4240
loop {
4341
tokio::select! {
4442
ev = event_receiver.recv() => {
4543
match ev {
46-
Ok(Event::PeerConnected(_)) => {
47-
if connections.load(Ordering::Relaxed) == 0 {
48-
h264_streaming_thread(packet_sender_clone.clone(), video_path.clone(), connections.clone());
49-
ogg_stream_thread(packet_sender_clone.clone(), audio_path.clone(), connections.clone());
44+
Ok(Event::PeerConnected(addr)) => {
45+
let mut connections = CONNECTIONS.lock().unwrap();
46+
if connections.is_empty(){
47+
h264_streaming_thread(packet_sender_clone.clone(), video_path.clone());
48+
ogg_stream_thread(packet_sender_clone.clone(), audio_path.clone());
5049
}
5150

52-
let count = connections.fetch_add(1, Ordering::Relaxed);
53-
log::info!("connections count: {}", count + 1);
51+
connections.insert(addr);
52+
log::info!("connections count: {}", connections.len());
5453
}
5554
Ok(Event::LocalClosed(addr)) => {
56-
if connections.fetch_sub(1, Ordering::Relaxed) == 0 {
57-
connections.store(0, Ordering::Relaxed);
58-
}
59-
6055
log::info!("LocalClosed({addr})");
61-
log::info!("connections count: {}", connections.load(Ordering::Relaxed));
56+
let mut connections = CONNECTIONS.lock().unwrap();
57+
connections.remove(&addr);
58+
log::info!("connections count: {}", connections.len());
6259
}
6360
Ok(Event::PeerClosed(addr)) => {
64-
if connections.fetch_sub(1, Ordering::Relaxed) == 0 {
65-
connections.store(0, Ordering::Relaxed);
66-
}
67-
6861
log::info!("PeerClosed({addr})");
69-
log::info!("connections count: {}", connections.load(Ordering::Relaxed));
62+
let mut connections = CONNECTIONS.lock().unwrap();
63+
connections.remove(&addr);
64+
log::info!("connections count: {}", connections.len());
65+
7066
}
7167
_ => (),
7268
}
@@ -92,11 +88,7 @@ async fn main() -> Result<()> {
9288
Ok(())
9389
}
9490

95-
fn h264_streaming_thread(
96-
packet_sender: Sender<PacketData>,
97-
video_file: String,
98-
connections: Arc<AtomicI32>,
99-
) {
91+
fn h264_streaming_thread(packet_sender: Sender<PacketData>, video_file: String) {
10092
tokio::spawn(async move {
10193
'out: loop {
10294
let file = File::open(&video_file.clone()).unwrap();
@@ -128,12 +120,12 @@ fn h264_streaming_thread(
128120

129121
_ = ticker.tick().await;
130122

131-
if connections.load(Ordering::Relaxed) <= 0 {
123+
if CONNECTIONS.lock().unwrap().is_empty() {
132124
break 'out;
133125
}
134126
}
135127

136-
if connections.load(Ordering::Relaxed) <= 0 {
128+
if CONNECTIONS.lock().unwrap().is_empty() {
137129
break;
138130
}
139131
}
@@ -142,11 +134,7 @@ fn h264_streaming_thread(
142134
});
143135
}
144136

145-
fn ogg_stream_thread(
146-
packet_sender: Sender<PacketData>,
147-
audio_file: String,
148-
connections: Arc<AtomicI32>,
149-
) {
137+
fn ogg_stream_thread(packet_sender: Sender<PacketData>, audio_file: String) {
150138
tokio::spawn(async move {
151139
'out: loop {
152140
let file = File::open(&audio_file).unwrap();
@@ -172,12 +160,12 @@ fn ogg_stream_thread(
172160

173161
_ = ticker.tick().await;
174162

175-
if connections.load(Ordering::Relaxed) <= 0 {
163+
if CONNECTIONS.lock().unwrap().is_empty() {
176164
break 'out;
177165
}
178166
}
179167

180-
if connections.load(Ordering::Relaxed) <= 0 {
168+
if CONNECTIONS.lock().unwrap().is_empty() {
181169
break;
182170
}
183171
}

lib/wrtc/src/session.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,7 @@ impl WebRTCServerSession {
148148
let ty = eles[1];
149149

150150
if eles.len() < 2 {
151-
log::warn!(
152-
"WebRTCServerSession::run the http path is not correct: {}",
153-
http_request.uri.path
154-
);
151+
log::warn!("the http path is not correct: {}", http_request.uri.path);
155152

156153
return Err(SessionError::HttpRequestPathError);
157154
}
@@ -247,10 +244,7 @@ impl WebRTCServerSession {
247244
}
248245
http_method_name::PATCH => (),
249246
_ => {
250-
log::warn!(
251-
"WebRTCServerSession::unsupported method name: {}",
252-
http_request.method
253-
);
247+
log::warn!("unsupported method name: {}", http_request.method);
254248
}
255249
}
256250

@@ -271,7 +265,7 @@ impl WebRTCServerSession {
271265

272266
sessions_unlock.remove(uuid);
273267
} else {
274-
log::warn!("the session :{uuid} is not exited.");
268+
log::warn!("the session: [{uuid}] is not exited.");
275269
}
276270
}
277271

0 commit comments

Comments
 (0)