-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathgroup_chat_repository.rs
More file actions
131 lines (119 loc) · 4.32 KB
/
group_chat_repository.rs
File metadata and controls
131 lines (119 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use event_store_adapter_rs::types::{Aggregate, Event, EventStore};
use std::collections::{HashMap, VecDeque};
use command_domain::group_chat::GroupChatEvent;
use command_domain::group_chat::{GroupChat, GroupChatId};
use command_interface_adaptor_if::{GroupChatRepository, GroupChatRepositoryError};
#[derive(Debug, Clone)]
pub struct MockGroupChatRepository {
events: HashMap<GroupChatId, VecDeque<GroupChatEvent>>,
snapshot: HashMap<GroupChatId, Option<GroupChat>>,
}
impl MockGroupChatRepository {
pub fn new() -> Self {
Self {
events: HashMap::new(),
snapshot: HashMap::new(),
}
}
}
#[async_trait::async_trait]
impl GroupChatRepository for MockGroupChatRepository {
async fn store(&mut self, event: &GroupChatEvent, snapshot: &GroupChat) -> Result<(), GroupChatRepositoryError> {
self
.events
.entry(event.aggregate_id().clone())
.or_insert_with(VecDeque::new)
.push_back(event.clone());
*self
.snapshot
.entry(event.aggregate_id().clone())
.or_insert(Some(snapshot.clone())) = Some(snapshot.clone());
Ok(())
}
async fn find_by_id(&self, id: &GroupChatId) -> Result<Option<GroupChat>, GroupChatRepositoryError> {
let events = self.events.get(id).unwrap().clone();
let snapshot_opt = self.snapshot.get(id).unwrap().clone();
if let Some(snapshot) = snapshot_opt {
let result = GroupChat::replay(events.into(), snapshot);
Ok(Some(result))
} else {
Ok(None)
}
}
}
#[derive(Debug, Clone)]
pub struct GroupChatRepositoryImpl<ES: EventStore<AID = GroupChatId, AG = GroupChat, EV = GroupChatEvent>> {
event_store: ES,
snapshot_interval: usize,
}
unsafe impl<ES: EventStore<AID = GroupChatId, AG = GroupChat, EV = GroupChatEvent>> Sync
for GroupChatRepositoryImpl<ES>
{
}
unsafe impl<ES: EventStore<AID = GroupChatId, AG = GroupChat, EV = GroupChatEvent>> Send
for GroupChatRepositoryImpl<ES>
{
}
impl<ES: EventStore<AID = GroupChatId, AG = GroupChat, EV = GroupChatEvent>> GroupChatRepositoryImpl<ES> {
/// コンストラクタ。
///
/// # 引数
/// - `event_persistence_gateway` - イベント永続化ゲートウェイ
pub fn new(event_store: ES, snapshot_interval: usize) -> Self {
Self {
event_store,
snapshot_interval,
}
}
/// スナップショットを永続化するかどうかを判定する。
///
/// # 引数
/// - `snapshot_interval` - スナップショットを永続化する間隔
/// - `created` - グループチャットが作成されたかどうか
/// - `group_chat` - グループチャット
///
/// # 戻り値
/// スナップショットを永続化する場合は `Some` 、そうでない場合は `None` 。
fn resolve_snapshot(snapshot_interval: usize, created: bool, group_chat: &GroupChat) -> Option<&GroupChat> {
if created || group_chat.seq_nr() % snapshot_interval == 0 {
Some(group_chat)
} else {
None
}
}
}
#[async_trait::async_trait]
impl<ES: EventStore<AID = GroupChatId, AG = GroupChat, EV = GroupChatEvent>> GroupChatRepository
for GroupChatRepositoryImpl<ES>
{
async fn store(&mut self, event: &GroupChatEvent, snapshot: &GroupChat) -> Result<(), GroupChatRepositoryError> {
let result = match Self::resolve_snapshot(self.snapshot_interval, event.is_created(), snapshot) {
Some(snapshot) => self.event_store.persist_event_and_snapshot(event, snapshot).await,
None => self.event_store.persist_event(event, snapshot.version()).await,
};
match result {
Ok(_) => Ok(()),
Err(error) => Err(GroupChatRepositoryError::StoreError(snapshot.clone(), error)),
}
}
async fn find_by_id(&self, id: &GroupChatId) -> Result<Option<GroupChat>, GroupChatRepositoryError> {
let snapshot_opt = self.event_store.get_latest_snapshot_by_id(id).await;
match snapshot_opt {
Ok(None) => Ok(None),
Ok(Some(snapshot)) => {
let events = self
.event_store
.get_events_by_id_since_seq_nr(id, snapshot.seq_nr())
.await;
match events {
Ok(events) => {
let result = GroupChat::replay(events, snapshot.clone());
Ok(Some(result))
}
Err(error) => Err(GroupChatRepositoryError::FindByIdError(id.clone(), error)),
}
}
Err(error) => Err(GroupChatRepositoryError::FindByIdError(id.clone(), error)),
}
}
}