diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index f54bf6043..796f1c654 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -130,6 +130,11 @@ def __init__( if project is _marker: project = None + # Save the initial value of client_info and client_options before they + # are passed along, for use in __reduce__ defined elsewhere. + self._initial_client_info = client_info + self._initial_client_options = client_options + kw_args = {"client_info": client_info} # `api_endpoint` should be only set by the user via `client_options`, diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index e87f0cc76..8de9c6c7b 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -16,10 +16,16 @@ import concurrent.futures +import io +import inspect import os import warnings +import pickle +import copyreg from google.api_core import exceptions +from google.cloud.storage import Client +from google.cloud.storage import Blob warnings.warn( "The module `transfer_manager` is a preview feature. Functionality and API " @@ -27,16 +33,54 @@ ) -DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 +TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 +DEFAULT_MAX_WORKERS = 8 +# Constants to be passed in as `worker_type`. +PROCESS = "process" +THREAD = "thread" + + +_cached_clients = {} + + +def _deprecate_threads_param(func): + def convert_threads_or_raise(*args, **kwargs): + binding = inspect.signature(func).bind(*args, **kwargs) + threads = binding.arguments.get("threads") + if threads: + worker_type = binding.arguments.get("worker_type") + max_workers = binding.arguments.get("max_workers") + if worker_type or max_workers: # Parameter conflict + raise ValueError( + "The `threads` parameter is deprecated and conflicts with its replacement parameters, `worker_type` and `max_workers`." + ) + # No conflict, so issue a warning and set worker_type and max_workers. + warnings.warn( + "The `threads` parameter is deprecated. Please use `worker_type` and `max_workers` parameters instead." + ) + args = binding.args + kwargs = binding.kwargs + kwargs["worker_type"] = THREAD + kwargs["max_workers"] = threads + return func(*args, **kwargs) + else: + return func(*args, **kwargs) + + return convert_threads_or_raise + + +@_deprecate_threads_param def upload_many( file_blob_pairs, skip_if_exists=False, upload_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Upload many files concurrently via a worker pool. @@ -48,6 +92,9 @@ def upload_many( uploaded to the corresponding blob by using blob.upload_from_file() or blob.upload_from_filename() as appropriate. + File handlers are only supported if worker_type is set to THREAD. + If worker_type is set to PROCESS, please use filenames only. + :type skip_if_exists: bool :param skip_if_exists: If True, blobs that already have a live version will not be overwritten. @@ -65,14 +112,10 @@ def upload_many( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -92,6 +135,40 @@ def upload_many( If skip_if_exists is True, 412 Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + PROCESS workers do not support writing to file handlers. Please refer + to files by filename only when using PROCESS workers. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -103,21 +180,37 @@ def upload_many( if upload_kwargs is None: upload_kwargs = {} if skip_if_exists: + upload_kwargs = upload_kwargs.copy() upload_kwargs["if_generation_match"] = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + + with pool_class(max_workers=max_workers) as executor: futures = [] for path_or_file, blob in file_blob_pairs: - method = ( - blob.upload_from_filename - if isinstance(path_or_file, str) - else blob.upload_from_file + # File objects are only supported by the THREAD worker because they can't + # be pickled. + if needs_pickling and not isinstance(path_or_file, str): + raise ValueError( + "Passing in a file object is only supported by the THREAD worker type. Please either select THREAD workers, or pass in filenames only." + ) + + futures.append( + executor.submit( + _call_method_on_maybe_pickled_blob, + _pickle_blob(blob) if needs_pickling else blob, + "upload_from_filename" + if isinstance(path_or_file, str) + else "upload_from_file", + path_or_file, + **upload_kwargs, + ) ) - futures.append(executor.submit(method, path_or_file, **upload_kwargs)) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + results = [] - concurrent.futures.wait( - futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED - ) for future in futures: exp = future.exception() @@ -134,12 +227,15 @@ def upload_many( return results +@_deprecate_threads_param def download_many( blob_file_pairs, download_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Download many blobs concurrently via a worker pool. @@ -154,6 +250,9 @@ def download_many( Note that blob.download_to_filename() does not delete the destination file if the download fails. + File handlers are only supported if worker_type is set to THREAD. + If worker_type is set to PROCESS, please use filenames only. + :type download_kwargs: dict :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer @@ -163,14 +262,10 @@ def download_many( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -187,6 +282,40 @@ def download_many( are only processed and potentially raised after all operations are complete in success or failure. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + PROCESS workers do not support writing to file handlers. Please refer + to files by filename only when using PROCESS workers. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -198,29 +327,48 @@ def download_many( if download_kwargs is None: download_kwargs = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + + with pool_class(max_workers=max_workers) as executor: futures = [] for blob, path_or_file in blob_file_pairs: - method = ( - blob.download_to_filename - if isinstance(path_or_file, str) - else blob.download_to_file + # File objects are only supported by the THREAD worker because they can't + # be pickled. + if needs_pickling and not isinstance(path_or_file, str): + raise ValueError( + "Passing in a file object is only supported by the THREAD worker type. Please either select THREAD workers, or pass in filenames only." + ) + + futures.append( + executor.submit( + _call_method_on_maybe_pickled_blob, + _pickle_blob(blob) if needs_pickling else blob, + "download_to_filename" + if isinstance(path_or_file, str) + else "download_to_file", + path_or_file, + **download_kwargs, + ) ) - futures.append(executor.submit(method, path_or_file, **download_kwargs)) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + results = [] - concurrent.futures.wait( - futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED - ) for future in futures: + # If raise_exception is False, don't call future.result() if not raise_exception: exp = future.exception() if exp: results.append(exp) continue + # Get the real result. If there was an exception, this will raise it. results.append(future.result()) return results +@_deprecate_threads_param def upload_many_from_filenames( bucket, filenames, @@ -229,9 +377,11 @@ def upload_many_from_filenames( skip_if_exists=False, blob_constructor_kwargs=None, upload_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Upload many files concurrently by their filenames. @@ -309,14 +459,10 @@ def upload_many_from_filenames( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -336,6 +482,37 @@ def upload_many_from_filenames( If skip_if_exists is True, 412 Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -359,22 +536,26 @@ def upload_many_from_filenames( file_blob_pairs, skip_if_exists=skip_if_exists, upload_kwargs=upload_kwargs, - threads=threads, deadline=deadline, raise_exception=raise_exception, + worker_type=worker_type, + max_workers=max_workers, ) +@_deprecate_threads_param def download_many_to_path( bucket, blob_names, destination_directory="", blob_name_prefix="", download_kwargs=None, - threads=4, + threads=None, deadline=None, create_directories=True, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Download many files concurrently by their blob names. @@ -442,14 +623,10 @@ def download_many_to_path( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker` param; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -474,6 +651,37 @@ def download_many_to_path( Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -495,7 +703,237 @@ def download_many_to_path( return download_many( blob_file_pairs, download_kwargs=download_kwargs, - threads=threads, deadline=deadline, raise_exception=raise_exception, + worker_type=worker_type, + max_workers=max_workers, ) + + +def download_chunks_concurrently( + blob, + filename, + chunk_size=TM_DEFAULT_CHUNK_SIZE, + download_kwargs=None, + deadline=None, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, +): + """Download a single file in chunks, concurrently. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + In some environments, using this feature with mutiple processes will result + in faster downloads of large files. + + Using this feature with multiple threads is unlikely to improve download + performance under normal circumstances due to Python interpreter threading + behavior. The default is therefore to use processes instead of threads. + + Checksumming (md5 or crc32c) is not supported for chunked operations. Any + `checksum` parameter passed in to download_kwargs will be ignored. + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which contains the blobs to be downloaded + + :type blob: `google.cloud.storage.Blob` + :param blob: + The blob to be downloaded. + + :type filename: str + :param filename: + The destination filename or path. + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + Keyword arguments "start" and "end" which are not supported and will + cause a ValueError if present. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + """ + + if download_kwargs is None: + download_kwargs = {} + if "start" in download_kwargs or "end" in download_kwargs: + raise ValueError( + "Download arguments 'start' and 'end' are not supported by download_chunks_concurrently." + ) + + # We must know the size and the generation of the blob. + if not blob.size or not blob.generation: + blob.reload() + + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + # Pickle the blob ahead of time (just once, not once per chunk) if needed. + maybe_pickled_blob = _pickle_blob(blob) if needs_pickling else blob + + futures = [] + + # Create and/or truncate the destination file to prepare for sparse writing. + with open(filename, "wb") as _: + pass + + with pool_class(max_workers=max_workers) as executor: + cursor = 0 + end = blob.size + while cursor < end: + start = cursor + cursor = min(cursor + chunk_size, end) + futures.append( + executor.submit( + _download_and_write_chunk_in_place, + maybe_pickled_blob, + filename, + start=start, + end=cursor - 1, + download_kwargs=download_kwargs, + ) + ) + + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + + # Raise any exceptions. Successful results can be ignored. + for future in futures: + future.result() + return None + + +def _download_and_write_chunk_in_place( + maybe_pickled_blob, filename, start, end, download_kwargs +): + if isinstance(maybe_pickled_blob, Blob): + blob = maybe_pickled_blob + else: + blob = pickle.loads(maybe_pickled_blob) + with open( + filename, "rb+" + ) as f: # Open in mixed read/write mode to avoid truncating or appending + f.seek(start) + return blob.download_to_file(f, start=start, end=end, **download_kwargs) + + +def _call_method_on_maybe_pickled_blob( + maybe_pickled_blob, method_name, *args, **kwargs +): + """Helper function that runs inside a thread or subprocess. + + `maybe_pickled_blob` is either a blob (for threads) or a specially pickled + blob (for processes) because the default pickling mangles clients which are + attached to blobs.""" + + if isinstance(maybe_pickled_blob, Blob): + blob = maybe_pickled_blob + else: + blob = pickle.loads(maybe_pickled_blob) + return getattr(blob, method_name)(*args, **kwargs) + + +def _reduce_client(cl): + """Replicate a Client by constructing a new one with the same params.""" + + client_object_id = id(cl) + project = cl.project + credentials = cl._credentials + _http = None # Can't carry this over + client_info = cl._initial_client_info + client_options = cl._initial_client_options + + return _LazyClient, ( + client_object_id, + project, + credentials, + _http, + client_info, + client_options, + ) + + +def _pickle_blob(blob): + """Pickle a Blob (and its Bucket and Client) and return a bytestring.""" + + # We need a custom pickler to process Client objects, which are attached to + # Buckets (and therefore to Blobs in turn). Unfortunately, the Python + # multiprocessing library doesn't seem to have a good way to use a custom + # pickler, and using copyreg will mutate global state and affect code + # outside of the client library. Instead, we'll pre-pickle the object and + # pass the bytestring in. + f = io.BytesIO() + p = pickle.Pickler(f) + p.dispatch_table = copyreg.dispatch_table.copy() + p.dispatch_table[Client] = _reduce_client + p.dump(blob) + return f.getvalue() + + +def _get_pool_class_and_requirements(worker_type): + """Returns the pool class, and whether the pool requires pickled Blobs.""" + + if worker_type == PROCESS: + # Use processes. Pickle blobs with custom logic to handle the client. + return (concurrent.futures.ProcessPoolExecutor, True) + elif worker_type == THREAD: + # Use threads. Pass blobs through unpickled. + return (concurrent.futures.ThreadPoolExecutor, False) + else: + raise ValueError( + "The worker_type must be google.cloud.storage.transfer_manager.PROCESS or google.cloud.storage.transfer_manager.THREAD" + ) + + +class _LazyClient: + """An object that will transform into either a cached or a new Client""" + + def __new__(cls, id, *args, **kwargs): + cached_client = _cached_clients.get(id) + if cached_client: + return cached_client + else: + cached_client = Client(*args, **kwargs) + _cached_clients[id] = cached_client + return cached_client diff --git a/noxfile.py b/noxfile.py index 522f826e9..3b67a5712 100644 --- a/noxfile.py +++ b/noxfile.py @@ -137,9 +137,8 @@ def system(session): session.skip("System tests were not found") # Use pre-release gRPC for system tests. - # TODO: Revert #845 once grpc issue fix is released. - # Pending grpc/grpc#30642 and grpc/grpc#30651. - session.install("--pre", "grpcio!=1.49.0rc1") + # TODO: Remove ban of 1.52.0rc1 once grpc/grpc#31885 is resolved. + session.install("--pre", "grpcio!=1.52.0rc1") # Install all test dependencies, then install this package into the # virtualenv's dist-packages. diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 0b639170d..bc7e0d31e 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -14,11 +14,15 @@ # limitations under the License. import tempfile +import os from google.cloud.storage import transfer_manager +from google.cloud.storage._helpers import _base64_md5hash from google.api_core import exceptions +DEADLINE = 30 + def test_upload_many(shared_bucket, file_data, blobs_to_delete): FILE_BLOB_PAIRS = [ @@ -26,7 +30,11 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete): (file_data["simple"]["path"], shared_bucket.blob("simple2")), ] - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + worker_type=transfer_manager.PROCESS, + deadline=DEADLINE, + ) assert results == [None, None] blobs = shared_bucket.list_blobs() @@ -36,13 +44,19 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete): assert len(blobs_to_delete) == 2 -def test_upload_many_with_file_objs(shared_bucket, file_data, blobs_to_delete): +def test_upload_many_with_threads_and_file_objs( + shared_bucket, file_data, blobs_to_delete +): FILE_BLOB_PAIRS = [ (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple1")), (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple2")), ] - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + worker_type=transfer_manager.THREAD, + deadline=DEADLINE, + ) assert results == [None, None] blobs = shared_bucket.list_blobs() @@ -61,7 +75,10 @@ def test_upload_many_skip_if_exists( ] results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + FILE_BLOB_PAIRS, + skip_if_exists=True, + raise_exception=True, + deadline=DEADLINE, ) assert isinstance(results[0], exceptions.PreconditionFailed) assert results[1] is None @@ -75,10 +92,82 @@ def test_upload_many_skip_if_exists( def test_download_many(listable_bucket): blobs = list(listable_bucket.list_blobs()) - tempfiles = [tempfile.TemporaryFile(), tempfile.TemporaryFile()] - BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) - - results = transfer_manager.download_many(BLOB_FILE_PAIRS) - assert results == [None, None] - for fp in tempfiles: - assert fp.tell() != 0 + with tempfile.TemporaryDirectory() as tempdir: + filenames = [ + os.path.join(tempdir, "file_a.txt"), + os.path.join(tempdir, "file_b.txt"), + ] + BLOB_FILE_PAIRS = zip(blobs[:2], filenames) + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + worker_type=transfer_manager.PROCESS, + deadline=DEADLINE, + ) + assert results == [None, None] + for count, filename in enumerate(filenames): + with open(filename, "rb") as fp: + assert len(fp.read()) == blobs[count].size + + +def test_download_many_with_threads_and_file_objs(listable_bucket): + blobs = list(listable_bucket.list_blobs()) + with tempfile.TemporaryFile() as file_a, tempfile.TemporaryFile() as file_b: + tempfiles = [file_a, file_b] + BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + worker_type=transfer_manager.THREAD, + deadline=DEADLINE, + ) + assert results == [None, None] + for fp in tempfiles: + assert fp.tell() != 0 + + +def test_download_chunks_concurrently(shared_bucket, file_data): + # Upload a big file + source_file = file_data["big"] + upload_blob = shared_bucket.blob("chunky_file") + upload_blob.upload_from_filename(source_file["path"]) + upload_blob.reload() + size = upload_blob.size + chunk_size = size // 32 + + # Get a fresh blob obj w/o metadata for testing purposes + download_blob = shared_bucket.blob("chunky_file") + + with tempfile.TemporaryDirectory() as tempdir: + full_filename = os.path.join(tempdir, "chunky_file_1") + transfer_manager.download_chunks_concurrently( + download_blob, + full_filename, + chunk_size=chunk_size, + deadline=DEADLINE, + ) + with open(full_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] + + # Now test for case where last chunk is exactly 1 byte. + trailing_chunk_filename = os.path.join(tempdir, "chunky_file_2") + transfer_manager.download_chunks_concurrently( + download_blob, + trailing_chunk_filename, + chunk_size=size - 1, + deadline=DEADLINE, + ) + with open(trailing_chunk_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] + + # Also test threaded mode. + threaded_filename = os.path.join(tempdir, "chunky_file_3") + transfer_manager.download_chunks_concurrently( + download_blob, + threaded_filename, + chunk_size=chunk_size, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + ) + with open(threaded_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index f52d5471b..bdfd236b5 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -17,258 +17,483 @@ with pytest.warns(UserWarning): from google.cloud.storage import transfer_manager +from google.cloud.storage import Blob + from google.api_core import exceptions import os import tempfile -import unittest import mock - - -class Test_Transfer_Manager(unittest.TestCase): - def test_upload_many_with_filenames(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} - FAKE_RESULT = "nothing to see here" - - for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_filename.return_value = FAKE_RESULT - - results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS +import pickle + +BLOB_TOKEN_STRING = "blob token" +FAKE_CONTENT_TYPE = "text/fake" +UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} +FAKE_RESULT = "nothing to see here" +FAKE_ENCODING = "fake_gzip" +DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} +CHUNK_SIZE = 8 + + +# Used in subprocesses only, so excluded from coverage +def _validate_blob_token_in_subprocess( + maybe_pickled_blob, method_name, path_or_file, **kwargs +): # pragma: NO COVER + assert pickle.loads(maybe_pickled_blob) == BLOB_TOKEN_STRING + assert method_name.endswith("filename") + assert path_or_file.startswith("file") + assert kwargs == UPLOAD_KWARGS or kwargs == DOWNLOAD_KWARGS + return FAKE_RESULT + + +def test_upload_many_with_filenames(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_filename.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (filename, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.assert_any_call( + filename, **EXPECTED_UPLOAD_KWARGS ) - for (filename, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.assert_any_call( - filename, **EXPECTED_UPLOAD_KWARGS - ) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_upload_many_with_file_objs(self): - FILE_BLOB_PAIRS = [ - (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()), - ] - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} - FAKE_RESULT = "nothing to see here" - - for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_file.return_value = FAKE_RESULT - - results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_with_file_objs(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_file.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (file, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_passes_concurrency_options(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, ) - for (file, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_upload_many_passes_concurrency_options(self): - FILE_BLOB_PAIRS = [ - (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()), - ] - MAX_WORKERS = 7 - DEADLINE = 10 - with mock.patch( - "concurrent.futures.ThreadPoolExecutor" - ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_threads_deprecation_with_upload(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + with pytest.warns(): transfer_manager.upload_many( - FILE_BLOB_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE - ) - pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with( - mock.ANY, timeout=DEADLINE, return_when=mock.ANY + FILE_BLOB_PAIRS, deadline=DEADLINE, threads=MAX_WORKERS ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_threads_deprecation_conflict_with_upload(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with pytest.raises(ValueError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + deadline=DEADLINE, + threads=5, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, + ) - def test_upload_many_suppresses_exceptions(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() - - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) - for result in results: - self.assertEqual(type(result), ConnectionError) - - def test_upload_many_raises_exceptions(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() - with self.assertRaises(ConnectionError): - transfer_manager.upload_many(FILE_BLOB_PAIRS, raise_exception=True) +def test_upload_many_suppresses_exceptions(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, worker_type=transfer_manager.THREAD + ) + for result in results: + assert isinstance(result, ConnectionError) + + +def test_upload_many_raises_exceptions(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + with pytest.raises(ConnectionError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, raise_exception=True, worker_type=transfer_manager.THREAD + ) - def test_upload_many_suppresses_412_with_skip_if_exists(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( - "412" - ) +def test_upload_many_suppresses_412_with_skip_if_exists(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( + "412" + ) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + raise_exception=True, + worker_type=transfer_manager.THREAD, + ) + for result in results: + assert type(result) == exceptions.PreconditionFailed + + +def test_upload_many_with_processes(): + # Mocks are not pickleable, so we send token strings over the wire. + FILE_BLOB_PAIRS = [ + ("file_a.txt", BLOB_TOKEN_STRING), + ("file_b.txt", BLOB_TOKEN_STRING), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + FILE_BLOB_PAIRS, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + raise_exception=True, ) - for result in results: - self.assertEqual(type(result), exceptions.PreconditionFailed) - - def test_download_many_with_filenames(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - FAKE_ENCODING = "fake_gzip" - DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} - FAKE_RESULT = "nothing to see here" + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_with_processes_rejects_file_obj(): + # Mocks are not pickleable, so we send token strings over the wire. + FILE_BLOB_PAIRS = [ + ("file_a.txt", BLOB_TOKEN_STRING), + (tempfile.TemporaryFile(), BLOB_TOKEN_STRING), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): + with pytest.raises(ValueError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + ) - for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_filename.return_value = FAKE_RESULT - results = transfer_manager.download_many( - BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS +def test_download_many_with_filenames(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_filename.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_with_file_objs(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + ] + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_file.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_passes_concurrency_options(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + transfer_manager.download_many( + BLOB_FILE_PAIRS, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, ) - for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_download_many_with_file_objs(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()), - ] - FAKE_ENCODING = "fake_gzip" - DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} - FAKE_RESULT = "nothing to see here" + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_download_many_suppresses_exceptions(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, worker_type=transfer_manager.THREAD + ) + for result in results: + assert isinstance(result, ConnectionError) + + +def test_download_many_raises_exceptions(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + with pytest.raises(ConnectionError): + transfer_manager.download_many( + BLOB_FILE_PAIRS, raise_exception=True, worker_type=transfer_manager.THREAD + ) + - for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_file.return_value = FAKE_RESULT +def test_download_many_with_processes(): + # Mocks are not pickleable, so we send token strings over the wire. + BLOB_FILE_PAIRS = [ + (BLOB_TOKEN_STRING, "file_a.txt"), + (BLOB_TOKEN_STRING, "file_b.txt"), + ] + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): results = transfer_manager.download_many( - BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, ) - for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_download_many_passes_concurrency_options(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()), - ] - MAX_WORKERS = 7 - DEADLINE = 10 - with mock.patch( - "concurrent.futures.ThreadPoolExecutor" - ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_with_processes_rejects_file_obj(): + # Mocks are not pickleable, so we send token strings over the wire. + BLOB_FILE_PAIRS = [ + (BLOB_TOKEN_STRING, "file_a.txt"), + (BLOB_TOKEN_STRING, tempfile.TemporaryFile()), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): + with pytest.raises(ValueError): transfer_manager.download_many( - BLOB_FILE_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE - ) - pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with( - mock.ANY, timeout=DEADLINE, return_when=mock.ANY + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, ) - def test_download_many_suppresses_exceptions(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() - - results = transfer_manager.download_many(BLOB_FILE_PAIRS) - for result in results: - self.assertEqual(type(result), ConnectionError) - - def test_download_many_raises_exceptions(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() - - transfer_manager.download_many(BLOB_FILE_PAIRS) - with self.assertRaises(ConnectionError): - transfer_manager.download_many(BLOB_FILE_PAIRS, raise_exception=True) - - def test_upload_many_from_filenames(self): - bucket = mock.Mock() - - FILENAMES = ["file_a.txt", "file_b.txt"] - ROOT = "mypath/" - PREFIX = "myprefix/" - KEY_NAME = "keyname" - BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} - UPLOAD_KWARGS = {"content-type": "text/fake"} - MAX_WORKERS = 7 - DEADLINE = 10 - - EXPECTED_FILE_BLOB_PAIRS = [ - (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES - ] - - with mock.patch( - "google.cloud.storage.transfer_manager.upload_many" - ) as mock_upload_many: - transfer_manager.upload_many_from_filenames( - bucket, - FILENAMES, - source_directory=ROOT, - blob_name_prefix=PREFIX, - skip_if_exists=True, - blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, - upload_kwargs=UPLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, - raise_exception=True, - ) - mock_upload_many.assert_called_once_with( - EXPECTED_FILE_BLOB_PAIRS, +def test_upload_many_from_filenames(): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + ROOT = "mypath/" + PREFIX = "myprefix/" + KEY_NAME = "keyname" + BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} + UPLOAD_KWARGS = {"content-type": "text/fake"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + EXPECTED_FILE_BLOB_PAIRS = [ + (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + source_directory=ROOT, + blob_name_prefix=PREFIX, skip_if_exists=True, + blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, upload_kwargs=UPLOAD_KWARGS, - threads=MAX_WORKERS, deadline=DEADLINE, raise_exception=True, + worker_type=WORKER_TYPE, + max_workers=MAX_WORKERS, ) - bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) - bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) - def test_upload_many_from_filenames_minimal_args(self): - bucket = mock.Mock() + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + worker_type=WORKER_TYPE, + max_workers=MAX_WORKERS, + ) + bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) + bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) - FILENAMES = ["file_a.txt", "file_b.txt"] - EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] +def test_upload_many_from_filenames_minimal_args(): + bucket = mock.Mock() - with mock.patch( - "google.cloud.storage.transfer_manager.upload_many" - ) as mock_upload_many: - transfer_manager.upload_many_from_filenames( - bucket, - FILENAMES, - ) + FILENAMES = ["file_a.txt", "file_b.txt"] - mock_upload_many.assert_called_once_with( - EXPECTED_FILE_BLOB_PAIRS, - skip_if_exists=False, - upload_kwargs=None, - threads=4, - deadline=None, - raise_exception=False, + EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, ) - bucket.blob.assert_any_call(FILENAMES[0]) - bucket.blob.assert_any_call(FILENAMES[1]) - def test_download_many_to_path(self): - bucket = mock.Mock() + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=False, + upload_kwargs=None, + deadline=None, + raise_exception=False, + worker_type=transfer_manager.PROCESS, + max_workers=8, + ) + bucket.blob.assert_any_call(FILENAMES[0]) + bucket.blob.assert_any_call(FILENAMES[1]) + + +def test_download_many_to_path(): + bucket = mock.Mock() + + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + ) - BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] - PATH_ROOT = "mypath/" - BLOB_NAME_PREFIX = "myprefix/" - DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} - MAX_WORKERS = 7 - DEADLINE = 10 + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + + +def test_download_many_to_path_creates_directories(): + bucket = mock.Mock() + + with tempfile.TemporaryDirectory() as tempdir: + DIR_NAME = "dir_a/dir_b" + BLOBNAMES = [ + "file_a.txt", + "file_b.txt", + os.path.join(DIR_NAME, "file_c.txt"), + ] EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES ] with mock.patch( @@ -277,59 +502,214 @@ def test_download_many_to_path(self): transfer_manager.download_many_to_path( bucket, BLOBNAMES, - destination_directory=PATH_ROOT, - blob_name_prefix=BLOB_NAME_PREFIX, - download_kwargs=DOWNLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, - create_directories=False, + destination_directory=tempdir, + create_directories=True, raise_exception=True, ) mock_download_many.assert_called_once_with( EXPECTED_BLOB_FILE_PAIRS, - download_kwargs=DOWNLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, + download_kwargs=None, + deadline=None, raise_exception=True, + worker_type=transfer_manager.PROCESS, + max_workers=8, ) for blobname in BLOBNAMES: - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) - - def test_download_many_to_path_creates_directories(self): - bucket = mock.Mock() - - with tempfile.TemporaryDirectory() as tempdir: - DIR_NAME = "dir_a/dir_b" - BLOBNAMES = [ - "file_a.txt", - "file_b.txt", - os.path.join(DIR_NAME, "file_c.txt"), - ] - - EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES - ] - - with mock.patch( - "google.cloud.storage.transfer_manager.download_many" - ) as mock_download_many: - transfer_manager.download_many_to_path( - bucket, - BLOBNAMES, - destination_directory=tempdir, - create_directories=True, - raise_exception=True, - ) - - mock_download_many.assert_called_once_with( - EXPECTED_BLOB_FILE_PAIRS, - download_kwargs=None, - threads=4, - deadline=None, - raise_exception=True, + bucket.blob.assert_any_call(blobname) + + assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) + + +def test_download_chunks_concurrently(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + blob_mock.download_to_filename.return_value = FAKE_RESULT + + with mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for x in range(MULTIPLE): + blob_mock.download_to_file.assert_any_call( + mock.ANY, + **DOWNLOAD_KWARGS, + start=x * CHUNK_SIZE, + end=((x + 1) * CHUNK_SIZE) - 1 + ) + assert blob_mock.download_to_file.call_count == 4 + assert result is None + + +def test_download_chunks_concurrently_raises_on_start_and_end(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + with mock.patch("__main__.open", mock.mock_open()): + with pytest.raises(ValueError): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + worker_type=transfer_manager.THREAD, + download_kwargs={ + "start": CHUNK_SIZE, + }, ) - for blobname in BLOBNAMES: - bucket.blob.assert_any_call(blobname) + with pytest.raises(ValueError): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + worker_type=transfer_manager.THREAD, + download_kwargs={ + "end": (CHUNK_SIZE * (MULTIPLE - 1)) - 1, + }, + ) + + +def test_download_chunks_concurrently_passes_concurrency_options(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MAX_WORKERS = 7 + DEADLINE = 10 + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch, mock.patch("__main__.open", mock.mock_open()): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, + ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +class _PickleableMockBlob: + def __init__( + self, + name="", + size=None, + generation=None, + size_after_reload=None, + generation_after_reload=None, + ): + self.name = name + self.size = size + self.generation = generation + self._size_after_reload = size_after_reload + self._generation_after_reload = generation_after_reload + + def reload(self): + self.size = self._size_after_reload + self.generation = self._generation_after_reload + + def download_to_file(self, *args, **kwargs): + return "SUCCESS" + + +# Used in subprocesses only, so excluded from coverage +def _validate_blob_token_in_subprocess_for_chunk( + maybe_pickled_blob, filename, **kwargs +): # pragma: NO COVER + blob = pickle.loads(maybe_pickled_blob) + assert isinstance(blob, _PickleableMockBlob) + assert filename.startswith("file") + return FAKE_RESULT + + +def test_download_chunks_concurrently_with_processes(): + blob = _PickleableMockBlob( + "file_a_blob", size_after_reload=24, generation_after_reload=100 + ) + FILENAME = "file_a.txt" + + with mock.patch( + "google.cloud.storage.transfer_manager._download_and_write_chunk_in_place", + new=_validate_blob_token_in_subprocess_for_chunk, + ), mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager.download_chunks_concurrently( + blob, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + ) + assert result is None + + +def test__LazyClient(): + fake_cache = {} + MOCK_ID = 9999 + with mock.patch( + "google.cloud.storage.transfer_manager._cached_clients", new=fake_cache + ), mock.patch("google.cloud.storage.transfer_manager.Client"): + lazyclient = transfer_manager._LazyClient(MOCK_ID) + lazyclient_cached = transfer_manager._LazyClient(MOCK_ID) + assert lazyclient is lazyclient_cached + assert len(fake_cache) == 1 + + +def test__pickle_blob(): + # This test nominally has coverage, but doesn't assert that the essential + # copyreg behavior in _pickle_blob works. Unfortunately there doesn't seem + # to be a good way to check that without actually creating a Client, which + # will spin up HTTP connections undesirably. This is more fully checked in + # the system tests, though. + pkl = transfer_manager._pickle_blob(FAKE_RESULT) + assert pickle.loads(pkl) == FAKE_RESULT + + +def test__download_and_write_chunk_in_place(): + pickled_mock = pickle.dumps(_PickleableMockBlob()) + FILENAME = "file_a.txt" + with mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager._download_and_write_chunk_in_place( + pickled_mock, FILENAME, 0, 8, {} + ) + assert result == "SUCCESS" + + +def test__get_pool_class_and_requirements_error(): + with pytest.raises(ValueError): + transfer_manager._get_pool_class_and_requirements("garbage") + + +def test__reduce_client(): + fake_cache = {} + client = mock.Mock() + + with mock.patch( + "google.cloud.storage.transfer_manager._cached_clients", new=fake_cache + ), mock.patch("google.cloud.storage.transfer_manager.Client"): + transfer_manager._reduce_client(client) + + +def test__call_method_on_maybe_pickled_blob(): + blob = mock.Mock(spec=Blob) + blob.download_to_file.return_value = "SUCCESS" + result = transfer_manager._call_method_on_maybe_pickled_blob( + blob, "download_to_file" + ) + assert result == "SUCCESS" - assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) + pickled_blob = pickle.dumps(_PickleableMockBlob()) + result = transfer_manager._call_method_on_maybe_pickled_blob( + pickled_blob, "download_to_file" + ) + assert result == "SUCCESS"