Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ enum ESimpleCounters {
COUNTER_DELAYED_PROPOSE_QUEUE_SIZE = 22 [(CounterOpts) = {Name: "DelayedProposeQueueSize"}];
COUNTER_WAITING_TX_QUEUE_SIZE = 23 [(CounterOpts) = {Name: "WaitingTxQueueSize"}];
COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY = 24 [(CounterOpts) = {Name: "ChangeQueueReservedCapacity"}];
COUNTER_VOLATILE_TX_INFLIGHT = 25 [(CounterOpts) = {Name: "VolatileTxInFlight"}];
COUNTER_VOLATILE_TX_WAITING_COUNT = 26 [(CounterOpts) = {Name: "VolatileTxWaitingCount"}];
COUNTER_VOLATILE_TX_COMMITTED_COUNT = 27 [(CounterOpts) = {Name: "VolatileTxCommittedCount"}];
COUNTER_VOLATILE_TX_ABORTING_COUNT = 28 [(CounterOpts) = {Name: "VolatileTxAbortingCount"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -403,7 +407,37 @@ enum EPercentileCounters {

COUNTER_WRITE_EXEC_LATENCY = 21 [(CounterOpts) = {
Name: "WriteExecLatency"
}];
}];

COUNTER_VOLATILE_TX_WAIT_LATENCY_MS = 22 [(CounterOpts) = {
Name: "VolatileTxWaitLatencyMs"
Ranges: { Value: 0 Name: "0"},
Ranges: { Value: 1 Name: "1"},
Ranges: { Value: 2 Name: "2"},
Ranges: { Value: 5 Name: "5"},
Ranges: { Value: 10 Name: "10"},
Ranges: { Value: 25 Name: "25"},
Ranges: { Value: 50 Name: "50"},
Ranges: { Value: 125 Name: "125"},
Ranges: { Value: 250 Name: "250"},
Ranges: { Value: 500 Name: "500"},
Ranges: { Value: 1000 Name: "1000"},
}];

COUNTER_VOLATILE_TX_TOTAL_LATENCY_MS = 23 [(CounterOpts) = {
Name: "VolatileTxTotalLatencyMs"
Ranges: { Value: 0 Name: "0"},
Ranges: { Value: 1 Name: "1"},
Ranges: { Value: 2 Name: "2"},
Ranges: { Value: 5 Name: "5"},
Ranges: { Value: 10 Name: "10"},
Ranges: { Value: 25 Name: "25"},
Ranges: { Value: 50 Name: "50"},
Ranges: { Value: 125 Name: "125"},
Ranges: { Value: 250 Name: "250"},
Ranges: { Value: 500 Name: "500"},
Ranges: { Value: 1000 Name: "1000"},
}];
}

enum ETxTypes {
Expand Down
76 changes: 73 additions & 3 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ namespace NKikimr::NDataShard {
std::vector<TVolatileTxInfo*> byCommitOrder;
byCommitOrder.reserve(VolatileTxs.size());

auto postProcessTxInfo = [this, &byCommitOrder](TVolatileTxInfo* info) {
auto postProcessTxInfo = [&](TVolatileTxInfo* info) {
switch (info->State) {
case EVolatileTxState::Waiting:
case EVolatileTxState::Committed: {
Expand Down Expand Up @@ -399,6 +399,28 @@ namespace NKikimr::NDataShard {
VolatileTxByCommitOrder.PushBack(info);
}

ui64 numWaiting = 0;
ui64 numCommitted = 0;
ui64 numAborting = 0;
for (auto& pr : VolatileTxs) {
switch (pr.second->State) {
case EVolatileTxState::Waiting:
++numWaiting;
break;
case EVolatileTxState::Committed:
++numCommitted;
break;
case EVolatileTxState::Aborting:
++numAborting;
break;
}
}

Self->SetCounter(COUNTER_VOLATILE_TX_INFLIGHT, VolatileTxs.size());
Self->SetCounter(COUNTER_VOLATILE_TX_WAITING_COUNT, numWaiting);
Self->SetCounter(COUNTER_VOLATILE_TX_COMMITTED_COUNT, numCommitted);
Self->SetCounter(COUNTER_VOLATILE_TX_ABORTING_COUNT, numAborting);

return true;
}

Expand Down Expand Up @@ -554,6 +576,8 @@ namespace NKikimr::NDataShard {
db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update();
}

UpdateCountersAdd(info);

txc.DB.OnRollback([this, txId]() {
RollbackAddVolatileTx(txId);
});
Expand Down Expand Up @@ -593,7 +617,10 @@ namespace NKikimr::NDataShard {

// FIXME: do we need to handle WaitingSnapshotEvents somehow?

// Note: not counting latency (this is a rollback)

// This will also unlink from linked lists
UpdateCountersRemove(info);
VolatileTxs.erase(txId);
}

Expand Down Expand Up @@ -632,6 +659,10 @@ namespace NKikimr::NDataShard {
VolatileTxByCommitTxId.erase(commitTxId);
}
VolatileTxByVersion.erase(info);

Self->IncCounter(COUNTER_VOLATILE_TX_TOTAL_LATENCY_MS, info->LatencyTimer.Passed() * 1000);

UpdateCountersRemove(info);
VolatileTxs.erase(txId);

if (prevUncertain < GetMinUncertainVersion()) {
Expand Down Expand Up @@ -728,7 +759,7 @@ namespace NKikimr::NDataShard {
ui64 txId = info->TxId;

// Move tx to aborting, but don't persist yet, we need a separate transaction for that
info->State = EVolatileTxState::Aborting;
ChangeState(info, EVolatileTxState::Aborting);

// Aborted transactions don't have dependencies
for (ui64 dependencyTxId : info->Dependencies) {
Expand Down Expand Up @@ -842,7 +873,7 @@ namespace NKikimr::NDataShard {
// Move tx to committed.
// Note that we don't need to wait until the new state is committed (it's repeatable),
// but we need to wait until the initial effects are committed and persisted.
info->State = EVolatileTxState::Committed;
ChangeState(info, EVolatileTxState::Committed);
db.Table<Schema::TxVolatileDetails>().Key(txId).Update(
NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State));

Expand Down Expand Up @@ -1030,4 +1061,43 @@ namespace NKikimr::NDataShard {
return false;
}

void TVolatileTxManager::UpdateCountersAdd(TVolatileTxInfo* info) {
Self->IncCounter(COUNTER_VOLATILE_TX_INFLIGHT);
switch (info->State) {
case EVolatileTxState::Waiting:
Self->IncCounter(COUNTER_VOLATILE_TX_WAITING_COUNT);
break;
case EVolatileTxState::Committed:
Self->IncCounter(COUNTER_VOLATILE_TX_COMMITTED_COUNT);
break;
case EVolatileTxState::Aborting:
Self->IncCounter(COUNTER_VOLATILE_TX_ABORTING_COUNT);
break;
}
}

void TVolatileTxManager::UpdateCountersRemove(TVolatileTxInfo* info) {
Self->DecCounter(COUNTER_VOLATILE_TX_INFLIGHT);
switch (info->State) {
case EVolatileTxState::Waiting:
Self->DecCounter(COUNTER_VOLATILE_TX_WAITING_COUNT);
break;
case EVolatileTxState::Committed:
Self->DecCounter(COUNTER_VOLATILE_TX_COMMITTED_COUNT);
break;
case EVolatileTxState::Aborting:
Self->DecCounter(COUNTER_VOLATILE_TX_ABORTING_COUNT);
break;
}
}

void TVolatileTxManager::ChangeState(TVolatileTxInfo* info, EVolatileTxState state) {
if (info->State == EVolatileTxState::Waiting) {
Self->IncCounter(COUNTER_VOLATILE_TX_WAIT_LATENCY_MS, info->LatencyTimer.Passed() * 1000);
}
UpdateCountersRemove(info);
info->State = state;
UpdateCountersAdd(info);
}

} // namespace NKikimr::NDataShard
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/generic/hash.h>
#include <util/generic/intrlist.h>
#include <util/system/hp_timer.h>

namespace NKikimr::NTabletFlatExecutor {

Expand Down Expand Up @@ -75,6 +76,9 @@ namespace NKikimr::NDataShard {
// DECISION_ABORT on abort.
std::vector<ui64> ArbiterReadSets;

// Calculates Waiting and Total latency
THPTimer LatencyTimer;

template<class TTag>
bool IsInList() const {
using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>;
Expand Down Expand Up @@ -276,6 +280,10 @@ namespace NKikimr::NDataShard {
void RemoveFromCommitOrder(TVolatileTxInfo* info);
bool ReadyToDbCommit(TVolatileTxInfo* info) const;

void UpdateCountersAdd(TVolatileTxInfo* info);
void UpdateCountersRemove(TVolatileTxInfo* info);
void ChangeState(TVolatileTxInfo* info, EVolatileTxState state);

private:
TDataShard* const Self;
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
Expand Down