Skip to content

Commit e381259

Browse files
fix(cache): validate cache_index schema collisions in worker dat… (#3216)
* fix(data-cache): validate cache_index schema collisions in worker datasource Signed-off-by: Hitanshi Goklani <hitanshigoklani33@gmail.com> * Update pkg/data_cache/src/worker/worker_datasource.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Hitanshi7556 <hitanshigoklani33@gmail.com> --------- Signed-off-by: Hitanshi Goklani <hitanshigoklani33@gmail.com> Signed-off-by: Hitanshi7556 <hitanshigoklani33@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent fa2e7cf commit e381259

1 file changed

Lines changed: 50 additions & 1 deletion

File tree

pkg/data_cache/src/worker/worker_datasource.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,18 @@ impl WorkerDataSource {
111111

112112
let fields = schema.fields().clone();
113113
let mut builder = SchemaBuilder::from(&fields);
114-
builder.push(Field::new(CACHE_INDEX_COLUMN, DataType::UInt64, false)); // TODO:// validate name collision
114+
115+
// Validate that source schema does not already contain cache_index column
116+
if schema.field_with_name(CACHE_INDEX_COLUMN).is_ok() {
117+
return Err(format!(
118+
"Source schema already contains a column named '{}'. This conflicts with the cache index column added by the data cache worker.",
119+
CACHE_INDEX_COLUMN
120+
)
121+
.into());
122+
}
123+
124+
builder.push(Field::new(CACHE_INDEX_COLUMN, DataType::UInt64, false));
125+
115126
let output_schema = Arc::new(Schema::new(builder.finish().fields));
116127
Ok(Self {
117128
file_urls,
@@ -513,3 +524,41 @@ async fn filter_and_create_stream(
513524
))))),
514525
}
515526
}
527+
528+
#[cfg(test)]
529+
mod tests {
530+
use super::*;
531+
use arrow_schema::{DataType, Field, Schema};
532+
533+
#[test]
534+
fn fails_when_source_schema_contains_cache_index() {
535+
// Build a schema that already has a cache_index column
536+
let schema = Schema::new(vec![
537+
Field::new("id", DataType::Int32, false),
538+
Field::new(CACHE_INDEX_COLUMN, DataType::UInt64, false),
539+
]);
540+
541+
// The check we added: if cache_index already exists, is_ok() returns true
542+
let collision = schema.field_with_name(CACHE_INDEX_COLUMN).is_ok();
543+
assert!(
544+
collision,
545+
"Should detect cache_index collision in source schema"
546+
);
547+
}
548+
549+
#[test]
550+
fn passes_when_source_schema_has_no_cache_index() {
551+
// Build a schema with no cache_index column
552+
let schema = Schema::new(vec![
553+
Field::new("id", DataType::Int32, false),
554+
Field::new("name", DataType::Utf8, false),
555+
]);
556+
557+
// No collision — field_with_name returns Err, so is_ok() is false
558+
let collision = schema.field_with_name(CACHE_INDEX_COLUMN).is_ok();
559+
assert!(
560+
!collision,
561+
"Should not detect cache_index collision when column is absent"
562+
);
563+
}
564+
}

0 commit comments

Comments
 (0)