Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4784442
refactor: make all the flags for reverse as a compile-time argument
donghao526 Nov 20, 2025
45281c3
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 21, 2025
b421adb
refactor(tdigest): change variable reverse to Reverse
donghao526 Nov 21, 2025
1ced14d
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 21, 2025
70ec5b8
refactor(tdigest): update TDigestRank template parameters for clarity
donghao526 Nov 23, 2025
c42ed44
refactor(tdigest): update DummyCentroids to use template parameter fo…
donghao526 Nov 23, 2025
5d3a885
style: use the init-statement for calling TDigestRank
donghao526 Nov 24, 2025
3bb2fe6
refactor(tdigest): simplify iterator handling in DummyCentroids and i…
donghao526 Nov 24, 2025
f1c559c
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 24, 2025
7fb6b2f
refactor(tdigest): change get_cbegin_iter and get_cend_iter methods t…
donghao526 Nov 24, 2025
0ed408d
refactor(tdigest): rename function names for consistency
donghao526 Nov 24, 2025
d5cce3f
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 27, 2025
843af3d
refactor(tdigest): make iterator methods static and improve clarity i…
donghao526 Dec 2, 2025
3c935d8
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 2, 2025
48c1f47
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 2, 2025
4bdb79b
refactor(tdigest): rename iterator methods for consistency and clarity
donghao526 Dec 2, 2025
b6074af
refactor(tdigest): refactor the GetCbeginIter and GetCendIter to deta…
donghao526 Dec 2, 2025
c2894a0
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 4, 2025
7487397
refactor(tdigest): replace template Rank with separate Rank and RevRa…
donghao526 Dec 4, 2025
0aaa788
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 4, 2025
be9a404
refactor(tdigest): move Rank and RevRank methods above Quantile for b…
donghao526 Dec 4, 2025
8f229ce
refactor(tdigest): remove unused RankImpl template function
donghao526 Dec 4, 2025
e73eae4
refactor(tdigest): extract common preparation logic from Rank and Rev…
donghao526 Dec 4, 2025
69bf01a
refactor(tdigest): remove unnecessary comment indicating special retu…
donghao526 Dec 4, 2025
e1ed70a
refactor(tdigest): fix compile error
donghao526 Dec 4, 2025
798a69d
refactor(tdigest): fix clang-format error
donghao526 Dec 4, 2025
1c3ae7d
refactor(tdigest): fix error when get rank on an empty tdigest
donghao526 Dec 5, 2025
4b37eed
refactor(tdigest): fix some compile error
donghao526 Dec 5, 2025
6911242
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 5, 2025
c097f8c
refactor(tdigest): use the init-statement to declare "s" inside the i…
donghao526 Dec 6, 2025
f4e5488
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 6, 2025
a06db87
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

