Skip to content

Commit c3233aa

Browse files
authored
Merge branch 'unstable' into fix-3267-sentinel-existing-connections
2 parents bb21e04 + 75df374 commit c3233aa

34 files changed

+747
-292
lines changed

cmake/jemalloc.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ endif()
3232
include(cmake/utils.cmake)
3333

3434
FetchContent_DeclareGitHubWithMirror(jemalloc
35-
facebook/jemalloc 5.3.0
36-
SHA1=1be8fdba021e9d6ed201e7d6a3c464b2223fc927
35+
jemalloc/jemalloc 5.3.1
36+
MD5=d421650ffe4b9e0ec463a9c26135f657
3737
)
3838

3939
FetchContent_GetProperties(jemalloc)

kvrocks.conf

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,26 @@ repl-namespace-enabled no
7777
#
7878
# proto-max-bulk-len 536870912
7979

80+
# The encoding mode for newly created hash keys.
81+
# - legacy: use the original subkey format (no per-field expire support).
82+
# Existing data written in this mode is always readable regardless of this setting.
83+
# - field-expiration: use the new subkey format that prepends an 8-byte expire
84+
# timestamp to each field value. Required for HEXPIRE/HPERSIST/HPTTL commands.
85+
# NOTE: this only affects newly created keys. Existing keys retain their original mode.
86+
# Default: legacy
87+
# Available options: legacy, field-expiration
88+
hash-encoding-mode legacy
89+
90+
# How HLEN computes the number of fields when some fields have a TTL.
91+
# - accurate: return the exact count. O(1) when no fields have TTL or none have
92+
# expired yet; otherwise performs a scan to remove expired fields and update metadata.
93+
# - approximate: always return the stored size without scanning. The count may include
94+
# fields that have already expired but have not yet been cleaned up.
95+
# This option has no effect when hash-encoding-mode is legacy.
96+
# Default: accurate
97+
# Available options: accurate, approximate
98+
hash-length-mode accurate
99+
80100
# Persist the cluster nodes topology in local file($dir/nodes.conf). This configuration
81101
# takes effect only if the cluster mode was enabled.
82102
#

