diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 20e303bf..f7b0fc1b 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -2,11 +2,15 @@ import logging import os import time +import uuid import warnings from collections import OrderedDict from datetime import datetime +from io import BytesIO import numpy as np +import pandas as pd +from google.cloud.bigquery.job import ExtractJobConfig from pandas import DataFrame from pandas_gbq.exceptions import AccessDenied @@ -308,6 +312,104 @@ def run_query(self, query, **kwargs): return schema, result_rows + def export_table_to_gcs( + self, + dataset, + table, + timeout_in_seconds=None, + bucket=None, + blob=None, + zipped=True, + ): + """ + export table to gcs. returns tuple of (bucket, blob) + """ + client = self.client + table_ref = client.dataset(dataset).table(table) + job_config = ExtractJobConfig() + job_config.compression = 'GZIP' if zipped else 'NONE' + bucket = bucket or '{}-temp'.format(client.project) + blob = blob or '{}/{}.csv'.format(dataset, table) + if zipped and not blob.endswith('.gz'): + blob += '.gz' + if not isinstance(bucket, str): + bucket = bucket.name + destination_uri = 'gs://{}/{}'.format(bucket, blob) + extract_job = client.extract_table( + table_ref, destination_uri, job_config=job_config) + wait_for_job(extract_job, timeout_in_seconds=timeout_in_seconds) + logger.info('Exported {}.{} -> {}'.format( + dataset, + table, + destination_uri) + ) + return bucket, blob + + def read_gbq_table( + self, + dataset, + table, + bucket, + timeout_in_seconds=600, + ): + """ + reads an entire table from gbq into a dataframe + """ + from google.cloud import storage + + storage = storage.Client(self.project, self.credentials) + prefix = 'gbq-exports/{}/{}/'.format(dataset, table) + bucket = storage.get_bucket(bucket) + + for old_blob in bucket.list_blobs(prefix=prefix): + old_blob.delete() + logger.info('Old Blob Deleted: {}'.format(old_blob.name)) + + self.export_table_to_gcs( + dataset=dataset, + table=table, + timeout_in_seconds=timeout_in_seconds, + bucket=bucket, + blob='{}*.csv.gz'.format(prefix), + ) + + frames = [] + + downloads = tqdm(list(bucket.list_blobs(prefix=prefix)), unit='file') + for blob in downloads: + downloads.set_description('Processing {}, {}MB'.format( + blob, blob.size / 2**20)) + s = BytesIO(blob.download_as_string()) + frames.append(pd.read_csv(s, compression='gzip')) + blob.delete() + + return pd.concat(frames, ignore_index=True) + + def read_gbq_bulk( + self, + query, + project=None, + bucket=None, + dataset='pandas_bulk', + ): + + table_name = uuid.uuid4().hex[:6] + create_job = self.create_table_from_query( + query=query, + dataset=dataset, + table=table_name, + block=True, + ) + + df = self.read_gbq_table( + dataset=dataset, + table=table_name, + ) + + self.client.delete_table(create_job.destination) + + return df + def load_data( self, dataframe, dataset_id, table_id, chunksize=None, schema=None, progress_bar=True): @@ -774,6 +876,30 @@ def _generate_bq_schema(df, default_type='STRING'): return schema.generate_bq_schema(df, default_type=default_type) +def wait_for_job(job, timeout_in_seconds=None): + # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/bigquery/cloud-client/snippets.py + if timeout_in_seconds: + start = datetime.datetime.now() + timeout = start + datetime.timedelta(0, timeout_in_seconds) + + with tqdm( + bar_format='Waiting for {desc} Elapsed: {elapsed}', + total=10000, + ) as progress: + while True: + job.reload() # Refreshes the state via a GET request. + progress.set_description(str(job)) + if job.state == 'DONE': + if job.error_result: + raise RuntimeError(job.errors) + progress.bar_format = 'Completed {desc}. Elapsed: {elapsed}' + return + if timeout_in_seconds: + if datetime.datetime.now() > timeout: + raise TimeoutError + time.sleep(1) + + class _Table(GbqConnector): def __init__(self, project_id, dataset_id, reauth=False, private_key=None):