Skip to content

don't fully enforce matching schema #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented May 18, 2024

When we read from a checkpoint, there can be disagreement about the schema we think we've read, and what was actually in the parquet. This can cause issues when we try and interact with engine data via expressions. For example here, where we use the "correct" schema and do not mark dvs as nullable. Also, in our arrow_conversions we make assumptions about the names of the fields that mark map keys and values (see here), which also causes issues when the actual materialized names are different.

An example error trying to work with a checkpoint file:

Arrow(
    InvalidArgumentError(
        "Incorrect datatype for StructArray field \"deletionVector\", expected Struct([Field { name: \"storageType\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"pathOrInlineDv\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"offset\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"sizeInBytes\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"cardinality\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) got Struct([Field { name: \"storageType\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"pathOrInlineDv\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"offset\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"sizeInBytes\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"cardinality\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"maxRowIndex\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
    ),
)

This arises for two reasons:

  1. To avoid "parquet corruption", spark will write a nullable struct as having all fields nullable. Thus, for example, since DVs are nullable, all fields of the DV struct are marked nullable. This means the "nullability" of fields will not be the same as the specified read schema
  2. We can't guarantee exactly what names the "meta" fields for maps/lists will have in the raw schema

This PR does as much validation as possible, but doesn't check things we can't control. So for each named field in the output schema it ensures the types are the same, and does so recursively into structs, maps, and lists.

Then it simply uses the schema that the parquet/json reader had already associated with the data.

This "works". Major issues:

  1. The schema you ask for might not be exactly the schema you get. Although differences in nullability are the only "observable" difference from kernel perspective
  2. A bigger issue is that this complicates the writing of expression evaluators for engines, as we'll have to carefully document and explain potential schema mismatches.

After some discussion, we will go with this for the time being.

@scovich
Copy link
Collaborator

scovich commented May 20, 2024

FYI spark does document this behavior:

When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

@roeap
Copy link
Collaborator

roeap commented May 20, 2024

FYI spark does document this behavior:

When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

After reading, I am not completely sure, how specifically this is meant. When it says columns, does that imply all leaf columns, or just top level columns, or everything recursively? i.e. just applying to all leafs might yield incorrect data (e.g. map keys). So my guess would be just roots?

@scovich
Copy link
Collaborator

scovich commented May 20, 2024

FYI spark does document this behavior:

When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

After reading, I am not completely sure, how specifically this is meant. When it says columns, does that imply all leaf columns, or just top level columns, or everything recursively? i.e. just applying to all leafs might yield incorrect data (e.g. map keys). So my guess would be just roots?

My understanding is: Any time we have a struct containing non-null fields which is itself a nullable field (whether in a parent struct or top-level schema), then parquet cannot express that situation. Writers have to compensate by forcing all non-null-with-nullable-ancestor fields to be nullable. It is not necessary to make the entire schema nullable, but I'm guessing many writers do so because it's simpler?

Comment on lines 250 to 251
// use a `map` here so rustc doesn't have to infer the error type
output_data_type.map(|output_type| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is avoiding the double-? issue we hit before?

(but if ensure_data_types is really just a validator rather than a mapper... a lot of this code can probably go away?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to fix an error like:

error[E0282]: type annotations needed
   --> kernel/src/engine/arrow_expression.rs:256:21
    |
256 |                     Ok(ArrowField::new(input_field.name(), array.data_type().clone(), array.is_nullable()))
    |                     ^^ cannot infer type of the type parameter `E` declared on the enum `Result`
    |
help: consider specifying the generic arguments
    |
256 |                     Ok::<arrow_schema::Field, E>(ArrowField::new(input_field.name(), array.data_type().clone(), array.is_nullable()))
    |                       ++++++++++++++++++++++++++

I've fixed it slightly differently to avoid having to use the turbofish.

@nicklan nicklan force-pushed the align-arrow-and-kernel-schema-in-expressions branch from 2c66a85 to 98c2803 Compare May 20, 2024 19:54
@nicklan nicklan requested a review from samansmink May 20, 2024 19:55
Comment on lines 192 to 206
if kernel_fields.fields.len() == arrow_fields.len() {
for (kernel_field, arrow_field) in kernel_fields.fields().zip(arrow_fields.iter()) {
ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?;
}
Ok(())
} else {
Err(make_arrow_error(format!(
"Struct types have different numbers of fields. Expected {}, got {}",
kernel_fields.fields.len(),
arrow_fields.len()
)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seems like a good place for require! followed by the for-loop?

})
ensure_data_types(input_field.data_type(), array.data_type())?;
// need to help type inference a bit so it knows what the error type is
let res: DeltaResult<ArrowField> = Ok(ArrowField::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suspect this would also work:

.map(|(array, input_field)| -> DeltaResult<_> {
    ensure_data_types(...)?;
    Ok(ArrowFIeld::new(...));
}

@nicklan nicklan force-pushed the align-arrow-and-kernel-schema-in-expressions branch from 98c2803 to 26749e0 Compare May 24, 2024 19:52
@nicklan nicklan marked this pull request as ready for review May 24, 2024 19:56
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@nicklan nicklan merged commit 3641f77 into delta-io:main May 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

3 participants