Skip to content

Commit 6139677

Browse files
committed
feat(ts) Support IGNORE option when creating TimeSeries
1 parent 94e4c8e commit 6139677

File tree

6 files changed

+113
-6
lines changed

6 files changed

+113
-6
lines changed

src/commands/cmd_timeseries.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ constexpr const char *errBadRetention = "Couldn't parse RETENTION";
2929
constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
3030
constexpr const char *errBadEncoding = "unknown ENCODING parameter";
3131
constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
32+
constexpr const char *errBadIgnore = "Couldn't parse IGNORE";
3233
constexpr const char *errInvalidTimestamp = "invalid timestamp";
3334
constexpr const char *errInvalidValue = "invalid value";
3435
constexpr const char *errOldTimestamp = "Timestamp is older than retention";
@@ -252,6 +253,9 @@ class CommandTSCreateBase : public KeywordCommandBase {
252253
registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
253254
return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
254255
});
256+
registerHandler("IGNORE", [this](TSOptionsParser &parser) {
257+
return handleIgnore(parser, create_option_.ignore_max_time_diff, create_option_.ignore_max_val_diff);
258+
});
255259
registerHandler("LABELS", [this](TSOptionsParser &parser) { return handleLabels(parser, create_option_.labels); });
256260
}
257261

@@ -315,6 +319,17 @@ class CommandTSCreateBase : public KeywordCommandBase {
315319
return Status::OK();
316320
}
317321

322+
static Status handleIgnore(TSOptionsParser &parser, uint64_t &ignore_max_time_diff, double &ignore_max_val_diff) {
323+
auto parse_time_diff = parser.TakeInt<uint64_t>();
324+
auto parse_val_diff = parser.TakeFloat<double>();
325+
if (!parse_time_diff.IsOK() || !parse_val_diff.IsOK() || parse_val_diff.GetValue() < 0) {
326+
return {Status::RedisParseErr, errBadIgnore};
327+
}
328+
ignore_max_time_diff = parse_time_diff.GetValue();
329+
ignore_max_val_diff = parse_val_diff.GetValue();
330+
return Status::OK();
331+
}
332+
318333
TSCreateOption create_option_;
319334
};
320335

src/storage/redis_metadata.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,15 @@ void TimeSeriesMetadata::Encode(std::string *dst) const {
548548
PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
549549
PutSizedString(dst, source_key);
550550
PutFixed64(dst, last_timestamp);
551+
PutFixed64(dst, ignore_max_time_diff);
552+
PutDouble(dst, ignore_max_val_diff);
551553
}
552554

553555
rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
554556
if (auto s = Metadata::Decode(input); !s.ok()) {
555557
return s;
556558
}
557-
if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 + sizeof(uint32_t)) {
559+
if (input->size() < sizeof(uint64_t) * 3 + sizeof(uint8_t) * 2 + sizeof(uint32_t) + sizeof(double)) {
558560
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
559561
}
560562

@@ -566,6 +568,8 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
566568
GetSizedString(input, &source_key_slice);
567569
source_key = source_key_slice.ToString();
568570
GetFixed64(input, &last_timestamp);
571+
GetFixed64(input, &ignore_max_time_diff);
572+
GetDouble(input, &ignore_max_val_diff);
569573

570574
return rocksdb::Status::OK();
571575
}

