Skip to content

Commit c9566ec

Browse files
committed
refactor: centralize infallible available_parallelism fn. Use better channel size limit in arrow read
1 parent bb8b9f5 commit c9566ec

File tree

4 files changed

+50
-9
lines changed

4 files changed

+50
-9
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::io::{FileIO, FileMetadata, FileRead};
4646
use crate::runtime::spawn;
4747
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
4848
use crate::spec::{Datum, Schema};
49+
use crate::utils::available_parallelism;
4950
use crate::{Error, ErrorKind};
5051

5152
/// Builder to create ArrowReader
@@ -58,9 +59,7 @@ pub struct ArrowReaderBuilder {
5859
impl ArrowReaderBuilder {
5960
/// Create a new ArrowReaderBuilder
6061
pub(crate) fn new(file_io: FileIO) -> Self {
61-
let num_cpus = std::thread::available_parallelism()
62-
.expect("failed to get number of CPUs")
63-
.get();
62+
let num_cpus = available_parallelism().get();
6463

6564
ArrowReaderBuilder {
6665
batch_size: None,
@@ -109,16 +108,16 @@ impl ArrowReader {
109108
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
110109
let file_io = self.file_io.clone();
111110
let batch_size = self.batch_size;
112-
let max_concurrent_fetching_datafiles = self.concurrency_limit_data_files;
111+
let concurrency_limit_data_files = self.concurrency_limit_data_files;
113112

114-
let (tx, rx) = channel(10);
113+
let (tx, rx) = channel(concurrency_limit_data_files);
115114
let mut channel_for_error = tx.clone();
116115

117116
spawn(async move {
118117
let result = tasks
119118
.map(|task| Ok((task, file_io.clone(), tx.clone())))
120119
.try_for_each_concurrent(
121-
max_concurrent_fetching_datafiles,
120+
concurrency_limit_data_files,
122121
|(file_scan_task, file_io, tx)| async move {
123122
match file_scan_task {
124123
Ok(task) => {

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,5 @@ pub mod transform;
4747
mod runtime;
4848

4949
pub mod arrow;
50+
mod utils;
5051
pub mod writer;

crates/iceberg/src/scan.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::spec::{
3939
SchemaRef, SnapshotRef, TableMetadataRef,
4040
};
4141
use crate::table::Table;
42+
use crate::utils::available_parallelism;
4243
use crate::{Error, ErrorKind, Result};
4344

4445
/// A stream of [`FileScanTask`].
@@ -62,9 +63,7 @@ pub struct TableScanBuilder<'a> {
6263

6364
impl<'a> TableScanBuilder<'a> {
6465
pub(crate) fn new(table: &'a Table) -> Self {
65-
let num_cpus = std::thread::available_parallelism()
66-
.expect("failed to get number of CPUs")
67-
.get();
66+
let num_cpus = available_parallelism().get();
6867

6968
Self {
7069
table,

crates/iceberg/src/utils.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::num::NonZero;
19+
20+
// Use a default value of 1 as the safest option.
21+
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
22+
// for more details.
23+
const DEFAULT_PARALLELISM: usize = 1;
24+
25+
/// Uses [`std::thread::available_parallelism`] in order to
26+
/// retrieve an estimate of the default amount of parallelism
27+
/// that should be used. Note that [`std::thread::available_parallelism`]
28+
/// returns a `Result` as it can fail, so here we use
29+
/// a default value instead.
30+
/// Note: we don't use a OnceCell or LazyCell here as there
31+
/// are circumstances where the level of available
32+
/// parallelism can change during the lifetime of an executing
33+
/// process, but this should not be called in a hot loop.
34+
pub(crate) fn available_parallelism() -> NonZero<usize> {
35+
std::thread::available_parallelism().unwrap_or_else(|_err| {
36+
// Failed to get the level of parallelism.
37+
// TODO: log/trace when this fallback occurs.
38+
39+
// Using a default value.
40+
NonZero::new(DEFAULT_PARALLELISM).unwrap()
41+
})
42+
}

0 commit comments

Comments
 (0)