Skip to content

Commit f1330df

Browse files
committed
feat/auto-schema-align:
### Commit Message Enhance error handling in `pending_rows_batcher.rs` - Updated `collect_unique_table_schemas` to return a `Result` type, enabling error handling for duplicate table names. - Modified the function to return an error when duplicate table names are found in `table_rows`. - Adjusted test cases to handle the new `Result` return type in `collect_unique_table_schemas`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
1 parent 4d66d9e commit f1330df

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

src/servers/src/pending_rows_batcher.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ impl PendingRowsBatcher {
322322
return Ok((Vec::new(), 0));
323323
}
324324

325-
let unique_tables = Self::collect_unique_table_schemas(&table_rows);
325+
let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
326326
let mut plan = self
327327
.plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
328328
.await?;
@@ -367,17 +367,28 @@ impl PendingRowsBatcher {
367367

368368
/// Returns unique `(table_name, proto_schema)` pairs while keeping the
369369
/// first-seen schema for duplicate table names.
370-
fn collect_unique_table_schemas(table_rows: &[(String, Rows)]) -> Vec<(&str, &[ColumnSchema])> {
370+
fn collect_unique_table_schemas(
371+
table_rows: &[(String, Rows)],
372+
) -> Result<Vec<(&str, &[ColumnSchema])>> {
371373
let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
372374
let mut seen = HashSet::new();
373375

374376
for (table_name, rows) in table_rows {
375377
if seen.insert(table_name.as_str()) {
376378
unique_tables.push((table_name.as_str(), &rows.schema));
379+
} else {
380+
// table_rows should group rows by table name.
381+
return error::InvalidPromRemoteRequestSnafu {
382+
msg: format!(
383+
"Found duplicated table name in RowInsertRequest: {}",
384+
table_name
385+
),
386+
}
387+
.fail();
377388
}
378389
}
379390

380-
unique_tables
391+
Ok(unique_tables)
381392
}
382393

383394
/// Resolves table metadata and classifies each table into existing,
@@ -1521,7 +1532,7 @@ mod tests {
15211532
("cpu".to_string(), mock_rows(1, "instance")),
15221533
];
15231534

1524-
let unique = PendingRowsBatcher::collect_unique_table_schemas(&table_rows);
1535+
let unique = PendingRowsBatcher::collect_unique_table_schemas(&table_rows).unwrap();
15251536

15261537
assert_eq!(2, unique.len());
15271538
assert_eq!("cpu", unique[0].0);

0 commit comments

Comments
 (0)