Skip to content

Fix checkpoint data loss: defer execution, fix queue timeout, add interval batching#66

Merged
hampsterx merged 5 commits intomasterfrom
fix/checkpoint-safety
Mar 17, 2026
Merged

Fix checkpoint data loss: defer execution, fix queue timeout, add interval batching#66
hampsterx merged 5 commits intomasterfrom
fix/checkpoint-safety

Conversation

@hampsterx
Copy link
Copy Markdown
Owner

@hampsterx hampsterx commented Mar 16, 2026

Deferred checkpoint execution: CHECKPOINT sentinels are stored as pending when dequeued and only committed at the start of the next anext call, proving the user survived processing. Previously, checkpoints fired immediately on dequeue before the user had processed subsequent records.

Queue put timeout fix: LastSequenceNumber now only advances to the last successfully enqueued record. Records silently dropped by queue timeout no longer advance the checkpoint position.

Shard deallocation ordering: pending checkpoints are flushed before deallocate() on both shard exhaustion and close(), preventing the race where ownership is released before the final checkpoint persists.

checkpoint_interval parameter: new optional debounce that batches checkpoint writes via a background flusher task, reducing backend write pressure on active streams. Mutually exclusive with auto_checkpoint=False.

CheckPointer protocol docstrings: documents semantics, idempotency guarantees, and the flush-before-deallocate contract.

Test plan

  • 14 new unit tests in test_checkpoint_ordering.py
  • Existing test updated to match deferred semantics
  • Full non-Docker suite green (119 passed, 7 skipped)
  • Integration smoke test with kinesalite + Redis (193 passed)

Summary by CodeRabbit

  • New Features

    • Optional checkpoint_interval for configurable debounced (batched) checkpointing with a background flusher and guaranteed flush-on-close; deferred checkpointing behavior for smoother writes.
  • Documentation

    • Expanded CheckPointer docs and added “Checkpoint Safety” design notes; README updated with checkpoint_interval guidance.
  • Tests

    • Extensive new and updated tests covering checkpoint ordering, deferred behavior, debouncing, queue timeouts, deallocation ordering, and shutdown; improved test utilities and fixtures.
  • Bug Fixes

    • Fixed last-sequence tracking and ensured pending/deferred checkpoints reliably flush on timeouts and shutdown.

…erval batching

Deferred checkpoint execution: __CHECKPOINT__ sentinels are stored as
pending when dequeued and only committed at the start of the next
__anext__ call, proving the user survived processing. Previously,
checkpoints fired immediately on dequeue before the user had processed
subsequent records.

Queue put timeout fix: LastSequenceNumber now only advances to the last
successfully enqueued record. Records silently dropped by queue timeout
no longer advance the checkpoint position.

Shard deallocation ordering: pending checkpoints are flushed before
deallocate() on both shard exhaustion and close(), preventing the race
where ownership is released before the final checkpoint persists.

checkpoint_interval parameter: new optional debounce that batches
checkpoint writes via a background flusher task, reducing backend write
pressure on active streams. Mutually exclusive with auto_checkpoint=False.

CheckPointer protocol docstrings: documents semantics, idempotency
guarantees, and the flush-before-deallocate contract.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds optional debounced per-shard checkpointing with background flusher and final flush-on-close; defers __CHECKPOINT__ handling to next iteration or flusher; expands CheckPointer Protocol docstrings; updates mocks, tests, and docs to validate ordering, timeout, deallocation, and debouncing semantics.

Changes