src/storage/redis_metadata.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,8 @@ class TimeSeriesMetadata : public Metadata {
387387
uint64_t chunk_size;
388388
ChunkType chunk_type;
389389
DuplicatePolicy duplicate_policy;
390+
uint64_t ignore_max_time_diff;
391+
double ignore_max_val_diff;
390392
std::string source_key;
391393
uint64_t last_timestamp = 0; // Approximate last timestamp, used for compaction filter
392394

@@ -395,14 +397,18 @@ class TimeSeriesMetadata : public Metadata {
395397
retention_time(0),
396398
chunk_size(0),
397399
chunk_type(ChunkType::UNCOMPRESSED),
398-
duplicate_policy(DuplicatePolicy::BLOCK) {}
400+
duplicate_policy(DuplicatePolicy::BLOCK),
401+
ignore_max_time_diff(0),
402+
ignore_max_val_diff(0.0) {}
399403
TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType chunk_type,
400404
DuplicatePolicy duplicate_policy, bool generate_version = true)
401405
: Metadata(kRedisTimeSeries, generate_version),
402406
retention_time(retention_time),
403407
chunk_size(chunk_size),
404408
chunk_type(chunk_type),
405-
duplicate_policy(duplicate_policy) {}
409+
duplicate_policy(duplicate_policy),
410+
ignore_max_time_diff(0),
411+
ignore_max_val_diff(0.0) {}
406412

407413
void SetSourceKey(Slice key);
408414

src/types/redis_timeseries.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "redis_timeseries.h"
2222

23+
#include <cmath>
2324
#include <queue>
2425

2526
#include "commands/error_constants.h"
@@ -515,7 +516,9 @@ TSCreateOption::TSCreateOption()
515516
: retention_time(kDefaultRetentionTime),
516517
chunk_size(kDefaultChunkSize),
517518
chunk_type(kDefaultChunkType),
518-
duplicate_policy(kDefaultDuplicatePolicy) {}
519+
duplicate_policy(kDefaultDuplicatePolicy),
520+
ignore_max_time_diff(0),
521+
ignore_max_val_diff(0.0) {}
519522

520523
Status TSMQueryFilterParser::Parse(std::string_view expr) {
521524
if (expr.empty()) return Status::OK();
@@ -678,6 +681,8 @@ TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
678681
metadata.chunk_size = option.chunk_size;
679682
metadata.chunk_type = option.chunk_type;
680683
metadata.duplicate_policy = option.duplicate_policy;
684+
metadata.ignore_max_time_diff = option.ignore_max_time_diff;
685+
metadata.ignore_max_val_diff = option.ignore_max_val_diff;
681686
metadata.SetSourceKey(option.source_key);
682687

683688
return metadata;
@@ -851,6 +856,43 @@ rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
851856
return createTimeSeries(ctx, ns_key, metadata_out, option);
852857
}
853858

859+
rocksdb::Status TimeSeries::filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
860+
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch) {
861+
if (!metadata.source_key.empty() || metadata.duplicate_policy != DuplicatePolicy::LAST) {
862+
return rocksdb::Status::OK();
863+
}
864+
865+
std::vector<TSSample> latest_samples;
866+
auto s = getCommon(ctx, ns_key, metadata, true, &latest_samples);
867+
if (!s.ok() || latest_samples.empty()) {
868+
return s;
869+
}
870+
871+
auto latest_sample = latest_samples.back();
872+
auto all_samples = sample_batch->AsSlice();
873+
auto samples = all_samples.GetSampleSpan();
874+
auto add_results = all_samples.GetAddResultSpan();
875+
876+
for (size_t i = 0; i < samples.size(); i++) {
877+
if (add_results[i].type != TSChunk::AddResultType::kNone) {
878+
continue;
879+
}
880+
881+
const auto &sample = samples[i];
882+
if (sample.ts >= latest_sample.ts && sample.ts - latest_sample.ts <= metadata.ignore_max_time_diff &&
883+
std::abs(sample.v - latest_sample.v) <= metadata.ignore_max_val_diff) {
884+
add_results[i].type = TSChunk::AddResultType::kSkip;
885+
continue;
886+
}
887+
888+
if (sample.ts >= latest_sample.ts) {
889+
latest_sample = sample;
890+
}
891+
}
892+
893+
return rocksdb::Status::OK();
894+
}
895+
854896
rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
855897
SampleBatch &sample_batch, DownstreamUpsertArgs *ds_args) {
856898
auto batch = storage_->GetWriteBatchBase();
@@ -1930,6 +1972,8 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSS
19301972
rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
19311973
if (!s.ok()) return s;
19321974
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : metadata.duplicate_policy);
1975+
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
1976+
if (!s.ok()) return s;
19331977

