Skip to content

Commit 4aa2ea9

Browse files
authored
GH-3223: Implement Variant parquet writer (#3221)
1 parent 46595e2 commit 4aa2ea9

File tree

7 files changed

+1168
-33
lines changed

7 files changed

+1168
-33
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@
4242
import org.apache.parquet.io.api.Binary;
4343
import org.apache.parquet.io.api.RecordConsumer;
4444
import org.apache.parquet.schema.GroupType;
45+
import org.apache.parquet.schema.LogicalTypeAnnotation;
4546
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
4647
import org.apache.parquet.schema.MessageType;
4748
import org.apache.parquet.schema.Type;
49+
import org.apache.parquet.variant.Variant;
50+
import org.apache.parquet.variant.VariantValueWriter;
4851
import org.slf4j.Logger;
4952
import org.slf4j.LoggerFactory;
5053

@@ -181,9 +184,79 @@ public void write(T record) {
181184
}
182185

183186
private void writeRecord(GroupType schema, Schema avroSchema, Object record) {
184-
recordConsumer.startGroup();
185-
writeRecordFields(schema, avroSchema, record);
186-
recordConsumer.endGroup();
187+
if (schema.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
188+
writeVariantFields(schema, avroSchema, record);
189+
} else {
190+
recordConsumer.startGroup();
191+
writeRecordFields(schema, avroSchema, record);
192+
recordConsumer.endGroup();
193+
}
194+
}
195+
196+
// Return true if schema and avroSchema have the same field names, in the same order.
197+
private static boolean schemaMatches(GroupType schema, Schema avroSchema) {
198+
List<Schema.Field> avroFields = avroSchema.getFields();
199+
if (schema.getFieldCount() != avroFields.size()) {
200+
return false;
201+
}
202+
203+
for (int i = 0; i < avroFields.size(); i += 1) {
204+
if (!avroFields.get(i).name().equals(schema.getFieldName(i))) {
205+
return false;
206+
}
207+
}
208+
209+
return true;
210+
}
211+
212+
private void writeVariantFields(GroupType schema, Schema avroSchema, Object record) {
213+
List<Type> fields = schema.getFields();
214+
List<Schema.Field> avroFields = avroSchema.getFields();
215+
216+
if (schemaMatches(schema, avroSchema)) {
217+
// If the Avro schema matches the Parquet schema, the shredding matches and writeRecordFields can be used.
218+
// writeRecordFields will validate that the field types match.
219+
recordConsumer.startGroup();
220+
writeRecordFields(schema, avroSchema, record);
221+
recordConsumer.endGroup();
222+
return;
223+
}
224+
225+
boolean binarySchema = true;
226+
ByteBuffer metadata = null;
227+
ByteBuffer value = null;
228+
// Extract the value and metadata binary.
229+
for (int index = 0; index < avroFields.size(); index++) {
230+
Schema.Field avroField = avroFields.get(index);
231+
Schema fieldSchema = AvroSchemaConverter.getNonNull(avroField.schema());
232+
if (!fieldSchema.getType().equals(Schema.Type.BYTES)) {
233+
binarySchema = false;
234+
break;
235+
}
236+
Type fieldType = fields.get(index);
237+
if (fieldType.getName().equals("value")) {
238+
Object valueObj = model.getField(record, avroField.name(), index);
239+
Preconditions.checkArgument(
240+
valueObj instanceof ByteBuffer,
241+
"Expected ByteBuffer for value, but got " + valueObj.getClass());
242+
value = (ByteBuffer) valueObj;
243+
} else if (fieldType.getName().equals("metadata")) {
244+
Object metadataObj = model.getField(record, avroField.name(), index);
245+
Preconditions.checkArgument(
246+
metadataObj instanceof ByteBuffer,
247+
"Expected metadata to be a ByteBuffer, but got " + metadataObj.getClass());
248+
metadata = (ByteBuffer) metadataObj;
249+
} else {
250+
binarySchema = false;
251+
break;
252+
}
253+
}
254+
255+
if (binarySchema) {
256+
VariantValueWriter.write(recordConsumer, schema, new Variant(value, metadata));
257+
} else {
258+
throw new RuntimeException("Invalid Avro schema for Variant logical type: " + schema.getName());
259+
}
187260
}
188261

189262
private void writeRecordFields(GroupType schema, Schema avroSchema, Object record) {

parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.parquet.hadoop.ParquetReader;
3434
import org.apache.parquet.hadoop.ParquetWriter;
3535
import org.apache.parquet.hadoop.util.HadoopInputFile;
36+
import org.apache.parquet.variant.Variant;
3637
import org.junit.Assert;
3738
import org.junit.rules.TemporaryFolder;
3839

@@ -129,4 +130,36 @@ public static Configuration conf(String name, boolean value) {
129130
conf.setBoolean(name, value);
130131
return conf;
131132
}
133+
134+
/**
135+
* Assert that to Variant values are logically equivalent.
136+
* E.g. values in an object may be ordered differently in the binary.
137+
*/
138+
static void assertEquivalent(Variant expected, Variant actual) {
139+
Assert.assertEquals(expected.getType(), actual.getType());
140+
switch (expected.getType()) {
141+
case STRING:
142+
// Short strings may use the compact or extended representation.
143+
Assert.assertEquals(expected.getString(), actual.getString());
144+
break;
145+
case ARRAY:
146+
Assert.assertEquals(expected.numArrayElements(), actual.numArrayElements());
147+
for (int i = 0; i < expected.numArrayElements(); ++i) {
148+
assertEquivalent(expected.getElementAtIndex(i), actual.getElementAtIndex(i));
149+
}
150+
break;
151+
case OBJECT:
152+
Assert.assertEquals(expected.numObjectElements(), actual.numObjectElements());
153+
for (int i = 0; i < expected.numObjectElements(); ++i) {
154+
Variant.ObjectField expectedField = expected.getFieldAtIndex(i);
155+
Variant.ObjectField actualField = actual.getFieldAtIndex(i);
156+
Assert.assertEquals(expectedField.key, actualField.key);
157+
assertEquivalent(expectedField.value, actualField.value);
158+
}
159+
break;
160+
default:
161+
// All other types have a single representation, and must be bit-for-bit identical.
162+
Assert.assertEquals(expected.getValueBuffer(), actual.getValueBuffer());
163+
}
164+
}
132165
}

parquet-avro/src/test/java/org/apache/parquet/avro/TestReadVariant.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2111,36 +2111,8 @@ void assertThrows(Callable callable, Class<? extends Exception> exception, Strin
21112111
void assertEquivalent(ByteBuffer expectedMetadata, ByteBuffer expectedValue, GenericRecord actual) {
21122112
assertEquals(expectedMetadata, (ByteBuffer) actual.get("metadata"));
21132113
assertEquals(expectedMetadata, (ByteBuffer) actual.get("metadata"));
2114-
assertEquivalent(
2114+
AvroTestUtil.assertEquivalent(
21152115
new Variant(expectedValue, expectedMetadata),
21162116
new Variant(((ByteBuffer) actual.get("value")), expectedMetadata));
21172117
}
2118-
2119-
void assertEquivalent(Variant expected, Variant actual) {
2120-
assertEquals(expected.getType(), actual.getType());
2121-
switch (expected.getType()) {
2122-
case STRING:
2123-
// Short strings may use the compact or extended representation.
2124-
assertEquals(expected.getString(), actual.getString());
2125-
break;
2126-
case ARRAY:
2127-
assertEquals(expected.numArrayElements(), actual.numArrayElements());
2128-
for (int i = 0; i < expected.numArrayElements(); ++i) {
2129-
assertEquivalent(expected.getElementAtIndex(i), actual.getElementAtIndex(i));
2130-
}
2131-
break;
2132-
case OBJECT:
2133-
assertEquals(expected.numObjectElements(), actual.numObjectElements());
2134-
for (int i = 0; i < expected.numObjectElements(); ++i) {
2135-
Variant.ObjectField expectedField = expected.getFieldAtIndex(i);
2136-
Variant.ObjectField actualField = actual.getFieldAtIndex(i);
2137-
assertEquals(expectedField.key, actualField.key);
2138-
assertEquivalent(expectedField.value, actualField.value);
2139-
}
2140-
break;
2141-
default:
2142-
// All other types have a single representation, and must be bit-for-bit identical.
2143-
assertEquals(expected.getValueBuffer(), actual.getValueBuffer());
2144-
}
2145-
}
21462118
}

0 commit comments

Comments
 (0)