Skip to content

Commit 5ab8103

Browse files
feat(zb-experimental): add async write object stream (#1612)
feat(zb-experimental): add async write object stream --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 92fc2b0 commit 5ab8103

File tree

4 files changed

+244
-0
lines changed

4 files changed

+244
-0
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
NOTE:
16+
This is _experimental module for upcoming support for Rapid Storage.
17+
(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new)
18+
19+
APIs may not work as intended and are not stable yet. Feature is not
20+
GA(Generally Available) yet, please contact your TAM(Technical Account Manager)
21+
if you want to use these Rapid Storage APIs.
22+
23+
"""
24+
from typing import Optional
25+
from google.cloud import _storage_v2
26+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
27+
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
28+
_AsyncAbstractObjectStream,
29+
)
30+
from google.api_core.bidi_async import AsyncBidiRpc
31+
32+
33+
class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
34+
"""Class representing a gRPC bidi-stream for writing data from a GCS
35+
``Appendable Object``.
36+
37+
This class provides a unix socket-like interface to a GCS ``Object``, with
38+
methods like ``open``, ``close``, ``send``, and ``recv``.
39+
40+
:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
41+
:param client: async grpc client to use for making API requests.
42+
43+
:type bucket_name: str
44+
:param bucket_name: The name of the GCS ``bucket`` containing the object.
45+
46+
:type object_name: str
47+
:param object_name: The name of the GCS ``Appendable Object`` to be write.
48+
49+
:type generation_number: int
50+
:param generation_number: (Optional) If present, selects a specific revision of
51+
this object. If None, a new object is created.
52+
53+
:type write_handle: bytes
54+
:param write_handle: (Optional) An existing handle for writing the object.
55+
If provided, opening the bidi-gRPC connection will be faster.
56+
"""
57+
58+
def __init__(
59+
self,
60+
client: AsyncGrpcClient.grpc_client,
61+
bucket_name: str,
62+
object_name: str,
63+
generation_number: Optional[int] = None, # None means new object
64+
write_handle: Optional[bytes] = None,
65+
) -> None:
66+
if client is None:
67+
raise ValueError("client must be provided")
68+
if bucket_name is None:
69+
raise ValueError("bucket_name must be provided")
70+
if object_name is None:
71+
raise ValueError("object_name must be provided")
72+
73+
super().__init__(
74+
bucket_name=bucket_name,
75+
object_name=object_name,
76+
generation_number=generation_number,
77+
)
78+
self.client: AsyncGrpcClient.grpc_client = client
79+
self.write_handle: Optional[bytes] = write_handle
80+
81+
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
82+
83+
self.rpc = self.client._client._transport._wrapped_methods[
84+
self.client._client._transport.bidi_write_object
85+
]
86+
87+
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
88+
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
89+
self._is_stream_open: bool = False
90+
self.first_bidi_write_req = None
91+
self.persisted_size = 0
92+
self.object_resource: Optional[_storage_v2.Object] = None
93+
94+
async def open(self) -> None:
95+
"""Opening an object for write , should do it's state lookup
96+
to know what's the persisted size is.
97+
"""
98+
raise NotImplementedError(
99+
"open() is not implemented yet in _AsyncWriteObjectStream"
100+
)
101+
102+
async def close(self) -> None:
103+
"""Closes the bidi-gRPC connection."""
104+
raise NotImplementedError(
105+
"close() is not implemented yet in _AsyncWriteObjectStream"
106+
)
107+
108+
async def send(
109+
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
110+
) -> None:
111+
"""Sends a request message on the stream.
112+
113+
Args:
114+
bidi_write_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`):
115+
The request message to send. This is typically used to specify
116+
the read offset and limit.
117+
"""
118+
raise NotImplementedError(
119+
"send() is not implemented yet in _AsyncWriteObjectStream"
120+
)
121+
122+
async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
123+
"""Receives a response from the stream.
124+
125+
This method waits for the next message from the server, which could
126+
contain object data or metadata.
127+
128+
Returns:
129+
:class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`:
130+
The response message from the server.
131+
"""
132+
raise NotImplementedError(
133+
"recv() is not implemented yet in _AsyncWriteObjectStream"
134+
)

google/cloud/storage/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def _buckets_page_start(iterator, page, response):
7373
)
7474
page.unreachable = unreachable
7575

76+
7677
class Client(ClientWithProject):
7778
"""Client to bundle configuration needed for API requests.
7879
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
from unittest import mock
17+
18+
from google.cloud.storage._experimental.asyncio.async_write_object_stream import (
19+
_AsyncWriteObjectStream,
20+
)
21+
from google.cloud import _storage_v2
22+
23+
BUCKET = "my-bucket"
24+
OBJECT = "my-object"
25+
26+
27+
@pytest.fixture
28+
def mock_client():
29+
"""Mock the async gRPC client."""
30+
mock_transport = mock.AsyncMock()
31+
mock_transport.bidi_write_object = mock.sentinel.bidi_write_object
32+
mock_transport._wrapped_methods = {
33+
mock.sentinel.bidi_write_object: mock.sentinel.wrapped_bidi_write_object
34+
}
35+
36+
mock_gapic_client = mock.AsyncMock()
37+
mock_gapic_client._transport = mock_transport
38+
39+
client = mock.AsyncMock()
40+
client._client = mock_gapic_client
41+
return client
42+
43+
44+
def test_async_write_object_stream_init(mock_client):
45+
"""Test the constructor of _AsyncWriteObjectStream."""
46+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
47+
48+
assert stream.client == mock_client
49+
assert stream.bucket_name == BUCKET
50+
assert stream.object_name == OBJECT
51+
assert stream.generation_number is None
52+
assert stream.write_handle is None
53+
assert stream._full_bucket_name == f"projects/_/buckets/{BUCKET}"
54+
assert stream.rpc == mock.sentinel.wrapped_bidi_write_object
55+
assert stream.metadata == (
56+
("x-goog-request-params", f"bucket=projects/_/buckets/{BUCKET}"),
57+
)
58+
assert stream.socket_like_rpc is None
59+
assert not stream._is_stream_open
60+
assert stream.first_bidi_write_req is None
61+
assert stream.persisted_size == 0
62+
assert stream.object_resource is None
63+
64+
65+
def test_async_write_object_stream_init_with_generation_and_handle(mock_client):
66+
"""Test the constructor with optional arguments."""
67+
generation = 12345
68+
write_handle = b"test-handle"
69+
stream = _AsyncWriteObjectStream(
70+
mock_client,
71+
BUCKET,
72+
OBJECT,
73+
generation_number=generation,
74+
write_handle=write_handle,
75+
)
76+
77+
assert stream.generation_number == generation
78+
assert stream.write_handle == write_handle
79+
80+
81+
def test_async_write_object_stream_init_raises_value_error():
82+
"""Test that the constructor raises ValueError for missing arguments."""
83+
with pytest.raises(ValueError, match="client must be provided"):
84+
_AsyncWriteObjectStream(None, BUCKET, OBJECT)
85+
86+
with pytest.raises(ValueError, match="bucket_name must be provided"):
87+
_AsyncWriteObjectStream(mock.Mock(), None, OBJECT)
88+
89+
with pytest.raises(ValueError, match="object_name must be provided"):
90+
_AsyncWriteObjectStream(mock.Mock(), BUCKET, None)
91+
92+
93+
@pytest.mark.asyncio
94+
async def test_unimplemented_methods_raise_error(mock_client):
95+
"""Test that unimplemented methods raise NotImplementedError."""
96+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
97+
98+
with pytest.raises(NotImplementedError):
99+
await stream.open()
100+
101+
with pytest.raises(NotImplementedError):
102+
await stream.close()
103+
104+
with pytest.raises(NotImplementedError):
105+
await stream.send(_storage_v2.BidiWriteObjectRequest())
106+
107+
with pytest.raises(NotImplementedError):
108+
await stream.recv()

tests/unit/test_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3145,6 +3145,7 @@ def test_list_buckets_w_partial_success(self):
31453145
page_start=_buckets_page_start,
31463146
)
31473147

3148+
31483149
class Test__item_to_bucket(unittest.TestCase):
31493150
def _call_fut(self, iterator, item):
31503151
from google.cloud.storage.client import _item_to_bucket

0 commit comments

Comments
 (0)