19341978
DownstreamUpsertArgs ds_args;
19351979
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
@@ -1950,6 +1994,8 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, st
19501994
return s;
19511995
}
19521996
auto sample_batch = SampleBatch(std::move(samples), metadata.duplicate_policy);
1997+
s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch);
1998+
if (!s.ok()) return s;
19531999
DownstreamUpsertArgs ds_args;
19542000
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
19552001
if (!s.ok()) return s;

src/types/redis_timeseries.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ struct TSCreateOption {
151151
uint64_t chunk_size;
152152
TimeSeriesMetadata::ChunkType chunk_type;
153153
TimeSeriesMetadata::DuplicatePolicy duplicate_policy;
154+
uint64_t ignore_max_time_diff;
155+
double ignore_max_val_diff;
154156
std::string source_key;
155157
LabelKVList labels;
156158

@@ -257,8 +259,7 @@ enum class TSAlterMode : uint8_t {
257259
RETENTION = 1,
258260
CHUNK_SIZE = 1 << 1,
259261
DUPLICATE_POLICY = 1 << 2,
260-
IGNORE = 1 << 3,
261-
LABELS = 1 << 4,
262+
LABELS = 1 << 3,
262263
};
263264

264265
std::vector<TSSample> GroupSamplesAndReduce(const std::vector<std::vector<TSSample>> &all_samples,
@@ -317,6 +318,8 @@ class TimeSeries : public SubKeyScanner {
317318
const TSCreateOption *options);
318319
rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out,
319320
const TSCreateOption *option = nullptr);
321+
rocksdb::Status filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key,
322+
const TimeSeriesMetadata &metadata, SampleBatch *sample_batch);
320323
rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
321324
LabelKVList *labels);
322325
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,

tests/gocase/unit/type/timeseries/timeseries_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,24 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
202202
require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", "13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
203203
})
204204

205+
t.Run("TS.ADD Ignore Option", func(t *testing.T) {
206+
ignoreKey := "test_add_ignore_key"
207+
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
208+
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())
209+
210+
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())
211+
require.Equal(t, int64(1003), rdb.Do(ctx, "ts.add", ignoreKey, "1003", "11").Val())
212+
213+
res := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
214+
require.Equal(t, 1, len(res))
215+
assert.Equal(t, []interface{}{int64(1000), float64(10)}, res[0])
216+
217+
require.Equal(t, int64(1008), rdb.Do(ctx, "ts.add", ignoreKey, "1008", "20").Val())
218+
res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
219+
require.Equal(t, 2, len(res))
220+
assert.Equal(t, []interface{}{int64(1008), float64(20)}, res[1])
221+
})
222+
205223
t.Run("TS.ADD With Retention", func(t *testing.T) {
206224
require.NoError(t, rdb.Del(ctx, key).Err())
207225
require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", "1000").Err())
@@ -232,6 +250,21 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
232250
assert.Contains(t, res[1], "update is not supported when DUPLICATE_POLICY is set to BLOCK mode")
233251
})
234252

253+
t.Run("TS.MADD Ignore Option", func(t *testing.T) {
254+
ignoreKey := "test_madd_ignore_key"
255+
require.NoError(t, rdb.Del(ctx, ignoreKey).Err())
256+
require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err())
257+
require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val())
258+
259+
res := rdb.Do(ctx, "ts.madd", ignoreKey, "1003", "11", ignoreKey, "1004", "13", ignoreKey, "1007", "14").Val().([]interface{})
260+
assert.Equal(t, []interface{}{int64(1003), int64(1004), int64(1007)}, res)
261+
262+
samples := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{})
263+
require.Equal(t, 2, len(samples))
264+
assert.Equal(t, []interface{}{int64(1000), float64(10)}, samples[0])
265+
assert.Equal(t, []interface{}{int64(1004), float64(13)}, samples[1])
266+
})
267+
235268
t.Run("TS.MADD Nonexistent Key", func(t *testing.T) {
236269
require.NoError(t, rdb.Del(ctx, "nonexistent").Err())
237270
require.NoError(t, rdb.Del(ctx, "existent").Err())

0 commit comments

Comments
 (0)