Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4784442
refactor: make all the flags for reverse as a compile-time argument
donghao526 Nov 20, 2025
45281c3
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 21, 2025
b421adb
refactor(tdigest): change variable reverse to Reverse
donghao526 Nov 21, 2025
1ced14d
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 21, 2025
70ec5b8
refactor(tdigest): update TDigestRank template parameters for clarity
donghao526 Nov 23, 2025
c42ed44
refactor(tdigest): update DummyCentroids to use template parameter fo…
donghao526 Nov 23, 2025
5d3a885
style: use the init-statement for calling TDigestRank
donghao526 Nov 24, 2025
3bb2fe6
refactor(tdigest): simplify iterator handling in DummyCentroids and i…
donghao526 Nov 24, 2025
f1c559c
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Nov 24, 2025
7fb6b2f
refactor(tdigest): change get_cbegin_iter and get_cend_iter methods t…
donghao526 Nov 24, 2025
0ed408d
refactor(tdigest): rename function names for consistency
donghao526 Nov 24, 2025
d5cce3f
Merge branch 'unstable' into refactor/tdigest-rank
PragmaTwice Nov 27, 2025
843af3d
refactor(tdigest): make iterator methods static and improve clarity i…
donghao526 Dec 2, 2025
3c935d8
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 2, 2025
48c1f47
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 2, 2025
4bdb79b
refactor(tdigest): rename iterator methods for consistency and clarity
donghao526 Dec 2, 2025
b6074af
refactor(tdigest): refactor the GetCbeginIter and GetCendIter to deta…
donghao526 Dec 2, 2025
c2894a0
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 4, 2025
7487397
refactor(tdigest): replace template Rank with separate Rank and RevRa…
donghao526 Dec 4, 2025
0aaa788
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 4, 2025
be9a404
refactor(tdigest): move Rank and RevRank methods above Quantile for b…
donghao526 Dec 4, 2025
8f229ce
refactor(tdigest): remove unused RankImpl template function
donghao526 Dec 4, 2025
e73eae4
refactor(tdigest): extract common preparation logic from Rank and Rev…
donghao526 Dec 4, 2025
69bf01a
refactor(tdigest): remove unnecessary comment indicating special retu…
donghao526 Dec 4, 2025
e1ed70a
refactor(tdigest): fix compile error
donghao526 Dec 4, 2025
798a69d
refactor(tdigest): fix clang-format error
donghao526 Dec 4, 2025
1c3ae7d
refactor(tdigest): fix error when get rank on an empty tdigest
donghao526 Dec 5, 2025
4b37eed
refactor(tdigest): fix some compile error
donghao526 Dec 5, 2025
6911242
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 5, 2025
c097f8c
refactor(tdigest): use the init-statement to declare "s" inside the i…
donghao526 Dec 6, 2025
f4e5488
Merge branch 'refactor/tdigest-rank' of github.com:donghao526/kvrocks…
donghao526 Dec 6, 2025
a06db87
Merge branch 'unstable' into refactor/tdigest-rank
donghao526 Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};

