Skip to content

Commit 1a179ba

Browse files
committed
fix: deduplicate eviction loop, fix logical-limit recheck, and improve release accuracy
Extract reserveResourceInternalImpl to eliminate duplicated eviction logic between reserveResourceInternal and reserveResourceInternalWithTracker. Fix a bug where logical_limit_exceeded was unconditionally cleared after eviction instead of being rechecked. Return actual unscaled release size from ReleaseLoadingResource for accurate metrics tracking. Also improve exception safety in CacheSlot constructor and optimize LoadingOverheadTracker::Unregister from O(n) to O(1). Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
1 parent ce29823 commit 1a179ba

File tree

4 files changed

+77
-95
lines changed

4 files changed

+77
-95
lines changed

include/cachinglayer/CacheSlot.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,17 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
6868
storage_usage_tracking_enabled_(storage_usage_tracking_enabled),
6969
loading_timeout_(loading_timeout),
7070
warmup_loading_timeout_(warmup_loading_timeout) {
71-
if (auto& lo = translator_->meta()->loading_overhead) {
72-
overhead_handle_ = dlist_->RegisterLoadingOverhead(lo->group, lo->upper_bound);
73-
}
7471
cells_.reserve(translator_->num_cells());
7572
for (cid_t i = 0; i < static_cast<cid_t>(translator_->num_cells()); ++i) {
7673
cells_.push_back(std::make_unique<CacheCell>(this, i));
7774
}
7875
monitor::cache_slot_count(cell_data_type_, storage_type_).Increment();
7976
monitor::cache_cell_count(cell_data_type_, storage_type_).Increment(translator_->num_cells());
77+
// Register after all potentially-throwing operations, so that if the constructor
78+
// fails, we don't leak a ref_count (destructor won't run for incomplete objects).
79+
if (auto& lo = translator_->meta()->loading_overhead) {
80+
overhead_handle_ = dlist_->RegisterLoadingOverhead(lo->group, lo->upper_bound);
81+
}
8082
}
8183

8284
CacheSlot(const CacheSlot&) = delete;
@@ -485,13 +487,14 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
485487
return;
486488
}
487489
try {
488-
dlist_->ReleaseLoadingResource(loaded_resource, loading_overhead, overhead_handle_);
490+
auto released_resource =
491+
dlist_->ReleaseLoadingResource(loaded_resource, loading_overhead, overhead_handle_);
489492
if (metrics_tracked) {
490493
monitor::cache_cell_loading_count(cell_data_type_, storage_type_).Decrement(loading_cids_count);
491494
monitor::cache_loading_bytes(cell_data_type_, StorageType::MEMORY)
492-
.Decrement(actual_dlist_reserve.memory_bytes);
495+
.Decrement(released_resource.memory_bytes);
493496
monitor::cache_loading_bytes(cell_data_type_, StorageType::DISK)
494-
.Decrement(actual_dlist_reserve.file_bytes);
497+
.Decrement(released_resource.file_bytes);
495498
}
496499
} catch (...) {
497500
auto ew = folly::exception_wrapper(std::current_exception());

include/cachinglayer/LoadingOverheadTracker.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ class LoadingOverheadTracker {
5858
it->second, state.ref_count, upper_bound.ToString());
5959
} else if (state.upper_bound.memory_bytes < upper_bound.memory_bytes ||
6060
state.upper_bound.file_bytes < upper_bound.file_bytes) {
61+
LOG_WARN(
62+
"[MCL] LoadingOverheadTracker UB mismatch for group '{}' (handle {}): existing={}, new={}. "
63+
"Taking max per dimension.",
64+
group, it->second, state.upper_bound.ToString(), upper_bound.ToString());
6165
state.upper_bound.memory_bytes = std::max(state.upper_bound.memory_bytes, upper_bound.memory_bytes);
6266
state.upper_bound.file_bytes = std::max(state.upper_bound.file_bytes, upper_bound.file_bytes);
63-
LOG_INFO("[MCL] LoadingOverheadTracker widened UB for group '{}' (handle {}, refs={}): {}", group,
64-
it->second, state.ref_count, state.upper_bound.ToString());
6567
} else {
6668
LOG_DEBUG("[MCL] LoadingOverheadTracker re-registered group '{}' (handle {}, refs={}), UB unchanged",
6769
group, it->second, state.ref_count);
@@ -70,7 +72,7 @@ class LoadingOverheadTracker {
7072
}
7173
auto handle = next_handle_++;
7274
name_to_handle_[group] = handle;
73-
handle_state_[handle] = GroupState{upper_bound, {}, {}, 1};
75+
handle_state_[handle] = GroupState{upper_bound, {}, {}, 1, group};
7476
LOG_INFO("[MCL] LoadingOverheadTracker registered group '{}' (handle {}, refs=1): UB={}", group, handle,
7577
upper_bound.ToString());
7678
return handle;
@@ -166,13 +168,8 @@ class LoadingOverheadTracker {
166168
"sum_of_overhead={}, overhead_reserved={}. Cleaning up anyway to avoid leak.",
167169
handle, state.sum_of_overhead.ToString(), state.overhead_reserved.ToString());
168170
}
169-
for (auto nit = name_to_handle_.begin(); nit != name_to_handle_.end(); ++nit) {
170-
if (nit->second == handle) {
171-
LOG_INFO("[MCL] LoadingOverheadTracker unregistered group '{}' (handle {})", nit->first, handle);
172-
name_to_handle_.erase(nit);
173-
break;
174-
}
175-
}
171+
LOG_INFO("[MCL] LoadingOverheadTracker unregistered group '{}' (handle {})", state.group_name, handle);
172+
name_to_handle_.erase(state.group_name);
176173
handle_state_.erase(it);
177174
}
178175

