|
22 | 22 |
|
23 | 23 | #include <optional> |
24 | 24 | #include <string> |
| 25 | +#include <type_traits> |
25 | 26 | #include <utility> |
26 | 27 | #include <variant> |
27 | 28 | #include <vector> |
28 | 29 |
|
29 | 30 | #include "cluster/cluster_defs.h" |
| 31 | +#include "common/db_util.h" |
30 | 32 | #include "redis_metadata.h" |
31 | 33 | #include "storage.h" |
32 | 34 |
|
@@ -182,6 +184,68 @@ class SubKeyScanner : public redis::Database { |
182 | 184 | rocksdb::Status Scan(engine::Context &ctx, RedisType type, const Slice &user_key, const std::string &cursor, |
183 | 185 | uint64_t limit, const std::string &subkey_prefix, std::vector<std::string> *keys, |
184 | 186 | std::vector<std::string> *values = nullptr); |
| 187 | + |
| 188 | + protected: |
| 189 | + struct RawSubkeyValueAdapter { |
| 190 | + template <typename MetadataT> |
| 191 | + rocksdb::Status operator()(const MetadataT &, Slice *) const { |
| 192 | + return rocksdb::Status::OK(); |
| 193 | + } |
| 194 | + }; |
| 195 | + |
| 196 | + template <typename MetadataT> |
| 197 | + static MetadataT createScanMetadata(RedisType type) { |
| 198 | + if constexpr (std::is_same_v<MetadataT, Metadata>) { |
| 199 | + return Metadata(type, false); |
| 200 | + } else { |
| 201 | + return MetadataT(false); |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + template <typename MetadataT, typename ValueAdapter = RawSubkeyValueAdapter> |
| 206 | + rocksdb::Status scanSubkeys(engine::Context &ctx, RedisType type, const Slice &user_key, const std::string &cursor, |
| 207 | + uint64_t limit, const std::string &subkey_prefix, std::vector<std::string> *keys, |
| 208 | + std::vector<std::string> *values = nullptr, ValueAdapter value_adapter = {}) { |
| 209 | + uint64_t cnt = 0; |
| 210 | + std::string ns_key = AppendNamespacePrefix(user_key); |
| 211 | + auto metadata = createScanMetadata<MetadataT>(type); |
| 212 | + rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &metadata); |
| 213 | + if (!s.ok()) return s; |
| 214 | + |
| 215 | + auto iter = util::UniqueIterator(ctx, ctx.DefaultScanOptions()); |
| 216 | + std::string match_prefix_key = |
| 217 | + InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode(); |
| 218 | + |
| 219 | + std::string start_key; |
| 220 | + if (!cursor.empty()) { |
| 221 | + start_key = InternalKey(ns_key, cursor, metadata.version, storage_->IsSlotIdEncoded()).Encode(); |
| 222 | + } else { |
| 223 | + start_key = match_prefix_key; |
| 224 | + } |
| 225 | + for (iter->Seek(start_key); iter->Valid(); iter->Next()) { |
| 226 | + if (!cursor.empty() && iter->key() == start_key) { |
| 227 | + // if cursor is not empty, then we need to skip start_key |
| 228 | + // because we already return that key in the last scan |
| 229 | + continue; |
| 230 | + } |
| 231 | + if (!iter->key().starts_with(match_prefix_key)) { |
| 232 | + break; |
| 233 | + } |
| 234 | + InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); |
| 235 | + keys->emplace_back(ikey.GetSubKey().ToString()); |
| 236 | + if (values != nullptr) { |
| 237 | + Slice value(iter->value()); |
| 238 | + s = value_adapter(metadata, &value); |
| 239 | + if (!s.ok()) return s; |
| 240 | + values->emplace_back(value.data(), value.size()); |
| 241 | + } |
| 242 | + cnt++; |
| 243 | + if (limit > 0 && cnt >= limit) { |
| 244 | + break; |
| 245 | + } |
| 246 | + } |
| 247 | + return iter->status(); |
| 248 | + } |
185 | 249 | }; |
186 | 250 |
|
187 | 251 | class WriteBatchLogData { |
|
0 commit comments