-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathtopo.rs
More file actions
274 lines (252 loc) · 9.78 KB
/
topo.rs
File metadata and controls
274 lines (252 loc) · 9.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
use crate::select::Distance;
use crate::{Endpoint, Endpoints, Topology};
use discovery::TopologyWrite;
use protocol::memcache::Binary;
use protocol::{Protocol, Request, Resource::Memcache};
use sharding::hash::{Hash, HashKey, Hasher, HashGrouper};
use super::config::Flag;
use crate::shards::Shards;
use crate::PerformanceTuning;
use protocol::Bit;
#[derive(Clone)]
pub struct CacheService<E, P> {
// 一共有n组,每组1个连接。
// 排列顺序: master, master l1, slave, slave l1
streams: Distance<Shards<E>>,
// streams里面的前r_num个数据是提供读的(这个长度不包含slave l1, slave)。
hasher: Hasher,
parser: P,
exp_sec: u32,
// TODO 线上稳定后再清理,预计2024.2之后
// 1. 去掉force_write_all,其设计的本意是set失败后,是否更新其他layer;
// 当前的设计原则已经改为可用性优先,只要有layer可用,就应该对外提供服务,所以force_write_all都应该为true,也就失去了存在的价值了;
//
// 兼容已有业务逻辑,set master失败后,是否更新其他layer
// force_write_all: bool,
// 保留本设置,非必要场景,减少一次slave访问
backend_no_storage: bool, // true:mc后面没有存储
update_master_l1: bool, // false:不更新master_L1,issue#834
writer_idx: Vec<usize>, // 写操作对象在streams里的索引
}
impl<E, P> From<P> for CacheService<E, P> {
#[inline]
fn from(parser: P) -> Self {
Self {
parser,
streams: Distance::new(),
exp_sec: 0,
// force_write_all: false, // 兼容考虑默认为false,set master失败后,不更新其他layers,新业务推荐用true
hasher: Default::default(),
backend_no_storage: false,
update_master_l1: true,
writer_idx: Default::default(),
}
}
}
impl<E, P> discovery::Inited for CacheService<E, P>
where
E: discovery::Inited,
{
#[inline]
fn inited(&self) -> bool {
self.streams.len() > 0
&& self
.streams
.iter()
.fold(true, |inited, e| inited && e.inited())
}
}
impl<E, P> Hash for CacheService<E, P>
where
E: Endpoint,
P: Protocol,
{
#[inline]
fn hash<K: HashKey>(&self, k: &K) -> i64 {
self.hasher.hash(k)
}
}
impl<E, P> HashGrouper for CacheService<E, P>
where
E: Endpoint,
P: Protocol,
{
}
impl<E, Req, P> Topology for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
#[inline]
fn exp_sec(&self) -> u32 {
self.exp_sec
}
}
impl<E, Req, P> Endpoint for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
type Item = Req;
#[inline]
fn send(&self, mut req: Self::Item) {
debug_assert!(self.streams.local_len() > 0);
// let mut idx: usize = 0; // master
let mut ctx = super::Context::from(*req.mut_context());
// gets及store类指令,都需要先请求master,然后再考虑masterL1
let (idx, try_next, write_back) = if req.operation().is_store() {
self.context_store(&mut ctx)
} else {
if !ctx.inited() {
// ctx未初始化, 是第一次读请求;仅第一次请求记录时间,原因如下:
// 第一次读一般访问L1,miss之后再读master;
// 读quota的更新根据第一次的请求时间更合理
if let Some(quota) = self.streams.quota() {
req.quota(quota);
}
}
self.context_get(&mut ctx, &req)
};
req.try_next(try_next);
req.write_back(write_back);
// TODO 有点怪异,先实现,晚点调整,这个属性直接从request获取更佳? fishermen
req.retry_on_rsp_notok(req.can_retry_on_rsp_notok());
*req.mut_context() = ctx.ctx;
log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self);
assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req);
unsafe { self.streams.get_unchecked(idx).send(req) };
}
}
impl<E, Req: Request, P: Protocol> CacheService<E, P>
where
E: Endpoint<Item = Req>,
{
#[inline]
fn context_store(&self, ctx: &mut super::Context) -> (usize, bool, bool) {
let (idx, try_next, write_back);
ctx.check_and_inited(true);
if ctx.is_write() {
// 写指令,总是从master开始
let seq = ctx.take_write_idx() as usize; // 第几次写
write_back = seq + 1 < self.writer_idx.len();
// topo控制try_next,只要还有layers,topo都支持try next
try_next = seq + 1 < self.writer_idx.len();
assert!(seq < self.writer_idx.len());
idx = self.writer_idx[seq];
} else {
// 是读触发的回种的写请求
idx = ctx.take_read_idx() as usize;
write_back = false; // 只尝试回种一次。
try_next = false; // 不再需要错误重试
};
(idx, try_next, write_back)
}
// 第一次访问到L1,下一次访问M
// 第一次访问到M,下一次访问L1
// 最多访问两次
// 对于mc做存储场景,也最多访问两次
// 若有L1,则两次访问分布在M、L1
// 若无L1,则两次访问分布在M、S;#654
#[inline]
fn context_get(&self, ctx: &mut super::Context, req: &Req) -> (usize, bool, bool) {
let (idx, try_next, write_back);
if !ctx.check_and_inited(false) {
// 第一个retrieve,如果需要master-first,则直接访问master
idx = match req.operation().master_first() {
true => 0,
false => self.streams.select_idx(),
};
// 第一次访问,没有取到master,则下一次一定可以取到master
// 如果取到了master,有slave也可以继续访问
// 后端无storage且后端资源不止一组,可以多访问一次
try_next = (self.streams.local_len() > 1)
|| self.backend_no_storage && (self.streams.len() > 1);
write_back = false;
} else {
let last_idx = ctx.index();
try_next = false;
// 不是第一次访问,获取上一次访问的index
// 上一次是主,则有从取从,上一次不是主,则取主。
if last_idx != 0 {
idx = 0;
} else {
// 满足#654场景,如果没有MasterL1,这里idx需要选到Slave
// 同时,为对于gets成功,请求路径按照固定顺序进行 fishermen
idx = match req.operation().master_first() {
true => 1,
false => self.streams.select_next_idx(0, 1),
};
}
write_back = true;
}
// 把当前访问过的idx记录到ctx中,方便回写时使用。
ctx.write_back_idx(idx as u16);
(idx, try_next, write_back)
}
}
impl<E, P> TopologyWrite for CacheService<E, P>
where
P: Protocol,
E: Endpoint,
{
#[inline]
fn update(&mut self, namespace: &str, cfg: &str) {
if let Some(ns) = super::config::Namespace::try_from(cfg, namespace) {
self.hasher = Hasher::from(&ns.hash);
self.exp_sec = (ns.exptime / 1000) as u32; // 转换成秒
// self.force_write_all = ns.flag.get(Flag::ForceWriteAll as u8);
self.backend_no_storage = ns.flag.get(Flag::BackendNoStorage as u8);
use crate::cacheservice::UpdateMasterL1;
self.update_master_l1 = namespace.update_master_l1();
let dist = &ns.distribution.clone();
// 把所有的endpoints cache下来
let mut endpoints: Endpoints<'_, P, E> =
Endpoints::new(namespace, &self.parser, Memcache);
self.streams.take().into_iter().for_each(|shard| {
endpoints.cache(shard.into());
});
let mto = crate::TO_MC_M.to(ns.timeout_ms_master);
let rto = crate::TO_MC_S.to(ns.timeout_ms_slave);
//use discovery::distance::{Balance, ByDistance};
//let master = ns.master.clone();
let is_performance = ns.flag.get(Flag::LocalAffinity as u8).tuning_mode();
let (local_len, backends, writer_idx) = ns.take_backends(self.update_master_l1);
self.writer_idx = writer_idx;
let mut new = Vec::with_capacity(backends.len());
for (i, group) in backends.into_iter().enumerate() {
// 第一组是master
let to = if i == 0 { mto } else { rto };
let backends = endpoints.take_or_build(&group, to);
let shard = Shards::from_dist(dist, backends);
new.push(shard);
}
self.streams.update(new, local_len, is_performance);
}
// old 会被dopped
}
// 不同的业务共用一个配置。把不同的业务配置给拆分开
#[inline]
fn disgroup<'a>(&self, _path: &'a str, cfg: &'a str) -> Vec<(&'a str, &'a str)> {
let mut v = Vec::with_capacity(16);
use std::str;
for item in super::config::Config::new(cfg.as_bytes()) {
let namespace = str::from_utf8(item.0).expect("not valid utf8");
let val = str::from_utf8(item.1).expect("not valid utf8");
v.push((namespace, val));
}
v
}
}
use std::fmt::{self, Display, Formatter};
impl<E, P> Display for CacheService<E, P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"shards:{} local-shards:{}",
self.streams.len(),
self.streams.local_len(),
)
}
}