Cohort / File(s) Summary
Protocol Documentation
kinesis/checkpointers.py
Extended docstrings for the CheckPointer Protocol (allocate, deallocate, checkpoint, get_all_checkpoints, close) with detailed behavioral semantics. Documentation-only changes.
Consumer Core
kinesis/consumer.py
Added checkpoint_interval to Consumer.__init__; added debounced checkpointing internals (_deferred_checkpoints, _pending_checkpoints, _checkpoint_flusher_task) and helpers (_maybe_checkpoint, _checkpoint_flusher, _flush_pending_checkpoints). Modified fetch(), __anext__(), and close() to defer, debounce, flush periodically, and ensure flush before shard deallocation.
Mock/Test Consumer
kinesis/testing.py
MockConsumer now tracks _deferred_checkpoints, adds _flush_deferred_checkpoints(), defers per-record checkpoints during iteration, and flushes deferred checkpoints on close/exit.
Test Fixtures
tests/conftest.py
Added _make_mock_consumer(**kwargs) helper and mock_consumer async fixture; teardown cancels/awaits consumer background flusher and fetch tasks with error handling.
Checkpoint Ordering Tests
tests/test_checkpoint_ordering.py
New comprehensive tests for deferred checkpoint semantics, queue put timeout / LastSequenceNumber behavior, shard deallocation ordering, checkpoint-interval debouncing (including quiet-stream/background flush), flush-on-close, and error propagation. Adds test helpers and scaffolding.
Consumer Tests
tests/test_consumer.py
Removed internal mock scaffolding; renamed/adjusted tests to assert deferred checkpoint behavior (checkpoint sentinel deferred on first __anext__ and awaited on subsequent iteration).
Docs
README.md, docs/DESIGN.md
Documented checkpoint_interval option and added "Checkpoint Safety" design notes describing deferred commit, queue-timeout semantics, shard deallocation ordering, interval debouncing, and final flush behavior.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Consumer
    participant Flusher as Background Flusher
    participant CheckPointer
    participant Shard

    Client->>Consumer: fetch() / enqueue records
    note over Consumer: record last_enqueued_sequence\nenqueue checkpoint sentinel (deferred)
    Consumer->>Consumer: store sentinel in _deferred_checkpoints / _pending_checkpoints

    Client->>Consumer: __anext__() (consume next)
    Consumer->>Consumer: _maybe_checkpoint(shard)
    alt deferred checkpoint ready or interval elapsed
        Consumer->>CheckPointer: checkpoint(shard, sequence)
        CheckPointer-->>Consumer: success / error
    else checkpoint remains deferred
        Consumer-->>Client: yield record (checkpoint deferred)
    end

    Flusher->>Consumer: periodic _checkpoint_flusher()
    Consumer->>Consumer: _flush_pending_checkpoints()
    Consumer->>CheckPointer: checkpoint(pending...)
    CheckPointer-->>Consumer: responses

    Client->>Consumer: close()
    Consumer->>Consumer: _flush_pending_checkpoints() + _flush_deferred_checkpoints()
    Consumer->>CheckPointer: checkpoint(remaining...)
    Consumer->>Flusher: cancel _checkpoint_flusher_task
    Consumer->>Shard: deallocate()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 I tuck each checkpoint, nibble on the pause,

Debounced and patient — I follow careful laws,
A flusher hums softly and writes what I keep,
Before shards sleep, I whisper them to sleep,
Hopping tidy home, no lost checkpoints to weep.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly summarizes the three main functional changes in the changeset: deferred checkpoint execution, queue timeout fix, and interval-based batching.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/checkpoint-safety
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
tests/test_checkpoint_ordering.py (1)

200-233: Add coverage for the final-batch shard-close case.

