Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4784442
refactor: make all the flags for reverse as a compile-time argument
donghao526 Nov 20, 2025
45281c3
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 21, 2025
b421adb
refactor(tdigest): change variable reverse to Reverse
donghao526 Nov 21, 2025
1ced14d
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 21, 2025
70ec5b8
refactor(tdigest): update TDigestRank template parameters for clarity
donghao526 Nov 23, 2025
c42ed44
refactor(tdigest): update DummyCentroids to use template parameter fo…
donghao526 Nov 23, 2025
5d3a885
style: use the init-statement for calling TDigestRank
donghao526 Nov 24, 2025
3bb2fe6
refactor(tdigest): simplify iterator handling in DummyCentroids and i…
donghao526 Nov 24, 2025
f1c559c
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 24, 2025
7fb6b2f
refactor(tdigest): change get_cbegin_iter and get_cend_iter methods t…
donghao526 Nov 24, 2025
0ed408d
refactor(tdigest): rename function names for consistency
donghao526 Nov 24, 2025
d5cce3f
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 27, 2025
843af3d
refactor(tdigest): make iterator methods static and improve clarity i…
donghao526 Dec 2, 2025
3c935d8
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 2, 2025
48c1f47
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 2, 2025
4bdb79b
refactor(tdigest): rename iterator methods for consistency and clarity
donghao526 Dec 2, 2025
b6074af
refactor(tdigest): refactor the GetCbeginIter and GetCendIter to deta…
donghao526 Dec 2, 2025
c2894a0
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 4, 2025
7487397
refactor(tdigest): replace template Rank with separate Rank and RevRa…
donghao526 Dec 4, 2025
0aaa788
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 4, 2025
be9a404
refactor(tdigest): move Rank and RevRank methods above Quantile for b…
donghao526 Dec 4, 2025
8f229ce
refactor(tdigest): remove unused RankImpl template function
donghao526 Dec 4, 2025
e73eae4
refactor(tdigest): extract common preparation logic from Rank and Rev…
donghao526 Dec 4, 2025
69bf01a
refactor(tdigest): remove unnecessary comment indicating special retu…
donghao526 Dec 4, 2025
e1ed70a
refactor(tdigest): fix compile error
donghao526 Dec 4, 2025
798a69d
refactor(tdigest): fix clang-format error
donghao526 Dec 4, 2025
1c3ae7d
refactor(tdigest): fix error when get rank on an empty tdigest
donghao526 Dec 5, 2025
4b37eed
refactor(tdigest): fix some compile error
donghao526 Dec 5, 2025
6911242
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 5, 2025
c097f8c
refactor(tdigest): use the init-statement to declare "s" inside the i…
donghao526 Dec 6, 2025
f4e5488
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 6, 2025
a06db87
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

