Skip to content

Commit 6aa4e31

Browse files
cjhopmanmeta-codesync[bot]
authored andcommitted
Add store_data + page_out_item to PagableStorage trait
Summary: Adds two methods to the `PagableStorage` trait so that callers can use `Arc<dyn PagableStorage>` rather than the concrete sled type: - `store_data(key, data)`: backend-specific primitive — writes one content-addressable blob. - `page_out_item(serializer, finished)`: default trait method that walks the arc tree, recursively serializes nested arcs, and stores them via `store_data`. The orchestration logic (previously inherent on `SledBackedPagableStorage`) is moved into the default trait method, so adding a new backend only requires implementing `store_data`. `SledBackedPagableStorage`, `InMemoryPagableStorageHandle`, and the test `EmptyPagableStorage` each gain a `store_data` impl. `SerializerForPaging::new` is also bumped from `pub(crate)` to `pub` so the default trait method is callable from outside the crate. No behavior change beyond what's described above. Reviewed By: jtbraun Differential Revision: D101759762 fbshipit-source-id: 6193bb529dd17d1030cb3eb444d72413a73bbb48
1 parent 6753370 commit 6aa4e31

5 files changed

Lines changed: 128 additions & 64 deletions

File tree

pagable/src/storage/in_memory.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,10 @@ impl PagableStorage for InMemoryPagableStorageHandle {
294294
fn session_context(&self) -> &Mutex<SessionContext> {
295295
&self.session_context
296296
}
297+
298+
fn store_data(&self, data: PagableData) -> DataKey {
299+
let key = data.compute_key();
300+
self.cache.insert_data(key, std::sync::Arc::new(data));
301+
key
302+
}
297303
}

pagable/src/storage/sled.rs

Lines changed: 27 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,6 @@ impl SledBackedPagableStorage {
7575
lock.pending.len()
7676
}
7777

78-
/// Serialize `PagableData` into the on-disk byte format and insert into sled.
79-
/// Returns the content-addressable `DataKey`.
80-
fn store_data(&self, data: PagableData) -> DataKey {
81-
let key = data.compute_key();
82-
let db_key = bytemuck::bytes_of(&key);
83-
if self
84-
.db
85-
.contains_key(db_key)
86-
.expect("sled contains_key failed")
87-
{
88-
return key;
89-
}
90-
91-
let bytes_size = 8 + 8 + data.data.len() + data.arcs.len() * 16;
92-
let mut bytes = Vec::with_capacity(bytes_size);
93-
bytes.extend_from_slice(&(data.data.len() as u64).to_le_bytes());
94-
bytes.extend_from_slice(&(data.arcs.len() as u64).to_le_bytes());
95-
bytes.extend_from_slice(&data.data);
96-
bytes.extend_from_slice(bytemuck::cast_slice(&data.arcs));
97-
assert_eq!(bytes.len(), bytes_size);
98-
99-
self.db
100-
.insert(db_key, sled::IVec::from(bytes))
101-
.expect("sled insert failed");
102-
key
103-
}
104-
10578
/// Recursively serialize a set of arcs and their dependencies, storing each
10679
/// into sled. Updates `finished` with the identity -> DataKey mapping for
10780
/// each processed arc.
@@ -194,42 +167,6 @@ impl SledBackedPagableStorage {
194167
}
195168
}
196169

197-
pub fn page_out_item_cache(&self) -> HashMap<usize, DataKey> {
198-
HashMap::new()
199-
}
200-
201-
pub fn serializer_for_page_out_item<'a>(
202-
&self,
203-
session_context: &'a mut SessionContext,
204-
) -> SerializerForPaging<'a> {
205-
SerializerForPaging::new(session_context)
206-
}
207-
208-
pub fn page_out_item(
209-
&self,
210-
item_data: Vec<u8>,
211-
item_arcs: Vec<Box<dyn ArcEraseDyn>>,
212-
finished: &mut HashMap<usize, DataKey>,
213-
session_context: &mut SessionContext,
214-
) -> DataKey {
215-
let roots = item_arcs.iter().map(|arc| arc.clone_dyn()).collect();
216-
self.serialize_arcs(roots, finished, session_context);
217-
218-
let arcs = item_arcs
219-
.iter()
220-
.map(|arc| {
221-
*finished
222-
.get(&arc.identity())
223-
.expect("nested arc should have been serialized first")
224-
})
225-
.collect();
226-
227-
self.store_data(PagableData {
228-
data: item_data,
229-
arcs,
230-
})
231-
}
232-
233170
pub fn write_bytes<T: bytemuck::Pod>(&self, key: &str, data: T) {
234171
self.db
235172
.insert(key.as_bytes(), sled::IVec::from(bytemuck::bytes_of(&data)))
@@ -352,4 +289,31 @@ impl PagableStorage for SledBackedPagableStorage {
352289
fn session_context(&self) -> &Mutex<SessionContext> {
353290
&self.session_context
354291
}
292+
293+
/// Serialize `PagableData` into the on-disk byte format and insert into sled.
294+
/// Returns the content-addressable `DataKey`.
295+
fn store_data(&self, data: PagableData) -> DataKey {
296+
let key = data.compute_key();
297+
let db_key = bytemuck::bytes_of(&key);
298+
if self
299+
.db
300+
.contains_key(db_key)
301+
.expect("sled contains_key failed")
302+
{
303+
return key;
304+
}
305+
306+
let bytes_size = 8 + 8 + data.data.len() + data.arcs.len() * 16;
307+
let mut bytes = Vec::with_capacity(bytes_size);
308+
bytes.extend_from_slice(&(data.data.len() as u64).to_le_bytes());
309+
bytes.extend_from_slice(&(data.arcs.len() as u64).to_le_bytes());
310+
bytes.extend_from_slice(&data.data);
311+
bytes.extend_from_slice(bytemuck::cast_slice(&data.arcs));
312+
assert_eq!(bytes.len(), bytes_size);
313+
314+
self.db
315+
.insert(db_key, sled::IVec::from(bytes))
316+
.expect("sled insert failed");
317+
key
318+
}
355319
}

pagable/src/storage/support.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct SerializerForPaging<'a> {
2727
}
2828

