@@ -23,20 +23,44 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
2323 }
2424 };
2525
26+ class TTableKey {
27+ public:
28+ ui64 PathId;
29+ ui64 Step;
30+ ui64 TxId;
31+ ui64 Version;
32+
33+ public:
34+ TTableKey (ui64 pathId, ui64 step, ui64 txId, ui64 version)
35+ : PathId(pathId)
36+ , Step(step)
37+ , TxId(txId)
38+ , Version(version)
39+ {
40+ }
41+ };
42+
2643 std::vector<TKey> VersionsToRemove;
44+ std::vector<TTableKey> TableVersionsToRemove;
2745
2846public:
29- TNormalizerResult (std::vector<TKey>&& versions)
47+ TNormalizerResult (std::vector<TKey>&& versions, std::vector<TTableKey>&& tableVersions )
3048 : VersionsToRemove(versions)
49+ , TableVersionsToRemove(tableVersions)
3150 {
3251 }
3352
3453 bool ApplyOnExecute (NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */ ) const override {
3554 using namespace NColumnShard ;
3655 NIceDb::TNiceDb db (txc.DB );
3756 for (auto & key: VersionsToRemove) {
57+ LOG_S_DEBUG (" Removing schema version in TSchemaVersionNormalizer " << key.Version );
3858 db.Table <Schema::SchemaPresetVersionInfo>().Key (key.Id , key.Step , key.TxId ).Delete ();
3959 }
60+ for (auto & key: TableVersionsToRemove) {
61+ LOG_S_DEBUG (" Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId );
62+ db.Table <Schema::TableVersionInfo>().Key (key.PathId , key.Step , key.TxId ).Delete ();
63+ }
4064 return true ;
4165 }
4266
@@ -78,6 +102,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
78102 }
79103
80104 std::vector<TKey> unusedSchemaIds;
105+ std::vector<TTableKey> unusedTableSchemaIds;
81106 std::optional<ui64> maxVersion;
82107 std::vector<INormalizerChanges::TPtr> changes;
83108
@@ -107,18 +132,57 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
107132 }
108133 }
109134
135+ {
136+ auto rowset = db.Table <Schema::TableVersionInfo>().Select ();
137+ if (!rowset.IsReady ()) {
138+ return std::nullopt ;
139+ }
140+
141+ while (!rowset.EndOfSet ()) {
142+ const ui64 pathId = rowset.GetValue <Schema::TableVersionInfo::PathId>();
143+
144+ NKikimrTxColumnShard::TTableVersionInfo versionInfo;
145+ Y_ABORT_UNLESS (versionInfo.ParseFromString (rowset.GetValue <Schema::TableVersionInfo::InfoProto>()));
146+ if (versionInfo.HasSchema ()) {
147+ ui64 version = versionInfo.GetSchema ().GetVersion ();
148+ if (!usedSchemaVersions.contains (version)) {
149+ unusedTableSchemaIds.emplace_back (pathId, rowset.GetValue <Schema::TableVersionInfo::SinceStep>(), rowset.GetValue <Schema::TableVersionInfo::SinceTxId>(), version);
150+ }
151+ }
152+
153+ if (!rowset.Next ()) {
154+ return std::nullopt ;
155+ }
156+ }
157+ }
158+
159+ std::vector<TTableKey> tablePortion;
110160 std::vector<TKey> portion;
161+ tablePortion.reserve (10000 );
111162 portion.reserve (10000 );
163+ auto addPortion = [&]() {
164+ if (portion.size () + tablePortion.size () >= 10000 ) {
165+ changes.emplace_back (std::make_shared<TNormalizerResult>(std::move (portion), std::move (tablePortion)));
166+ portion = std::vector<TKey>();
167+ tablePortion = std::vector<TTableKey>();
168+ }
169+ };
112170 for (const auto & id: unusedSchemaIds) {
113171 if (!maxVersion.has_value () || (id.Version != *maxVersion)) {
114172 portion.push_back (id);
115- if (portion.size () >= 10000 ) {
116- changes.emplace_back (std::make_shared<TNormalizerResult>(std::move (portion)));
117- }
173+ addPortion ();
174+ }
175+ }
176+
177+ for (const auto & id: unusedTableSchemaIds) {
178+ if (!maxVersion.has_value () || (id.Version != *maxVersion)) {
179+ tablePortion.push_back (id);
180+ addPortion ();
118181 }
119182 }
120- if (portion.size () > 0 ) {
121- changes.emplace_back (std::make_shared<TNormalizerResult>(std::move (portion)));
183+
184+ if (portion.size () + tablePortion.size () > 0 ) {
185+ changes.emplace_back (std::make_shared<TNormalizerResult>(std::move (portion), std::move (tablePortion)));
122186 }
123187 return changes;
124188 }
0 commit comments