Skip to content

Commit 078444b

Browse files
authored
Merge pull request #1784 from j5ik2o/remote-reliable-deathwatch
feat(remote): add reliable remote DeathWatch delivery
2 parents 2147f7d + c10f4d2 commit 078444b

43 files changed

Lines changed: 3158 additions & 202 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/gap-analysis/remote-gap-analysis.md

Lines changed: 19 additions & 32 deletions
Large diffs are not rendered by default.

modules/actor-core-kernel/src/system/remote/noop_remote_watch_hook.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ impl RemoteWatchHook for NoopRemoteWatchHook {
1616
fn handle_unwatch(&mut self, _target: Pid, _watcher: Pid) -> bool {
1717
false
1818
}
19+
20+
fn handle_deathwatch_notification(&mut self, _watcher: Pid, _terminated: Pid) -> bool {
21+
false
22+
}
1923
}

modules/actor-core-kernel/src/system/remote/remote_watch_hook.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,8 @@ pub trait RemoteWatchHook: Send + 'static {
1212

1313
/// Handles an unwatch request. Returns `true` when the provider consumed the message.
1414
fn handle_unwatch(&mut self, target: Pid, watcher: Pid) -> bool;
15+
16+
/// Handles a remote-bound termination notification. Returns `true` when the provider consumed the
17+
/// message.
18+
fn handle_deathwatch_notification(&mut self, watcher: Pid, terminated: Pid) -> bool;
1519
}

modules/actor-core-kernel/src/system/remote/remote_watch_hook_dyn_shared.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ impl RemoteWatchHookDynShared {
5555
pub(crate) fn handle_unwatch(&self, target: Pid, watcher: Pid) -> bool {
5656
self.with_write(|inner| inner.handle_unwatch(target, watcher))
5757
}
58+
59+
/// Handles a remote-bound termination notification by delegating to the inner hook.
60+
pub(crate) fn handle_deathwatch_notification(&self, watcher: Pid, terminated: Pid) -> bool {
61+
self.with_write(|inner| inner.handle_deathwatch_notification(watcher, terminated))
62+
}
5863
}
5964

6065
impl SharedAccess<Box<dyn RemoteWatchHook>> for RemoteWatchHookDynShared {

modules/actor-core-kernel/src/system/state/system_state_shared.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,12 @@ impl SystemStateShared {
753753
}
754754
Ok(())
755755
},
756-
| SystemMessage::DeathWatchNotification(_) => Ok(()),
756+
| SystemMessage::DeathWatchNotification(terminated) => {
757+
if self.remote_watch_hook.handle_deathwatch_notification(pid, terminated) {
758+
return Ok(());
759+
}
760+
Ok(())
761+
},
757762
| SystemMessage::PipeTask(_) => Ok(()),
758763
| other => Err(SendError::closed(AnyMessage::new(other))),
759764
}

modules/actor-core-kernel/src/system/state/system_state_test.rs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,29 @@ fn remote_watch_hook_consumes_unwatch_is_invoked() {
687687
assert_eq!(calls.last_unwatch, Some((target_pid, watcher_pid)));
688688
}
689689

690+
#[test]
691+
fn remote_watch_hook_consumes_deathwatch_notification_is_invoked() {
692+
let state = build_shared_state();
693+
let watcher_pid = state.allocate_pid();
694+
let target_pid = state.allocate_pid();
695+
696+
let calls = ArcShared::new(SpinSyncMutex::new(RemoteWatchHookCalls::default()));
697+
state.register_remote_watch_hook(Box::new(RecordingRemoteWatchHook::with_notification(
698+
calls.clone(),
699+
false,
700+
false,
701+
true,
702+
)));
703+
704+
state
705+
.send_system_message(watcher_pid, SystemMessage::DeathWatchNotification(target_pid))
706+
.expect("death-watch notification send ok");
707+
708+
let calls = calls.lock();
709+
assert_eq!(calls.notification_calls, 1);
710+
assert_eq!(calls.last_notification, Some((watcher_pid, target_pid)));
711+
}
712+
690713
#[test]
691714
fn remote_watch_hook_replaces_previous_registration() {
692715
let state = build_shared_state_with_noop_dispatcher();
@@ -759,21 +782,33 @@ struct RestartProbeActor;
759782

760783
#[derive(Default)]
761784
struct RemoteWatchHookCalls {
762-
watch_calls: usize,
763-
unwatch_calls: usize,
764-
last_watch: Option<(Pid, Pid)>,
765-
last_unwatch: Option<(Pid, Pid)>,
785+
watch_calls: usize,
786+
unwatch_calls: usize,
787+
notification_calls: usize,
788+
last_watch: Option<(Pid, Pid)>,
789+
last_unwatch: Option<(Pid, Pid)>,
790+
last_notification: Option<(Pid, Pid)>,
766791
}
767792

768793
struct RecordingRemoteWatchHook {
769-
calls: ArcShared<SpinSyncMutex<RemoteWatchHookCalls>>,
770-
consume_watch: bool,
771-
consume_unwatch: bool,
794+
calls: ArcShared<SpinSyncMutex<RemoteWatchHookCalls>>,
795+
consume_watch: bool,
796+
consume_unwatch: bool,
797+
consume_notification: bool,
772798
}
773799

774800
impl RecordingRemoteWatchHook {
775801
fn new(calls: ArcShared<SpinSyncMutex<RemoteWatchHookCalls>>, consume_watch: bool, consume_unwatch: bool) -> Self {
776-
Self { calls, consume_watch, consume_unwatch }
802+
Self::with_notification(calls, consume_watch, consume_unwatch, false)
803+
}
804+
805+
fn with_notification(
806+
calls: ArcShared<SpinSyncMutex<RemoteWatchHookCalls>>,
807+
consume_watch: bool,
808+
consume_unwatch: bool,
809+
consume_notification: bool,
810+
) -> Self {
811+
Self { calls, consume_watch, consume_unwatch, consume_notification }
777812
}
778813
}
779814

@@ -791,6 +826,13 @@ impl crate::system::remote::RemoteWatchHook for RecordingRemoteWatchHook {
791826
calls.last_unwatch = Some((target, watcher));
792827
self.consume_unwatch
793828
}
829+
830+
fn handle_deathwatch_notification(&mut self, watcher: Pid, terminated: Pid) -> bool {
831+
let mut calls = self.calls.lock();
832+
calls.notification_calls += 1;
833+
calls.last_notification = Some((watcher, terminated));
834+
self.consume_notification
835+
}
794836
}
795837

796838
struct RemoteEventRecorder {

0 commit comments

Comments
 (0)