2929
impl<'a> SerializerForPaging<'a> {
30-
pub(crate) fn new(session_context: &'a mut SessionContext) -> Self {
30+
pub fn new(session_context: &'a mut SessionContext) -> Self {
3131
Self {
3232
serde: postcard::Serializer {
3333
output: crate::flavors::PagableVecFlavor::new(),

pagable/src/storage/traits.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
*/
1010

1111
use std::any::TypeId;
12+
use std::collections::HashMap;
1213
use std::sync::Mutex;
1314

1415
use either::Either;
1516

1617
use crate::arc_erase::ArcEraseDyn;
1718
use crate::storage::data::DataKey;
1819
use crate::storage::data::PagableData;
20+
use crate::storage::support::SerializerForPaging;
1921
use crate::traits::SessionContext;
2022

2123
/// Trait for storage backends that can persist and retrieve paged-out data.
@@ -68,6 +70,93 @@ pub trait PagableStorage: Send + Sync + 'static {
6870
/// Access the session context for storing/retrieving layer-specific state
6971
/// during serialization and deserialization.
7072
fn session_context(&self) -> &Mutex<SessionContext>;
73+
74+
/// Stores a single content-addressable [`PagableData`] blob and returns its
75+
/// [`DataKey`]. The key is derived from the data via
76+
/// [`PagableData::compute_key`]; if the same data is stored twice the second
77+
/// write is expected to be idempotent (or skipped).
78+
fn store_data(&self, data: PagableData) -> DataKey;
79+
80+
/// Stores a previously-serialized item (and its transitively reachable arcs)
81+
/// to storage and returns its content-addressable [`DataKey`].
82+
///
83+
/// The caller is responsible for the initial serialization: lock
84+
/// `session_context()`, build a [`SerializerForPaging`], serialize the value,
85+
/// `.finish()` to obtain `(item_data, item_arcs)`, then pass them in here
86+
/// along with the still-locked `&mut SessionContext` (this method uses it to
87+
/// recursively serialize nested arcs).
88+
///
89+
/// `finished` is a cache of arc identity → `DataKey` that the caller may share
90+
/// across multiple `page_out_item` invocations to avoid re-serializing arcs
91+
/// that were already paged out earlier in the same batch.
92+
fn page_out_item(
93+
&self,
94+
item_data: Vec<u8>,
95+
item_arcs: Vec<Box<dyn ArcEraseDyn>>,
96+
finished: &mut HashMap<usize, DataKey>,
97+
session_context: &mut SessionContext,
98+
) -> DataKey {
99+
enum Task {
100+
Start(Box<dyn ArcEraseDyn>),
101+
Finish((Box<dyn ArcEraseDyn>, Vec<u8>, Vec<Box<dyn ArcEraseDyn>>)),
102+
}
103+
104+
let mut tasks: Vec<Task> = item_arcs
105+
.iter()
106+
.map(|arc| Task::Start(arc.clone_dyn()))
107+
.collect();
108+
109+
while let Some(task) = tasks.pop() {
110+
match task {
111+
Task::Start(v) => {
112+
if finished.contains_key(&v.identity()) {
113+
continue;
114+
}
115+
116+
let mut serializer = SerializerForPaging::new(session_context);
117+
v.serialize(&mut serializer).unwrap();
118+
let (data, arcs) = serializer.finish();
119+
120+
let subtasks: Vec<_> = arcs
121+
.iter()
122+
.filter(|arc| !finished.contains_key(&arc.identity()))
123+
.map(|arc| Task::Start(arc.clone_dyn()))
124+
.collect();
125+
126+
tasks.push(Task::Finish((v, data, arcs)));
127+
tasks.extend(subtasks);
128+
}
129+
Task::Finish((arc, data, serialized_arcs)) => {
130+
let arcs: Vec<DataKey> = serialized_arcs
131+
.iter()
132+
.map(|arc| {
133+
*finished
134+
.get(&arc.identity())
135+
.expect("nested arc should have been serialized first")
136+
})
137+
.collect();
138+
139+
let key = self.store_data(PagableData { data, arcs });
140+
finished.insert(arc.identity(), key);
141+
arc.set_data_key(key);
142+
}
143+
}
144+
}
145+
146+
let arcs: Vec<DataKey> = item_arcs
147+
.iter()
148+
.map(|arc| {
149+
*finished
150+
.get(&arc.identity())
151+
.expect("nested arc should have been serialized first")
152+
})
153+
.collect();
154+
155+
self.store_data(PagableData {
156+
data: item_data,
157+
arcs,
158+
})
159+
}
71160
}
72161

73162
static_assertions::assert_obj_safe!(PagableStorage);

pagable/src/testing.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,9 @@ impl PagableStorage for EmptyPagableStorage {
257257
fn session_context(&self) -> &Mutex<SessionContext> {
258258
&self.session_context
259259
}
260+
261+
fn store_data(&self, data: PagableData) -> DataKey {
262+
// no-op storage; just compute the key
263+
data.compute_key()
264+
}
260265
}

0 commit comments

Comments
 (0)