Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a38c396
feat(zb-experimental): add async write object stream
chandra-siri Nov 13, 2025
aaee2f3
remove unused import and add license info
chandra-siri Nov 13, 2025
4db8bf1
remove unwated test
chandra-siri Nov 13, 2025
8dbd158
feat(zb-experimental): implement "open" for write_object_stream
chandra-siri Nov 14, 2025
03f1fde
remove unused import
chandra-siri Nov 14, 2025
f0d3439
feat(zb-experimental): implement close
chandra-siri Nov 14, 2025
e3b6f9e
feat(zb-experimental): implement send & recv
chandra-siri Nov 14, 2025
b24535f
feat(zb-experimental): Add Async_appendable_object_writer.py
chandra-siri Nov 14, 2025
eae1b36
feat(zb-experimental): implement state_lookup
chandra-siri Nov 14, 2025
788848a
implement tests for state_lookup
chandra-siri Nov 14, 2025
07c9b44
feat(zb-experimental): implement open in writer
chandra-siri Nov 14, 2025
3930823
add type annotation for state_lookup
chandra-siri Nov 14, 2025
222aef2
state_lookup should return persisted_size instead of proto
chandra-siri Nov 14, 2025
3eae403
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 14, 2025
79835fa
persisted size changes
chandra-siri Nov 14, 2025
cc5e12d
feat(zb-experimental): implement flush, close and finalize
chandra-siri Nov 16, 2025
57aa63e
Update doc strings
chandra-siri Nov 16, 2025
6fa5e11
add doc string for AsyncAppendableObjectWriter
chandra-siri Nov 16, 2025
a8eba4f
Merge branch 'bidi-writes-5' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
d8859ee
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
c1cbd86
Merge branch 'bidi-writes-7' of github.com:googleapis/python-storage …
chandra-siri Nov 16, 2025
244635f
Merge branch 'main' into bidi-writes-4
chandra-siri Nov 17, 2025
1981418
add missedout test after merge conflict resolution
chandra-siri Nov 17, 2025
ebd5c10
Merge branch 'bidi-writes-4' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
2a22cf7
Merge branch 'bidi-writes-5' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
aee1feb
Merge branch 'bidi-writes-6' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
cdaa25f
Merge branch 'bidi-writes-7' of github.com:googleapis/python-storage …
chandra-siri Nov 17, 2025
10308bc
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Nov 18, 2025
bd54ded
undo changes in bucket.py and test_bucket.py
chandra-siri Nov 18, 2025
7692d4f
Merge branch 'main' into bidi-writes-8
chandra-siri Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NOTE:
This is _experimental module for upcoming support for Rapid Storage.
(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new)

APIs may not work as intended and are not stable yet. Feature is not
GA(Generally Available) yet, please contact your TAM (Technical Account Manager)
if you want to use these Rapid Storage APIs.

"""
from typing import Optional
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
AsyncGrpcClient,
)
from google.cloud.storage._experimental.asyncio.async_write_object_stream import (
_AsyncWriteObjectStream,
)


class AsyncAppendableObjectWriter:
"""Class for appending data to a GCS Appendable Object asynchronously."""

def __init__(
self,
client: AsyncGrpcClient.grpc_client,
bucket_name: str,
object_name: str,
generation=None,
write_handle=None,
):
"""
Class for appending data to a GCS Appendable Object.

Example usage:

```

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter
import asyncio

client = AsyncGrpcClient().grpc_client
bucket_name = "my-bucket"
object_name = "my-appendable-object"

# instantiate the writer
writer = AsyncAppendableObjectWriter(client, bucket_name, object_name)
# open the writer, (underlying gRPC bidi-stream will be opened)
await writer.open()

# append data, it can be called multiple times.
await writer.append(b"hello world")
await writer.append(b"some more data")

# optionally flush data to persist.
await writer.flush()

# close the gRPC stream.
# Please note closing the program will also close the stream,
# however it's recommended to close the stream if no more data to append
# to clean up gRPC connection (which means CPU/memory/network resources)
await writer.close()
```

:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
:param client: async grpc client to use for making API requests.

:type bucket_name: str
:param bucket_name: The name of the GCS bucket containing the object.

:type object_name: str
:param object_name: The name of the GCS Appendable Object to be written.

:type generation: int
:param generation: (Optional) If present, selects a specific revision of
that object.
If None, a new object is created.
If None and Object already exists then it'll will be
overwritten.

:type write_handle: bytes
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
self.write_handle = write_handle
self.generation = generation

self.write_obj_stream = _AsyncWriteObjectStream(
client=self.client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation,
write_handle=self.write_handle,
)
self._is_stream_open: bool = False
self.offset: Optional[int] = None
self.persisted_size: Optional[int] = None

async def state_lookup(self) -> int:
"""Returns the persisted_size

:rtype: int
:returns: persisted size.
"""
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
state_lookup=True,
)
)
response = await self.write_obj_stream.recv()
self.persisted_size = response.persisted_size
return self.persisted_size

async def open(self) -> None:
"""Opens the underlying bidi-gRPC stream."""
if self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is already open")

await self.write_obj_stream.open()
self._is_stream_open = True
if self.generation is None:
self.generation = self.write_obj_stream.generation_number
self.write_handle = self.write_obj_stream.write_handle

# Update self.persisted_size
_ = await self.state_lookup()

async def append(self, data: bytes):
raise NotImplementedError("append is not implemented yet.")

async def flush(self) -> int:
"""Flushes the data to the server.

:rtype: int
:returns: The persisted size after flush.
"""
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
flush=True,
state_lookup=True,
)
)
response = await self.write_obj_stream.recv()
self.persisted_size = response.persisted_size
self.offset = self.persisted_size
return self.persisted_size

async def close(self, finalize_on_close=False) -> int:
"""Returns persisted_size"""
if finalize_on_close:
await self.finalize()

await self.write_obj_stream.close()
self._is_stream_open = False
self.offset = None

async def finalize(self) -> _storage_v2.Object:
"""Finalizes the Appendable Object.

Note: Once finalized no more data can be appended.

rtype: google.cloud.storage_v2.types.Object
returns: The finalized object resource.
"""
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(finish_write=True)
)
response = await self.write_obj_stream.recv()
self.object_resource = response.resource

# helper methods.
async def append_from_string(self, data: str):
"""
str data will be encoded to bytes using utf-8 encoding calling

self.append(data.encode("utf-8"))
"""
raise NotImplementedError("append_from_string is not implemented yet.")

async def append_from_stream(self, stream_obj):
"""
At a time read a chunk of data (16MiB) from `stream_obj`
and call self.append(chunk)
"""
raise NotImplementedError("append_from_stream is not implemented yet.")

async def append_from_file(self, file_path: str):
"""Create a file object from `file_path` and call append_from_stream(file_obj)"""
raise NotImplementedError("append_from_file is not implemented yet.")
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ async def send(
The request message to send. This is typically used to specify
the read offset and limit.
"""
raise NotImplementedError(
"send() is not implemented yet in _AsyncWriteObjectStream"
)
if not self._is_stream_open:
raise ValueError("Stream is not open")
await self.socket_like_rpc.send(bidi_write_object_request)

async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
"""Receives a response from the stream.
Expand All @@ -174,9 +174,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
:class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`:
The response message from the server.
"""
raise NotImplementedError(
"recv() is not implemented yet in _AsyncWriteObjectStream"
)
if not self._is_stream_open:
raise ValueError("Stream is not open")
return await self.socket_like_rpc.recv()

@property
def is_stream_open(self) -> bool:
Expand Down
Loading