|
15 | 15 | from __future__ import annotations |
16 | 16 |
|
17 | 17 | import math |
| 18 | +import os |
18 | 19 | import threading |
19 | 20 | from typing import Literal, Mapping, Optional, Sequence, Tuple |
20 | 21 | import warnings |
|
58 | 59 | MAX_SUBTREE_FACTORINGS = 5 |
59 | 60 | _MAX_CLUSTER_COLUMNS = 4 |
60 | 61 | MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G |
61 | | - |
| 62 | +_MAX_READ_STREAMS = os.cpu_count() |
62 | 63 |
|
63 | 64 | SourceIdMapping = Mapping[str, str] |
64 | 65 |
|
@@ -323,7 +324,10 @@ def _export_gbq( |
323 | 324 | self.bqclient.update_table(table, ["schema"]) |
324 | 325 |
|
325 | 326 | return executor.ExecuteResult( |
326 | | - row_iter.to_arrow_iterable(), |
| 327 | + row_iter.to_arrow_iterable( |
| 328 | + bqstorage_client=self.bqstoragereadclient, |
| 329 | + max_stream_count=_MAX_READ_STREAMS, |
| 330 | + ), |
327 | 331 | array_value.schema, |
328 | 332 | query_job, |
329 | 333 | total_bytes_processed=row_iter.total_bytes_processed, |
@@ -668,7 +672,8 @@ def _execute_plan_gbq( |
668 | 672 |
|
669 | 673 | return executor.ExecuteResult( |
670 | 674 | _arrow_batches=iterator.to_arrow_iterable( |
671 | | - bqstorage_client=self.bqstoragereadclient |
| 675 | + bqstorage_client=self.bqstoragereadclient, |
| 676 | + max_stream_count=_MAX_READ_STREAMS, |
672 | 677 | ), |
673 | 678 | schema=og_schema, |
674 | 679 | query_job=query_job, |
|
0 commit comments