-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathtopo.rs
More file actions
175 lines (161 loc) · 5.12 KB
/
topo.rs
File metadata and controls
175 lines (161 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
use crate::{
dns::{DnsConfig, DnsLookup},
select::Distance,
Endpoint, Endpoints, Topology,
};
use discovery::{Inited, TopologyWrite};
use protocol::{Protocol, Request, Resource::Phantom};
use sharding::{
distribution::Range,
hash::{Crc32, Hash, HashKey, HashGrouper},
};
use super::config::PhantomNamespace;
#[derive(Clone)]
pub struct PhantomService<E, P> {
// 一般有2组,相互做HA,每组是一个域名列表,域名下只有一个ip,但会变化
streams: Vec<Distance<E>>,
hasher: Crc32,
distribution: Range,
parser: P,
cfg: Box<DnsConfig<PhantomNamespace>>,
}
impl<E, P> From<P> for PhantomService<E, P> {
fn from(parser: P) -> Self {
Self {
parser,
streams: Default::default(),
hasher: Default::default(),
distribution: Default::default(),
cfg: Default::default(),
}
}
}
impl<E, P> Hash for PhantomService<E, P>
where
E: Endpoint,
P: Protocol,
{
#[inline]
fn hash<K: HashKey>(&self, k: &K) -> i64 {
self.hasher.hash(k)
}
}
impl<E, P> HashGrouper for PhantomService<E, P>
where
E: Endpoint,
P: Protocol,
{
}
impl<E, Req, P> Topology for PhantomService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
}
impl<E, Req, P> Endpoint for PhantomService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
type Item = Req;
#[inline]
fn send(&self, mut req: Self::Item) {
debug_assert_ne!(self.streams.len(), 0);
// 确认分片idx
let s_idx = self.distribution.index(req.hash());
debug_assert!(s_idx < self.streams.len(), "{} {:?} {:?}", s_idx, self, req);
let shard = unsafe { self.streams.get_unchecked(s_idx) };
let mut ctx = super::Context::from(*req.context_mut());
let idx = ctx.fetch_add_idx(); // 按顺序轮询
// 写操作,写所有实例
req.write_back(req.operation().is_store() && ctx.index() < shard.len());
// 读操作,只重试一次
req.try_next(idx == 0);
//ctx.update_idx(idx);
assert!(idx < shard.len(), "{} {:?} {:?}", idx, self, req);
let e = unsafe { shard.get_unchecked(idx) };
//ctx.check_inited();
*req.context_mut() = ctx.ctx;
e.send(req)
}
}
impl<E, P> TopologyWrite for PhantomService<E, P>
where
P: Protocol,
E: Endpoint,
{
#[inline]
fn update(&mut self, namespace: &str, cfg: &str) {
if let Some(ns) = PhantomNamespace::try_from(cfg) {
log::info!("topo updating {:?} => {:?}", self, ns);
// phantome 只会使用crc32
//self.hasher = Hasher::from(&ns.basic.hash);
let dist = &ns.basic.distribution;
let num = dist
.find('-')
.map(|idx| dist[idx + 1..].parse::<u64>().ok())
.flatten();
self.distribution = Range::from(num, ns.backends.len());
self.cfg.update(namespace, ns);
}
}
// 更新条件:
// 1. 最近存在dns解析失败;
// 2. 近期有dns更新;
#[inline]
fn need_load(&self) -> bool {
self.streams.len() != self.cfg.shards_url.len() || self.cfg.need_load()
}
#[inline]
fn load(&mut self) -> bool {
// 先改通知状态,再load,如果失败改一个通用状态,确保下次重试,同时避免变更过程中新的并发变更,待讨论 fishermen
self.cfg
.load_guard()
.check_load(|| self.load_inner().is_some())
}
}
impl<E, P> PhantomService<E, P>
where
P: Protocol,
E: Endpoint,
{
#[inline]
fn load_inner(&mut self) -> Option<()> {
let addrs = self.cfg.shards_url.lookup()?;
assert_eq!(addrs.len(), self.cfg.shards_url.len());
let mut endpoints: Endpoints<'_, P, E> =
Endpoints::new(&self.cfg.service, &self.parser, Phantom);
// 把老的stream缓存起来
self.streams.split_off(0).into_iter().for_each(|shard| {
endpoints.cache(shard.into_inner());
});
addrs.iter().for_each(|shard| {
assert!(!shard.is_empty());
let backends = endpoints.take_or_build(&*shard, self.cfg.timeout());
self.streams.push(Distance::from(backends));
});
// endpoints中如果还有stream,会被drop掉
Some(())
}
}
impl<E: Inited, P> Inited for PhantomService<E, P> {
// 每一个域名都有对应的endpoint,并且都初始化完成。
#[inline]
fn inited(&self) -> bool {
self.streams.len() > 0
&& self.streams.len() == self.cfg.shards_url.len()
&& self.streams.iter().fold(true, |inited, shard| {
inited && {
// 每个shard都有对应的endpoint,并且都初始化完成。
shard.iter().fold(true, |inited, e| inited && e.inited())
}
})
}
}
impl<E, P> std::fmt::Debug for PhantomService<E, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.cfg)
}
}