Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf {
)?
.aggregate(
vec![
// Trim the final path element
regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None)
.alias("existing_target"),
// We want to omit the file name along with any "special" partitions
// from the path before comparing it to the target partition. Special
// partitions must be leaf most nodes and are designated by a leading
// underscore. These are useful for adding additional information to a
// filename without affecting partitioning or staleness checks.
regexp_replace(
col("file_path"),
lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"),
lit("/"),
None,
)
.alias("existing_target"),
],
vec![max(col("last_modified")).alias("target_last_modified")],
)?
Expand Down Expand Up @@ -1169,8 +1178,7 @@ mod test {
}

let cases = &[
TestCase {
name: "un-transformed partition column",
TestCase { name: "un-transformed partition column",
query_to_analyze:
"SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
table_name: "m1",
Expand Down Expand Up @@ -1202,6 +1210,38 @@ mod test {
"+--------------------------------+----------------------+-----------------------+----------+",
],
},
TestCase { name: "omit 'special' partition columns",
query_to_analyze:
"SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
table_name: "m1",
table_path: ListingTableUrl::parse("s3://m1/").unwrap(),
partition_cols: vec!["partition_column"],
file_extension: ".parquet",
expected_output: vec![
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
"| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
"| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
"| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
"| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
"+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
],
// second file is old
file_metadata: "
('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0),
('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0),
('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0)
",
expected_stale_files_output: vec![
"+--------------------------------+----------------------+-----------------------+----------+",
"| target | target_last_modified | sources_last_modified | is_stale |",
"+--------------------------------+----------------------+-----------------------+----------+",
"| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
"| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |",
"| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
"+--------------------------------+----------------------+-----------------------+----------+",
],
},
TestCase {
name: "transform year/month/day partition into timestamp partition",
query_to_analyze: "
Expand Down
Loading