From a38c3965487da32903870b48a6dfc92eebc85a1f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 18:47:05 +0000 Subject: [PATCH 01/19] feat(zb-experimental): add async write object stream --- .../asyncio/async_write_object_stream.py | 113 +++++++++++++++++ .../asyncio/test_async_write_object_stream.py | 117 ++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_write_object_stream.py create mode 100644 tests/unit/asyncio/test_async_write_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py new file mode 100644 index 000000000..ef19463e1 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -0,0 +1,113 @@ +# experimental poc - chandrasiri +import asyncio +from typing import Optional +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) +from google.api_core.bidi_async import AsyncBidiRpc + + +class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): + """Class representing a gRPC bidi-stream for writing data from a GCS + ``Appendable Object``. + + This class provides a unix socket-like interface to a GCS ``Object``, with + methods like ``open``, ``close``, ``send``, and ``recv``. + + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS ``bucket`` containing the object. + + :type object_name: str + :param object_name: The name of the GCS ``Appendable Object`` to be write. + + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. If None, a new object is created. + + :type write_handle: bytes + :param write_handle: (Optional) An existing handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation_number: Optional[int] = None, # None means new object + write_handle: Optional[bytes] = None, + ) -> None: + if client is None: + raise ValueError("client must be provided") + if bucket_name is None: + raise ValueError("bucket_name must be provided") + if object_name is None: + raise ValueError("object_name must be provided") + + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client: AsyncGrpcClient.grpc_client = client + self.write_handle: Optional[bytes] = write_handle + + self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" + + self.rpc = self.client._client._transport._wrapped_methods[ + self.client._client._transport.bidi_write_object + ] + + self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),) + self.socket_like_rpc: Optional[AsyncBidiRpc] = None + self._is_stream_open: bool = False + self.first_bidi_write_req = None + self.persisted_size = 0 + self.object_resource: Optional[_storage_v2.Object] = None + + async def open(self) -> None: + """Opening an object for write , should do it's state lookup + to know what's the persisted size is. + """ + raise NotImplementedError( + "open() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def close(self) -> None: + """Closes the bidi-gRPC connection.""" + raise NotImplementedError( + "close() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def send( + self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest + ) -> None: + """Sends a request message on the stream. + + Args: + bidi_write_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`): + The request message to send. This is typically used to specify + the read offset and limit. + """ + raise NotImplementedError( + "send() is not implemented yet in _AsyncWriteObjectStream" + ) + + async def recv(self) -> _storage_v2.BidiWriteObjectResponse: + """Receives a response from the stream. + + This method waits for the next message from the server, which could + contain object data or metadata. + + Returns: + :class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`: + The response message from the server. + """ + raise NotImplementedError( + "recv() is not implemented yet in _AsyncWriteObjectStream" + ) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py new file mode 100644 index 000000000..3e7c25d36 --- /dev/null +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -0,0 +1,117 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( + _AsyncWriteObjectStream, +) +from google.cloud import _storage_v2 + +BUCKET = "my-bucket" +OBJECT = "my-object" + + +@pytest.fixture +def mock_client(): + """Mock the async gRPC client.""" + mock_transport = mock.AsyncMock() + mock_transport.bidi_write_object = mock.sentinel.bidi_write_object + mock_transport._wrapped_methods = { + mock.sentinel.bidi_write_object: mock.sentinel.wrapped_bidi_write_object + } + + mock_gapic_client = mock.AsyncMock() + mock_gapic_client._transport = mock_transport + + client = mock.AsyncMock() + client._client = mock_gapic_client + return client + + +def test_async_write_object_stream_init(mock_client): + """Test the constructor of _AsyncWriteObjectStream.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + assert stream.client == mock_client + assert stream.bucket_name == BUCKET + assert stream.object_name == OBJECT + assert stream.generation_number is None + assert stream.write_handle is None + assert stream._full_bucket_name == f"projects/_/buckets/{BUCKET}" + assert stream.rpc == mock.sentinel.wrapped_bidi_write_object + assert stream.metadata == ( + ("x-goog-request-params", f"bucket=projects/_/buckets/{BUCKET}"), + ) + assert stream.socket_like_rpc is None + assert not stream._is_stream_open + assert stream.first_bidi_write_req is None + assert stream.persisted_size == 0 + assert stream.object_resource is None + + +def test_async_write_object_stream_init_with_generation_and_handle(mock_client): + """Test the constructor with optional arguments.""" + generation = 12345 + write_handle = b"test-handle" + stream = _AsyncWriteObjectStream( + mock_client, + BUCKET, + OBJECT, + generation_number=generation, + write_handle=write_handle, + ) + + assert stream.generation_number == generation + assert stream.write_handle == write_handle + + +def test_async_write_object_stream_init_raises_value_error(): + """Test that the constructor raises ValueError for missing arguments.""" + with pytest.raises(ValueError, match="client must be provided"): + _AsyncWriteObjectStream(None, BUCKET, OBJECT) + + with pytest.raises(ValueError, match="bucket_name must be provided"): + _AsyncWriteObjectStream(mock.Mock(), None, OBJECT) + + with pytest.raises(ValueError, match="object_name must be provided"): + _AsyncWriteObjectStream(mock.Mock(), BUCKET, None) + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that unimplemented methods raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + with pytest.raises(NotImplementedError): + await stream.open() + + with pytest.raises(NotImplementedError): + await stream.close() + + with pytest.raises(NotImplementedError): + await stream.send(_storage_v2.BidiWriteObjectRequest()) + + with pytest.raises(NotImplementedError): + await stream.recv() + + +def test_add_to_sources_txt(): + """ + This is a dummy test to ensure that the new test file is included in the + package's source file list. This is necessary for some build and packaging + tools to recognize the file. + """ + pass From aaee2f331f051e6577a4240d1bc9662874a7689b Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 19:15:26 +0000 Subject: [PATCH 02/19] remove unused import and add license info --- .../asyncio/async_write_object_stream.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index ef19463e1..07263ddd8 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -1,5 +1,26 @@ -# experimental poc - chandrasiri -import asyncio +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM(Technical Account Manager) +if you want to use these Rapid Storage APIs. + +""" from typing import Optional from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient From 4db8bf1e0b47f763ad5e95234d7f4735c775dc84 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 13 Nov 2025 19:16:55 +0000 Subject: [PATCH 03/19] remove unwated test --- tests/unit/asyncio/test_async_write_object_stream.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 3e7c25d36..9834b79c9 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -106,12 +106,3 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await stream.recv() - - -def test_add_to_sources_txt(): - """ - This is a dummy test to ensure that the new test file is included in the - package's source file list. This is necessary for some build and packaging - tools to recognize the file. - """ - pass From 8dbd158f8d75a66330521938d8938453f2e97bdf Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 05:13:06 +0000 Subject: [PATCH 04/19] feat(zb-experimental): implement "open" for write_object_stream --- .../asyncio/async_write_object_stream.py | 48 ++++++- .../asyncio/test_async_write_object_stream.py | 131 +++++++++++++++++- 2 files changed, 174 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 07263ddd8..7386747eb 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -95,10 +95,50 @@ async def open(self) -> None: """Opening an object for write , should do it's state lookup to know what's the persisted size is. """ - raise NotImplementedError( - "open() is not implemented yet in _AsyncWriteObjectStream" + if self._is_stream_open: + raise ValueError("Stream is already open") + + # Create a new object or overwrite existing one if generation_number + # is None. This makes it consistent with GCS JSON API behavior. + # Created object type would be Appendable Object. + if self.generation_number is None: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + write_object_spec=_storage_v2.WriteObjectSpec( + resource=_storage_v2.Object( + name=self.object_name, bucket=self._full_bucket_name + ), + appendable=True, + ), + ) + else: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + append_object_spec=_storage_v2.AppendObjectSpec( + bucket=self._full_bucket_name, + object=self.object_name, + generation=self.generation_number, + ), + state_lookup=True, + ) + + self.socket_like_rpc = AsyncBidiRpc( + self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata ) + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + self._is_stream_open = True + + if not response.resource and not response.resource.generation: + raise ValueError( + "Failed to obtain object generation after opening the stream" + ) + self.generation_number = response.resource.generation + + if not response.write_handle: + raise ValueError("Failed to obtain write_handle after opening the stream") + + self.write_handle = response.write_handle + async def close(self) -> None: """Closes the bidi-gRPC connection.""" raise NotImplementedError( @@ -132,3 +172,7 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: raise NotImplementedError( "recv() is not implemented yet in _AsyncWriteObjectStream" ) + + @property + def is_stream_open(self) -> bool: + return self._is_stream_open diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 9834b79c9..60d47cbcf 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import pytest from unittest import mock @@ -22,6 +23,8 @@ BUCKET = "my-bucket" OBJECT = "my-object" +GENERATION = 12345 +WRITE_HANDLE = b"test-handle" @pytest.fixture @@ -91,13 +94,135 @@ def test_async_write_object_stream_init_raises_value_error(): @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that unimplemented methods raise NotImplementedError.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for a new object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for an existing object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream( + mock_client, BUCKET, OBJECT, generation_number=GENERATION + ) + + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client): + """Test that opening an already open stream raises a ValueError.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + await stream.open() + + # Act & Assert + with pytest.raises(ValueError, match="Stream is already open"): await stream.open() + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_generation( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if generation is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=None) + ) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Failed to obtain object generation"): + await stream.open() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_write_handle( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if write_handle is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + resource=_storage_v2.Object(generation=GENERATION), write_handle=None + ) + ) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Failed to obtain write_handle"): + await stream.open() + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that unimplemented methods raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(NotImplementedError): await stream.close() From 03f1fdea29adb9ac801ddcf50d71b84bd534c079 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 06:00:05 +0000 Subject: [PATCH 05/19] remove unused import --- tests/unit/asyncio/test_async_write_object_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 60d47cbcf..310b45519 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import pytest from unittest import mock From f0d3439291dab008a086e0980c2f4556ed0d4395 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 07:41:46 +0000 Subject: [PATCH 06/19] feat(zb-experimental): implement close --- .../asyncio/async_write_object_stream.py | 7 +- .../asyncio/test_async_write_object_stream.py | 68 +++++++++++++++++-- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 7386747eb..463e6796a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -141,9 +141,10 @@ async def open(self) -> None: async def close(self) -> None: """Closes the bidi-gRPC connection.""" - raise NotImplementedError( - "close() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.close() + self._is_stream_open = False async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 310b45519..6c403683f 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -15,6 +15,7 @@ import pytest from unittest import mock +from unittest.mock import AsyncMock from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, ) @@ -43,6 +44,27 @@ def mock_client(): return client +async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True): + """Helper to create an instance of _AsyncWriteObjectStream and open it by default.""" + socket_like_rpc = AsyncMock() + mock_cls_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = AsyncMock() + socket_like_rpc.close = AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = AsyncMock(return_value=mock_response) + + write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + if open: + await write_obj_stream.open() + + return write_obj_stream + + def test_async_write_object_stream_init(mock_client): """Test the constructor of _AsyncWriteObjectStream.""" stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -218,13 +240,49 @@ async def test_open_raises_error_on_missing_write_handle( @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that unimplemented methods raise NotImplementedError.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close(mock_cls_async_bidi_rpc, mock_client): + """Test that close successfully closes the stream.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) - with pytest.raises(NotImplementedError): - await stream.close() + # Act + await write_obj_stream.close() + # Assert + write_obj_stream.socket_like_rpc.close.assert_called_once() + assert not write_obj_stream.is_stream_open + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that closing a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.close() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client): + """Test that unimplemented methods (send, recv) raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) with pytest.raises(NotImplementedError): await stream.send(_storage_v2.BidiWriteObjectRequest()) From e3b6f9e1cd90c0e8be4c76b9ace032d671e9af3d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 08:03:46 +0000 Subject: [PATCH 07/19] feat(zb-experimental): implement send & recv --- .../asyncio/async_write_object_stream.py | 12 +-- .../asyncio/test_async_write_object_stream.py | 81 +++++++++++++++++-- 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 463e6796a..fb3957a15 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -156,9 +156,9 @@ async def send( The request message to send. This is typically used to specify the read offset and limit. """ - raise NotImplementedError( - "send() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.send(bidi_write_object_request) async def recv(self) -> _storage_v2.BidiWriteObjectResponse: """Receives a response from the stream. @@ -170,9 +170,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: :class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`: The response message from the server. """ - raise NotImplementedError( - "recv() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + return await self.socket_like_rpc.recv() @property def is_stream_open(self) -> bool: diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 6c403683f..1f6aff2de 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -49,6 +49,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope socket_like_rpc = AsyncMock() mock_cls_async_bidi_rpc.return_value = socket_like_rpc socket_like_rpc.open = AsyncMock() + socket_like_rpc.send = AsyncMock() socket_like_rpc.close = AsyncMock() mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) @@ -280,11 +281,77 @@ async def test_close_without_open_should_raise_error( @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) -async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client): - """Test that unimplemented methods (send, recv) raise NotImplementedError.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await stream.send(_storage_v2.BidiWriteObjectRequest()) +async def test_send(mock_cls_async_bidi_rpc, mock_client): + """Test that send calls the underlying rpc's send method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + + # Act + bidi_write_object_request = _storage_v2.BidiWriteObjectRequest() + await write_obj_stream.send(bidi_write_object_request) + + # Assert + write_obj_stream.socket_like_rpc.send.assert_called_once_with( + bidi_write_object_request + ) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_send_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that sending on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest()) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv(mock_cls_async_bidi_rpc, mock_client): + """Test that recv calls the underlying rpc's recv method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + bidi_write_object_response = _storage_v2.BidiWriteObjectResponse() + write_obj_stream.socket_like_rpc.recv = AsyncMock( + return_value=bidi_write_object_response + ) + + # Act + response = await write_obj_stream.recv() + + # Assert + write_obj_stream.socket_like_rpc.recv.assert_called_once() + assert response == bidi_write_object_response + - with pytest.raises(NotImplementedError): - await stream.recv() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that receiving on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.recv() From b24535fa416bbbfcf10b072728c062450cadaaa3 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 09:14:56 +0000 Subject: [PATCH 08/19] feat(zb-experimental): Add Async_appendable_object_writer.py --- .../asyncio/async_appendable_object_writer.py | 79 ++++++++++++ .../test_async_appendable_object_writer.py | 115 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py create mode 100644 tests/unit/asyncio/test_async_appendable_object_writer.py diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py new file mode 100644 index 000000000..d78c6d3fe --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -0,0 +1,79 @@ +from typing import Optional +from google.cloud.storage._experimental.asyncio.async_grpc_client import ( + AsyncGrpcClient, +) +from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( + _AsyncWriteObjectStream, +) + + +class AsyncAppendableObjectWriter: + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation=None, + write_handle=None, + ): + self.client = client + self.bucket_name = bucket_name + self.object_name = object_name + self.write_handle = write_handle + self.generation = generation + + self.write_obj_stream = _AsyncWriteObjectStream( + client=self.client, + bucket_name=self.bucket_name, + object_name=self.object_name, + generation_number=self.generation, + write_handle=self.write_handle, + ) + self._is_stream_open: bool = False + self.offset: Optional[int] = None + self.persisted_size: Optional[int] = None + + async def state_lookup(self): + """Returns the persisted_size.""" + raise NotImplementedError("state_lookup is not implemented yet.") + + async def open(self) -> None: + """Opens the underlying bidi-gRPC stream.""" + raise NotImplementedError("open is not implemented yet.") + + async def append(self, data: bytes): + raise NotImplementedError("append is not implemented yet.") + + async def flush(self) -> int: + """Returns persisted_size""" + raise NotImplementedError("flush is not implemented yet.") + + async def close(self, finalize_on_close=False) -> int: + """Returns persisted_size""" + raise NotImplementedError("close is not implemented yet.") + + async def finalize(self) -> int: + """Returns persisted_size + Note: Once finalized no more data can be appended. + """ + raise NotImplementedError("finalize is not implemented yet.") + + # helper methods. + async def append_from_string(self, data: str): + """ + str data will be encoded to bytes using utf-8 encoding calling + + self.append(data.encode("utf-8")) + """ + raise NotImplementedError("append_from_string is not implemented yet.") + + async def append_from_stream(self, stream_obj): + """ + At a time read a chunk of data (16MiB) from `stream_obj` + and call self.append(chunk) + """ + raise NotImplementedError("append_from_stream is not implemented yet.") + + async def append_from_file(self, file_path: str): + """Create a file object from `file_path` and call append_from_stream(file_obj)""" + raise NotImplementedError("append_from_file is not implemented yet.") diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py new file mode 100644 index 000000000..cf838ac05 --- /dev/null +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -0,0 +1,115 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +BUCKET = "test-bucket" +OBJECT = "test-object" +GENERATION = 123 +WRITE_HANDLE = b"test-write-handle" + + +@pytest.fixture +def mock_client(): + """Mock the async gRPC client.""" + return mock.AsyncMock() + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init(mock_write_object_stream, mock_client): + """Test the constructor of AsyncAppendableObjectWriter.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + assert writer.client == mock_client + assert writer.bucket_name == BUCKET + assert writer.object_name == OBJECT + assert writer.generation is None + assert writer.write_handle is None + assert not writer._is_stream_open + assert writer.offset is None + assert writer.persisted_size is None + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=None, + write_handle=None, + ) + assert writer.write_obj_stream == mock_write_object_stream.return_value + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init_with_optional_args(mock_write_object_stream, mock_client): + """Test the constructor with optional arguments.""" + writer = AsyncAppendableObjectWriter( + mock_client, + BUCKET, + OBJECT, + generation=GENERATION, + write_handle=WRITE_HANDLE, + ) + + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=GENERATION, + write_handle=WRITE_HANDLE, + ) + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that all currently unimplemented methods raise NotImplementedError.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + with pytest.raises(NotImplementedError): + await writer.state_lookup() + + with pytest.raises(NotImplementedError): + await writer.open() + + with pytest.raises(NotImplementedError): + await writer.append(b"data") + + with pytest.raises(NotImplementedError): + await writer.flush() + + with pytest.raises(NotImplementedError): + await writer.close() + + with pytest.raises(NotImplementedError): + await writer.finalize() + + with pytest.raises(NotImplementedError): + await writer.append_from_string("data") + + with pytest.raises(NotImplementedError): + await writer.append_from_stream(mock.Mock()) + + with pytest.raises(NotImplementedError): + await writer.append_from_file("file.txt") From eae1b36d7fcc0249bdc85c20f45e1c5f856e4e2d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 09:24:41 +0000 Subject: [PATCH 09/19] feat(zb-experimental): implement state_lookup --- .../asyncio/async_appendable_object_writer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index d78c6d3fe..2bf93bcc8 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -1,4 +1,5 @@ from typing import Optional +from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import ( AsyncGrpcClient, ) @@ -35,7 +36,12 @@ def __init__( async def state_lookup(self): """Returns the persisted_size.""" - raise NotImplementedError("state_lookup is not implemented yet.") + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + state_lookup=True, + ) + ) + return await self.write_obj_stream.recv() async def open(self) -> None: """Opens the underlying bidi-gRPC stream.""" From 788848a4cc51c8df11a4ee91968b5574dca57b4a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 09:26:10 +0000 Subject: [PATCH 10/19] implement tests for state_lookup --- .../test_async_appendable_object_writer.py | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index cf838ac05..99e77f769 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -18,6 +18,8 @@ from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) +from google.cloud import _storage_v2 + BUCKET = "test-bucket" OBJECT = "test-object" @@ -82,14 +84,34 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client): ) +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_state_lookup(mock_write_object_stream, mock_client): + """Test state_lookup method.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock(return_value=mock.sentinel.response) + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + + # Act + response = await writer.state_lookup() + + # Assert + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert response == mock.sentinel.response + + @pytest.mark.asyncio async def test_unimplemented_methods_raise_error(mock_client): """Test that all currently unimplemented methods raise NotImplementedError.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await writer.state_lookup() - with pytest.raises(NotImplementedError): await writer.open() From 07c9b441dbf10caf24146683c67d8b53f43778c0 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 09:40:27 +0000 Subject: [PATCH 11/19] feat(zb-experimental): implement open in writer --- .../asyncio/async_appendable_object_writer.py | 13 ++++- .../test_async_appendable_object_writer.py | 55 ++++++++++++++++++- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 2bf93bcc8..21b65d097 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -45,7 +45,18 @@ async def state_lookup(self): async def open(self) -> None: """Opens the underlying bidi-gRPC stream.""" - raise NotImplementedError("open is not implemented yet.") + if self._is_stream_open: + raise ValueError("Underlying bidi-gRPC stream is already open") + + await self.write_obj_stream.open() + self._is_stream_open = True + if self.generation is None: + self.generation = self.write_obj_stream.generation_number + self.write_handle = self.write_obj_stream.write_handle + + # send another message on stream to get persisted_size. + state_loop_up_resp = await self.state_lookup() + self.persisted_size = state_loop_up_resp.persisted_size async def append(self, data: bytes): raise NotImplementedError("append is not implemented yet.") diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 99e77f769..a1029bcc9 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -108,13 +108,62 @@ async def test_state_lookup(mock_write_object_stream, mock_client): @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that all currently unimplemented methods raise NotImplementedError.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_appendable_object_writer(mock_write_object_stream, mock_client): + """Test the open method.""" + # Arrange writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.open = mock.AsyncMock() + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock() - with pytest.raises(NotImplementedError): + mock_state_response = mock.MagicMock() + mock_state_response.persisted_size = 1024 + mock_stream.recv.return_value = mock_state_response + + mock_stream.generation_number = GENERATION + mock_stream.write_handle = WRITE_HANDLE + + # Act + await writer.open() + + # Assert + mock_stream.open.assert_awaited_once() + assert writer._is_stream_open + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_when_already_open_raises_error( + mock_write_object_stream, mock_client +): + """Test that opening an already open writer raises a ValueError.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True # Manually set to open + + # Act & Assert + with pytest.raises(ValueError, match="Underlying bidi-gRPC stream is already open"): await writer.open() + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that all currently unimplemented methods raise NotImplementedError.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises(NotImplementedError): await writer.append(b"data") From 3930823e0e11b9f9d166acc4f7215b23005bfe19 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 09:42:43 +0000 Subject: [PATCH 12/19] add type annotation for state_lookup --- .../_experimental/asyncio/async_appendable_object_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 2bf93bcc8..ceba7b4ce 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -34,7 +34,7 @@ def __init__( self.offset: Optional[int] = None self.persisted_size: Optional[int] = None - async def state_lookup(self): + async def state_lookup(self) -> _storage_v2.BidiWriteObjectResponse: """Returns the persisted_size.""" await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( From 222aef236cec8e24739a3056f831638f9b796842 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 13:12:02 +0000 Subject: [PATCH 13/19] state_lookup should return persisted_size instead of proto --- .../asyncio/async_appendable_object_writer.py | 6 ++++-- tests/unit/asyncio/test_async_appendable_object_writer.py | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index ceba7b4ce..3a1815655 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -34,14 +34,16 @@ def __init__( self.offset: Optional[int] = None self.persisted_size: Optional[int] = None - async def state_lookup(self) -> _storage_v2.BidiWriteObjectResponse: + async def state_lookup(self) -> int: """Returns the persisted_size.""" await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( state_lookup=True, ) ) - return await self.write_obj_stream.recv() + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + return self.persisted_size async def open(self) -> None: """Opens the underlying bidi-gRPC stream.""" diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 99e77f769..d3a6d3830 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -25,6 +25,7 @@ OBJECT = "test-object" GENERATION = 123 WRITE_HANDLE = b"test-write-handle" +PERSISTED_SIZE = 456 @pytest.fixture @@ -94,7 +95,9 @@ async def test_state_lookup(mock_write_object_stream, mock_client): writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - mock_stream.recv = mock.AsyncMock(return_value=mock.sentinel.response) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=PERSISTED_SIZE) + ) expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) @@ -104,7 +107,8 @@ async def test_state_lookup(mock_write_object_stream, mock_client): # Assert mock_stream.send.assert_awaited_once_with(expected_request) mock_stream.recv.assert_awaited_once() - assert response == mock.sentinel.response + assert writer.persisted_size == PERSISTED_SIZE + assert response == PERSISTED_SIZE @pytest.mark.asyncio From 79835fa0e16f5e27cafaaa66f813a9bf7691a3d8 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 14 Nov 2025 13:15:06 +0000 Subject: [PATCH 14/19] persisted size changes --- .../_experimental/asyncio/async_appendable_object_writer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 8f5c67d56..d490f14a2 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -56,9 +56,8 @@ async def open(self) -> None: self.generation = self.write_obj_stream.generation_number self.write_handle = self.write_obj_stream.write_handle - # send another message on stream to get persisted_size. - state_loop_up_resp = await self.state_lookup() - self.persisted_size = state_loop_up_resp.persisted_size + # Update self.persisted_size + _ = await self.state_lookup() async def append(self, data: bytes): raise NotImplementedError("append is not implemented yet.") From cc5e12de75dfb36cccb9166cc9c4b8cf1a6d2cd0 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 16 Nov 2025 13:01:29 +0000 Subject: [PATCH 15/19] feat(zb-experimental): implement flush, close and finalize --- .../asyncio/async_appendable_object_writer.py | 34 +++++-- .../test_async_appendable_object_writer.py | 97 +++++++++++++++++-- 2 files changed, 116 insertions(+), 15 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index d490f14a2..888d154a7 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -63,18 +63,40 @@ async def append(self, data: bytes): raise NotImplementedError("append is not implemented yet.") async def flush(self) -> int: - """Returns persisted_size""" - raise NotImplementedError("flush is not implemented yet.") + """Flushes the data to the server. + + :rtype: int + :returns: The persisted size after flush. + """ + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + state_lookup=True, + ) + ) + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + self.offset = self.persisted_size + return self.persisted_size async def close(self, finalize_on_close=False) -> int: """Returns persisted_size""" - raise NotImplementedError("close is not implemented yet.") + if finalize_on_close: + await self.finalize() + + await self.write_obj_stream.close() + self._is_stream_open = False + self.offset = None - async def finalize(self) -> int: - """Returns persisted_size + async def finalize(self) -> _storage_v2.Object: + """Returns object_resource Note: Once finalized no more data can be appended. """ - raise NotImplementedError("finalize is not implemented yet.") + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + response = await self.write_obj_stream.recv() + self.object_resource = response.resource # helper methods. async def append_from_string(self, data: str): diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 67a074a11..18f7a8826 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -171,15 +171,6 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await writer.append(b"data") - with pytest.raises(NotImplementedError): - await writer.flush() - - with pytest.raises(NotImplementedError): - await writer.close() - - with pytest.raises(NotImplementedError): - await writer.finalize() - with pytest.raises(NotImplementedError): await writer.append_from_string("data") @@ -188,3 +179,91 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await writer.append_from_file("file.txt") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_flush(mock_write_object_stream, mock_client): + """Test that flush sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) + + persisted_size = await writer.flush() + + expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + assert writer.offset == 1024 + assert persisted_size == 1024 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_close_without_finalize(mock_write_object_stream, mock_client): + """Test close without finalizing.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.close = mock.AsyncMock() + writer.finalize = mock.AsyncMock() + + await writer.close(finalize_on_close=False) + + writer.finalize.assert_not_awaited() + mock_stream.close.assert_awaited_once() + assert not writer._is_stream_open + assert writer.offset is None + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_close_with_finalize(mock_write_object_stream, mock_client): + """Test close with finalizing.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.close = mock.AsyncMock() + writer.finalize = mock.AsyncMock() + + await writer.close(finalize_on_close=True) + + writer.finalize.assert_awaited_once() + mock_stream.close.assert_awaited_once() + assert not writer._is_stream_open + assert writer.offset is None + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_finalize(mock_write_object_stream, mock_client): + """Test that finalize sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) + + await writer.finalize() + + mock_stream.send.assert_awaited_once_with( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + mock_stream.recv.assert_awaited_once() + assert writer.object_resource == mock_resource From 57aa63e78e23d3b5de47ad5c6a69d6690ea4e584 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 16 Nov 2025 13:20:29 +0000 Subject: [PATCH 16/19] Update doc strings --- .../asyncio/async_appendable_object_writer.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 888d154a7..18010b0f5 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -35,7 +35,11 @@ def __init__( self.persisted_size: Optional[int] = None async def state_lookup(self) -> int: - """Returns the persisted_size.""" + """Returns the persisted_size + + :rtype: int + :returns: persisted size. + """ await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( state_lookup=True, @@ -89,8 +93,12 @@ async def close(self, finalize_on_close=False) -> int: self.offset = None async def finalize(self) -> _storage_v2.Object: - """Returns object_resource + """Finalizes the Appendable Object. + Note: Once finalized no more data can be appended. + + rtype: google.cloud.storage_v2.types.Object + returns: The finalized object resource. """ await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest(finish_write=True) From 6fa5e1197eb2eb4b71744e59a06a255ae7bbeab2 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sun, 16 Nov 2025 14:43:21 +0000 Subject: [PATCH 17/19] add doc string for AsyncAppendableObjectWriter --- .../asyncio/async_appendable_object_writer.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index d78c6d3fe..ca03c2b6d 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -1,3 +1,26 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM (Technical Account Manager) +if you want to use these Rapid Storage APIs. + +""" from typing import Optional from google.cloud.storage._experimental.asyncio.async_grpc_client import ( AsyncGrpcClient, @@ -8,6 +31,8 @@ class AsyncAppendableObjectWriter: + """Class for appending data to a GCS Appendable Object asynchronously.""" + def __init__( self, client: AsyncGrpcClient.grpc_client, @@ -16,6 +41,60 @@ def __init__( generation=None, write_handle=None, ): + """ + Class for appending data to a GCS Appendable Object. + + Example usage: + + ``` + + from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter + import asyncio + + client = AsyncGrpcClient().grpc_client + bucket_name = "my-bucket" + object_name = "my-appendable-object" + + # instantiate the writer + writer = AsyncAppendableObjectWriter(client, bucket_name, object_name) + # open the writer, (underlying gRPC bidi-stream will be opened) + await writer.open() + + # append data, it can be called multiple times. + await writer.append(b"hello world") + await writer.append(b"some more data") + + # optionally flush data to persist. + await writer.flush() + + # close the gRPC stream. + # Please note closing the program will also close the stream, + # however it's recommended to close the stream if no more data to append + # to clean up gRPC connection (which means CPU/memory/network resources) + await writer.close() + ``` + + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS bucket containing the object. + + :type object_name: str + :param object_name: The name of the GCS Appendable Object to be written. + + :type generation: int + :param generation: (Optional) If present, selects a specific revision of + that object. + If None, a new object is created. + If None and Object already exists then it'll will be + overwritten. + + :type write_handle: bytes + :param write_handle: (Optional) An existing handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + """ self.client = client self.bucket_name = bucket_name self.object_name = object_name From 1981418acacea728171b9f3181e18a8eb427f0e2 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 17 Nov 2025 14:33:13 +0000 Subject: [PATCH 18/19] add missedout test after merge conflict resolution --- .../asyncio/test_async_write_object_stream.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 1d8b3c9dd..7fa2123c5 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -202,6 +202,34 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli await stream.open() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_object_resource( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if object_resource is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + mock_reponse = mock.AsyncMock() + type(mock_reponse).resource = mock.PropertyMock(return_value=None) + socket_like_rpc.recv.return_value = mock_reponse + + # Note: Don't use below code as unittest library automatically assigns an + # `AsyncMock` object to an attribute, if not set. + # socket_like_rpc.recv.return_value = mock.AsyncMock( + # return_value=_storage_v2.BidiWriteObjectResponse(resource=None) + # ) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Failed to obtain object resource after opening the stream" + ): + await stream.open() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" From bd54ded98a7d4ad1dd1dd56f9db5367bdc665852 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 18 Nov 2025 15:19:44 +0000 Subject: [PATCH 19/19] undo changes in bucket.py and test_bucket.py --- google/cloud/storage/bucket.py | 6 +++++- tests/unit/test_bucket.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 0d1f9192b..1621f879e 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -41,6 +41,7 @@ from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL +from google.cloud.storage.blob import _quote from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS @@ -2360,7 +2361,10 @@ def move_blob( ) new_blob = Blob(bucket=self, name=new_name) - api_path = blob.path + "/moveTo/o/" + new_blob.name + api_path = "{blob_path}/moveTo/o/{new_name}".format( + blob_path=blob.path, new_name=_quote(new_blob.name) + ) + move_result = client._post_resource( api_path, None, diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 809b572e0..850e89d04 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -18,6 +18,7 @@ import mock import pytest +from google.cloud.storage.blob import _quote from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED @@ -2320,6 +2321,37 @@ def test_move_blob_w_no_retry_timeout_and_generation_match(self): _target_object=new_blob, ) + def test_move_blob_needs_url_encoding(self): + source_name = "source" + blob_name = "blob-name" + new_name = "new/name" + api_response = {} + client = mock.Mock(spec=["_post_resource"]) + client._post_resource.return_value = api_response + source = self._make_one(client=client, name=source_name) + blob = self._make_blob(source_name, blob_name) + + new_blob = source.move_blob( + blob, new_name, if_generation_match=0, retry=None, timeout=30 + ) + + self.assertIs(new_blob.bucket, source) + self.assertEqual(new_blob.name, new_name) + + expected_path = "/b/{}/o/{}/moveTo/o/{}".format( + source_name, blob_name, _quote(new_name) + ) + expected_data = None + expected_query_params = {"ifGenerationMatch": 0} + client._post_resource.assert_called_once_with( + expected_path, + expected_data, + query_params=expected_query_params, + timeout=30, + retry=None, + _target_object=new_blob, + ) + def test_move_blob_w_user_project(self): source_name = "source" blob_name = "blob-name"