11#include " kqp_write_actor.h"
22
33#include " kqp_write_table.h"
4+ #include " kqp_write_actor_settings.h"
45
56#include < util/generic/singleton.h>
67#include < ydb/core/actorlib_impl/long_timer.h>
2324
2425
2526namespace {
26- constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
27- constexpr i64 kMemoryLimitPerMessage = 64_MB;
28- constexpr i64 kMaxBatchesPerMessage = 8 ;
29-
30- struct TWriteActorBackoffSettings {
31- TDuration StartRetryDelay = TDuration::MilliSeconds(250 );
32- TDuration MaxRetryDelay = TDuration::Seconds(5 );
33- double UnsertaintyRatio = 0.5 ;
34- double Multiplier = 2.0 ;
35-
36- ui64 MaxWriteAttempts = 32 ;
37- ui64 MaxResolveAttempts = 5 ;
38- };
39-
40- const TWriteActorBackoffSettings* BackoffSettings () {
41- return Singleton<TWriteActorBackoffSettings>();
42- }
43-
44- TDuration CalculateNextAttemptDelay (ui64 attempt) {
45- auto delay = BackoffSettings ()->StartRetryDelay ;
27+ TDuration CalculateNextAttemptDelay (const NKikimr::NKqp::TWriteActorSettings& settings, ui64 attempt) {
28+ auto delay = settings.StartRetryDelay ;
4629 for (ui64 index = 0 ; index < attempt; ++index) {
47- delay *= BackoffSettings ()-> Multiplier ;
30+ delay *= settings. Multiplier ;
4831 }
4932
50- delay *= 1 + BackoffSettings ()-> UnsertaintyRatio * (1 - 2 * RandomNumber<double >());
51- delay = Min (delay, BackoffSettings ()-> MaxRetryDelay );
33+ delay *= 1 + settings. UnsertaintyRatio * (1 - 2 * RandomNumber<double >());
34+ delay = Min (delay, settings. MaxRetryDelay );
5235
5336 return delay;
5437 }
@@ -133,6 +116,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
133116 TIntrusivePtr<TKqpCounters> counters)
134117 : LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ". ")
135118 , Settings(std::move(settings))
119+ , MessageSettings(GetWriteActorSettings())
136120 , OutputIndex(args.OutputIndex)
137121 , Callbacks(args.Callback)
138122 , Counters(counters)
@@ -149,6 +133,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
149133 Settings.GetImmediateTx())
150134 , InconsistentTx(
151135 Settings.GetInconsistentTx())
136+ , MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes)
152137 {
153138 YQL_ENSURE (std::holds_alternative<ui64>(TxId));
154139 YQL_ENSURE (!ImmediateTx);
@@ -248,9 +233,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
248233 }
249234
250235 void PlanResolveTable () {
251- CA_LOG_D (" Plan resolve with delay " << CalculateNextAttemptDelay (ResolveAttempts));
236+ CA_LOG_D (" Plan resolve with delay " << CalculateNextAttemptDelay (MessageSettings, ResolveAttempts));
252237 TlsActivationContext->Schedule (
253- CalculateNextAttemptDelay (ResolveAttempts),
238+ CalculateNextAttemptDelay (MessageSettings, ResolveAttempts),
254239 new IEventHandle (SelfId (), SelfId (), new TEvPrivate::TEvResolveRequestPlanned{}, 0 , 0 ));
255240 }
256241
@@ -262,7 +247,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
262247 SchemeEntry.reset ();
263248 SchemeRequest.reset ();
264249
265- if (ResolveAttempts++ >= BackoffSettings ()-> MaxResolveAttempts ) {
250+ if (ResolveAttempts++ >= MessageSettings. MaxResolveAttempts ) {
266251 CA_LOG_E (TStringBuilder ()
267252 << " Too many table resolve attempts for table " << TableId << " ." );
268253 RuntimeError (
@@ -596,7 +581,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
596581 void SendDataToShard (const ui64 shardId) {
597582 const auto metadata = ShardedWriteController->GetMessageMetadata (shardId);
598583 YQL_ENSURE (metadata);
599- if (metadata->SendAttempts >= BackoffSettings ()-> MaxWriteAttempts ) {
584+ if (metadata->SendAttempts >= MessageSettings. MaxWriteAttempts ) {
600585 CA_LOG_E (" ShardId=" << shardId
601586 << " for table '" << Settings.GetTable ().GetPath ()
602587 << " ': retry limit exceeded."
@@ -666,7 +651,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
666651
667652 if (InconsistentTx) {
668653 TlsActivationContext->Schedule (
669- CalculateNextAttemptDelay (metadata->SendAttempts ),
654+ CalculateNextAttemptDelay (MessageSettings, metadata->SendAttempts ),
670655 new IEventHandle (
671656 SelfId (),
672657 SelfId (),
@@ -760,11 +745,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
760745 try {
761746 ShardedWriteController = CreateShardedWriteController (
762747 TShardedWriteControllerSettings {
763- .MemoryLimitTotal = kInFlightMemoryLimitPerActor ,
764- .MemoryLimitPerMessage = kMemoryLimitPerMessage ,
748+ .MemoryLimitTotal = MessageSettings. InFlightMemoryLimitPerActorBytes ,
749+ .MemoryLimitPerMessage = MessageSettings. MemoryLimitPerMessageBytes ,
765750 .MaxBatchesPerMessage = (SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable
766751 ? 1
767- : kMaxBatchesPerMessage ),
752+ : MessageSettings. MaxBatchesPerMessage ),
768753 },
769754 std::move (columnsMetadata),
770755 TypeEnv,
@@ -800,6 +785,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
800785
801786 TString LogPrefix;
802787 const NKikimrKqp::TKqpTableSinkSettings Settings;
788+ TWriteActorSettings MessageSettings;
803789 const ui64 OutputIndex;
804790 NYql::NDq::TDqAsyncStats EgressStats;
805791 NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr ;
@@ -820,7 +806,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
820806 THashMap<ui64, TLockInfo> LocksInfo;
821807 bool Finished = false ;
822808
823- const i64 MemoryLimit = kInFlightMemoryLimitPerActor ;
809+ const i64 MemoryLimit;
824810
825811 IShardedWriteControllerPtr ShardedWriteController = nullptr ;
826812};
0 commit comments