@@ -57,6 +57,7 @@ unstaging_state::notify_unstage_error()
5757{
5858 std::lock_guard lock (mutex_);
5959 abort_ = true ;
60+ in_flight_count_--;
6061 cv_.notify_all ();
6162}
6263
@@ -205,36 +206,48 @@ staged_mutation_queue::commit(attempt_context_impl* ctx)
205206 }
206207
207208 auto barrier = std::make_shared<std::promise<void >>();
208- futures.push_back (barrier->get_future ());
209-
210- auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref ()->io_context ());
211- async_constant_delay delay (timer);
209+ auto future = barrier->get_future ();
212210
213- switch (item.type ()) {
214- case staged_mutation_type::REMOVE:
215- remove_doc (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
216- if (exc) {
217- state.notify_unstage_error ();
218- barrier->set_exception (exc);
219- } else {
220- state.notify_unstage_complete ();
221- barrier->set_value ();
222- }
223- });
224- break ;
225- case staged_mutation_type::INSERT:
226- case staged_mutation_type::REPLACE:
227- commit_doc (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
228- if (exc) {
229- state.notify_unstage_error ();
230- barrier->set_exception (exc);
231- } else {
232- state.notify_unstage_complete ();
233- barrier->set_value ();
234- }
235- });
236- break ;
211+ try {
212+ auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref ()->io_context ());
213+ async_constant_delay delay (timer);
214+
215+ switch (item.type ()) {
216+ case staged_mutation_type::REMOVE:
217+ remove_doc (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
218+ if (exc) {
219+ state.notify_unstage_error ();
220+ barrier->set_exception (exc);
221+ } else {
222+ state.notify_unstage_complete ();
223+ barrier->set_value ();
224+ }
225+ });
226+ break ;
227+ case staged_mutation_type::INSERT:
228+ case staged_mutation_type::REPLACE:
229+ commit_doc (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
230+ if (exc) {
231+ state.notify_unstage_error ();
232+ barrier->set_exception (exc);
233+ } else {
234+ state.notify_unstage_complete ();
235+ barrier->set_value ();
236+ }
237+ });
238+ break ;
239+ }
240+ } catch (...) {
241+ // This should not happen, but catching it to ensure that we wait for in-flight operations
242+ CB_ATTEMPT_CTX_LOG_ERROR (ctx,
243+ " caught exception while trying to initiate commit for {}. Aborting rest of commit and waiting for "
244+ " in-flight rollback operations to finish" ,
245+ item.doc ().id ());
246+ aborted = true ;
247+ break ;
237248 }
249+
250+ futures.push_back (std::move (future));
238251 }
239252
240253 std::exception_ptr exc{};
@@ -276,36 +289,48 @@ staged_mutation_queue::rollback(attempt_context_impl* ctx)
276289 }
277290
278291 auto barrier = std::make_shared<std::promise<void >>();
279- futures. push_back ( barrier->get_future () );
292+ auto future = barrier->get_future ();
280293
281- auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref ()->io_context ());
282- async_exp_delay delay (timer);
283-
284- switch (item.type ()) {
285- case staged_mutation_type::INSERT:
286- rollback_insert (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
287- if (exc) {
288- state.notify_unstage_error ();
289- barrier->set_exception (exc);
290- } else {
291- state.notify_unstage_complete ();
292- barrier->set_value ();
293- }
294- });
295- break ;
296- case staged_mutation_type::REMOVE:
297- case staged_mutation_type::REPLACE:
298- rollback_remove_or_replace (ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
299- if (exc) {
300- state.notify_unstage_error ();
301- barrier->set_exception (exc);
302- } else {
303- state.notify_unstage_complete ();
304- barrier->set_value ();
305- }
306- });
307- break ;
294+ try {
295+ auto timer = std::make_shared<asio::steady_timer>(ctx->cluster_ref ()->io_context ());
296+ async_exp_delay delay (timer);
297+
298+ switch (item.type ()) {
299+ case staged_mutation_type::INSERT:
300+ rollback_insert (ctx, item, delay, [&state, barrier](const std::exception_ptr& exc) {
301+ if (exc) {
302+ state.notify_unstage_error ();
303+ barrier->set_exception (exc);
304+ } else {
305+ state.notify_unstage_complete ();
306+ barrier->set_value ();
307+ }
308+ });
309+ break ;
310+ case staged_mutation_type::REMOVE:
311+ case staged_mutation_type::REPLACE:
312+ rollback_remove_or_replace (ctx, item, delay, [&state, barrier](std::exception_ptr exc) {
313+ if (exc) {
314+ state.notify_unstage_error ();
315+ barrier->set_exception (exc);
316+ } else {
317+ state.notify_unstage_complete ();
318+ barrier->set_value ();
319+ }
320+ });
321+ break ;
322+ }
323+ } catch (...) {
324+ // This should not happen, but catching it to ensure that we wait for in-flight operations
325+ CB_ATTEMPT_CTX_LOG_ERROR (ctx,
326+ " caught exception while trying to initiate rollback for {}. Aborting rollback and waiting for "
327+ " in-flight rollback operations to finish" ,
328+ item.doc ().id ());
329+ aborted = true ;
330+ break ;
308331 }
332+
333+ futures.push_back (std::move (future));
309334 }
310335
311336 std::exception_ptr exc{};
0 commit comments