@@ -182,6 +179,7 @@ class LoadingOverheadTracker {
182179
ResourceUsage sum_of_overhead;
183180
ResourceUsage overhead_reserved;
184181
uint64_t ref_count{0};
182+
std::string group_name;
185183
};
186184

187185
static ResourceUsage

include/cachinglayer/lrucache/DList.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ class DList : public std::enable_shared_from_this<DList> {
160160
OpContext* ctx = nullptr);
161161

162162
// Release with loading overhead tracker integration.
163-
void
163+
// Returns the actual unscaled size released (loaded + tracker_delta).
164+
ResourceUsage
164165
ReleaseLoadingResource(const ResourceUsage& loaded, const ResourceUsage& overhead, uint64_t overhead_handle);
165166

166167
// Release resource used for loading, called after loading a cell.
@@ -273,6 +274,11 @@ class DList : public std::enable_shared_from_this<DList> {
273274
bool
274275
reserveResourceInternal(const ResourceUsage& size);
275276

277+
// Common implementation for resource reservation with eviction.
278+
// Returns {success, scaled_size_reserved}.
279+
std::pair<bool, ResourceUsage>
280+
reserveResourceInternalImpl(const ResourceUsage& size, std::function<void()> rollback);
281+
276282
// Reserve with tracker under lock. Space check uses loaded + overhead,
277283
// actual reservation uses loaded + tracker delta. Returns actual reserved (zero = failed).
278284
// Returns {success, unscaled_reserved}. Scaled amount is added to total_loading_size_ internally.

src/cachinglayer/lrucache/DList.cpp

Lines changed: 53 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,42 @@ DList::ReserveLoadingResourceWithTimeout(const ResourceUsage& original_size, std
219219

220220
bool
221221
DList::reserveResourceInternal(const ResourceUsage& size) {
222+
auto rollback = []() {};
223+
auto [success, _] = reserveResourceInternalImpl(size, rollback);
224+
return success;
225+
}
226+
227+
std::pair<bool, ResourceUsage>
228+
DList::reserveResourceInternalWithTracker(const ResourceUsage& loaded, const ResourceUsage& overhead,
229+
uint64_t overhead_handle, LoadingOverheadTracker* tracker) {
230+
// Compute tracker delta first so space check uses the actual amount, not the uncapped overhead.
231+
// This avoids rejecting loads that would fit after tracker capping (e.g., delta=0 when group is saturated).
232+
auto delta = overhead;
233+
if (overhead_handle != LoadingOverheadTracker::kInvalidHandle && tracker != nullptr) {
234+
delta = tracker->Reserve(overhead_handle, overhead);
235+
}
236+
auto actual_size = (loaded + delta) * eviction_config_.loading_resource_factor;
237+
238+
auto rollback = [overhead_handle, overhead, tracker]() {
239+
if (overhead_handle != LoadingOverheadTracker::kInvalidHandle && tracker != nullptr) {
240+
tracker->Release(overhead_handle, overhead);
241+
}
242+
};
243+
244+
auto [success, _] = reserveResourceInternalImpl(actual_size, rollback);
245+
if (!success) {
246+
return {false, {}};
247+
}
248+
249+
auto unscaled = loaded + delta;
250+
LOG_TRACE("[MCL] reserve with tracker: loaded={}, overhead={}, delta={}, unscaled={}, scaled={}, total_loading={}",
251+
loaded.ToString(), overhead.ToString(), delta.ToString(), unscaled.ToString(), actual_size.ToString(),
252+
total_loading_size_.load().ToString());
253+
return {true, unscaled};
254+
}
255+
256+
std::pair<bool, ResourceUsage>
257+
DList::reserveResourceInternalImpl(const ResourceUsage& size, std::function<void()> rollback) {
222258
auto using_resources = total_loaded_size_.load() + total_loading_size_.load();
223259

224260
// Combined logical and physical memory limit check
@@ -268,10 +304,13 @@ DList::reserveResourceInternal(const ResourceUsage& size) {
268304
"[MCL] reserve resource with size={} failed due to all zero evicted_size, "
269305
"eviction_target={}, min_eviction={}",
270306
size.ToString(), eviction_target.ToString(), min_eviction.ToString());
271-
return false;
307+
rollback();
308+
return {false, {}};
272309
}
310+
311+
using_resources -= evicted_size;
273312
// logical limit is accurate, thus we can guarantee after one successful eviction, logical limit is satisfied.
274-
logical_limit_exceeded = false;
313+
logical_limit_exceeded = !max_resource_limit_.load().CanHold(using_resources + size);
275314

276315
if (!physical_eviction_needed.AnyGTZero()) {
277316
// we only need to evict for logical limit and we have succeeded.
@@ -290,79 +329,10 @@ DList::reserveResourceInternal(const ResourceUsage& size) {
290329
}
291330

292331
total_loading_size_ += size;
293-
LOG_TRACE("[MCL] reserve resource with size={} success, total_loading_size={}, total_loaded_size={}",
294-
size.ToString(), total_loading_size_.load().ToString(), total_loaded_size_.load().ToString());
295-
296-
return true;
297-
}
298-
299-
std::pair<bool, ResourceUsage>
300-
DList::reserveResourceInternalWithTracker(const ResourceUsage& loaded, const ResourceUsage& overhead,
301-
uint64_t overhead_handle, LoadingOverheadTracker* tracker) {
302-
// Compute tracker delta first so space check uses the actual amount, not the uncapped overhead.
303-
// This avoids rejecting loads that would fit after tracker capping (e.g., delta=0 when group is saturated).
304-
auto delta = overhead;
305-
if (overhead_handle != LoadingOverheadTracker::kInvalidHandle && tracker != nullptr) {
306-
delta = tracker->Reserve(overhead_handle, overhead);
307-
}
308-
auto actual_size = (loaded + delta) * eviction_config_.loading_resource_factor;
309-
auto using_resources = total_loaded_size_.load() + total_loading_size_.load();
332+
LOG_TRACE("[MCL] reserve resource success, size={}, total_loading_size={}, total_loaded_size={}", size.ToString(),
333+
total_loading_size_.load().ToString(), total_loaded_size_.load().ToString());
310334

311-
bool logical_limit_exceeded = !max_resource_limit_.load().CanHold(using_resources + actual_size);
312-
auto physical_eviction_needed = checkPhysicalMemoryLimit(actual_size);
313-
314-
while (logical_limit_exceeded || physical_eviction_needed.AnyGTZero()) {
315-
ResourceUsage eviction_target;
316-
ResourceUsage min_eviction;
317-
318-
if (logical_limit_exceeded) {
319-
eviction_target = using_resources + actual_size - low_watermark_;
320-
min_eviction = using_resources + actual_size - max_resource_limit_.load();
321-
if (eviction_target.memory_bytes < 0)
322-
eviction_target.memory_bytes = 0;
323-
if (eviction_target.file_bytes < 0)
324-
eviction_target.file_bytes = 0;
325-
if (min_eviction.memory_bytes < 0)
326-
min_eviction.memory_bytes = 0;
327-
if (min_eviction.file_bytes < 0)
328-
min_eviction.file_bytes = 0;
329-
}
330-
331-
if (physical_eviction_needed.AnyGTZero()) {
332-
eviction_target.memory_bytes =
333-
std::max(eviction_target.memory_bytes, physical_eviction_needed.memory_bytes);
334-
eviction_target.file_bytes = std::max(eviction_target.file_bytes, physical_eviction_needed.file_bytes);
335-
min_eviction.memory_bytes = std::max(min_eviction.memory_bytes, physical_eviction_needed.memory_bytes);
336-
min_eviction.file_bytes = std::max(min_eviction.file_bytes, physical_eviction_needed.file_bytes);
337-
}
338-
339-
ResourceUsage evicted_size = tryEvict(eviction_target, min_eviction);
340-
if (!evicted_size.AnyGTZero()) {
341-
LOG_WARN(
342-
"[MCL] reserve with tracker failed: loaded={}, overhead={}, delta={}, eviction_target={}, "
343-
"min_eviction={}",
344-
loaded.ToString(), overhead.ToString(), delta.ToString(), eviction_target.ToString(),
345-
min_eviction.ToString());
346-
// Rollback tracker state on failure
347-
if (overhead_handle != LoadingOverheadTracker::kInvalidHandle && tracker != nullptr) {
348-
tracker->Release(overhead_handle, overhead);
349-
}
350-
return {false, {}};
351-
}
352-
logical_limit_exceeded = false;
353-
354-
if (!physical_eviction_needed.AnyGTZero())
355-
break;
356-
if (physical_eviction_needed = checkPhysicalMemoryLimit(actual_size); !physical_eviction_needed.AnyGTZero())
357-
break;
358-
}
359-
360-
total_loading_size_ += actual_size;
361-
auto unscaled = loaded + delta;
362-
LOG_TRACE("[MCL] reserve with tracker: loaded={}, overhead={}, delta={}, unscaled={}, scaled={}, total_loading={}",
363-
loaded.ToString(), overhead.ToString(), delta.ToString(), unscaled.ToString(), actual_size.ToString(),
364-
total_loading_size_.load().ToString());
365-
return {true, unscaled};
335+
return {true, size};
366336
}
367337

368338
void
@@ -643,16 +613,18 @@ DList::UpdateHighWatermark(const ResourceUsage& new_high_watermark) {
643613
cachinglayer::monitor::cache_high_watermark_bytes(StorageType::DISK).Set(high_watermark_.load().file_bytes);
644614
}
645615

646-
void
616+
ResourceUsage
647617
DList::ReleaseLoadingResource(const ResourceUsage& loaded, const ResourceUsage& overhead, uint64_t overhead_handle) {
648618
std::vector<std::unique_ptr<WaitingRequest>> to_destroy;
619+
ResourceUsage unscaled{};
649620
{
650621
std::unique_lock<std::mutex> lock(list_mtx_);
651622
auto delta = overhead;
652623
if (overhead_handle != LoadingOverheadTracker::kInvalidHandle && loading_overhead_tracker_) {
653624
delta = loading_overhead_tracker_->Release(overhead_handle, overhead);
654625
}
655-
auto actual = (loaded + delta) * eviction_config_.loading_resource_factor;
626+
unscaled = loaded + delta;
627+
auto actual = unscaled * eviction_config_.loading_resource_factor;
656628
total_loading_size_ -= actual;
657629
ClampNonNegative(total_loading_size_, [&](const ResourceUsage& curr) {
658630
LOG_ERROR(
@@ -663,6 +635,7 @@ DList::ReleaseLoadingResource(const ResourceUsage& loaded, const ResourceUsage&
663635
to_destroy = handleWaitingRequests();
664636
}
665637
// Destroy requests outside lock to avoid deadlock with cancel callbacks
638+
return unscaled;
666639
}
667640

668641
void
@@ -884,14 +857,16 @@ DList::handleWaitingRequests() {
884857
waiting_queue_.pop();
885858
} else {
886859
// Check if this request is permanently impossible (required size exceeds capacity).
887-
// If so, fail it immediately instead of blocking the entire queue until timeout.
860+
// Use required_size for both paths: (loaded + overhead) * factor for tracker-aware,
861+
// which is the upper bound of what the request could need.
888862
if (!max_resource_limit_.load().CanHold(request_ptr_ref->required_size)) {
889863
auto request = std::move(request_ptr_ref);
890864
if (waiting_requests_map_.erase(request->request_id) > 0) {
891865
LOG_WARN(
892866
"[MCL] Request {} is permanently impossible (required_size={} > capacity={}), "
893867
"failing immediately.",
894-
request->request_id, request->required_size.ToString(), max_resource_limit_.load().ToString());
868+
request->request_id, request->required_size.ToString(),
869+
max_resource_limit_.load().ToString());
895870
request->setValue(false);
896871
}
897872
requests_to_destroy.push_back(std::move(request));

0 commit comments

Comments
 (0)