This test preloads _deferred_checkpoints, but the real failure mode is kinesis/consumer.py:321-334 with Records != [] and NextShardIterator=None, where the last checkpoint is still only in consumer.queue when deallocate() runs. As written, this test won't catch that regression.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_checkpoint_ordering.py` around lines 200 - 233, The test only
covers the empty-Records exhausted-iterator path; add coverage for the
final-batch shard-close case by simulating a fetch result with Records != [] and
NextShardIterator = None and ensuring the pending checkpoint is enqueued (in
consumer.queue) rather than already in consumer._deferred_checkpoints so the
consumer.fetch code path that processes final Records flushes the checkpoint
before calling deallocate; update
test_shard_exhaustion_flushes_checkpoint_before_deallocate to create a fetch
future where fetch_result["Records"] is non-empty, place the pending checkpoint
into consumer.queue (not consumer._deferred_checkpoints), call await
consumer.fetch(), and assert
checkpointer.assert_has_calls([call.checkpoint("shard-0","100"),
call.deallocate("shard-0")]).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@kinesis/consumer.py`:
- Around line 107-119: Reject non-positive checkpoint_interval values in the
constructor: validate checkpoint_interval in __init__ (where
checkpoint_interval, _pending_checkpoints, and _checkpoint_flusher_task are set)
and raise a ValueError if checkpoint_interval is not None and
checkpoint_interval <= 0, with a clear message that checkpoint_interval must be
> 0 to avoid busy-spinning in _checkpoint_flusher; also apply the same
validation where the alternate initialization code path sets checkpoint_interval
(the other block that references checkpointer and auto_checkpoint) so both
construction paths enforce checkpoint_interval > 0.
- Around line 321-334: When enqueuing the final "__CHECKPOINT__" sentinel you
must not call deallocate() until that sentinel has been consumed and its
checkpoint persisted; change the deallocation path (the block that flushes
_deferred_checkpoints/_pending_checkpoints and calls deallocate()) to wait for
the terminal checkpoint acknowledgement instead of immediately releasing
ownership. Concretely: after putting the {"__CHECKPOINT__": {"ShardId": ...,
"SequenceNumber": ...}} into self.queue, record a unique marker or future tied
to that sentinel (or reuse the pending-checkpoint tracking) and have the
deallocation code (the section that currently flushes
_deferred_checkpoints/_pending_checkpoints and then calls deallocate()) await
that marker/confirmation (or queue consumption via an acknowledgment
event/queue.join-like mechanism) before calling deallocate(); keep references to
_deferred_checkpoints/_pending_checkpoints and ensure __anext__()/close()
resolves the marker when it successfully persists the checkpoint so deallocate()
is deferred until persistence completes.

In `@tests/conftest.py`:
- Around line 190-204: The teardown currently swallows all exceptions from
background tasks (_checkpoint_flusher_task and fetch_task); change the cleanup
so that CancelledError is still ignored but any other Exception is logged
(include the exception and traceback via the test logger or
asyncio.get_event_loop().call_exception_handler) and re-raised (or at minimum
fail the fixture) so unexpected failures surface in CI; update the await blocks
around c._checkpoint_flusher_task and c.fetch_task to catch
asyncio.CancelledError separately and for other Exception log.exception(...)
with context referencing the consumer instance and then re-raise the exception.

---

Nitpick comments:
In `@tests/test_checkpoint_ordering.py`:
- Around line 200-233: The test only covers the empty-Records exhausted-iterator
path; add coverage for the final-batch shard-close case by simulating a fetch
result with Records != [] and NextShardIterator = None and ensuring the pending
checkpoint is enqueued (in consumer.queue) rather than already in
consumer._deferred_checkpoints so the consumer.fetch code path that processes
final Records flushes the checkpoint before calling deallocate; update
test_shard_exhaustion_flushes_checkpoint_before_deallocate to create a fetch
future where fetch_result["Records"] is non-empty, place the pending checkpoint
into consumer.queue (not consumer._deferred_checkpoints), call await
consumer.fetch(), and assert
checkpointer.assert_has_calls([call.checkpoint("shard-0","100"),
call.deallocate("shard-0")]).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8981b669-4625-4247-b701-e048a1d6c7d5

📥 Commits

Reviewing files that changed from the base of the PR and between 66451cc and c33e104.

📒 Files selected for processing (5)
  • kinesis/checkpointers.py
  • kinesis/consumer.py
  • tests/conftest.py
  • tests/test_checkpoint_ordering.py
  • tests/test_consumer.py

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
tests/conftest.py (1)

193-201: ⚠️ Potential issue | 🟡 Minor

Surface background-task failures during fixture teardown.

The not task.done() guard skips tasks that have already finished with an exception, and the current except Exception branch only logs awaited failures. That can keep CI green while _checkpoint_flusher_task or fetch_task is broken.

Teardown hardening
     for c in consumers:
         for task in [getattr(c, "_checkpoint_flusher_task", None), c.fetch_task]:
-            if task and not task.done():
-                task.cancel()
-                try:
-                    await task
-                except asyncio.CancelledError:
-                    pass
-                except Exception:
-                    log.warning("Unexpected error during mock_consumer teardown", exc_info=True)
+            if not task:
+                continue
+            if not task.done():
+                task.cancel()
+            try:
+                await task
+            except asyncio.CancelledError:
+                pass
+            except Exception:
+                log.exception("Unexpected error during mock_consumer teardown")
+                raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/conftest.py` around lines 193 - 201, The teardown loop in conftest.py
can hide background-task failures because it skips tasks that are done; change
the logic around the loop that iterates over getattr(c,
"_checkpoint_flusher_task", None) and c.fetch_task so you always inspect tasks
(remove the not task.done() guard), and if a task is done check task.exception()
and raise that exception (or re-raise) so failures surface; for tasks not yet
done keep cancel() and await with the existing asyncio.CancelledError handling
but after awaiting also check task.exception() and raise if present to surface
background errors during mock_consumer teardown.
kinesis/consumer.py (1)