src/config/config.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ const std::vector<ConfigEnum<BlockCacheType>> cache_types{[] {
9393
const std::vector<ConfigEnum<MigrationType>> migration_types{{"redis-command", MigrationType::kRedisCommand},
9494
{"raw-key-value", MigrationType::kRawKeyValue}};
9595

96+
const std::vector<ConfigEnum<HashSubkeyEncodingMode>> hash_subkey_encoding_modes{
97+
{"legacy", HashSubkeyEncodingMode::kLegacy},
98+
{"field-expiration", HashSubkeyEncodingMode::kFieldExpiration},
99+
};
100+
101+
const std::vector<ConfigEnum<HashLengthMode>> hash_length_modes{
102+
{"accurate", HashLengthMode::kAccurate},
103+
{"approximate", HashLengthMode::kApproximate},
104+
};
105+
96106
std::string TrimRocksDbPrefix(std::string s) {
97107
constexpr std::string_view prefix = "rocksdb.";
98108
if (!util::StartsWithICase(s, prefix)) return s;
@@ -237,6 +247,11 @@ Config::Config() {
237247
{"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, true)},
238248
{"redis-databases", true, new IntField(&redis_databases, 0, 0, INT_MAX)},
239249
{"resp3-enabled", false, new YesNoField(&resp3_enabled, true)},
250+
{"hash-encoding-mode", false,
251+
new EnumField<HashSubkeyEncodingMode>(&hash_encoding_mode, hash_subkey_encoding_modes,
252+
HashSubkeyEncodingMode::kLegacy)},
253+
{"hash-length-mode", false,
254+
new EnumField<HashLengthMode>(&hash_length_mode, hash_length_modes, HashLengthMode::kAccurate)},
240255
{"repl-namespace-enabled", false, new YesNoField(&repl_namespace_enabled, false)},
241256
{"proto-max-bulk-len", false,
242257
new IntWithUnitField<uint64_t>(&proto_max_bulk_len, std::to_string(512 * MiB), 1 * MiB,

src/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ struct Config {
180180
int redis_databases = 0;
181181
bool resp3_enabled = false;
182182
int log_retention_days;
183+
HashSubkeyEncodingMode hash_encoding_mode = HashSubkeyEncodingMode::kLegacy;
184+
HashLengthMode hash_length_mode = HashLengthMode::kAccurate;
183185

184186
// load_tokens is used to buffer the tokens when loading,
185187
// don't use it to authenticate or rewrite the configuration file.

src/search/indexer.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ StatusOr<FieldValueRetriever> FieldValueRetriever::Create(IndexOnDataType type,
4343
std::string ns_key = db.AppendNamespacePrefix(key);
4444
HashMetadata metadata(false);
4545

46-
auto s = db.GetMetadata(ctx, ns_key, &metadata);
46+
auto s = db.getMetadata(ctx, ns_key, &metadata);
4747
if (!s.ok()) return {s.IsNotFound() ? Status::NotFound : Status::NotOK, s.ToString()};
4848
return FieldValueRetriever(db, metadata, key);
4949
} else if (type == IndexOnDataType::JSON) {
@@ -98,7 +98,7 @@ StatusOr<kqir::Value> FieldValueRetriever::ParseFromJson(const jsoncons::json &v
9898
}
9999
}
100100

101-
StatusOr<kqir::Value> FieldValueRetriever::ParseFromHash(const std::string &value,
101+
StatusOr<kqir::Value> FieldValueRetriever::ParseFromHash(std::string_view value,
102102
const redis::IndexFieldMetadata *type) {
103103
if (auto numeric [[maybe_unused]] = dynamic_cast<const redis::NumericFieldMetadata *>(type)) {
104104
auto num = GET_OR_RET(ParseFloat(value));
@@ -137,7 +137,11 @@ StatusOr<kqir::Value> FieldValueRetriever::Retrieve(engine::Context &ctx, std::s
137137
if (s.IsNotFound()) return {Status::NotFound, s.ToString()};
138138
if (!s.ok()) return {Status::NotOK, s.ToString()};
139139

140-
return ParseFromHash(value, type);
140+
Slice decoded_value(value);
141+
s = metadata.DecodeSubkeyValue(&decoded_value);
142+
if (!s.ok()) return {Status::NotOK, s.ToString()};
143+
144+
return ParseFromHash(decoded_value.ToStringView(), type);
141145
} else if (std::holds_alternative<JsonData>(db)) {
142146
auto &value = std::get<JsonData>(db);
143147

src/search/indexer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ struct FieldValueRetriever {
6767
StatusOr<kqir::Value> Retrieve(engine::Context &ctx, std::string_view field, const redis::IndexFieldMetadata *type);
6868

6969
static StatusOr<kqir::Value> ParseFromJson(const jsoncons::json &value, const redis::IndexFieldMetadata *type);
70-
static StatusOr<kqir::Value> ParseFromHash(const std::string &value, const redis::IndexFieldMetadata *type);
70+
static StatusOr<kqir::Value> ParseFromHash(std::string_view value, const redis::IndexFieldMetadata *type);
7171
};
7272

7373
struct IndexUpdater {

src/storage/redis_db.cc

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -558,42 +558,7 @@ rocksdb::Status Database::KeyExist(engine::Context &ctx, const std::string &key)
558558
rocksdb::Status SubKeyScanner::Scan(engine::Context &ctx, RedisType type, const Slice &user_key,
559559
const std::string &cursor, uint64_t limit, const std::string &subkey_prefix,
560560
std::vector<std::string> *keys, std::vector<std::string> *values) {
561-
uint64_t cnt = 0;
562-
std::string ns_key = AppendNamespacePrefix(user_key);
563-
Metadata metadata(type, false);
564-
rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &metadata);
565-
if (!s.ok()) return s;
566-
567-
auto iter = util::UniqueIterator(ctx, ctx.DefaultScanOptions());
568-
std::string match_prefix_key =
569-
InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode();
570-
571-
std::string start_key;
572-
if (!cursor.empty()) {
573-
start_key = InternalKey(ns_key, cursor, metadata.version, storage_->IsSlotIdEncoded()).Encode();
574-
} else {
575-
start_key = match_prefix_key;
576-
}
577-
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
578-
if (!cursor.empty() && iter->key() == start_key) {
579-
// if cursor is not empty, then we need to skip start_key
580-
// because we already return that key in the last scan
581-
continue;
582-
}
583-
if (!iter->key().starts_with(match_prefix_key)) {
584-
break;
585-
}
586-
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
587-
keys->emplace_back(ikey.GetSubKey().ToString());
588-
if (values != nullptr) {
589-
values->emplace_back(iter->value().ToString());
590-
}
591-
cnt++;
592-
if (limit > 0 && cnt >= limit) {
593-
break;
594-
}
595-
}
596-
return iter->status();
561+
return scanSubkeys<Metadata>(ctx, type, user_key, cursor, limit, subkey_prefix, keys, values);
597562
}
598563

599564
RedisType WriteBatchLogData::GetRedisType() const { return type_; }

src/storage/redis_db.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222

2323
#include <optional>
2424
#include <string>
25+
#include <type_traits>
2526
#include <utility>
2627
#include <variant>
2728
#include <vector>
2829

2930
#include "cluster/cluster_defs.h"
31+
#include "common/db_util.h"
3032
#include "redis_metadata.h"
3133
#include "storage.h"
3234

@@ -182,6 +184,68 @@ class SubKeyScanner : public redis::Database {
182184
rocksdb::Status Scan(engine::Context &ctx, RedisType type, const Slice &user_key, const std::string &cursor,
183185
uint64_t limit, const std::string &subkey_prefix, std::vector<std::string> *keys,
184186
std::vector<std::string> *values = nullptr);
187+
188+
protected:
189+
struct RawSubkeyValueAdapter {
190+
template <typename MetadataT>
191+
rocksdb::Status operator()(const MetadataT &, Slice *) const {
192+
return rocksdb::Status::OK();
193+
}
194+
};
195+
196+
template <typename MetadataT>
197+
static MetadataT createScanMetadata(RedisType type) {
198+
if constexpr (std::is_same_v<MetadataT, Metadata>) {
199+
return Metadata(type, false);
200+
} else {
201+
return MetadataT(false);
202+
}
203+
}
204+
205+
template <typename MetadataT, typename ValueAdapter = RawSubkeyValueAdapter>
206+
rocksdb::Status scanSubkeys(engine::Context &ctx, RedisType type, const Slice &user_key, const std::string &cursor,
207+
uint64_t limit, const std::string &subkey_prefix, std::vector<std::string> *keys,
208+
std::vector<std::string> *values = nullptr, ValueAdapter value_adapter = {}) {
209+
uint64_t cnt = 0;
210+
std::string ns_key = AppendNamespacePrefix(user_key);
211+
auto metadata = createScanMetadata<MetadataT>(type);
212+
rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &metadata);
213+
if (!s.ok()) return s;
214+
215+
auto iter = util::UniqueIterator(ctx, ctx.DefaultScanOptions());
216+
std::string match_prefix_key =
217+
InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode();
218+
219+
std::string start_key;
220+
if (!cursor.empty()) {
221+
start_key = InternalKey(ns_key, cursor, metadata.version, storage_->IsSlotIdEncoded()).Encode();
222+
} else {
223+
start_key = match_prefix_key;
224+
}
225+
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
226+
if (!cursor.empty() && iter->key() == start_key) {
227+
// if cursor is not empty, then we need to skip start_key
228+
// because we already return that key in the last scan
229+
continue;
230+
}
231+
if (!iter->key().starts_with(match_prefix_key)) {
232+
break;
233+
}
234+
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
235+
keys->emplace_back(ikey.GetSubKey().ToString());
236+
if (values != nullptr) {
237+
Slice value(iter->value());
238+
s = value_adapter(metadata, &value);
239+
if (!s.ok()) return s;
240+
values->emplace_back(value.data(), value.size());
241+
}
242+
cnt++;
243+
if (limit > 0 && cnt >= limit) {
244+
break;
245+
}
246+
}
247+
return iter->status();
248+
}
185249
};
186250

187251
class WriteBatchLogData {

src/storage/redis_metadata.cc

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const int VersionCounterBits = 11;
3838
static std::atomic<uint64_t> version_counter_ = 0;
3939

4040
constexpr const char *kErrMetadataTooShort = "metadata is too short";
41+
constexpr const char *kErrHashSubkeyValueTooShort = "hash subkey value is too short";
4142

4243
InternalKey::InternalKey(Slice input, bool slot_id_encoded) : slot_id_encoded_(slot_id_encoded) {
4344
uint32_t key_size = 0;
@@ -339,6 +340,80 @@ bool Metadata::IsEmptyableType() const {
339340

340341
bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
341342

343+
void HashMetadata::Encode(std::string *dst) const {
344+
Metadata::Encode(dst);
345+
if (IsLegacySubkeyEncoding()) {
346+
return;
347+
}
348+
349+
PutFixed8(dst, static_cast<uint8_t>(mode));
350+
PutFixed64(dst, expsz);
351+
PutFixed64(dst, lower);
352+
PutFixed64(dst, upper);
353+
}
354+
355+
rocksdb::Status HashMetadata::Decode(Slice *input) {
356+
if (auto s = Metadata::Decode(input); !s.ok()) {
357+
return s;
358+
}
359+
360+
if (input->empty()) {
361+
mode = HashSubkeyEncodingMode::kLegacy;
362+
expsz = 0;
363+
lower = 0;
364+
upper = 0;
365+
return rocksdb::Status::OK();
366+
}
367+
368+
if (input->size() < 1 + 8 + 8 + 8) {
369+
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
370+
}
371+
372+
uint8_t encoded_mode = 0;
373+
GetFixed8(input, &encoded_mode);
374+
if (encoded_mode > static_cast<uint8_t>(HashSubkeyEncodingMode::kFieldExpiration)) {
375+
return rocksdb::Status::InvalidArgument("invalid hash subkey encoding mode");
376+
}
377+
378+
mode = static_cast<HashSubkeyEncodingMode>(encoded_mode);
379+
GetFixed64(input, &expsz);
380+
GetFixed64(input, &lower);
381+
GetFixed64(input, &upper);
382+
return rocksdb::Status::OK();
383+
}
384+
385+
std::string HashMetadata::EncodeSubkeyValue(Slice value, uint64_t expire) const {
386+
if (IsLegacySubkeyEncoding()) {
387+
return value.ToString();
388+
}
389+
390+
std::string encoded;
391+
encoded.reserve(kFieldExpirationPrefixSize + value.size());
392+
PutFixed64(&encoded, expire);
393+
encoded.append(value.data(), value.size());
394+
return encoded;
395+
}
396+
397+
rocksdb::Status HashMetadata::DecodeSubkeyValue(Slice *value, uint64_t *expire) const {
398+
if (IsLegacySubkeyEncoding()) {
399+
if (expire != nullptr) {
400+
*expire = 0;
401+
}
402+
return rocksdb::Status::OK();
403+
}
404+
405+
if (value->size() < kFieldExpirationPrefixSize) {
406+
return rocksdb::Status::InvalidArgument(kErrHashSubkeyValueTooShort);
407+
}
408+
409+
uint64_t encoded_expire = 0;
410+
GetFixed64(value, &encoded_expire);
411+
if (expire != nullptr) {
412+
*expire = encoded_expire;
413+
}
414+
return rocksdb::Status::OK();
415+
}
416+
342417
ListMetadata::ListMetadata(bool generate_version)
343418
: Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {}
344419

src/storage/redis_metadata.h

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ enum RedisCommand {
103103
constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value";
104104
constexpr const char *kErrMsgKeyExpired = "the key was expired";
105105

106+
enum class HashSubkeyEncodingMode : uint8_t {
107+
kLegacy = 0,
108+
kFieldExpiration = 1,
109+
};
110+
111+
enum class HashLengthMode : uint8_t {
112+
kAccurate = 0,
113+
kApproximate = 1,
114+
};
115+
106116
using rocksdb::Slice;
107117

108118
struct KeyNumStats {
@@ -211,7 +221,25 @@ class Metadata {
211221

212222
class HashMetadata : public Metadata {
213223
public:
214-
explicit HashMetadata(bool generate_version = true) : Metadata(kRedisHash, generate_version) {}
224+
static constexpr size_t kFieldExpirationPrefixSize = sizeof(uint64_t);
225+
226+
HashSubkeyEncodingMode mode = HashSubkeyEncodingMode::kLegacy;
227+
uint64_t expsz = 0;
228+
uint64_t lower = 0;
229+
uint64_t upper = 0;
230+
231+
explicit HashMetadata(bool generate_version = true, HashSubkeyEncodingMode mode = HashSubkeyEncodingMode::kLegacy)
232+
: Metadata(kRedisHash, generate_version), mode(mode) {}
233+
234+
bool IsLegacySubkeyEncoding() const { return mode == HashSubkeyEncodingMode::kLegacy; }
235+
bool IsFieldExpirationEncoding() const { return mode == HashSubkeyEncodingMode::kFieldExpiration; }
236+
237+
[[nodiscard]] std::string EncodeSubkeyValue(Slice value, uint64_t expire = 0) const;
238+
[[nodiscard]] rocksdb::Status DecodeSubkeyValue(Slice *value, uint64_t *expire = nullptr) const;
239+
240+
void Encode(std::string *dst) const override;
241+
using Metadata::Decode;
242+
rocksdb::Status Decode(Slice *input) override;
215243
};
216244

217245
class SetMetadata : public Metadata {

0 commit comments

Comments
 (0)