template <bool reverse>
template <bool Reverse>
class TDigestRankCommand : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand All @@ -202,7 +202,16 @@ class TDigestRankCommand : public Commander {
TDigest tdigest(srv->storage, conn->GetNamespace());
std::vector<int> result;
result.reserve(origin_inputs_.size());
if (const auto s = tdigest.Rank(ctx, key_name_, unique_inputs_, reverse, result); !s.ok()) {

if (const auto s =
[&]() {
if constexpr (Reverse) {
return tdigest.RevRank(ctx, key_name_, unique_inputs_, result);
} else {
return tdigest.Rank(ctx, key_name_, unique_inputs_, result);
}
}();
!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
Expand Down
166 changes: 80 additions & 86 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,114 +47,81 @@

namespace redis {

namespace {
template <bool Reverse, typename Container>

Check warning on line 51 in src/types/redis_tdigest.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer free functions over member functions when handling objects of generic type "Container".

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZrvvWedQ5ZihAVLKRx6&open=AZrvvWedQ5ZihAVLKRx6&pullRequest=3268
inline decltype(auto) GetCbeginIter(const Container& centroids) {
if constexpr (Reverse) {
return centroids.crbegin();
} else {
return centroids.cbegin();
}
}

template <bool Reverse, typename Container>

Check warning on line 60 in src/types/redis_tdigest.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer free functions over member functions when handling objects of generic type "Container".

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZrvvWedQ5ZihAVLKRx7&open=AZrvvWedQ5ZihAVLKRx7&pullRequest=3268
inline decltype(auto) GetCendIter(const Container& centroids) {
if constexpr (Reverse) {
return centroids.crend();
} else {
return centroids.cend();
}
}
} // namespace

// TODO: It should be replaced by a iteration of the rocksdb iterator
template <bool Reverse>
class DummyCentroids {
public:
class BaseIterator {
public:
virtual ~BaseIterator() = default;
virtual bool Next() = 0;
virtual bool Prev() = 0;
virtual bool Valid() const = 0;
virtual std::unique_ptr<BaseIterator> Clone() const = 0;
virtual StatusOr<Centroid> GetCentroid() const = 0;
};

DummyCentroids(const TDigestMetadata& meta_data, const std::vector<Centroid>& centroids)
: meta_data_(meta_data), centroids_(centroids) {}
class Iterator : public BaseIterator {
class Iterator {
public:
Iterator(std::vector<Centroid>::const_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.cend()) {
return std::make_unique<Iterator>(std::next(centroids_.cbegin(), std::distance(centroids_.cbegin(), iter_)),
centroids_);
using IterType = std::conditional_t<Reverse, std::vector<Centroid>::const_reverse_iterator,
std::vector<Centroid>::const_iterator>;
Iterator(IterType iter, const std::vector<Centroid>& centroids) : iter_(iter), centroids_(centroids) {}
std::unique_ptr<Iterator> Clone() const {
if (iter_ != GetCendIter<Reverse>(centroids_)) {
return std::make_unique<Iterator>(
std::next(GetCbeginIter<Reverse>(centroids_), std::distance(GetCbeginIter<Reverse>(centroids_), iter_)),
centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
return std::make_unique<Iterator>(GetCendIter<Reverse>(centroids_), centroids_);
}
bool Next() override {
bool Next() {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.cend();
return iter_ != GetCendIter<Reverse>(centroids_);
}

// The Prev function can only be called for item is not cend,
// because we must guarantee the iterator to be inside the valid range before iteration.
bool Prev() override {
if (Valid() && iter_ != centroids_.cbegin()) {
bool Prev() {
if (Valid() && iter_ != GetCendIter<Reverse>(centroids_)) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.cend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.cend()) {
bool Valid() const { return iter_ != GetCendIter<Reverse>(centroids_); }
StatusOr<Centroid> GetCentroid() const {
if (iter_ == GetCendIter<Reverse>(centroids_)) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_iterator iter_;
IterType iter_;
const std::vector<Centroid>& centroids_;
};

class ReverseIterator final : public BaseIterator {
public:
ReverseIterator(std::vector<Centroid>::const_reverse_iterator&& iter, const std::vector<Centroid>& centroids)
: iter_(iter), centroids_(centroids) {}
std::unique_ptr<BaseIterator> Clone() const override {
if (iter_ != centroids_.crend()) {
return std::make_unique<ReverseIterator>(
std::next(centroids_.crbegin(), std::distance(centroids_.crbegin(), iter_)), centroids_);
}
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
bool Next() override {
if (Valid()) {
std::advance(iter_, 1);
}
return iter_ != centroids_.crend();
}

bool Prev() override {
if (Valid() && iter_ != centroids_.crbegin()) {
std::advance(iter_, -1);
}
return Valid();
}
bool Valid() const override { return iter_ != centroids_.crend(); }
StatusOr<Centroid> GetCentroid() const override {
if (iter_ == centroids_.crend()) {
return {::Status::NotOK, "invalid iterator during decoding tdigest centroid"};
}
return *iter_;
}

private:
std::vector<Centroid>::const_reverse_iterator iter_;
const std::vector<Centroid>& centroids_;
};

std::unique_ptr<BaseIterator> Begin(const bool reverse = false) const {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crbegin(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cbegin(), centroids_);
std::unique_ptr<Iterator> Begin() const {
return std::make_unique<Iterator>(GetCbeginIter<Reverse>(centroids_), centroids_);
}
std::unique_ptr<BaseIterator> End(const bool reverse = false) const {
std::unique_ptr<Iterator> End() const {
if (centroids_.empty()) {
if (reverse) {
return std::make_unique<ReverseIterator>(centroids_.crend(), centroids_);
}
return std::make_unique<Iterator>(centroids_.cend(), centroids_);
}
if (reverse) {
return std::make_unique<ReverseIterator>(std::prev(centroids_.crend()), centroids_);
return std::make_unique<Iterator>(GetCendIter<Reverse>(centroids_), centroids_);
}
return std::make_unique<Iterator>(std::prev(centroids_.cend()), centroids_);
return std::make_unique<Iterator>(std::prev(GetCendIter<Reverse>(centroids_)), centroids_);
}
double TotalWeight() const { return static_cast<double>(meta_data_.total_weight); }
double Min() const { return meta_data_.minimum; }
Expand Down Expand Up @@ -273,10 +240,9 @@
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
bool reverse, std::vector<int>& result) {
rocksdb::Status TDigest::prepareRankData(engine::Context& ctx, const Slice& digest_name, TDigestMetadata& metadata,
std::vector<Centroid>& centroids) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;
{
LockGuard guard(storage_->GetLockManager(), ns_key);

Expand All @@ -285,23 +251,51 @@
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}
return dumpCentroids(ctx, ns_key, metadata, &centroids);
}

rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
TDigestMetadata metadata;
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

auto dump_centroids = DummyCentroids<false>(metadata, centroids);
if (auto status = TDigestRank<false>(dump_centroids, inputs, result); !status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result) {
TDigestMetadata metadata;
std::vector<Centroid> centroids;
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids); !status.ok()) {
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto status = TDigestRank(dump_centroids, inputs, reverse, result);
if (!status) {
if (metadata.total_observations == 0) {
result.resize(inputs.size(), -2);
return rocksdb::Status::OK();
}

auto dump_centroids = DummyCentroids<true>(metadata, centroids);
if (auto status = TDigestRank<true>(dump_centroids, inputs, result); !status) {
return rocksdb::Status::InvalidArgument(status.Msg());
}
return rocksdb::Status::OK();
Expand Down Expand Up @@ -332,7 +326,7 @@
return status;
}

auto dump_centroids = DummyCentroids(metadata, centroids);
auto dump_centroids = DummyCentroids<false>(metadata, centroids);

auto quantile_results = std::vector<double>();
quantile_results.reserve(qs.size());
Expand Down
7 changes: 5 additions & 2 deletions src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ class TDigest : public SubKeyScanner {

rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs, bool reverse,
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs,
std::vector<int>& result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down Expand Up @@ -130,6 +132,7 @@ class TDigest : public SubKeyScanner {
static std::string internalValueFromCentroid(const Centroid& centroid);
rocksdb::Status decodeCentroidFromKeyValue(const rocksdb::Slice& key, const rocksdb::Slice& value,
Centroid* centroid) const;
rocksdb::Status prepareRankData(engine::Context& ctx, const Slice& digest_name, TDigestMetadata& metadata,
std::vector<Centroid>& centroids);
};

} // namespace redis
15 changes: 3 additions & 12 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ struct DoubleComparator {
bool operator()(const double& a, const double& b) const { return DoubleCompare(a, b) == -1; }
};

template <typename TD, bool Reverse>
inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
template <bool Reverse, typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, std::vector<int>& result) {
std::map<double, size_t, DoubleComparator> value_to_index;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_index[inputs[i]] = i;
Expand Down Expand Up @@ -211,7 +211,7 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
}
}

auto iter = td.Begin(Reverse);
auto iter = td.Begin();
double cumulative_weight = 0;
while (iter->Valid() && !is_end()) {
auto centroid = GET_OR_RET(iter->GetCentroid());
Expand Down Expand Up @@ -267,12 +267,3 @@ inline Status TDigestRankImpl(TD&& td, const std::vector<double>& inputs, std::v
}
return Status::OK();
}

template <typename TD>
inline Status TDigestRank(TD&& td, const std::vector<double>& inputs, bool reverse, std::vector<int>& result) {
if (reverse) {
return TDigestRankImpl<TD, true>(std::forward<TD>(td), inputs, result);
} else {
return TDigestRankImpl<TD, false>(std::forward<TD>(td), inputs, result);
}
}
Loading
Loading