template <bool reverse>
template <bool Reverse>
class TDigestRankCommand : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand All @@ -202,7 +202,7 @@ class TDigestRankCommand : public Commander {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(origin_inputs_.size());
if (const auto s = tdigest.Rank(ctx, key_name_, unique_inputs_, reverse, result); !s.ok()) {
if (const auto s = tdigest.Rank<Reverse>(ctx, key_name_, unique_inputs_, result); !s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
Expand Down
155 changes: 1 addition & 154 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,125 +47,6 @@

namespace redis {

// TODO: It should be replaced by a iteration of the rocksdb iterator
class DummyCentroids {
public:
class BaseIterator {
public:
virtual ~BaseIterator() = default;
virtual bool Next() = 0;
virtual bool Prev() = 0;
virtual bool Valid() const = 0;
virtual std::unique_ptr<BaseIterator> Clone() const = 0;
virtual StatusOr<Centroid> GetCentroid() const = 0;
};

DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
: meta_data_(meta_data), centroids_(centroids) {}
class Iterator : public BaseIterator {
public:
Iterator(std::vector<Centroid>::const_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.cend()) {
return std::make_unique<Iterator>(std::next(centroids_.cbegin(), std::distance(centroids_.cbegin(), iter_)),
centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.cend();
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() override {
if (Valid() && iter_ != centroids_.cbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.cend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.cend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_iterator iter_;
const std::vector<Centroid>& centroids_;
};

class ReverseIterator final : public BaseIterator {
public:
ReverseIterator(std::vector<Centroid>::const_reverse_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.crend()) {
return std::make_unique<ReverseIterator>(
std::next(centroids_.crbegin(), std::distance(centroids_.crbegin(), iter_)), centroids_);
}
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.crend();
}

bool Prev() override {
if (Valid() && iter_ != centroids_.crbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.crend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.crend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_reverse_iterator iter_;
const std::vector<Centroid>& centroids_;
};

std::unique_ptr<BaseIterator> Begin(const bool reverse = false) const {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crbegin(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
}
std::unique_ptr<BaseIterator> End(const bool reverse = false) const {
if (centroids_.empty()) {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
if (reverse) {
return std::make_unique<ReverseIterator>(std::prev(centroids_.crend()), centroids_);
}
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
double Max() const { return meta_data_.maximum; }
uint64_t Size() const { return meta_data_.merged_nodes; }

private:
const TDigestMetadata& meta_data_;
const std::vector<Centroid>& centroids_;
};

uint32_t constexpr kMaxElements = 1 * 1024; // 1k doubles

rocksdb::Status TDigest::Create(engine::Context& ctx, const Slice& digest_name, const TDigestCreateOptions& options,
Expand Down Expand Up @@ -273,40 +154,6 @@ rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string& ns_
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
bool reverse, std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRank(dump_centroids, inputs, reverse, result);
if (!status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
Expand All @@ -332,7 +179,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto dump_centroids = DummyCentroids<false>(metadata, centroids);

auto quantile_results = std::vector<double>();
quantile_results.reserve(qs.size());
Expand Down
132 changes: 131 additions & 1 deletion src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,101 @@
#include "tdigest.h"

namespace redis {

// TODO: It should be replaced by a iteration of the rocksdb iterator

Check warning on line 37 in src/types/redis_tdigest.h

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this "TODO" comment.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZquNwderlS43wKvYAG1&open=AZquNwderlS43wKvYAG1&pullRequest=3268
template <bool Reverse>
class DummyCentroids {
public:
DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
: meta_data_(meta_data), centroids_(centroids) {}
class Iterator {
public:
using IterType = std::conditional_t<Reverse, std::vector<Centroid>::const_reverse_iterator,
std::vector<Centroid>::const_iterator>;
Iterator(IterType iter, const std::vector<Centroid>& centroids) : iter_(iter), centroids_(centroids) {}
std::unique_ptr<Iterator> Clone() const {
if (iter_ != getCendIter(centroids_)) {
return std::make_unique<Iterator>(
std::next(getCbeginIter(centroids_), std::distance(getCbeginIter(centroids_), iter_)), centroids_);
}
return std::make_unique<Iterator>(getCendIter(centroids_), centroids_);
}
bool Next() {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != getCendIter(centroids_);
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() {
if (Valid() && iter_ != getCendIter(centroids_)) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const { return iter_ != getCendIter(centroids_); }
StatusOr<Centroid> GetCentroid() const {
if (iter_ == getCendIter(centroids_)) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
IterType iter_;
const std::vector<Centroid>& centroids_;
template <typename Container>

Check warning on line 81 in src/types/redis_tdigest.h

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer free functions over member functions when handling objects of generic type "Container".

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZq_HL075jIPpto-0u7a&open=AZq_HL075jIPpto-0u7a&pullRequest=3268
decltype(auto) getCbeginIter(const Container& centroids) const {
if constexpr (Reverse) {
return centroids.crbegin();
} else {
return centroids.cbegin();
}
}

template <typename Container>

Check warning on line 90 in src/types/redis_tdigest.h

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer free functions over member functions when handling objects of generic type "Container".

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZq_HL075jIPpto-0u7b&open=AZq_HL075jIPpto-0u7b&pullRequest=3268
decltype(auto) getCendIter(const Container& centroids) const {
if constexpr (Reverse) {
return centroids.crend();
} else {
return centroids.cend();
}
}
};

std::unique_ptr<Iterator> Begin() const {
if constexpr (Reverse) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if constexpr (Reverse) {
return std::make_unique<Iterator>(getCbeginIter(centroids_), centroids_);

Maybe here could also use the util.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the Begin and End functions are outside the Iterator class, I made getCbeginIter and getCendIter static. Is this a good solution?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the getbegin and getend are both stateless, static is ok.
maybe they could be a free inline function protected by a detail private namespace if you prefer this way.

return std::make_unique<Iterator>(centroids_.crbegin(), centroids_);
} else {
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
}
}
std::unique_ptr<Iterator> End() const {
if (centroids_.empty()) {
if constexpr (Reverse) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return std::make_unique<Iterator>(centroids_.crend(), centroids_);
} else {
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
}
if constexpr (Reverse) {
return std::make_unique<Iterator>(std::prev(centroids_.crend()), centroids_);
} else {
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
}
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
double Max() const { return meta_data_.maximum; }
uint64_t Size() const { return meta_data_.merged_nodes; }

private:
const TDigestMetadata& meta_data_;
const std::vector<Centroid>& centroids_;
};

inline constexpr uint32_t kTDigestMaxCompression = 1000; // limit the compression to 1k

struct CentroidWithKey {
Expand Down Expand Up @@ -77,7 +172,8 @@

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs, bool reverse,
template <bool Reverse>
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

Expand Down Expand Up @@ -132,4 +228,38 @@
Centroid* centroid) const;
};

template <bool Reverse>
rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we can just expose two non-template function TDigest::Rank and TDigest::RevRank so that we can put the definition of this function into the .cc file instead of header?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necesary in this PR.

std::vector<int>& result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids<Reverse>(metadata, centroids);
if (auto status = TDigestRank<Reverse>(dump_centroids, inputs, result); !status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

} // namespace redis
15 changes: 3 additions & 12 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ struct DoubleComparator {
bool operator()(const double& a, const double& b) const { return DoubleCompare(a, b) == -1; }
};

template <typename TD, bool Reverse>
inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
template <bool Reverse, typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
std::map<double, size_t, DoubleComparator> value_to_index;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_index[inputs[i]] = i;
Expand Down Expand Up @@ -211,7 +211,7 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
}
}

auto iter = td.Begin(Reverse);
auto iter = td.Begin();
double cumulative_weight = 0;
while (iter->Valid() && !is_end()) {
auto centroid = GET_OR_RET(iter->GetCentroid());
Expand Down Expand Up @@ -267,12 +267,3 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
}
return Status::OK();
}

template <typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, bool reverse, std::vector<int>& result) {
if (reverse) {
return TDigestRankImpl<TD, true>(std::forward<TD>(td), inputs, result);
} else {
return TDigestRankImpl<TD, false>(std::forward<TD>(td), inputs, result);
}
}
Loading
Loading