Skip to content

Commit 9390cb6

Browse files
committed
implement flush, close and finalize
1 parent 79835fa commit 9390cb6

File tree

2 files changed

+116
-15
lines changed

2 files changed

+116
-15
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,40 @@ async def append(self, data: bytes):
6363
raise NotImplementedError("append is not implemented yet.")
6464

6565
async def flush(self) -> int:
66-
"""Returns persisted_size"""
67-
raise NotImplementedError("flush is not implemented yet.")
66+
"""Flushes the data to the server.
67+
68+
:rtype: int
69+
:returns: The persisted size after flush.
70+
"""
71+
await self.write_obj_stream.send(
72+
_storage_v2.BidiWriteObjectRequest(
73+
flush=True,
74+
state_lookup=True,
75+
)
76+
)
77+
response = await self.write_obj_stream.recv()
78+
self.persisted_size = response.persisted_size
79+
self.offset = self.persisted_size
80+
return self.persisted_size
6881

6982
async def close(self, finalize_on_close=False) -> int:
7083
"""Returns persisted_size"""
71-
raise NotImplementedError("close is not implemented yet.")
84+
if finalize_on_close:
85+
await self.finalize()
86+
87+
await self.write_obj_stream.close()
88+
self._is_stream_open = False
89+
self.offset = None
7290

73-
async def finalize(self) -> int:
74-
"""Returns persisted_size
91+
async def finalize(self) -> _storage_v2.Object:
92+
"""Returns object_resource
7593
Note: Once finalized no more data can be appended.
7694
"""
77-
raise NotImplementedError("finalize is not implemented yet.")
95+
await self.write_obj_stream.send(
96+
_storage_v2.BidiWriteObjectRequest(finish_write=True)
97+
)
98+
response = await self.write_obj_stream.recv()
99+
self.object_resource = response.resource
78100

79101
# helper methods.
80102
async def append_from_string(self, data: str):

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,15 +171,6 @@ async def test_unimplemented_methods_raise_error(mock_client):
171171
with pytest.raises(NotImplementedError):
172172
await writer.append(b"data")
173173

174-
with pytest.raises(NotImplementedError):
175-
await writer.flush()
176-
177-
with pytest.raises(NotImplementedError):
178-
await writer.close()
179-
180-
with pytest.raises(NotImplementedError):
181-
await writer.finalize()
182-
183174
with pytest.raises(NotImplementedError):
184175
await writer.append_from_string("data")
185176

@@ -188,3 +179,91 @@ async def test_unimplemented_methods_raise_error(mock_client):
188179

189180
with pytest.raises(NotImplementedError):
190181
await writer.append_from_file("file.txt")
182+
183+
184+
@pytest.mark.asyncio
185+
@mock.patch(
186+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
187+
)
188+
async def test_flush(mock_write_object_stream, mock_client):
189+
"""Test that flush sends the correct request and updates state."""
190+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
191+
mock_stream = mock_write_object_stream.return_value
192+
mock_stream.send = mock.AsyncMock()
193+
mock_stream.recv = mock.AsyncMock(
194+
return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024)
195+
)
196+
197+
persisted_size = await writer.flush()
198+
199+
expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True)
200+
mock_stream.send.assert_awaited_once_with(expected_request)
201+
mock_stream.recv.assert_awaited_once()
202+
assert writer.persisted_size == 1024
203+
assert writer.offset == 1024
204+
assert persisted_size == 1024
205+
206+
207+
@pytest.mark.asyncio
208+
@mock.patch(
209+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
210+
)
211+
async def test_close_without_finalize(mock_write_object_stream, mock_client):
212+
"""Test close without finalizing."""
213+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
214+
writer._is_stream_open = True
215+
writer.offset = 1024
216+
mock_stream = mock_write_object_stream.return_value
217+
mock_stream.close = mock.AsyncMock()
218+
writer.finalize = mock.AsyncMock()
219+
220+
await writer.close(finalize_on_close=False)
221+
222+
writer.finalize.assert_not_awaited()
223+
mock_stream.close.assert_awaited_once()
224+
assert not writer._is_stream_open
225+
assert writer.offset is None
226+
227+
228+
@pytest.mark.asyncio
229+
@mock.patch(
230+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
231+
)
232+
async def test_close_with_finalize(mock_write_object_stream, mock_client):
233+
"""Test close with finalizing."""
234+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
235+
writer._is_stream_open = True
236+
writer.offset = 1024
237+
mock_stream = mock_write_object_stream.return_value
238+
mock_stream.close = mock.AsyncMock()
239+
writer.finalize = mock.AsyncMock()
240+
241+
await writer.close(finalize_on_close=True)
242+
243+
writer.finalize.assert_awaited_once()
244+
mock_stream.close.assert_awaited_once()
245+
assert not writer._is_stream_open
246+
assert writer.offset is None
247+
248+
249+
@pytest.mark.asyncio
250+
@mock.patch(
251+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
252+
)
253+
async def test_finalize(mock_write_object_stream, mock_client):
254+
"""Test that finalize sends the correct request and updates state."""
255+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
256+
mock_stream = mock_write_object_stream.return_value
257+
mock_stream.send = mock.AsyncMock()
258+
mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET)
259+
mock_stream.recv = mock.AsyncMock(
260+
return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource)
261+
)
262+
263+
await writer.finalize()
264+
265+
mock_stream.send.assert_awaited_once_with(
266+
_storage_v2.BidiWriteObjectRequest(finish_write=True)
267+
)
268+
mock_stream.recv.assert_awaited_once()
269+
assert writer.object_resource == mock_resource

0 commit comments

Comments
 (0)