Skip to content

Commit 8836ef6

Browse files
authored
refactor(observe): make Arc internal to proc and cgroup caches (#593)
1 parent 78a1a8c commit 8836ef6

File tree

7 files changed

+103
-89
lines changed

7 files changed

+103
-89
lines changed

auraed/src/cells/cell_service/cell_service.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,21 @@ use std::time::Duration;
4444
use std::{process::ExitStatus, sync::Arc};
4545
use tokio::sync::Mutex;
4646
use tonic::{Code, Request, Response, Status};
47-
use tracing::{info, trace, warn};
47+
use tracing::{info, instrument, trace, warn};
4848

4949
/**
5050
* Macro to perform an operation within a cell.
5151
* It retries the operation with an exponential backoff strategy in case of connection errors.
5252
*/
5353
macro_rules! do_in_cell {
5454
($self:ident, $cell_name:ident, $function:ident, $request:ident) => {{
55-
let mut cells = $self.cells.lock().await;
56-
5755
// Retrieve the client socket for the specified cell
58-
let client_socket = cells
56+
let client_socket = {
57+
let mut cells = $self.cells.lock().await;
58+
cells
5959
.get(&$cell_name, |cell| cell.client_socket())
60-
.map_err(CellsServiceError::CellsError)?;
60+
.map_err(CellsServiceError::CellsError)?
61+
};
6162

6263
// Initialize the exponential backoff strategy for retrying the operation
6364
let mut retry_strategy = backoff::ExponentialBackoffBuilder::new()
@@ -289,22 +290,26 @@ impl CellService {
289290
assert!(cell_name.is_none());
290291
info!("CellService: stop() executable_name={:?}", executable_name,);
291292

292-
let mut executables = self.executables.lock().await;
293-
294-
// Retrieve the process ID (PID) of the executable to be stopped
295-
let pid = executables
296-
.get(&executable_name)
297-
.map_err(CellsServiceError::ExecutablesError)?
298-
.pid()
299-
.map_err(CellsServiceError::Io)?
300-
.expect("pid")
301-
.as_raw();
293+
let pid = {
294+
let mut executables = self.executables.lock().await;
295+
296+
// Retrieve the process ID (PID) of the executable to be stopped
297+
let pid = executables
298+
.get(&executable_name)
299+
.map_err(CellsServiceError::ExecutablesError)?
300+
.pid()
301+
.map_err(CellsServiceError::Io)?
302+
.expect("pid")
303+
.as_raw();
304+
305+
// Stop the executable and handle any errors
306+
let _: ExitStatus = executables
307+
.stop(&executable_name)
308+
.await
309+
.map_err(CellsServiceError::ExecutablesError)?;
302310

303-
// Stop the executable and handle any errors
304-
let _: ExitStatus = executables
305-
.stop(&executable_name)
306-
.await
307-
.map_err(CellsServiceError::ExecutablesError)?;
311+
pid
312+
};
308313

309314
// Remove the executable's logs from the observe service.
310315
if let Err(e) = self
@@ -472,6 +477,7 @@ impl cell_service_server::CellService for CellService {
472477
Ok(Response::new(self.allocate(request).await?))
473478
}
474479

480+
#[instrument(skip(self))]
475481
async fn free(
476482
&self,
477483
request: Request<CellServiceFreeRequest>,
@@ -485,6 +491,7 @@ impl cell_service_server::CellService for CellService {
485491
Ok(Response::new(self.free(request).await?))
486492
}
487493

494+
#[instrument(skip(self))]
488495
async fn start(
489496
&self,
490497
request: Request<CellServiceStartRequest>,
@@ -513,6 +520,7 @@ impl cell_service_server::CellService for CellService {
513520
}
514521
}
515522

523+
#[instrument(skip(self))]
516524
async fn stop(
517525
&self,
518526
request: Request<CellServiceStopRequest>,
@@ -586,7 +594,7 @@ mod tests {
586594

587595
// Create a new instance of CellService for testing
588596
let service = CellService::new(ObserveService::new(
589-
Arc::new(LogChannel::new(String::from("test"))),
597+
LogChannel::new(String::from("test")),
590598
(None, None, None),
591599
));
592600

@@ -685,7 +693,7 @@ mod tests {
685693
#[tokio::test]
686694
async fn start_registers_log_channels_and_returns_uid_gid() {
687695
let observe_service = ObserveService::new(
688-
Arc::new(LogChannel::new(String::from("test"))),
696+
LogChannel::new(String::from("test")),
689697
(None, None, None),
690698
);
691699
let service = CellService::new(observe_service.clone());

auraed/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ use proto::{
8686
vms::vm_service_server::VmServiceServer,
8787
};
8888
use std::path::{Path, PathBuf};
89-
use std::sync::Arc;
9089
use tokio::io::AsyncRead;
9190
use tokio::io::AsyncWrite;
9291
use tokio::task::JoinHandle;
@@ -247,7 +246,7 @@ pub async fn run(
247246
tonic_health::server::health_reporter();
248247

249248
let observe_service = ObserveService::new(
250-
Arc::new(LogChannel::new(String::from("auraed"))),
249+
LogChannel::new(String::from("auraed")),
251250
perf_events,
252251
);
253252
let observe_service_server =

auraed/src/observe/cgroup_cache.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
\* -------------------------------------------------------------------------- */
4444
use std::collections::HashMap;
4545
use std::ffi::OsString;
46+
use std::sync::{Arc, Mutex};
4647
use tracing::warn;
4748
use walkdir::DirEntryExt;
4849
use walkdir::WalkDir;
@@ -53,30 +54,40 @@ use walkdir::WalkDir;
5354
/// we should think if inode wraparound is a potential issue. We could look at
5455
/// how the Linux inode cache is implemented:
5556
/// https://elixir.bootlin.com/linux/latest/source/fs/inode.c
56-
#[derive(Debug)]
57+
#[derive(Debug, Clone)]
5758
pub(crate) struct CgroupCache {
5859
root: OsString,
59-
cache: HashMap<u64, OsString>,
60+
cache: Arc<Mutex<HashMap<u64, OsString>>>,
6061
}
6162

6263
impl CgroupCache {
6364
pub fn new(root: OsString) -> Self {
64-
Self { root, cache: HashMap::new() }
65+
Self { root, cache: Arc::new(Mutex::new(HashMap::new())) }
6566
}
6667

6768
pub fn get(&mut self, ino: u64) -> Option<OsString> {
68-
if let Some(path) = self.cache.get(&ino) {
69-
Some(path.clone())
70-
} else {
71-
self.refresh_cache();
72-
self.cache.get(&ino).cloned()
69+
{
70+
let cache =
71+
self.cache.lock().expect("failed to acquire cgroup lock");
72+
if let Some(path) = cache.get(&ino) {
73+
return Some(path.clone());
74+
}
7375
}
76+
77+
self.refresh_cache();
78+
self.cache
79+
.lock()
80+
.expect("failed to acquire cgroup lock")
81+
.get(&ino)
82+
.cloned()
7483
}
7584

7685
fn refresh_cache(&mut self) {
86+
let mut cache =
87+
self.cache.lock().expect("failed to acquire cgroup lock");
7788
WalkDir::new(&self.root).into_iter().for_each(|res| match res {
7889
Ok(dir_entry) => {
79-
_ = self.cache.insert(dir_entry.ino(), dir_entry.path().into());
90+
_ = cache.insert(dir_entry.ino(), dir_entry.path().into());
8091
}
8192
Err(e) => {
8293
warn!("could not read from {:?}: {}", self.root, e);

auraed/src/observe/observe_service.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ use proto::observe::{
3131
LogItem, Signal as PosixSignal, WorkloadType, observe_service_server,
3232
};
3333
use std::collections::HashMap;
34+
use std::sync::Arc;
3435
use std::time::Duration;
35-
use std::{ffi::OsString, sync::Arc};
3636
use tokio::sync::mpsc;
3737
use tokio::sync::{Mutex, broadcast::Receiver};
3838
use tokio_stream::wrappers::ReceiverStream;
3939
use tonic::{Request, Response, Status};
40-
use tracing::info;
40+
use tracing::{info, instrument};
4141

4242
#[derive(Debug, Clone)]
4343
pub struct ObserveService {
44-
aurae_logger: Arc<LogChannel>,
45-
cgroup_cache: Arc<Mutex<CgroupCache>>,
46-
proc_cache: Option<Arc<Mutex<ProcCache>>>,
44+
aurae_logger: LogChannel,
45+
cgroup_cache: CgroupCache,
46+
proc_cache: Option<ProcCache>,
4747
posix_signals: Option<PerfEventBroadcast<Signal>>,
4848
sub_process_consumer_list:
4949
Arc<Mutex<HashMap<i32, HashMap<LogChannelType, LogChannel>>>>,
@@ -56,24 +56,20 @@ type PerfEvents = (
5656
);
5757

5858
impl ObserveService {
59-
pub fn new(aurae_logger: Arc<LogChannel>, perf_events: PerfEvents) -> Self {
59+
pub fn new(aurae_logger: LogChannel, perf_events: PerfEvents) -> Self {
6060
let proc_cache = match perf_events {
61-
(Some(f), Some(e), _) => {
62-
Some(Arc::new(Mutex::new(ProcCache::new(
63-
Duration::from_secs(60),
64-
Duration::from_secs(60),
65-
f,
66-
e,
67-
ProcfsProcessInfo {},
68-
))))
69-
}
61+
(Some(f), Some(e), _) => Some(ProcCache::new(
62+
Duration::from_secs(60),
63+
Duration::from_secs(60),
64+
f,
65+
e,
66+
ProcfsProcessInfo {},
67+
)),
7068
_ => None,
7169
};
7270
Self {
7371
aurae_logger,
74-
cgroup_cache: Arc::new(Mutex::new(CgroupCache::new(
75-
OsString::from("/sys/fs/cgroup"),
76-
))),
72+
cgroup_cache: CgroupCache::new("/sys/fs/cgroup".into()),
7773
proc_cache,
7874
posix_signals: perf_events.2,
7975
sub_process_consumer_list: Arc::new(Mutex::new(HashMap::new())),
@@ -133,7 +129,8 @@ impl ObserveService {
133129
self.aurae_logger.subscribe()
134130
}
135131

136-
async fn get_posix_signals_stream(
132+
#[instrument(skip(self))]
133+
fn get_posix_signals_stream(
137134
&self,
138135
filter: Option<(WorkloadType, String)>,
139136
) -> ReceiverStream<Result<GetPosixSignalsStreamResponse, Status>> {
@@ -212,11 +209,11 @@ impl observe_service_server::ObserveService for ObserveService {
212209
&self,
213210
request: Request<GetSubProcessStreamRequest>,
214211
) -> Result<Response<Self::GetSubProcessStreamStream>, Status> {
215-
let channel = LogChannelType::try_from(request.get_ref().channel_type)
216-
.map_err(|_| ObserveServiceError::InvalidLogChannelType {
217-
channel_type: request.get_ref().channel_type,
218-
})?;
219-
let pid: i32 = request.get_ref().process_id;
212+
let GetSubProcessStreamRequest { process_id: pid, channel_type } =
213+
request.into_inner();
214+
let channel = LogChannelType::try_from(channel_type).map_err(|_| {
215+
ObserveServiceError::InvalidLogChannelType { channel_type }
216+
})?;
220217

221218
println!("Requested Channel {channel:?}");
222219
println!("Requested Process ID {pid}");
@@ -269,15 +266,9 @@ impl observe_service_server::ObserveService for ObserveService {
269266
));
270267
}
271268

272-
Ok(Response::new(
273-
self.get_posix_signals_stream(
274-
request
275-
.into_inner()
276-
.workload
277-
.map(|w| (w.workload_type(), w.id)),
278-
)
279-
.await,
280-
))
269+
Ok(Response::new(self.get_posix_signals_stream(
270+
request.into_inner().workload.map(|w| (w.workload_type(), w.id)),
271+
)))
281272
}
282273
}
283274

@@ -286,12 +277,11 @@ mod tests {
286277
use super::ObserveService;
287278
use crate::logging::log_channel::LogChannel;
288279
use proto::observe::LogChannelType;
289-
use std::sync::Arc;
290280

291281
#[tokio::test]
292282
async fn test_register_sub_process_channel_success() {
293283
let svc = ObserveService::new(
294-
Arc::new(LogChannel::new(String::from("auraed"))),
284+
LogChannel::new(String::from("auraed")),
295285
(None, None, None),
296286
);
297287
assert!(
@@ -310,7 +300,7 @@ mod tests {
310300
#[tokio::test]
311301
async fn test_register_sub_process_channel_duplicate_error() {
312302
let svc = ObserveService::new(
313-
Arc::new(LogChannel::new(String::from("auraed"))),
303+
LogChannel::new(String::from("auraed")),
314304
(None, None, None),
315305
);
316306
assert!(
@@ -338,7 +328,7 @@ mod tests {
338328
#[tokio::test]
339329
async fn test_unregister_sub_process_channel_success() {
340330
let svc = ObserveService::new(
341-
Arc::new(LogChannel::new(String::from("auraed"))),
331+
LogChannel::new(String::from("auraed")),
342332
(None, None, None),
343333
);
344334
assert!(
@@ -362,7 +352,7 @@ mod tests {
362352
#[tokio::test]
363353
async fn test_unregister_sub_process_channel_no_pid_error() {
364354
let svc = ObserveService::new(
365-
Arc::new(LogChannel::new(String::from("auraed"))),
355+
LogChannel::new(String::from("auraed")),
366356
(None, None, None),
367357
);
368358
assert!(
@@ -377,7 +367,7 @@ mod tests {
377367
#[tokio::test]
378368
async fn test_unregister_sub_process_channel_no_channel_type_error() {
379369
let svc = ObserveService::new(
380-
Arc::new(LogChannel::new(String::from("auraed"))),
370+
LogChannel::new(String::from("auraed")),
381371
(None, None, None),
382372
);
383373
assert!(

0 commit comments

Comments
 (0)