367-386: ⚠️ Potential issue | 🔴 Critical

Don't deallocate while a checkpoint sentinel for this shard is still only queued.

This block only looks at _deferred_checkpoints and _pending_checkpoints. If a previous non-terminal fetch already queued a {"__CHECKPOINT__": ...} for shard_id but __anext__() has not dequeued it yet, both maps are empty here and deallocate() still runs first. The later flush then happens after ownership was released, which recreates the race this PR is trying to remove. Track queued sentinels per shard, or delay deallocation until the queued sentinel has been acknowledged.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kinesis/consumer.py` around lines 367 - 386, The deallocation path
incorrectly ignores checkpoint sentinels that are enqueued but not yet dequeued
(e.g. "__CHECKPOINT__" queued by a prior fetch and not yet consumed by
__anext__()), so deallocate() can run before the sentinel is flushed; update the
logic around checkpointer.deallocate(shard_id) to also track queued sentinels
per shard (e.g. add or use a per-shard queued-sentinel set/map) or block/delay
deallocation until the queued sentinel for shard_id has been
acknowledged/consumed; ensure you consult and update references to
_deferred_checkpoints, _pending_checkpoints and any code that enqueues/dequeues
the "__CHECKPOINT__" sentinel so the check for safe deallocation only proceeds
when no sentinel remains queued for that shard.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@kinesis/consumer.py`:
- Around line 376-379: The shard_exhaustion path currently reads a value from
self._pending_checkpoints, awaits self.checkpointer.checkpoint, and then
unconditionally removes the dict entry with pop(), which can erase a newer
in-flight update; change that removal to a compare-and-delete: capture the
original value (e.g. old_seq = self._pending_checkpoints.get(shard_id)), await
self.checkpointer.checkpoint(shard_id, old_seq) and then only delete the entry
if self._pending_checkpoints.get(shard_id) == old_seq (use del or pop with the
key guarded by that equality) — mirror the guarded removal logic used in
_flush_pending_checkpoints() so only the exact flushed checkpoint is removed.

In `@kinesis/testing.py`:
- Around line 349-353: _flush_deferred_checkpoints currently commits every
deferred checkpoint immediately via self.checkpointer.checkpoint(...) which
bypasses the Consumer debounce logic and ignores checkpoint_interval; change it
to route writes through the existing debounce path by calling the internal
method that respects checkpoint_interval (e.g. self._maybe_checkpoint(shard_id,
seq) or the appropriate debounce helper used by Consumer) for each shard_id/seq,
then clear self._deferred_checkpoints; update MockConsumer.__init__ handling if
needed to ensure checkpoint_interval is stored so _maybe_checkpoint can read it.
- Line 384: The code currently sets self._deferred_checkpoints[shard.shard_id] =
seq before processor.parse() completes and before outputs in self._buffer are
delivered; move the deferred checkpoint assignment so it happens only after
processor.parse() returns successfully and any outputs for that source record
have been enqueued/drained (ensure _buffer is empty or that the specific outputs
are confirmed enqueued) and do not set the deferred checkpoint on parse
exceptions; also adjust close() and the logic around lines 400-401 to only
persist deferred checkpoints after confirming the buffer has been drained or
outputs acknowledged to avoid checkpointing records whose outputs were never
delivered.

---

Duplicate comments:
In `@kinesis/consumer.py`:
- Around line 367-386: The deallocation path incorrectly ignores checkpoint
sentinels that are enqueued but not yet dequeued (e.g. "__CHECKPOINT__" queued
by a prior fetch and not yet consumed by __anext__()), so deallocate() can run
before the sentinel is flushed; update the logic around
checkpointer.deallocate(shard_id) to also track queued sentinels per shard (e.g.
add or use a per-shard queued-sentinel set/map) or block/delay deallocation
until the queued sentinel for shard_id has been acknowledged/consumed; ensure
you consult and update references to _deferred_checkpoints, _pending_checkpoints
and any code that enqueues/dequeues the "__CHECKPOINT__" sentinel so the check
for safe deallocation only proceeds when no sentinel remains queued for that
shard.

