Skip to content

Commit 9e6fefd

Browse files
authored
fix: Add simple flush for optimized writes (#1633)
fix: Add simple flush for optimized writes
1 parent fcda233 commit 9e6fefd

File tree

2 files changed

+59
-9
lines changed

2 files changed

+59
-9
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,28 @@ async def append(self, data: bytes) -> None:
199199
self.offset += chunk_size
200200
bytes_to_flush += chunk_size
201201
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
202-
await self.flush()
202+
await self.simple_flush()
203203
bytes_to_flush = 0
204204
start_idx = end_idx
205205

206+
async def simple_flush(self) -> None:
207+
"""Flushes the data to the server.
208+
Please note: Unlike `flush` it does not do `state_lookup`
209+
210+
:rtype: None
211+
212+
:raises ValueError: If the stream is not open (i.e., `open()` has not
213+
been called).
214+
"""
215+
if not self._is_stream_open:
216+
raise ValueError("Stream is not open. Call open() before simple_flush().")
217+
218+
await self.write_obj_stream.send(
219+
_storage_v2.BidiWriteObjectRequest(
220+
flush=True,
221+
)
222+
)
223+
206224
async def flush(self) -> int:
207225
"""Flushes the data to the server.
208226

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,38 @@ async def test_flush_without_open_raises_value_error(mock_client):
224224
await writer.flush()
225225

226226

227+
@pytest.mark.asyncio
228+
@mock.patch(
229+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
230+
)
231+
async def test_simple_flush(mock_write_object_stream, mock_client):
232+
"""Test that flush sends the correct request and updates state."""
233+
# Arrange
234+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
235+
writer._is_stream_open = True
236+
mock_stream = mock_write_object_stream.return_value
237+
mock_stream.send = mock.AsyncMock()
238+
239+
# Act
240+
await writer.simple_flush()
241+
242+
# Assert
243+
mock_stream.send.assert_awaited_once_with(
244+
_storage_v2.BidiWriteObjectRequest(flush=True)
245+
)
246+
247+
248+
@pytest.mark.asyncio
249+
async def test_simple_flush_without_open_raises_value_error(mock_client):
250+
"""Test that flush raises an error if the stream is not open."""
251+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
252+
with pytest.raises(
253+
ValueError,
254+
match="Stream is not open. Call open\\(\\) before simple_flush\\(\\).",
255+
):
256+
await writer.simple_flush()
257+
258+
227259
@pytest.mark.asyncio
228260
@mock.patch(
229261
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
@@ -369,7 +401,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
369401
writer.persisted_size = 100
370402
mock_stream = mock_write_object_stream.return_value
371403
mock_stream.send = mock.AsyncMock()
372-
writer.flush = mock.AsyncMock()
404+
writer.simple_flush = mock.AsyncMock()
373405

374406
data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
375407
await writer.append(data)
@@ -387,7 +419,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
387419
assert len(second_call[0][0].checksummed_data.content) == 1
388420

389421
assert writer.offset == 100 + len(data)
390-
writer.flush.assert_not_awaited()
422+
writer.simple_flush.assert_not_awaited()
391423

392424

393425
@pytest.mark.asyncio
@@ -407,12 +439,12 @@ async def test_append_flushes_when_buffer_is_full(
407439
writer.persisted_size = 0
408440
mock_stream = mock_write_object_stream.return_value
409441
mock_stream.send = mock.AsyncMock()
410-
writer.flush = mock.AsyncMock()
442+
writer.simple_flush = mock.AsyncMock()
411443

412444
data = b"a" * _MAX_BUFFER_SIZE_BYTES
413445
await writer.append(data)
414446

415-
writer.flush.assert_awaited_once()
447+
writer.simple_flush.assert_awaited_once()
416448

417449

418450
@pytest.mark.asyncio
@@ -430,12 +462,12 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
430462
writer.persisted_size = 0
431463
mock_stream = mock_write_object_stream.return_value
432464
mock_stream.send = mock.AsyncMock()
433-
writer.flush = mock.AsyncMock()
465+
writer.simple_flush = mock.AsyncMock()
434466

435467
data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
436468
await writer.append(data)
437469

438-
assert writer.flush.await_count == 2
470+
assert writer.simple_flush.await_count == 2
439471

440472

441473
@pytest.mark.asyncio
@@ -453,7 +485,7 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
453485
writer.persisted_size = 0
454486
mock_stream = mock_write_object_stream.return_value
455487
mock_stream.send = mock.AsyncMock()
456-
writer.flush = mock.AsyncMock()
488+
writer.simple_flush = mock.AsyncMock()
457489

458490
data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
459491
await writer.append(data1)
@@ -463,4 +495,4 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
463495

464496
total_data_length = len(data1) + len(data2)
465497
assert writer.offset == total_data_length
466-
assert writer.flush.await_count == 0
498+
assert writer.simple_flush.await_count == 0

0 commit comments

Comments
 (0)