Skip to content

Commit ab6670a

Browse files
committed
Transactions: Implement ExtParallelUnstaging
1 parent 2b701c3 commit ab6670a

File tree

8 files changed

+670
-187
lines changed

8 files changed

+670
-187
lines changed

core/meta/features.hxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,5 @@
7171
* couchbase::collection::query_indexes() support
7272
*/
7373
#define COUCHBASE_CXX_CLIENT_HAS_COLLECTION_QUERY_INDEX_MANAGEMENT 1
74+
75+
#define COUCHBASE_CXX_CLIENT_TRANSACTIONS_EXT_PARALLEL_UNSTAGING

core/transactions/attempt_context_impl.hxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,11 @@ class attempt_context_impl
439439
return is_done_;
440440
}
441441

442+
[[nodiscard]] transaction_context& overall()
443+
{
444+
return overall_;
445+
}
446+
442447
[[nodiscard]] const std::string& transaction_id()
443448
{
444449
return overall_.transaction_id();

core/transactions/forward_compat.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ struct forward_compat_supported {
123123
uint32_t protocol_major = 2;
124124
uint32_t protocol_minor = 0;
125125
std::list<std::string> extensions{ "TI", "MO", "BM", "QU", "SD", "BF3787", "BF3705", "BF3838", "RC",
126-
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS" };
126+
"UA", "CO", "BF3791", "CM", "SI", "QC", "IX", "TS", "PU" };
127127
};
128128

129129
struct forward_compat_requirement {

core/transactions/internal/utils.hxx

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
#include <string>
3131
#include <thread>
3232

33+
#include <asio/steady_timer.hpp>
34+
#include <utility>
35+
3336
namespace couchbase::core::transactions
3437
{
3538
// returns the parsed server time from the result of a lookup_in_spec::get(subdoc::lookup_in_macro::vbucket).xattr() call
@@ -82,6 +85,9 @@ wrap_durable_request(T&& req, const couchbase::transactions::transactions_config
8285
return req;
8386
}
8487

88+
void
89+
validate_operation_result(result& res, bool ignore_subdoc_errors = true);
90+
8591
result
8692
wrap_operation_future(std::future<result>& fut, bool ignore_subdoc_errors = true);
8793

@@ -300,6 +306,81 @@ struct constant_delay {
300306
}
301307
};
302308

309+
struct async_exp_delay {
310+
std::shared_ptr<asio::steady_timer> timer;
311+
std::chrono::microseconds initial_delay;
312+
std::chrono::microseconds max_delay;
313+
std::chrono::microseconds timeout;
314+
mutable std::uint32_t retries;
315+
mutable std::optional<std::chrono::time_point<std::chrono::steady_clock>> end_time;
316+
317+
template<typename R1, typename P1, typename R2, typename P2, typename R3, typename P3>
318+
async_exp_delay(std::shared_ptr<asio::steady_timer> timer,
319+
std::chrono::duration<R1, P1> initial,
320+
std::chrono::duration<R2, P2> max,
321+
std::chrono::duration<R3, P3> limit)
322+
: timer(std::move(timer))
323+
, initial_delay(std::chrono::duration_cast<std::chrono::microseconds>(initial))
324+
, max_delay(std::chrono::duration_cast<std::chrono::microseconds>(max))
325+
, timeout(std::chrono::duration_cast<std::chrono::microseconds>(limit))
326+
, retries(0)
327+
, end_time()
328+
{
329+
}
330+
331+
void operator()(utils::movable_function<void(std::error_code)> callback) const
332+
{
333+
auto now = std::chrono::steady_clock::now();
334+
if (!end_time) {
335+
end_time = std::chrono::steady_clock::now() + timeout;
336+
return;
337+
}
338+
if (now > *end_time) {
339+
throw retry_operation_timeout("timed out");
340+
}
341+
auto delay = std::chrono::duration_cast<std::chrono::microseconds>(initial_delay * (jitter() * pow(2, retries++)));
342+
if (delay > max_delay) {
343+
delay = max_delay;
344+
}
345+
if (now + delay > *end_time) {
346+
timer->expires_after(*end_time - now);
347+
} else {
348+
timer->expires_after(delay);
349+
}
350+
timer->async_wait(callback);
351+
}
352+
};
353+
354+
struct async_constant_delay {
355+
std::shared_ptr<asio::steady_timer> timer;
356+
std::chrono::microseconds delay;
357+
std::size_t max_retries;
358+
std::size_t retries;
359+
360+
template<typename R, typename P>
361+
async_constant_delay(std::shared_ptr<asio::steady_timer> timer, std::chrono::duration<R, P> d, std::size_t max)
362+
: timer(std::move(timer))
363+
, delay(std::chrono::duration_cast<std::chrono::microseconds>(d))
364+
, max_retries(max)
365+
, retries(0)
366+
{
367+
}
368+
369+
explicit async_constant_delay(std::shared_ptr<asio::steady_timer> timer)
370+
: async_constant_delay(std::move(timer), DEFAULT_RETRY_OP_DELAY, DEFAULT_RETRY_OP_MAX_RETRIES)
371+
{
372+
}
373+
374+
void operator()(utils::movable_function<void(std::error_code)> callback)
375+
{
376+
if (retries++ >= max_retries) {
377+
throw retry_operation_retries_exhausted("retries exhausted");
378+
}
379+
timer->expires_after(delay);
380+
timer->async_wait(callback);
381+
}
382+
};
383+
303384
std::list<std::string>
304385
get_and_open_buckets(std::shared_ptr<core::cluster> c);
305386

0 commit comments

Comments
 (0)