In `@tests/conftest.py`:
- Around line 193-201: The teardown loop in conftest.py can hide background-task
failures because it skips tasks that are done; change the logic around the loop
that iterates over getattr(c, "_checkpoint_flusher_task", None) and c.fetch_task
so you always inspect tasks (remove the not task.done() guard), and if a task is
done check task.exception() and raise that exception (or re-raise) so failures
surface; for tasks not yet done keep cancel() and await with the existing
asyncio.CancelledError handling but after awaiting also check task.exception()
and raise if present to surface background errors during mock_consumer teardown.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4833911c-f321-42a3-9905-6fcf7713e762

📥 Commits

Reviewing files that changed from the base of the PR and between c33e104 and fe6a567.

📒 Files selected for processing (5)
  • README.md
  • kinesis/consumer.py
  • kinesis/testing.py
  • tests/conftest.py
  • tests/test_checkpoint_ordering.py
✅ Files skipped from review due to trivial changes (1)
  • README.md

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
tests/conftest.py (1)

208-209: ⚠️ Potential issue | 🟡 Minor

Do not swallow unexpected teardown errors.

Lines 208-209 only log and continue. This can hide real cleanup failures in CI; re-raise after logging.

Suggested fix
                 except asyncio.CancelledError:
                     pass
                 except Exception:
-                    log.warning("Unexpected error during mock_consumer teardown", exc_info=True)
+                    log.exception("Unexpected error during mock_consumer teardown")
+                    raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/conftest.py` around lines 208 - 209, The teardown for the mock_consumer
fixture is currently catching Exception and only logging it, which swallows real
failures; modify the except block in the mock_consumer teardown to capture the
exception (e.g., except Exception as e:) and after calling log.warning(...)
re-raise the exception (raise or raise from e) so CI sees failures while still
logging the error context.
kinesis/consumer.py (2)

381-384: ⚠️ Potential issue | 🔴 Critical

Avoid pre-await pop() in shard-exhaustion checkpoint flush.

On Line 383, popping before await can drop a newer in-flight sequence for the same shard. Use compare-and-delete after the checkpoint call, same as your flusher path.

Suggested fix
-                            if shard_id in self._pending_checkpoints:
-                                await self.checkpointer.checkpoint(
-                                    shard_id, self._pending_checkpoints.pop(shard_id)
-                                )
+                            if shard_id in self._pending_checkpoints:
+                                seq = self._pending_checkpoints.get(shard_id)
+                                if seq is not None:
+                                    await self.checkpointer.checkpoint(shard_id, seq)
+                                    if self._pending_checkpoints.get(shard_id) == seq:
+                                        del self._pending_checkpoints[shard_id]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kinesis/consumer.py` around lines 381 - 384, The current pre-await pop of
self._pending_checkpoints[shard_id] can remove a newer in-flight sequence;
instead, read the pending value into a local (e.g., pending_seq =
self._pending_checkpoints.get(shard_id)), call await
self.checkpointer.checkpoint(shard_id, pending_seq), and after the await perform
a compare-and-delete on self._pending_checkpoints (remove only if value is still
pending_seq) — follow the same compare-and-delete pattern used by the flusher
path to avoid dropping newer checkpoints.

879-881: ⚠️ Potential issue | 🟠 Major

Guard against concurrent key removal in pending flush loop.

Line 880 indexes _pending_checkpoints from a stale key snapshot. If another path removes that key first, this raises KeyError and can kill the flusher task.

Suggested fix
         for shard_id in list(self._pending_checkpoints):
