Skip to content

Commit b7fcb86

Browse files
committed
.
1 parent 0c32690 commit b7fcb86

File tree

3 files changed

+9
-13
lines changed

3 files changed

+9
-13
lines changed

kinesis/consumer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -928,9 +928,7 @@ async def __anext__(self):
928928
log.debug("Skipping checkpoint for deallocated shard %s", cp_shard)
929929
elif self.checkpointer:
930930
# Don't execute now — defer to next __anext__ call
931-
self._deferred_checkpoints[cp_shard] = item[
932-
"__CHECKPOINT__"
933-
]["SequenceNumber"]
931+
self._deferred_checkpoints[cp_shard] = item["__CHECKPOINT__"]["SequenceNumber"]
934932
checkpoint_count += 1
935933
if checkpoint_count >= max_checkpoints:
936934
log.warning(f"Processed {max_checkpoints} checkpoints, stopping iteration")

tests/conftest.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,7 @@ def _factory(**kwargs):
199199
# Don't re-raise: the test itself is responsible for observing
200200
# expected task failures (e.g. test_wait_ready_fetch_task_crash).
201201
if not task.cancelled() and task.exception():
202-
log.warning(
203-
"Task finished with error during test: %s", task.exception(), exc_info=task.exception()
204-
)
202+
log.warning("Task finished with error during test: %s", task.exception(), exc_info=task.exception())
205203
else:
206204
task.cancel()
207205
try:

tests/test_checkpoint_ordering.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,12 @@ async def test_shard_exhaustion_flushes_checkpoint_before_deallocate(self, mock_
278278
await consumer.fetch()
279279

280280
# Checkpoint must have been called before deallocate
281-
checkpointer.assert_has_calls([
282-
call.checkpoint("shard-0", "100"),
283-
call.deallocate("shard-0"),
284-
])
281+
checkpointer.assert_has_calls(
282+
[
283+
call.checkpoint("shard-0", "100"),
284+
call.deallocate("shard-0"),
285+
]
286+
)
285287

286288
@pytest.mark.asyncio
287289
async def test_shard_exhaustion_with_records_no_sentinel_enqueued(self, mock_consumer):
@@ -380,9 +382,7 @@ async def test_checkpoint_interval_debounces(self, mock_consumer):
380382

381383
for i in range(5):
382384
await consumer.queue.put({"msg": f"r{i}"})
383-
await consumer.queue.put(
384-
{"__CHECKPOINT__": {"ShardId": "shard-0", "SequenceNumber": str((i + 1) * 100)}}
385-
)
385+
await consumer.queue.put({"__CHECKPOINT__": {"ShardId": "shard-0", "SequenceNumber": str((i + 1) * 100)}})
386386

387387
for _ in range(5):
388388
await consumer.__anext__()

0 commit comments

Comments
 (0)