Skip to content

Commit b9fe0e1

Browse files
authored
[Kernel][Defaults] Handle legacy map types in Parquet files (#3097)
## Description Currently, Kernel's Parquet reader explicitly looks for the `key_value` repeated group under the Parquet map type, but the older versions of Parquet writers wrote any name for the repeated group. Instead of looking for the explicit `key_value` element, fetch the first element in the list. See [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps) for more details. ## How was this patch tested? The [test](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetThriftCompatibilitySuite.scala#L29) and sample file written by legacy writers are taken from Apache Spark™. Some columns (arrays with 2-level encoding, another legacy format) from the test file are currently not supported. I will follow up with a separate PR. It involves bit refactoring on the ArrayColumnReader.
1 parent b1b84d5 commit b9fe0e1

File tree

5 files changed

+89
-23
lines changed

5 files changed

+89
-23
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ private static Converter createElementConverter(
7777
ArrayType typeFromClient,
7878
GroupType typeFromFile) {
7979

80-
checkArgument(
81-
typeFromFile.getFieldCount() == 1, "Expected exactly one field in the array type");
80+
checkArgument(typeFromFile.getFieldCount() == 1,
81+
"Expected exactly one field in the array type, but got: " + typeFromFile);
8282
GroupType repeatedGroup = typeFromFile.getType(0).asGroupType();
8383

8484
// TODO: handle the legacy 2-level list physical format

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.delta.kernel.data.ColumnVector;
2525
import io.delta.kernel.types.MapType;
2626

27+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
28+
2729
import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector;
2830
import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter;
2931

@@ -57,10 +59,16 @@ public ColumnVector getDataColumnVector(int batchSize) {
5759
}
5860

5961
private static Converter[] createElementConverters(
60-
int initialBatchSize,
61-
MapType typeFromClient,
62-
GroupType typeFromFile) {
63-
final GroupType innerMapType = (GroupType) typeFromFile.getType("key_value");
62+
int initialBatchSize,
63+
MapType typeFromClient,
64+
GroupType typeFromFile) {
65+
// Repeated element can be any name. Latest Parquet versions use "key_value" as the name,
66+
// but legacy versions can use any arbitrary name for the repeated group.
67+
// See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for details
68+
checkArgument(typeFromFile.getFieldCount() == 1,
69+
"Expected exactly one repeated field in the map type, but got: " + typeFromFile);
70+
71+
GroupType innerMapType = typeFromFile.getType(0).asGroupType();
6472
Converter[] elemConverters = new Converter[2];
6573
elemConverters[0] = createConverter(
6674
initialBatchSize,

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,66 @@ class ParquetFileReaderSuite extends AnyFunSuite
135135
checkAnswer(actResult1, expResult1)
136136

137137
// File with multiple row-groups [0, 20000) where rowIndex = id
138-
val filePath = getTestResourceFilePath("parquet/")
138+
val filePath = getTestResourceFilePath("parquet/row_index_multiple_row_groups.parquet")
139139
val actResult2 = readParquetFilesUsingKernel(filePath, readSchema)
140140
val expResult2 = (0L until 20000L).map(i => TestRow(i, i))
141141

142142
checkAnswer(actResult2, expResult2)
143143
}
144+
145+
/////////////////////////////////////////////////////////////////////////////////////////////////
146+
// Test compatibility with Parquet legacy format files //
147+
/////////////////////////////////////////////////////////////////////////////////////////////////
148+
149+
// Test and the test file are copied from Spark's `ParquetThriftCompatibilitySuite`
150+
test("read parquet file generated by parquet-thrift") {
151+
val parquetFilePath = getTestResourceFilePath("parquet/parquet-thrift-compat.snappy.parquet")
152+
153+
val readSchema = new StructType()
154+
.add("boolColumn", BooleanType.BOOLEAN)
155+
.add("byteColumn", ByteType.BYTE)
156+
.add("shortColumn", ShortType.SHORT)
157+
.add("intColumn", IntegerType.INTEGER)
158+
.add("longColumn", LongType.LONG)
159+
.add("doubleColumn", DoubleType.DOUBLE)
160+
// Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always
161+
// treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume
162+
// Thrift `STRING`s are encoded using UTF-8.
163+
.add("binaryColumn", StringType.STRING)
164+
.add("stringColumn", StringType.STRING)
165+
.add("enumColumn", StringType.STRING)
166+
// maybe indicates nullable columns, above ones are non-nullable
167+
.add("maybeBoolColumn", BooleanType.BOOLEAN)
168+
.add("maybeByteColumn", ByteType.BYTE)
169+
.add("maybeShortColumn", ShortType.SHORT)
170+
.add("maybeIntColumn", IntegerType.INTEGER)
171+
.add("maybeLongColumn", LongType.LONG)
172+
.add("maybeDoubleColumn", DoubleType.DOUBLE)
173+
// Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always
174+
// treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume
175+
// Thrift `STRING`s are encoded using UTF-8.
176+
.add("maybeBinaryColumn", StringType.STRING)
177+
.add("maybeStringColumn", StringType.STRING)
178+
.add("maybeEnumColumn", StringType.STRING)
179+
// TODO: not working - separate PR to handle 2-level legacy lists
180+
// .add("stringsColumn", new ArrayType(StringType.STRING, true /* containsNull */))
181+
// .add("intSetColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */))
182+
.add("intToStringColumn",
183+
new MapType(IntegerType.INTEGER, StringType.STRING, true /* valueContainsNull */))
184+
// TODO: not working - separate PR to handle 2-level legacy lists
185+
// .add("complexColumn", new MapType(
186+
// IntegerType.INTEGER,
187+
// new ArrayType(
188+
// new StructType()
189+
// .add("nestedIntsColumn", new ArrayType(IntegerType.INTEGER, true /* containsNull */))
190+
// .add("nestedStringColumn", StringType.STRING)
191+
// .add("stringColumn", StringType.STRING),
192+
// true /* containsNull */),
193+
// true /* valueContainsNull */))
194+
195+
assert(parquetFileRowCount(parquetFilePath) === 10)
196+
checkAnswer(
197+
readParquetFilesUsingKernel(parquetFilePath, readSchema), /* actual */
198+
readParquetFilesUsingSpark(parquetFilePath, readSchema) /* expected */)
199+
}
144200
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -207,27 +207,24 @@ trait ParquetSuiteBase extends TestUtils {
207207
}
208208

209209
def readParquetUsingKernelAsColumnarBatches(
210-
actualFileDir: String,
210+
inputFileOrDir: String,
211211
readSchema: StructType,
212212
predicate: Optional[Predicate] = Optional.empty()): Seq[ColumnarBatch] = {
213-
val parquetFiles = Files.list(Paths.get(actualFileDir))
214-
.iterator().asScala
215-
.map(_.toString)
216-
.filter(path => path.endsWith(".parquet"))
217-
.map(path => FileStatus.of(path, 0L, 0L))
213+
val parquetFileList = parquetFiles(inputFileOrDir)
214+
.map(FileStatus.of(_, 0, 0))
218215

219216
val data = defaultEngine.getParquetHandler.readParquetFiles(
220-
toCloseableIterator(parquetFiles.asJava),
217+
toCloseableIterator(parquetFileList.asJava.iterator()),
221218
readSchema,
222219
predicate)
223220

224221
data.asScala.toSeq
225222
}
226223

227-
def parquetFileCount(path: String): Long = parquetFiles(path).size
224+
def parquetFileCount(fileOrDir: String): Long = parquetFiles(fileOrDir).size
228225

229-
def parquetFileRowCount(path: String): Long = {
230-
val files = parquetFiles(path)
226+
def parquetFileRowCount(fileOrDir: String): Long = {
227+
val files = parquetFiles(fileOrDir)
231228

232229
var rowCount = 0L
233230
files.foreach { file =>
@@ -238,12 +235,17 @@ trait ParquetSuiteBase extends TestUtils {
238235
rowCount
239236
}
240237

241-
def parquetFiles(path: String): Seq[String] = {
242-
Files.list(Paths.get(path))
243-
.iterator().asScala
244-
.map(_.toString)
245-
.filter(path => path.endsWith(".parquet"))
246-
.toSeq
238+
def parquetFiles(fileOrDir: String): Seq[String] = {
239+
val fileOrDirPath = Paths.get(fileOrDir)
240+
if (Files.isDirectory(fileOrDirPath)) {
241+
Files.list(fileOrDirPath)
242+
.iterator().asScala
243+
.map(_.toString)
244+
.filter(path => path.endsWith(".parquet"))
245+
.toSeq
246+
} else {
247+
Seq(fileOrDir)
248+
}
247249
}
248250

249251
def footer(path: String): ParquetMetadata = {

0 commit comments

Comments
 (0)