-            seq = self._pending_checkpoints[shard_id]
+            seq = self._pending_checkpoints.get(shard_id)
+            if seq is None:
+                continue
             await self.checkpointer.checkpoint(shard_id, seq)
             # Only delete if the value hasn't been superseded during the await
             if self._pending_checkpoints.get(shard_id) == seq:
                 del self._pending_checkpoints[shard_id]
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@kinesis/consumer.py`:
- Around line 901-906: Make a snapshot of _deferred_checkpoints (e.g.,
list(self._deferred_checkpoints.items())) and iterate that snapshot instead of
iterating the dict directly so awaits in _maybe_checkpoint won't raise
RuntimeError; for each (shard_id, sequence) call await
self._maybe_checkpoint(shard_id, sequence) and after success remove the entry
only if the current value in self._deferred_checkpoints still equals that
sequence (to avoid discarding entries added during the await), and skip
processing if the shard is already deallocated (check your shard registry e.g.
self._shards or self._active_shards) to avoid the stale-sentinel race; do not
call clear() unconditionally.

---

Duplicate comments:
In `@kinesis/consumer.py`:
- Around line 381-384: The current pre-await pop of
self._pending_checkpoints[shard_id] can remove a newer in-flight sequence;
instead, read the pending value into a local (e.g., pending_seq =
self._pending_checkpoints.get(shard_id)), call await
self.checkpointer.checkpoint(shard_id, pending_seq), and after the await perform
a compare-and-delete on self._pending_checkpoints (remove only if value is still
pending_seq) — follow the same compare-and-delete pattern used by the flusher
path to avoid dropping newer checkpoints.

In `@tests/conftest.py`:
- Around line 208-209: The teardown for the mock_consumer fixture is currently
catching Exception and only logging it, which swallows real failures; modify the
except block in the mock_consumer teardown to capture the exception (e.g.,
except Exception as e:) and after calling log.warning(...) re-raise the
exception (raise or raise from e) so CI sees failures while still logging the
error context.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 409c6db0-22d9-4132-9aba-de1b7803a7a4

📥 Commits

Reviewing files that changed from the base of the PR and between fe6a567 and 630ad78.

📒 Files selected for processing (4)
  • docs/DESIGN.md
  • kinesis/consumer.py
  • tests/conftest.py
  • tests/test_checkpoint_ordering.py

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/conftest.py (1)

4-6: Import order: statement between imports.

The logger assignment log = logging.getLogger(__name__) is placed between the logging import and the socket import. This breaks PEP 8 import grouping. Move it after all imports for consistency.

🧹 Suggested fix
 import asyncio
 import logging
-import os
-
-log = logging.getLogger(__name__)
 import socket
 import uuid
+import os
 from unittest.mock import patch
 from urllib.parse import urlparse
+
+log = logging.getLogger(__name__)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/conftest.py` around lines 4 - 6, Move the logger assignment so it sits
after all import statements: remove or relocate the line "log =
logging.getLogger(__name__)" that currently appears between "import logging" and
"import socket" and place it below the import block (after all imports). Update
any references if needed so "log" is still defined at module scope and imported
modules remain grouped per PEP8.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/conftest.py`:
- Around line 4-6: Move the logger assignment so it sits after all import
statements: remove or relocate the line "log = logging.getLogger(__name__)" that
currently appears between "import logging" and "import socket" and place it
below the import block (after all imports). Update any references if needed so
"log" is still defined at module scope and imported modules remain grouped per
PEP8.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 896e4fac-356f-44bf-addb-59180d147b90

📥 Commits

Reviewing files that changed from the base of the PR and between 630ad78 and 0c32690.

📒 Files selected for processing (2)
  • kinesis/consumer.py
  • tests/conftest.py

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
tests/conftest.py (1)

4-6: Minor: Import ordering.

The log = logging.getLogger(__name__) assignment at line 5 appears between imports. Consider grouping all imports together at the top, then initializing the logger after.

🧹 Suggested reorder
 import asyncio
 import logging
 import os
-
-log = logging.getLogger(__name__)
 import socket
 import uuid
 from unittest.mock import patch
 from urllib.parse import urlparse
 
 import pytest
 import pytest_asyncio
 from dotenv import load_dotenv
 
 from kinesis import Consumer, Producer
 
 # Import mock fixtures so they're available even without pip install -e .
 # (When installed, the pytest11 entry point handles this automatically.)
 from kinesis.testing import kinesis_backend, kinesis_consumer, kinesis_producer, kinesis_stream  # noqa: F401
+
+log = logging.getLogger(__name__)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/conftest.py` around lines 4 - 6, Move the logger initialization out of
the import block: group all import statements (including socket and logging) at
the top of the module, then initialize the logger with the existing statement
log = logging.getLogger(__name__) after the imports; locate the current log
variable assignment and relocate it below the import section so imports are
contiguous and the logger is initialized afterward.
kinesis/consumer.py (1)

