From 8b5e80b030e0905219e5c21e627ff99600de5584 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 15 Mar 2019 16:53:00 -0700 Subject: [PATCH 1/3] ENH: Add use_bqstorage_api option to read_gbq The BigQuery Storage API provides a way to read query results quickly (and using multiple threads). It only works with large query results (~125 MB), but as of 1.11.1, the google-cloud-bigquery library can fallback to the BigQuery API to download results when a request to the BigQuery Storage API fails. As this API can increase costs (and may not be enabled on the user's project), this option is disabled by default. --- pandas_gbq/gbq.py | 74 +++++++++++++++++++++++++++++++++++++++- tests/system/test_gbq.py | 28 ++++++++++++++- 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index b9978887..8b901870 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -5,6 +5,13 @@ import numpy as np +try: + # The BigQuery Storage API client is an optional dependency. It is only + # required when use_bqstorage_api=True. + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None + from pandas_gbq.exceptions import AccessDenied logger = logging.getLogger(__name__) @@ -302,6 +309,7 @@ def __init__( dialect="standard", location=None, credentials=None, + use_bqstorage_api=False, ): global context from google.api_core.exceptions import GoogleAPIError @@ -352,6 +360,9 @@ def __init__( context.project = self.project_id self.client = self.get_client() + self.bqstorage_client = _make_bqstorage_client( + use_bqstorage_api, self.credentials + ) # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing @@ -489,7 +500,9 @@ def run_query(self, query, **kwargs): schema_fields = [field.to_api_repr() for field in rows_iter.schema] nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) - df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes) + df = rows_iter.to_dataframe( + dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client + ) if df.empty: df = _cast_empty_df_dtypes(schema_fields, df) @@ -727,6 +740,21 @@ def _localize_df(schema_fields, df): return df +def _make_bqstorage_client(use_bqstorage_api, credentials): + if not use_bqstorage_api: + return None + + if bigquery_storage_v1beta1 is None: + raise ImportError( + "Install the google-cloud-bigquery-storage and fastavro packages " + "to use the BigQuery Storage API." + ) + + return bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=credentials + ) + + def read_gbq( query, project_id=None, @@ -738,6 +766,7 @@ def read_gbq( location=None, configuration=None, credentials=None, + use_bqstorage_api=False, verbose=None, private_key=None, ): @@ -815,6 +844,27 @@ def read_gbq( :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 + use_bqstorage_api : bool, default False + Use the `BigQuery Storage API + `__ to + download query results quickly, but at an increased cost. To use this + API, first `enable it in the Cloud Console + `__. + You must also have the `bigquery.readsessions.create + `__ + permission on the project you are billing queries to. + + **Note:** Due to a `known issue in the ``google-cloud-bigquery`` + package + `__ + (fixed in version 1.11.0), you must write your query results to a + destination table. To do this with ``read_gbq``, supply a + ``configuration`` dictionary. + + This feature requires the ``google-cloud-bigquery-storage`` and + ``fastavro`` packages. + + .. versionadded:: 0.10.0 verbose : None, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead @@ -835,6 +885,27 @@ def read_gbq( ------- df: DataFrame DataFrame representing results of query. + + Examples + -------- + + Use the BigQuery Storage API to fetch results quickly, but at an addition + cost. Due to a known issue in the BigQuery Storage API, you must write + your query results to a destination table. + + >>> pandas_gbq.read_gbq( + ... query_string, + ... configuration={ + ... 'query': { + ... 'destinationTable': { + ... 'projectId': 'your-project', + ... 'datasetId': 'destination_dataset', + ... 'tableId': 'new_table_name', + ... } + ... } + ... }, + ... use_bqstorage_api=True, + ... ) """ global context @@ -871,6 +942,7 @@ def read_gbq( location=location, credentials=credentials, private_key=private_key, + use_bqstorage_api=use_bqstorage_api, ) final_df = connector.run_query(query, configuration=configuration) diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 6c876068..5c1e6db2 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -895,10 +895,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path): location="asia-northeast1", private_key=private_key_path, ) - print(df) assert df["max_year"][0] >= 2000 +@pytest.mark.skip(reason="large query for BQ Storage API tests") +def test_read_gbq_w_bqstorage_api(credentials): + df = gbq.read_gbq( + """ + SELECT + dependency_name, + dependency_platform, + project_name, + project_id, + version_number, + version_id, + dependency_kind, + optional_dependency, + dependency_requirements, + dependency_project_id + FROM + `bigquery-public-data.libraries_io.dependencies` + WHERE + LOWER(dependency_platform) = 'npm' + LIMIT 2500000 + """, + use_bqstorage_api=True, + credentials=credentials, + ) + assert len(df) == 2500000 + + class TestToGBQIntegration(object): @pytest.fixture(autouse=True, scope="function") def setup(self, project, credentials, random_dataset_id): From 5b34b28b7038ce3f21e5db632526a94e0efea18a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 4 Apr 2019 17:44:59 -0500 Subject: [PATCH 2/3] Add to changelog. Remove comment about destination tables. --- docs/source/changelog.rst | 8 ++++++++ docs/source/reading.rst | 7 +++++++ pandas_gbq/gbq.py | 21 --------------------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index d710b37f..2f467ec8 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -39,8 +39,16 @@ Enhancements (contributed by @johnpaton) - Read ``project_id`` in :func:`to_gbq` from provided ``credentials`` if available (contributed by @daureg) +<<<<<<< Updated upstream - ``read_gbq`` uses the timezone-aware ``DatetimeTZDtype(unit='ns', tz='UTC')`` dtype for BigQuery ``TIMESTAMP`` columns. (:issue:`269`) +||||||| merged common ancestors +======= +- Add ``use_bqstorage_api`` to :func:`read_gbq`. The BigQuery Storage API can + be used to download large query results (>125 MB) more quickly. If the BQ + Storage API can't be used, the BigQuery API is used instead. (:issue:`133`, + :issue:`270`) +>>>>>>> Stashed changes .. _changelog-0.9.0: diff --git a/docs/source/reading.rst b/docs/source/reading.rst index 4a7b9d66..0668de3c 100644 --- a/docs/source/reading.rst +++ b/docs/source/reading.rst @@ -84,3 +84,10 @@ DATETIME datetime64[ns] TIME datetime64[ns] DATE datetime64[ns] ================== ========================= + +.. _reading-bqstorage-api: + +Using the BigQuery Storage API +------------------------------ + +The BigQuery Storage API diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 8b901870..9c02538d 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -885,27 +885,6 @@ def read_gbq( ------- df: DataFrame DataFrame representing results of query. - - Examples - -------- - - Use the BigQuery Storage API to fetch results quickly, but at an addition - cost. Due to a known issue in the BigQuery Storage API, you must write - your query results to a destination table. - - >>> pandas_gbq.read_gbq( - ... query_string, - ... configuration={ - ... 'query': { - ... 'destinationTable': { - ... 'projectId': 'your-project', - ... 'datasetId': 'destination_dataset', - ... 'tableId': 'new_table_name', - ... } - ... } - ... }, - ... use_bqstorage_api=True, - ... ) """ global context From 4d2a7d705202add3a035824dffd0e6f1d10bd97d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 5 Apr 2019 08:02:37 -0500 Subject: [PATCH 3/3] Add docs for using the BigQuery Storage API. --- docs/source/changelog.rst | 4 ---- docs/source/reading.rst | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 2f467ec8..f18b20d9 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -39,16 +39,12 @@ Enhancements (contributed by @johnpaton) - Read ``project_id`` in :func:`to_gbq` from provided ``credentials`` if available (contributed by @daureg) -<<<<<<< Updated upstream - ``read_gbq`` uses the timezone-aware ``DatetimeTZDtype(unit='ns', tz='UTC')`` dtype for BigQuery ``TIMESTAMP`` columns. (:issue:`269`) -||||||| merged common ancestors -======= - Add ``use_bqstorage_api`` to :func:`read_gbq`. The BigQuery Storage API can be used to download large query results (>125 MB) more quickly. If the BQ Storage API can't be used, the BigQuery API is used instead. (:issue:`133`, :issue:`270`) ->>>>>>> Stashed changes .. _changelog-0.9.0: diff --git a/docs/source/reading.rst b/docs/source/reading.rst index 0668de3c..e10f533c 100644 --- a/docs/source/reading.rst +++ b/docs/source/reading.rst @@ -90,4 +90,35 @@ DATE datetime64[ns] Using the BigQuery Storage API ------------------------------ -The BigQuery Storage API +Use the BigQuery Storage API to download large (>125 MB) query results more +quickly (but at an `increased cost +`__) by setting +``use_bqstorage_api`` to ``True``. + +1. Enable the BigQuery Storage API on the project you are using to run + queries. + + `Enable the API + `__. +2. Ensure you have the `*bigquery.readsessions.create permission* + `__. to + create BigQuery Storage API read sessions. This permission is provided by + the `*bigquery.user* role + `__. +4. Install the ``google-cloud-bigquery-storage``, ``fastavro``, and + ``python-snappy`` packages. + + With pip: + + ..code-block:: sh + + pip install --upgrade google-cloud-bigquery-storage fastavro python-snappy + + With conda: + + conda install -c conda-forge google-cloud-bigquery-storage fastavro python-snappy +4. Set ``use_bqstorage_api`` to ``True`` when calling the + :func:`~pandas_gbq.read_gbq` function. As of the ``google-cloud-bigquery`` + package, version 1.11.1 or later,the function will fallback to the + BigQuery API if the BigQuery Storage API cannot be used, such as with + small query results.