|
10 | 10 | #include <ydb/core/tx/columnshard/splitter/batch_slice.h> |
11 | 11 |
|
12 | 12 | #include <ydb/library/formats/arrow/simple_arrays_cache.h> |
| 13 | +#include <ydb/core/base/appdata_fwd.h> |
| 14 | +#include <ydb/core/protos/config.pb.h> |
13 | 15 |
|
14 | 16 | #include <util/string/join.h> |
15 | 17 |
|
@@ -99,36 +101,31 @@ TConclusion<NArrow::TContainerWithIndexes<arrow::RecordBatch>> ISnapshotSchema:: |
99 | 101 | if (targetIdx == -1) { |
100 | 102 | return TConclusionStatus::Success(); |
101 | 103 | } |
| 104 | + const auto hasNull = NArrow::HasNulls(incomingBatch->column(incomingIdx)); |
102 | 105 | const std::optional<i32> pkFieldIdx = GetIndexInfo().GetPKColumnIndexByIndexVerified(targetIdx); |
103 | | - if (!NArrow::HasNulls(incomingBatch->column(incomingIdx))) { |
104 | | - if (pkFieldIdx) { |
105 | | - AFL_VERIFY(*pkFieldIdx < (i32)pkColumns.size()); |
106 | | - AFL_VERIFY(!pkColumns[*pkFieldIdx]); |
107 | | - pkColumns[*pkFieldIdx] = incomingBatch->column(incomingIdx); |
108 | | - ++pkColumnsCount; |
109 | | - } |
110 | | - return TConclusionStatus::Success(); |
111 | | - } |
112 | | - if (pkFieldIdx) { |
| 106 | + if (pkFieldIdx && hasNull && !AppData()->ColumnShardConfig.GetAllowNullableColumnsInPK()) { |
113 | 107 | return TConclusionStatus::Fail("null data for pk column is impossible for '" + dstSchema.field(targetIdx)->name() + "'"); |
114 | 108 | } |
115 | | - switch (mType) { |
116 | | - case NEvWrite::EModificationType::Replace: |
117 | | - case NEvWrite::EModificationType::Insert: |
118 | | - case NEvWrite::EModificationType::Upsert: { |
119 | | - if (GetIndexInfo().IsNullableVerifiedByIndex(targetIdx)) { |
120 | | - return TConclusionStatus::Success(); |
121 | | - } |
122 | | - if (GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(targetIdx)) { |
123 | | - return TConclusionStatus::Success(); |
124 | | - } else { |
125 | | - return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema.field(targetIdx)->name() + "'"); |
126 | | - } |
| 109 | + if (hasNull) { |
| 110 | + switch (mType) { |
| 111 | + case NEvWrite::EModificationType::Replace: |
| 112 | + case NEvWrite::EModificationType::Insert: |
| 113 | + case NEvWrite::EModificationType::Upsert: { |
| 114 | + if (!GetIndexInfo().IsNullableVerifiedByIndex(targetIdx) && !GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(targetIdx)) |
| 115 | + return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema.field(targetIdx)->name() + "'"); |
| 116 | + } |
| 117 | + case NEvWrite::EModificationType::Delete: |
| 118 | + case NEvWrite::EModificationType::Update: |
| 119 | + break; |
127 | 120 | } |
128 | | - case NEvWrite::EModificationType::Delete: |
129 | | - case NEvWrite::EModificationType::Update: |
130 | | - return TConclusionStatus::Success(); |
131 | 121 | } |
| 122 | + if (pkFieldIdx) { |
| 123 | + AFL_VERIFY(*pkFieldIdx < (i32)pkColumns.size()); |
| 124 | + AFL_VERIFY(!pkColumns[*pkFieldIdx]); |
| 125 | + pkColumns[*pkFieldIdx] = incomingBatch->column(incomingIdx); |
| 126 | + ++pkColumnsCount; |
| 127 | + } |
| 128 | + return TConclusionStatus::Success(); |
132 | 129 | }; |
133 | 130 | const auto nameResolver = [&](const std::string& fieldName) -> i32 { |
134 | 131 | return GetIndexInfo().GetColumnIndexOptional(fieldName).value_or(-1); |
|
0 commit comments