353-358: Minor: Dead code assignment.

Line 354 assigns last_enqueued_sequence = None in the else branch (no records), but this variable is only read within the if records: branch above. The assignment has no effect since the variable falls out of scope.

🧹 Suggested removal
                     else:
-                        last_enqueued_sequence = None
                         log.debug(
                             "Shard {} caught up, sleeping {}s".format(shard["ShardId"], self.sleep_time_no_records)
                         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kinesis/consumer.py` around lines 353 - 358, Remove the dead assignment of
last_enqueued_sequence = None in the else branch inside the consumer loop (the
block that logs "Shard {} caught up, sleeping {}s" and awaits asyncio.sleep);
last_enqueued_sequence is only read in the if records: branch above (e.g., in
the same loop/function handling shard records), so simply delete that assignment
to avoid misleading/no-op code and ensure no other logic depends on
last_enqueued_sequence being reset there.
tests/test_checkpoint_ordering.py (1)

376-394: Consider reducing timing sensitivity in debounce test.

The test uses checkpoint_interval=0.5 with await asyncio.sleep(0.7), which depends on real wall-clock timing. On slow CI runners or under load, this could become flaky.

The assertion await_count <= 2 provides some tolerance, but consider either:

  • Using a shorter interval (e.g., 0.1s with 0.25s sleep) to reduce test duration
  • Increasing the tolerance further if needed

This is a minor concern given the inherent challenges of testing async timing behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_checkpoint_ordering.py` around lines 376 - 394, The test
test_checkpoint_interval_debounces is timing-sensitive and may flake on slow CI
because it sets checkpoint_interval=0.5 and sleeps 0.7s; reduce wall-clock
dependence by lowering the interval and sleep (e.g., set checkpoint_interval to
0.1 and change await asyncio.sleep to ~0.25) or increase tolerated count, so
modify the test to use a shorter checkpoint_interval and matching sleep (or
relax the assertion on checkpointer.checkpoint.await_count) while keeping the
same flow that enqueues messages, iterates consumer.__anext__(), and calls
consumer.close().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@kinesis/consumer.py`:
- Around line 353-358: Remove the dead assignment of last_enqueued_sequence =
None in the else branch inside the consumer loop (the block that logs "Shard {}
caught up, sleeping {}s" and awaits asyncio.sleep); last_enqueued_sequence is
only read in the if records: branch above (e.g., in the same loop/function
handling shard records), so simply delete that assignment to avoid
misleading/no-op code and ensure no other logic depends on
last_enqueued_sequence being reset there.

In `@tests/conftest.py`:
- Around line 4-6: Move the logger initialization out of the import block: group
all import statements (including socket and logging) at the top of the module,
then initialize the logger with the existing statement log =
logging.getLogger(__name__) after the imports; locate the current log variable
assignment and relocate it below the import section so imports are contiguous
and the logger is initialized afterward.

In `@tests/test_checkpoint_ordering.py`:
- Around line 376-394: The test test_checkpoint_interval_debounces is
timing-sensitive and may flake on slow CI because it sets
checkpoint_interval=0.5 and sleeps 0.7s; reduce wall-clock dependence by
lowering the interval and sleep (e.g., set checkpoint_interval to 0.1 and change
await asyncio.sleep to ~0.25) or increase tolerated count, so modify the test to
use a shorter checkpoint_interval and matching sleep (or relax the assertion on
checkpointer.checkpoint.await_count) while keeping the same flow that enqueues
messages, iterates consumer.__anext__(), and calls consumer.close().

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d19ce41-8eb0-4fdf-9f2f-c79cd6000de9

📥 Commits

Reviewing files that changed from the base of the PR and between 0c32690 and b7fcb86.

📒 Files selected for processing (3)
  • kinesis/consumer.py
  • tests/conftest.py
  • tests/test_checkpoint_ordering.py

@hampsterx hampsterx merged commit 1ffd327 into master Mar 17, 2026
8 checks passed
@hampsterx hampsterx deleted the fix/checkpoint-safety branch March 17, 2026 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant