diff --git a/ci/requirements-2.7-0.19.2.pip b/ci/requirements-2.7-0.19.2.pip index 852dc153..cd94478a 100644 --- a/ci/requirements-2.7-0.19.2.pip +++ b/ci/requirements-2.7-0.19.2.pip @@ -1,7 +1,5 @@ -google-api-python-client google-auth -google-auth-httplib2 google-auth-oauthlib PyCrypto -python-gflags mock +google-cloud-bigquery diff --git a/ci/requirements-3.5-0.18.1.pip b/ci/requirements-3.5-0.18.1.pip index 6fb8a03d..18369345 100644 --- a/ci/requirements-3.5-0.18.1.pip +++ b/ci/requirements-3.5-0.18.1.pip @@ -1,5 +1,4 @@ -google-api-python-client==1.6.0 google-auth==1.0.0 -google-auth-httplib2==0.0.1 google-auth-oauthlib==0.0.1 mock +google-cloud-bigquery==0.28.0 diff --git a/ci/requirements-3.6-0.20.1.conda b/ci/requirements-3.6-0.20.1.conda index a1608720..b52f2aeb 100644 --- a/ci/requirements-3.6-0.20.1.conda +++ b/ci/requirements-3.6-0.20.1.conda @@ -1,5 +1,4 @@ -google-api-python-client google-auth -google-auth-httplib2 google-auth-oauthlib mock +google-cloud-bigquery diff --git a/ci/requirements-3.6-MASTER.pip b/ci/requirements-3.6-MASTER.pip index a1608720..b52f2aeb 100644 --- a/ci/requirements-3.6-MASTER.pip +++ b/ci/requirements-3.6-MASTER.pip @@ -1,5 +1,4 @@ -google-api-python-client google-auth -google-auth-httplib2 google-auth-oauthlib mock +google-cloud-bigquery diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index e9998a59..b6684582 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +0.3.0 / 2017-??-?? +------------------ + +- Use the `google-cloud-bigquery `__ library for API calls. The ``google-cloud-bigquery`` package is a new dependency, and dependencies on ``google-api-python-client`` and ``httplib2`` are removed. See the `installation guide `__ for more details. (:issue:`93`) +- :func:`to_gbq` now uses a load job instead of the streaming API. (:issue:`75`) +- Remove ``StreamingInsertError`` class, as it is no longer used by :func:`to_gbq`. (:issue:`75`) + 0.2.1 / 2017-11-27 ------------------ diff --git a/docs/source/install.rst b/docs/source/install.rst index 2b701fd2..c64c7939 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -37,8 +37,17 @@ Dependencies This module requires following additional dependencies: -- `httplib2 `__: HTTP client -- `google-api-python-client `__: Google's API client - `google-auth `__: authentication and authorization for Google's API - `google-auth-oauthlib `__: integration with `oauthlib `__ for end-user authentication -- `google-auth-httplib2 `__: adapter to use ``httplib2`` HTTP client with ``google-auth`` +- `google-cloud-bigquery `__: Google Cloud client library for BigQuery + +.. note:: + + The dependency on `google-cloud-bigquery `__ is new in version 0.3.0 of ``pandas-gbq``. + Versions less than 0.3.0 required the following dependencies: + + - `httplib2 `__: HTTP client (no longer required) + - `google-api-python-client `__: Google's API client (no longer required, replaced by `google-cloud-bigquery `__:) + - `google-auth `__: authentication and authorization for Google's API + - `google-auth-oauthlib `__: integration with `oauthlib `__ for end-user authentication + - `google-auth-httplib2 `__: adapter to use ``httplib2`` HTTP client with ``google-auth`` (no longer required) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index da7dd21d..46a246e5 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -1,17 +1,16 @@ import warnings from datetime import datetime import json -from time import sleep -import uuid import time +from time import sleep import sys import os import numpy as np from distutils.version import StrictVersion -from pandas import compat, DataFrame, concat -from pandas.compat import lzip, bytes_to_str +from pandas import compat, DataFrame +from pandas.compat import lzip def _check_google_client_version(): @@ -22,31 +21,24 @@ def _check_google_client_version(): except ImportError: raise ImportError('Could not import pkg_resources (setuptools).') - # Version 1.6.0 is the first version to support google-auth. - # https://github.com/google/google-api-python-client/blob/master/CHANGELOG - google_api_minimum_version = '1.6.0' + # Version 0.28.0 includes many changes compared to previous versions + # https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md + bigquery_client_minimum_version = '0.28.0' - _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution( - 'google-api-python-client').version + _BIGQUERY_CLIENT_VERSION = pkg_resources.get_distribution( + 'google-cloud-bigquery').version - if (StrictVersion(_GOOGLE_API_CLIENT_VERSION) < - StrictVersion(google_api_minimum_version)): - raise ImportError('pandas requires google-api-python-client >= {0} ' + if (StrictVersion(_BIGQUERY_CLIENT_VERSION) < + StrictVersion(bigquery_client_minimum_version)): + raise ImportError('pandas requires google-cloud-bigquery >= {0} ' 'for Google BigQuery support, ' 'current version {1}' - .format(google_api_minimum_version, - _GOOGLE_API_CLIENT_VERSION)) + .format(bigquery_client_minimum_version, + _BIGQUERY_CLIENT_VERSION)) def _test_google_api_imports(): - try: - import httplib2 # noqa - except ImportError as ex: - raise ImportError( - 'pandas requires httplib2 for Google BigQuery support: ' - '{0}'.format(ex)) - try: from google_auth_oauthlib.flow import InstalledAppFlow # noqa except ImportError as ex: @@ -55,50 +47,35 @@ def _test_google_api_imports(): 'support: {0}'.format(ex)) try: - from google_auth_httplib2 import AuthorizedHttp # noqa - from google_auth_httplib2 import Request # noqa - except ImportError as ex: - raise ImportError( - 'pandas requires google-auth-httplib2 for Google BigQuery ' - 'support: {0}'.format(ex)) - - try: - from googleapiclient.discovery import build # noqa - from googleapiclient.errors import HttpError # noqa + import google.auth # noqa except ImportError as ex: raise ImportError( - "pandas requires google-api-python-client for Google BigQuery " - "support: {0}".format(ex)) + "pandas requires google-auth for Google BigQuery support: " + "{0}".format(ex)) try: - import google.auth # noqa + from google.cloud import bigquery # noqa except ImportError as ex: raise ImportError( - "pandas requires google-auth for Google BigQuery support: " + "pandas requires google-cloud-python for Google BigQuery support: " "{0}".format(ex)) _check_google_client_version() def _try_credentials(project_id, credentials): - import httplib2 - from googleapiclient.discovery import build - import googleapiclient.errors - from google_auth_httplib2 import AuthorizedHttp + from google.cloud import bigquery + import google.api_core.exceptions if credentials is None: return None - http = httplib2.Http() try: - authed_http = AuthorizedHttp(credentials, http=http) - bigquery_service = build('bigquery', 'v2', http=authed_http) + client = bigquery.Client(project=project_id, credentials=credentials) # Check if the application has rights to the BigQuery project - jobs = bigquery_service.jobs() - job_data = {'configuration': {'query': {'query': 'SELECT 1'}}} - jobs.insert(projectId=project_id, body=job_data).execute() + client.query('SELECT 1').result() return credentials - except googleapiclient.errors.Error: + except google.api_core.exceptions.GoogleAPIError: return None @@ -181,14 +158,6 @@ class QueryTimeout(ValueError): pass -class StreamingInsertError(ValueError): - """ - Raised when BigQuery reports a streaming insert error. - For more information see `Streaming Data Into BigQuery - `__ - """ - - class TableCreationError(ValueError): """ Raised when the create table method fails @@ -202,6 +171,9 @@ class GbqConnector(object): def __init__(self, project_id, reauth=False, verbose=False, private_key=None, auth_local_webserver=False, dialect='legacy'): + from google.api_core.exceptions import GoogleAPIError + from google.api_core.exceptions import ClientError + self.http_error = (ClientError, GoogleAPIError) self.project_id = project_id self.reauth = reauth self.verbose = verbose @@ -210,7 +182,7 @@ def __init__(self, project_id, reauth=False, verbose=False, self.dialect = dialect self.credentials_path = _get_credentials_file() self.credentials = self.get_credentials() - self.service = self.get_service() + self.client = self.get_client() # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing @@ -276,8 +248,7 @@ def load_user_account_credentials(self): credentials do not have access to the project (self.project_id) on BigQuery. """ - import httplib2 - from google_auth_httplib2 import Request + import google.auth.transport.requests from google.oauth2.credentials import Credentials # Use the default credentials location under ~/.config and the @@ -309,8 +280,7 @@ def load_user_account_credentials(self): scopes=credentials_json.get('scopes')) # Refresh the token before trying to use it. - http = httplib2.Http() - request = Request(http) + request = google.auth.transport.requests.Request() credentials.refresh(request) return _try_credentials(self.project_id, credentials) @@ -411,8 +381,7 @@ def get_user_account_credentials(self): return credentials def get_service_account_credentials(self): - import httplib2 - from google_auth_httplib2 import Request + import google.auth.transport.requests from google.oauth2.service_account import Credentials from os.path import isfile @@ -435,8 +404,7 @@ def get_service_account_credentials(self): credentials = credentials.with_scopes([self.scope]) # Refresh the token before trying to use it. - http = httplib2.Http() - request = Request(http) + request = google.auth.transport.requests.Request() credentials.refresh(request) return credentials @@ -475,70 +443,25 @@ def sizeof_fmt(num, suffix='B'): num /= 1024.0 return fmt % (num, 'Y', suffix) - def get_service(self): - import httplib2 - from google_auth_httplib2 import AuthorizedHttp - from googleapiclient.discovery import build - - http = httplib2.Http() - authed_http = AuthorizedHttp( - self.credentials, http=http) - bigquery_service = build('bigquery', 'v2', http=authed_http) - - return bigquery_service + def get_client(self): + from google.cloud import bigquery + return bigquery.Client( + project=self.project_id, credentials=self.credentials) @staticmethod def process_http_error(ex): # See `BigQuery Troubleshooting Errors # `__ - status = json.loads(bytes_to_str(ex.content))['error'] - errors = status.get('errors', None) - - if errors: - for error in errors: - reason = error['reason'] - message = error['message'] - - raise GenericGBQException( - "Reason: {0}, Message: {1}".format(reason, message)) - - raise GenericGBQException(errors) - - def process_insert_errors(self, insert_errors): - for insert_error in insert_errors: - row = insert_error['index'] - errors = insert_error.get('errors', None) - for error in errors: - reason = error['reason'] - message = error['message'] - location = error['location'] - error_message = ('Error at Row: {0}, Reason: {1}, ' - 'Location: {2}, Message: {3}' - .format(row, reason, location, message)) - - # Report all error messages if verbose is set - if self.verbose: - self._print(error_message) - else: - raise StreamingInsertError(error_message + - '\nEnable verbose logging to ' - 'see all errors') - - raise StreamingInsertError + raise GenericGBQException("Reason: {0}".format(ex)) def run_query(self, query, **kwargs): - try: - from googleapiclient.errors import HttpError - except ImportError: - from apiclient.errors import HttpError from google.auth.exceptions import RefreshError - - job_collection = self.service.jobs() + from google.cloud.bigquery import QueryJobConfig + from concurrent.futures import TimeoutError job_config = { 'query': { - 'query': query, 'useLegacySql': self.dialect == 'legacy' # 'allowLargeResults', 'createDisposition', # 'preserveNulls', destinationTable, useQueryCache @@ -550,24 +473,24 @@ def run_query(self, query, **kwargs): raise ValueError("Only one job type must be specified, but " "given {}".format(','.join(config.keys()))) if 'query' in config: - if 'query' in config['query'] and query is not None: - raise ValueError("Query statement can't be specified " - "inside config while it is specified " - "as parameter") + if 'query' in config['query']: + if query is not None: + raise ValueError("Query statement can't be specified " + "inside config while it is specified " + "as parameter") + query = config['query']['query'] + del config['query']['query'] job_config['query'].update(config['query']) else: raise ValueError("Only 'query' job type is supported") - job_data = { - 'configuration': job_config - } - self._start_timer() try: self._print('Requesting query... ', end="") - query_reply = job_collection.insert( - projectId=self.project_id, body=job_data).execute() + query_reply = self.client.query( + query, + job_config=QueryJobConfig.from_api_repr(job_config['query'])) self._print('ok.') except (RefreshError, ValueError): if self.private_key: @@ -577,96 +500,71 @@ def run_query(self, query, **kwargs): raise AccessDenied( "The credentials have been revoked or expired, " "please re-run the application to re-authorize") - except HttpError as ex: + except self.http_error as ex: self.process_http_error(ex) - job_reference = query_reply['jobReference'] - job_id = job_reference['jobId'] + job_id = query_reply.job_id self._print('Job ID: %s\nQuery running...' % job_id) - while not query_reply.get('jobComplete', False): + while query_reply.state != 'DONE': self.print_elapsed_seconds(' Elapsed', 's. Waiting...') timeout_ms = job_config['query'].get('timeoutMs') if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000: raise QueryTimeout('Query timeout: {} ms'.format(timeout_ms)) + timeout_sec = 1.0 + if timeout_ms: + # Wait at most 1 second so we can show progress bar + timeout_sec = min(1.0, timeout_ms / 1000.0) + try: - query_reply = job_collection.getQueryResults( - projectId=job_reference['projectId'], - jobId=job_id).execute() - except HttpError as ex: + query_reply.result(timeout=timeout_sec) + except TimeoutError: + # Use our own timeout logic + pass + except self.http_error as ex: self.process_http_error(ex) if self.verbose: - if query_reply['cacheHit']: + if query_reply.cache_hit: self._print('Query done.\nCache hit.\n') else: - bytes_processed = int(query_reply.get( - 'totalBytesProcessed', '0')) - self._print('Query done.\nProcessed: {}'.format( - self.sizeof_fmt(bytes_processed))) + bytes_processed = query_reply.total_bytes_processed or 0 + bytes_billed = query_reply.total_bytes_billed or 0 + self._print('Query done.\nProcessed: {} Billed: {}'.format( + self.sizeof_fmt(bytes_processed), + self.sizeof_fmt(bytes_billed))) self._print('Standard price: ${:,.2f} USD\n'.format( - bytes_processed * self.query_price_for_TB)) + bytes_billed * self.query_price_for_TB)) self._print('Retrieving results...') - total_rows = int(query_reply['totalRows']) - result_pages = list() - seen_page_tokens = list() - current_row = 0 - # Only read schema on first page - schema = query_reply['schema'] - - # Loop through each page of data - while 'rows' in query_reply and current_row < total_rows: - page = query_reply['rows'] - result_pages.append(page) - current_row += len(page) - - self.print_elapsed_seconds( - ' Got page: {}; {}% done. Elapsed'.format( - len(result_pages), - round(100.0 * current_row / total_rows))) - - if current_row == total_rows: - break - - page_token = query_reply.get('pageToken', None) - - if not page_token and current_row < total_rows: - raise InvalidPageToken("Required pageToken was missing. " - "Received {0} of {1} rows" - .format(current_row, total_rows)) - - elif page_token in seen_page_tokens: - raise InvalidPageToken("A duplicate pageToken was returned") - - seen_page_tokens.append(page_token) - - try: - query_reply = job_collection.getQueryResults( - projectId=job_reference['projectId'], - jobId=job_id, - pageToken=page_token).execute() - except HttpError as ex: - self.process_http_error(ex) - - if current_row < total_rows: - raise InvalidPageToken() + try: + rows_iter = query_reply.result() + except self.http_error as ex: + self.process_http_error(ex) + result_rows = list(rows_iter) + total_rows = rows_iter.total_rows + schema = { + 'fields': [ + field.to_api_repr() + for field in rows_iter.schema], + } # print basic query stats self._print('Got {} rows.\n'.format(total_rows)) - return schema, result_pages + return schema, result_rows def load_data(self, dataframe, dataset_id, table_id, chunksize): - try: - from googleapiclient.errors import HttpError - except ImportError: - from apiclient.errors import HttpError + from google.cloud.bigquery import LoadJobConfig + from six import StringIO - job_id = uuid.uuid4().hex + destination_table = self.client.dataset(dataset_id).table(table_id) + job_config = LoadJobConfig() + job_config.write_disposition = 'WRITE_APPEND' + job_config.source_format = 'NEWLINE_DELIMITED_JSON' rows = [] remaining_rows = len(dataframe) @@ -674,44 +572,25 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize): self._print("\n\n") for index, row in dataframe.reset_index(drop=True).iterrows(): - row_dict = dict() - row_dict['json'] = json.loads(row.to_json(force_ascii=False, - date_unit='s', - date_format='iso')) - row_dict['insertId'] = job_id + str(index) - rows.append(row_dict) + row_json = row.to_json( + force_ascii=False, date_unit='s', date_format='iso') + rows.append(row_json) remaining_rows -= 1 if (len(rows) % chunksize == 0) or (remaining_rows == 0): - self._print("\rStreaming Insert is {0}% Complete".format( + self._print("\rLoad is {0}% Complete".format( ((total_rows - remaining_rows) * 100) / total_rows)) - body = {'rows': rows} + body = StringIO('{}\n'.format('\n'.join(rows))) try: - response = self.service.tabledata().insertAll( - projectId=self.project_id, - datasetId=dataset_id, - tableId=table_id, - body=body).execute() - except HttpError as ex: + self.client.load_table_from_file( + body, + destination_table, + job_config=job_config).result() + except self.http_error as ex: self.process_http_error(ex) - # For streaming inserts, even if you receive a success HTTP - # response code, you'll need to check the insertErrors property - # of the response to determine if the row insertions were - # successful, because it's possible that BigQuery was only - # partially successful at inserting the rows. See the `Success - # HTTP Response Codes - # `__ - # section - - insert_errors = response.get('insertErrors', None) - if insert_errors: - self.process_insert_errors(insert_errors) - - sleep(1) # Maintains the inserts "per second" rate per API rows = [] self._print("\n") @@ -734,24 +613,20 @@ def schema(self, dataset_id, table_id): list of dicts Fields representing the schema """ + table_ref = self.client.dataset(dataset_id).table(table_id) try: - from googleapiclient.errors import HttpError - except ImportError: - from apiclient.errors import HttpError + table = self.client.get_table(table_ref) + remote_schema = table.schema - try: - remote_schema = self.service.tables().get( - projectId=self.project_id, - datasetId=dataset_id, - tableId=table_id).execute()['schema'] - - remote_fields = [{'name': field_remote['name'], - 'type': field_remote['type']} - for field_remote in remote_schema['fields']] + remote_fields = [ + field_remote.to_api_repr() for field_remote in remote_schema] + for field in remote_fields: + field['type'] = field['type'].upper() + field['mode'] = field['mode'].upper() return remote_fields - except HttpError as ex: + except self.http_error as ex: self.process_http_error(ex) def verify_schema(self, dataset_id, table_id, schema): @@ -781,6 +656,14 @@ def verify_schema(self, dataset_id, table_id, schema): key=lambda x: x['name']) fields_local = sorted(schema['fields'], key=lambda x: x['name']) + # Ignore mode when comparing schemas. + for field in fields_local: + if 'mode' in field: + del field['mode'] + for field in fields_remote: + if 'mode' in field: + del field['mode'] + return fields_remote == fields_local def schema_is_subset(self, dataset_id, table_id, schema): @@ -809,6 +692,14 @@ def schema_is_subset(self, dataset_id, table_id, schema): fields_remote = self.schema(dataset_id, table_id) fields_local = schema['fields'] + # Ignore mode when comparing schemas. + for field in fields_local: + if 'mode' in field: + del field['mode'] + for field in fields_remote: + if 'mode' in field: + del field['mode'] + return all(field in fields_remote for field in fields_local) def delete_and_recreate_table(self, dataset_id, table_id, table_schema): @@ -847,44 +738,30 @@ def _parse_data(schema, rows): fields = schema['fields'] col_types = [field['type'] for field in fields] col_names = [str(field['name']) for field in fields] - col_dtypes = [dtype_map.get(field['type'], object) for field in fields] + col_dtypes = [ + dtype_map.get(field['type'].upper(), object) + for field in fields + ] page_array = np.zeros((len(rows),), dtype=lzip(col_names, col_dtypes)) - for row_num, raw_row in enumerate(rows): - entries = raw_row.get('f', []) - for col_num, field_type in enumerate(col_types): - field_value = _parse_entry(entries[col_num].get('v', ''), - field_type) + for row_num, entries in enumerate(rows): + for col_num in range(len(col_types)): + field_value = entries[col_num] page_array[row_num][col_num] = field_value return DataFrame(page_array, columns=col_names) -def _parse_entry(field_value, field_type): - if field_value is None or field_value == 'null': - return None - if field_type == 'INTEGER': - return int(field_value) - elif field_type == 'FLOAT': - return float(field_value) - elif field_type == 'TIMESTAMP': - timestamp = datetime.utcfromtimestamp(float(field_value)) - return np.datetime64(timestamp) - elif field_type == 'BOOLEAN': - return field_value == 'true' - return field_value - - def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True, private_key=None, auth_local_webserver=False, dialect='legacy', **kwargs): - r"""Load data from Google BigQuery. + r"""Load data from Google BigQuery using google-cloud-python The main method a user calls to execute a Query in Google BigQuery and read results into a pandas DataFrame. - Google BigQuery API Client Library v2 for Python is used. + The Google Cloud library is used. Documentation is available `here - `__ + `__ Authentication to the Google BigQuery service is via OAuth 2.0. @@ -967,16 +844,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, connector = GbqConnector( project_id, reauth=reauth, verbose=verbose, private_key=private_key, dialect=dialect, auth_local_webserver=auth_local_webserver) - schema, pages = connector.run_query(query, **kwargs) - dataframe_list = [] - while len(pages) > 0: - page = pages.pop() - dataframe_list.append(_parse_data(schema, page)) - - if len(dataframe_list) > 0: - final_df = concat(dataframe_list, ignore_index=True) - else: - final_df = _parse_data(schema, []) + schema, rows = connector.run_query(query, **kwargs) + final_df = _parse_data(schema, rows) # Reindex the DataFrame on the provided column if index_col is not None: @@ -1001,10 +870,10 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, # if they dont have any nulls type_map = {'BOOLEAN': bool, 'INTEGER': int} for field in schema['fields']: - if field['type'] in type_map and \ + if field['type'].upper() in type_map and \ final_df[field['name']].notnull().all(): final_df[field['name']] = \ - final_df[field['name']].astype(type_map[field['type']]) + final_df[field['name']].astype(type_map[field['type'].upper()]) connector.print_elapsed_seconds( 'Total time taken', @@ -1160,11 +1029,6 @@ class _Table(GbqConnector): def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): - try: - from googleapiclient.errors import HttpError - except ImportError: - from apiclient.errors import HttpError - self.http_error = HttpError self.dataset_id = dataset_id super(_Table, self).__init__(project_id, reauth, verbose, private_key) @@ -1181,18 +1045,16 @@ def exists(self, table_id): boolean true if table exists, otherwise false """ + from google.api_core.exceptions import NotFound + table_ref = self.client.dataset(self.dataset_id).table(table_id) try: - self.service.tables().get( - projectId=self.project_id, - datasetId=self.dataset_id, - tableId=table_id).execute() + self.client.get_table(table_ref) return True + except NotFound: + return False except self.http_error as ex: - if ex.resp.status == 404: - return False - else: - self.process_http_error(ex) + self.process_http_error(ex) def create(self, table_id, schema): """ Create a table in Google BigQuery given a table and schema @@ -1205,6 +1067,8 @@ def create(self, table_id, schema): Use the generate_bq_schema to generate your table schema from a dataframe. """ + from google.cloud.bigquery import SchemaField + from google.cloud.bigquery import Table if self.exists(table_id): raise TableCreationError("Table {0} already " @@ -1215,20 +1079,20 @@ def create(self, table_id, schema): _Dataset(self.project_id, private_key=self.private_key).create(self.dataset_id) - body = { - 'schema': schema, - 'tableReference': { - 'tableId': table_id, - 'projectId': self.project_id, - 'datasetId': self.dataset_id - } - } + table_ref = self.client.dataset(self.dataset_id).table(table_id) + table = Table(table_ref) + + for field in schema['fields']: + if 'mode' not in field: + field['mode'] = 'NULLABLE' + + table.schema = [ + SchemaField.from_api_repr(field) + for field in schema['fields'] + ] try: - self.service.tables().insert( - projectId=self.project_id, - datasetId=self.dataset_id, - body=body).execute() + self.client.create_table(table) except self.http_error as ex: self.process_http_error(ex) @@ -1240,30 +1104,25 @@ def delete(self, table_id): table : str Name of table to be deleted """ + from google.api_core.exceptions import NotFound if not self.exists(table_id): raise NotFoundException("Table does not exist") + table_ref = self.client.dataset(self.dataset_id).table(table_id) try: - self.service.tables().delete( - datasetId=self.dataset_id, - projectId=self.project_id, - tableId=table_id).execute() - except self.http_error as ex: + self.client.delete_table(table_ref) + except NotFound: # Ignore 404 error which may occur if table already deleted - if ex.resp.status != 404: - self.process_http_error(ex) + pass + except self.http_error as ex: + self.process_http_error(ex) class _Dataset(GbqConnector): def __init__(self, project_id, reauth=False, verbose=False, private_key=None): - try: - from googleapiclient.errors import HttpError - except ImportError: - from apiclient.errors import HttpError - self.http_error = HttpError super(_Dataset, self).__init__(project_id, reauth, verbose, private_key) @@ -1280,17 +1139,15 @@ def exists(self, dataset_id): boolean true if dataset exists, otherwise false """ + from google.api_core.exceptions import NotFound try: - self.service.datasets().get( - projectId=self.project_id, - datasetId=dataset_id).execute() + self.client.get_dataset(self.client.dataset(dataset_id)) return True + except NotFound: + return False except self.http_error as ex: - if ex.resp.status == 404: - return False - else: - self.process_http_error(ex) + self.process_http_error(ex) def datasets(self): """ Return a list of datasets in Google BigQuery @@ -1306,32 +1163,15 @@ def datasets(self): """ dataset_list = [] - next_page_token = None - first_query = True - - while first_query or next_page_token: - first_query = False - try: - list_dataset_response = self.service.datasets().list( - projectId=self.project_id, - pageToken=next_page_token).execute() - - dataset_response = list_dataset_response.get('datasets') - if dataset_response is None: - dataset_response = [] - - next_page_token = list_dataset_response.get('nextPageToken') - - if dataset_response is None: - dataset_response = [] + try: + dataset_response = self.client.list_datasets() - for row_num, raw_row in enumerate(dataset_response): - dataset_list.append( - raw_row['datasetReference']['datasetId']) + for row in dataset_response: + dataset_list.append(row.dataset_id) - except self.http_error as ex: - self.process_http_error(ex) + except self.http_error as ex: + self.process_http_error(ex) return dataset_list @@ -1343,22 +1183,16 @@ def create(self, dataset_id): dataset : str Name of dataset to be written """ + from google.cloud.bigquery import Dataset if self.exists(dataset_id): raise DatasetCreationError("Dataset {0} already " "exists".format(dataset_id)) - body = { - 'datasetReference': { - 'projectId': self.project_id, - 'datasetId': dataset_id - } - } + dataset = Dataset(self.client.dataset(dataset_id)) try: - self.service.datasets().insert( - projectId=self.project_id, - body=body).execute() + self.client.create_dataset(dataset) except self.http_error as ex: self.process_http_error(ex) @@ -1370,20 +1204,20 @@ def delete(self, dataset_id): dataset : str Name of dataset to be deleted """ + from google.api_core.exceptions import NotFound if not self.exists(dataset_id): raise NotFoundException( "Dataset {0} does not exist".format(dataset_id)) try: - self.service.datasets().delete( - datasetId=dataset_id, - projectId=self.project_id).execute() + self.client.delete_dataset(self.client.dataset(dataset_id)) - except self.http_error as ex: + except NotFound: # Ignore 404 error which may occur if dataset already deleted - if ex.resp.status != 404: - self.process_http_error(ex) + pass + except self.http_error as ex: + self.process_http_error(ex) def tables(self, dataset_id): """ List tables in the specific dataset in Google BigQuery @@ -1400,28 +1234,15 @@ def tables(self, dataset_id): """ table_list = [] - next_page_token = None - first_query = True - - while first_query or next_page_token: - first_query = False - - try: - list_table_response = self.service.tables().list( - projectId=self.project_id, - datasetId=dataset_id, - pageToken=next_page_token).execute() - table_response = list_table_response.get('tables') - next_page_token = list_table_response.get('nextPageToken') - - if not table_response: - return table_list + try: + table_response = self.client.list_dataset_tables( + self.client.dataset(dataset_id)) - for row_num, raw_row in enumerate(table_response): - table_list.append(raw_row['tableReference']['tableId']) + for row in table_response: + table_list.append(row.table_id) - except self.http_error as ex: - self.process_http_error(ex) + except self.http_error as ex: + self.process_http_error(ex) return table_list diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 62b72dbc..6a2b8480 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -193,9 +193,9 @@ def test_should_be_able_to_get_valid_credentials(self): credentials = self.sut.get_credentials() assert credentials.valid - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - assert bigquery_service is not None + def test_should_be_able_to_get_a_bigquery_client(self): + bigquery_client = self.sut.get_client() + assert bigquery_client is not None def test_should_be_able_to_get_schema_from_query(self): schema, pages = self.sut.run_query('SELECT 1') @@ -256,9 +256,9 @@ def test_should_be_able_to_get_valid_credentials(self): credentials = self.sut.get_credentials() assert credentials.valid - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - assert bigquery_service is not None + def test_should_be_able_to_get_a_bigquery_client(self): + bigquery_client = self.sut.get_client() + assert bigquery_client is not None def test_should_be_able_to_get_schema_from_query(self): schema, pages = self.sut.run_query('SELECT 1') @@ -287,9 +287,9 @@ def test_should_be_able_to_get_valid_credentials(self): credentials = self.sut.get_credentials() assert credentials.valid - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - assert bigquery_service is not None + def test_should_be_able_to_get_a_bigquery_client(self): + bigquery_client = self.sut.get_client() + assert bigquery_client is not None def test_should_be_able_to_get_schema_from_query(self): schema, pages = self.sut.run_query('SELECT 1') @@ -977,8 +977,6 @@ def test_upload_data(self): gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(), chunksize=10000, private_key=_get_private_key_path()) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" .format(self.destination_table + test_id), project_id=_get_project_id(), @@ -1015,8 +1013,6 @@ def test_upload_data_if_table_exists_append(self): gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(), if_exists='append', private_key=_get_private_key_path()) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" .format(self.destination_table + test_id), project_id=_get_project_id(), @@ -1046,8 +1042,6 @@ def test_upload_subset_columns_if_table_exists_append(self): self.destination_table + test_id, _get_project_id(), if_exists='append', private_key=_get_private_key_path()) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" .format(self.destination_table + test_id), project_id=_get_project_id(), @@ -1080,8 +1074,6 @@ def test_upload_data_if_table_exists_replace(self): _get_project_id(), if_exists='replace', private_key=_get_private_key_path()) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" .format(self.destination_table + test_id), project_id=_get_project_id(), @@ -1255,10 +1247,14 @@ def test_verify_schema_ignores_field_mode(self): def test_retrieve_schema(self): # Issue #24 schema function returns the schema in biquery test_id = "15" - test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} + test_schema = { + 'fields': [ + {'name': 'A', 'type': 'FLOAT', 'mode': 'NULLABLE'}, + {'name': 'B', 'type': 'FLOAT', 'mode': 'NULLABLE'}, + {'name': 'C', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'D', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'} + ] + } self.table.create(TABLE_ID + test_id, test_schema) actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id) @@ -1415,8 +1411,6 @@ def test_upload_data(self): gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(), chunksize=10000) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}".format( self.destination_table + test_id), project_id=_get_project_id()) @@ -1473,8 +1467,6 @@ def test_upload_data(self): gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(), chunksize=10000, private_key=_get_private_key_contents()) - sleep(30) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) as num_rows FROM {0}".format( self.destination_table + test_id), project_id=_get_project_id(), diff --git a/requirements.txt b/requirements.txt index c72b5a5a..88cf967a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,4 @@ pandas -httplib2 -google-api-python-client google-auth -google-auth-httplib2 google-auth-oauthlib +google-cloud-bigquery diff --git a/setup.py b/setup.py index df3cd85d..86a40c5e 100644 --- a/setup.py +++ b/setup.py @@ -19,11 +19,9 @@ def readme(): INSTALL_REQUIRES = [ 'pandas', - 'httplib2>=0.9.2', - 'google-api-python-client>=1.6.0', 'google-auth>=1.0.0', - 'google-auth-httplib2>=0.0.1', 'google-auth-oauthlib>=0.0.1', + 'google-cloud-bigquery>=0.28.0', ]