template <bool reverse>
template <bool Reverse>
class TDigestRankCommand : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand All @@ -202,7 +202,7 @@ class TDigestRankCommand : public Commander {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(origin_inputs_.size());
if (const auto s = tdigest.Rank(ctx, key_name_, unique_inputs_, reverse, result); !s.ok()) {
if (const auto s = tdigest.Rank<Reverse>(ctx, key_name_, unique_inputs_, result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
Expand Down
153 changes: 0 additions & 153 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,125 +47,6 @@

namespace redis {

// TODO: It should be replaced by a iteration of the rocksdb iterator
class DummyCentroids {
public:
class BaseIterator {
public:
virtual ~BaseIterator() = default;
virtual bool Next() = 0;
virtual bool Prev() = 0;
virtual bool Valid() const = 0;
virtual std::unique_ptr<BaseIterator> Clone() const = 0;
virtual StatusOr<Centroid> GetCentroid() const = 0;
};

DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
: meta_data_(meta_data), centroids_(centroids) {}
class Iterator : public BaseIterator {
public:
Iterator(std::vector<Centroid>::const_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.cend()) {
return std::make_unique<Iterator>(std::next(centroids_.cbegin(), std::distance(centroids_.cbegin(), iter_)),
centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.cend();
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() override {
if (Valid() && iter_ != centroids_.cbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.cend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.cend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_iterator iter_;
const std::vector<Centroid>& centroids_;
};

class ReverseIterator final : public BaseIterator {
public:
ReverseIterator(std::vector<Centroid>::const_reverse_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.crend()) {
return std::make_unique<ReverseIterator>(
std::next(centroids_.crbegin(), std::distance(centroids_.crbegin(), iter_)), centroids_);
}
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.crend();
}

bool Prev() override {
if (Valid() && iter_ != centroids_.crbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.crend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.crend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_reverse_iterator iter_;
const std::vector<Centroid>& centroids_;
};

std::unique_ptr<BaseIterator> Begin(const bool reverse = false) const {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crbegin(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
}
std::unique_ptr<BaseIterator> End(const bool reverse = false) const {
if (centroids_.empty()) {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
if (reverse) {
return std::make_unique<ReverseIterator>(std::prev(centroids_.crend()), centroids_);
}
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
double Max() const { return meta_data_.maximum; }
uint64_t Size() const { return meta_data_.merged_nodes; }

private:
const TDigestMetadata& meta_data_;
const std::vector<Centroid>& centroids_;
};

uint32_t constexpr kMaxElements = 1 * 1024; // 1k doubles

rocksdb::Status TDigest::Create(engine::Context& ctx, const Slice& digest_name, const TDigestCreateOptions& options,
Expand Down Expand Up @@ -273,40 +154,6 @@ rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
bool reverse, std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRank(dump_centroids, inputs, reverse, result);
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
Expand Down
158 changes: 157 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,126 @@
#include "tdigest.h"

namespace redis {

// TODO: It should be replaced by a iteration of the rocksdb iterator
class DummyCentroids {
public:
class BaseIterator {
public:
virtual ~BaseIterator() = default;
virtual bool Next() = 0;
virtual bool Prev() = 0;
virtual bool Valid() const = 0;
virtual std::unique_ptr<BaseIterator> Clone() const = 0;
virtual StatusOr<Centroid> GetCentroid() const = 0;
};

DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
: meta_data_(meta_data), centroids_(centroids) {}
class Iterator : public BaseIterator {
public:
Iterator(std::vector<Centroid>::const_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.cend()) {
return std::make_unique<Iterator>(std::next(centroids_.cbegin(), std::distance(centroids_.cbegin(), iter_)),
centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.cend();
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() override {
if (Valid() && iter_ != centroids_.cbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.cend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.cend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_iterator iter_;
const std::vector<Centroid>& centroids_;
};

class ReverseIterator final : public BaseIterator {
Copy link
Copy Markdown
Member

@PragmaTwice PragmaTwice Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have something like template <bool Reverse> class Iterator instead of virtual functions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I have refactored the code.

public:
ReverseIterator(std::vector<Centroid>::const_reverse_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.crend()) {
return std::make_unique<ReverseIterator>(
std::next(centroids_.crbegin(), std::distance(centroids_.crbegin(), iter_)), centroids_);
}
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.crend();
}

bool Prev() override {
if (Valid() && iter_ != centroids_.crbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.crend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.crend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_reverse_iterator iter_;
const std::vector<Centroid>& centroids_;
};

std::unique_ptr<BaseIterator> Begin(const bool reverse = false) const {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crbegin(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
}
std::unique_ptr<BaseIterator> End(const bool reverse = false) const {
if (centroids_.empty()) {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
if (reverse) {
return std::make_unique<ReverseIterator>(std::prev(centroids_.crend()), centroids_);
}
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
double Max() const { return meta_data_.maximum; }
uint64_t Size() const { return meta_data_.merged_nodes; }

private:
const TDigestMetadata& meta_data_;
const std::vector<Centroid>& centroids_;
};

inline constexpr uint32_t kTDigestMaxCompression = 1000; // limit the compression to 1k

struct CentroidWithKey {
Expand Down Expand Up @@ -77,7 +197,8 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs, bool reverse,
template <bool reverse>
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

Expand Down Expand Up @@ -132,4 +253,39 @@ class TDigest : public SubKeyScanner {
Centroid* centroid) const;
};

template <bool reverse>
Comment thread
PragmaTwice marked this conversation as resolved.
Outdated
rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRank<DummyCentroids&, reverse>(dump_centroids, inputs, result);
Comment thread
PragmaTwice marked this conversation as resolved.
Outdated
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

} // namespace redis
13 changes: 2 additions & 11 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ struct DoubleComparator {
};

template <typename TD, bool Reverse>
Comment thread
PragmaTwice marked this conversation as resolved.
Outdated
inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
std::map<double, size_t, DoubleComparator> value_to_index;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_index[inputs[i]] = i;
Expand Down Expand Up @@ -266,13 +266,4 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
}
}
return Status::OK();
}

template <typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, bool reverse, std::vector<int>& result) {
if (reverse) {
return TDigestRankImpl<TD, true>(std::forward<TD>(td), inputs, result);
} else {
return TDigestRankImpl<TD, false>(std::forward<TD>(td), inputs, result);
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an endline for this file.

Loading