Skip to content

dynamodb checkpointer#54

Merged
hampsterx merged 3 commits intomasterfrom
dynamodb_checkpointer
Jun 22, 2025
Merged

dynamodb checkpointer#54
hampsterx merged 3 commits intomasterfrom
dynamodb_checkpointer

Conversation

@hampsterx
Copy link
Copy Markdown
Owner

@hampsterx hampsterx commented Jun 21, 2025

Summary by CodeRabbit

  • New Features
    • Added support for DynamoDB-based checkpointing for Kinesis consumer applications, enabling distributed shard coordination without managing external infrastructure.
  • Documentation
    • Updated documentation with a dedicated comprehensive guide for DynamoDB checkpointing, including configuration, migration tips, cost optimization, and best practices.
  • Tests
    • Introduced comprehensive asynchronous tests for the DynamoDB checkpointer covering initialization, allocation, checkpointing, deallocation, heartbeat, manual checkpointing, and error scenarios.
  • Chores
    • Added optional DynamoDB dependency to installation instructions and setup configuration.
    • Updated test configuration to exclude DynamoDB tests by default and added a pytest marker for DynamoDB-related tests.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jun 21, 2025

Walkthrough

This change introduces DynamoDB as a new backend for distributed checkpointing in Kinesis consumer applications. It adds a new DynamoDBCheckPointer implementation, updates documentation and installation instructions, and provides comprehensive tests. The setup script now supports an optional DynamoDB dependency, and all relevant documentation is updated to reflect these enhancements.

Changes

File(s) Change Summary
README.md, docs/dynamodb-checkpointing.md Added and updated documentation for DynamoDB-based checkpointing, including installation, usage, and migration info
kinesis/dynamodb.py New module implementing the asynchronous DynamoDBCheckPointer class for distributed checkpointing
kinesis/init.py Added optional import of DynamoDBCheckPointer with fallback to None if unavailable
setup.py Added "dynamodb" extra requiring aioboto3>=11.0.0
tests/test_dynamodb_checkpointer.py New asynchronous test suite for DynamoDBCheckPointer with extensive coverage using mocks and fixtures
docker-compose.yaml Updated test service command to exclude tests marked with dynamodb
pytest.ini Added dynamodb marker for tests requiring DynamoDB or aioboto3 dependencies

Sequence Diagram(s)

sequenceDiagram
    participant Consumer
    participant DynamoDBCheckPointer
    participant DynamoDB

    Consumer->>DynamoDBCheckPointer: allocate(shard_id)
    DynamoDBCheckPointer->>DynamoDB: PutItem/UpdateItem (conditional)
    DynamoDB-->>DynamoDBCheckPointer: Success/Failure

    loop Heartbeat
        Consumer->>DynamoDBCheckPointer: do_heartbeat(shard_id)
        DynamoDBCheckPointer->>DynamoDB: UpdateItem (conditional)
        DynamoDB-->>DynamoDBCheckPointer: Success/Failure
    end

    Consumer->>DynamoDBCheckPointer: checkpoint(shard_id, sequence)
    DynamoDBCheckPointer->>DynamoDB: UpdateItem (conditional)
    DynamoDB-->>DynamoDBCheckPointer: Success/Failure

    Consumer->>DynamoDBCheckPointer: deallocate(shard_id)
    DynamoDBCheckPointer->>DynamoDB: UpdateItem (clear owner)
    DynamoDB-->>DynamoDBCheckPointer: Success
Loading

Poem

In the fields of code, a new path we see,
DynamoDB joins our checkpoint spree!
With tables that scale and heartbeats that chime,
Shards are now managed in record time.
No servers to tend, just clouds in the sky—
The rabbit hops forward, reaching new high!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (7)
tests/test_dynamodb_checkpointer.py (2)

4-4: Remove unused import.

The os module is imported but never used in this file.

-import os

307-308: Combine nested with statements for cleaner code.

The nested with statements can be combined into a single statement.

-        with patch("kinesis.dynamodb.HAS_DYNAMODB", False):
-            with pytest.raises(ImportError, match="DynamoDB support requires aioboto3"):
+        with patch("kinesis.dynamodb.HAS_DYNAMODB", False), pytest.raises(ImportError, match="DynamoDB support requires aioboto3"):
                DynamoDBCheckPointer("test-app")
docs/dynamodb-checkpointing.md (1)

185-201: Fix markdown formatting for better readability.

The error examples should use proper headings and specify language for code blocks.

-**1. Conditional Check Failed**
+### 1. Conditional Check Failed
-```
+```text
 ConditionalCheckFailedException

This is normal - it means another consumer owns the shard. The checkpointer handles this automatically.

-2. Table Not Found
+### 2. Table Not Found
- +text
ResourceNotFoundException

Set `create_table=True` or create the table manually.

-**3. Throttling**
+### 3. Throttling
-```
+```text
ProvisionedThroughputExceededException

</blockquote></details>
<details>
<summary>kinesis/dynamodb.py (4)</summary><blockquote>

`116-116`: **Use explicit exception chaining.**

When raising a new exception within an except block, use `raise from None` to indicate that you're intentionally replacing the exception.


```diff
-raise Exception(f"DynamoDB table {self.table_name} does not exist")
+raise Exception(f"DynamoDB table {self.table_name} does not exist") from None

226-226: Remove unused variable assignment.

The response variable is assigned but never used.

-response = await table.update_item(
+await table.update_item(

248-253: Simplify exception handling.

Remove unnecessary else block and use explicit exception chaining.

-except ClientError as e:
-    if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
-        raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it")
-    else:
-        raise
+except ClientError as e:
+    if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
+        raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it") from e
+    raise

362-368: Remove unnecessary else block.

The else block is unnecessary after the return statement.

-except ClientError as e:
-    if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
-        log.info(f"Someone else beat us to allocating {shard_id}")
-        return False, None
-    else:
-        raise
+except ClientError as e:
+    if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
+        log.info(f"Someone else beat us to allocating {shard_id}")
+        return False, None
+    raise
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e68eb4b and f4f715c.

📒 Files selected for processing (6)
  • README.md (3 hunks)
  • docs/dynamodb-checkpointing.md (1 hunks)
  • kinesis/__init__.py (1 hunks)
  • kinesis/dynamodb.py (1 hunks)
  • setup.py (1 hunks)
  • tests/test_dynamodb_checkpointer.py (1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/dynamodb-checkpointing.md

[uncategorized] ~17-~17: Although a hyphen is possible, it is not necessary in a compound modifier in which the first word is an adverb that ends in ‘ly’.
Context: ...oDB checkpointer provides a serverless, fully-managed solution for coordinating multiple Kine...

(HYPHENATED_LY_ADVERB_ADJECTIVE)

🪛 markdownlint-cli2 (0.17.2)
docs/dynamodb-checkpointing.md

185-185: Emphasis used instead of a heading
null

(MD036, no-emphasis-as-heading)


186-186: Fenced code blocks should have a language specified
null

(MD040, fenced-code-language)


191-191: Emphasis used instead of a heading
null

(MD036, no-emphasis-as-heading)


192-192: Fenced code blocks should have a language specified
null

(MD040, fenced-code-language)


197-197: Emphasis used instead of a heading
null

(MD036, no-emphasis-as-heading)


198-198: Fenced code blocks should have a language specified
null

(MD040, fenced-code-language)

🪛 Ruff (0.11.9)
tests/test_dynamodb_checkpointer.py

4-4: os imported but unused

Remove unused import: os

(F401)


307-308: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)

kinesis/dynamodb.py

8-8: json imported but unused

Remove unused import: json

(F401)


116-116: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


226-226: Local variable response is assigned to but never used

Remove assignment to unused variable response

(F841)


250-250: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 Pylint (3.3.7)
tests/test_dynamodb_checkpointer.py

[error] 283-283: Instance of 'DynamoDBCheckPointer' has no 'manual_checkpoint' member

(E1101)

kinesis/dynamodb.py

[refactor] 29-29: Too many instance attributes (9/7)

(R0902)


[refactor] 53-53: Too many arguments (11/5)

(R0913)


[refactor] 53-53: Too many positional arguments (11/5)

(R0917)


[refactor] 249-252: Unnecessary "else" after "raise", remove the "else" and de-indent the code inside it

(R1720)


[refactor] 363-367: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it

(R1705)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: test (3.12)
  • GitHub Check: test (3.9)
  • GitHub Check: test (3.11)
  • GitHub Check: test (3.10)
  • GitHub Check: test-docker
🔇 Additional comments (13)
setup.py (1)

39-39: Verify the aioboto3 version constraint for security and compatibility.

The dependency addition follows the correct pattern. Please verify that aioboto3>=11.0.0 is the appropriate version constraint and check for any security advisories.

#!/bin/bash
# Description: Check aioboto3 latest versions and security advisories

# Check latest version on PyPI
curl -s https://pypi.org/pypi/aioboto3/json | jq '.info.version'

# Check for security advisories
gh api graphql -f query='
{
  securityVulnerabilities(first: 5, ecosystem: PIP, package: "aioboto3") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
      firstPatchedVersion {
        identifier
      }
    }
  }
}'
kinesis/__init__.py (1)

23-27: LGTM! Consistent optional import pattern.

The implementation correctly follows the same pattern used for PrometheusMetricsCollector, ensuring consistency across optional dependencies.

README.md (3)

81-81: Good addition of DynamoDB installation option.

The installation documentation correctly follows the pattern established for other optional dependencies.


369-422: Excellent comprehensive documentation for DynamoDB checkpointer.

The documentation is well-structured and covers all essential aspects:

  • Clear usage examples
  • Complete configuration reference
  • Benefits and comparison with Redis
  • Reference to detailed guide

The content follows the established documentation patterns and provides users with sufficient information to get started.


576-576: Good cross-reference to detailed documentation.

The link to the comprehensive DynamoDB checkpointing guide enhances the documentation structure.

tests/test_dynamodb_checkpointer.py (2)

1-321: Excellent comprehensive test coverage for DynamoDB checkpointer.

The test suite thoroughly covers all major functionality:

  • Initialization scenarios (existing/new table)
  • Shard allocation and ownership
  • Checkpointing and error handling
  • Heartbeat functionality
  • Configuration options
  • Cleanup and error scenarios

The use of proper async patterns and comprehensive mocking ensures reliable testing of the DynamoDB checkpointer functionality.


283-283: ```shell
#!/bin/bash

Display BaseHeartbeatCheckPointer class definition to find inherited methods

rg -n -C5 "class BaseHeartbeatCheckPointer" -g "kinesis/checkpointers.py"

Show lines around the manual_checkpoint implementation

sed -n '170,200p' kinesis/checkpointers.py


</details>
<details>
<summary>docs/dynamodb-checkpointing.md (1)</summary>

`1-357`: **Outstanding comprehensive documentation for DynamoDB checkpointing.**

This documentation guide is exceptionally well-written and covers all essential aspects:

- Clear installation and setup instructions
- Comprehensive configuration options with explanations
- Practical examples for different use cases
- Detailed monitoring and debugging guidance
- Cost optimization strategies with real calculations
- Migration path from Redis with comparison
- Production best practices
- Testing guidance with LocalStack

The depth and quality of this documentation will significantly help users adopt and effectively use the DynamoDB checkpointing feature.

</details>
<details>
<summary>kinesis/dynamodb.py (5)</summary>

`29-90`: **Well-structured checkpointer initialization.**

The class is properly documented with comprehensive configuration options. The ImportError message helpfully guides users to install the required dependency.

---

`122-156`: **Excellent table setup with TTL.**

The table creation properly uses on-demand billing and sets up TTL for automatic cleanup of old records. The error handling for TTL setup is appropriate.

---

`169-204`: **Robust heartbeat implementation with ownership verification.**

The conditional update ensures that only the current owner can update the heartbeat, preventing race conditions. The TTL update keeps active records from expiring.

---

`285-322`: **Well-designed shard allocation with race condition handling.**

The two-phase allocation (create new or take existing) properly handles race conditions and the recursive retry elegantly handles edge cases.

---

`369-375`: **Proper resource cleanup.**

The close method correctly cleans up the DynamoDB session after calling the parent's close method.

</details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
tests/test_dynamodb_checkpointer.py (3)

4-4: Remove unused import.

The os module is imported but never used in the test file.

-import os

24-32: Simplify nested with statements.

Multiple nested with statements can be combined into a single statement for better readability.

-with patch.dict(
-    sys.modules,
-    {
-        "aioboto3": mock_aioboto3,
-        "botocore.exceptions": mock_botocore,
-    },
-):
-    # Mock the HAS_DYNAMODB flag
-    with patch("kinesis.dynamodb.HAS_DYNAMODB", True):
-        with patch("kinesis.dynamodb.aioboto3", mock_aioboto3):
-            with patch("kinesis.dynamodb.ClientError", ClientError):
-                from kinesis.dynamodb import DynamoDBCheckPointer
+with (
+    patch.dict(
+        sys.modules,
+        {
+            "aioboto3": mock_aioboto3,
+            "botocore.exceptions": mock_botocore,
+        },
+    ),
+    patch("kinesis.dynamodb.HAS_DYNAMODB", True),
+    patch("kinesis.dynamodb.aioboto3", mock_aioboto3),
+    patch("kinesis.dynamodb.ClientError", ClientError),
+):
+    from kinesis.dynamodb import DynamoDBCheckPointer

318-320: Simplify nested with statements.

Another instance of nested with statements that can be combined.

-        with patch("kinesis.dynamodb.HAS_DYNAMODB", False):
-            with pytest.raises(ImportError, match="DynamoDB support requires aioboto3"):
-                DynamoDBCheckPointer("test-app")
+        with (
+            patch("kinesis.dynamodb.HAS_DYNAMODB", False),
+            pytest.raises(ImportError, match="DynamoDB support requires aioboto3"),
+        ):
+            DynamoDBCheckPointer("test-app")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f4f715c and a9cc109.

📒 Files selected for processing (3)
  • docker-compose.yaml (1 hunks)
  • pytest.ini (1 hunks)
  • tests/test_dynamodb_checkpointer.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • pytest.ini
🧰 Additional context used
🪛 Ruff (0.11.9)
tests/test_dynamodb_checkpointer.py

4-4: os imported but unused

Remove unused import: os

(F401)


24-32: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)


318-319: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)

🪛 Pylint (3.3.7)
tests/test_dynamodb_checkpointer.py

[error] 294-294: Instance of 'DynamoDBCheckPointer' has no 'manual_checkpoint' member

(E1101)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: test (3.10)
  • GitHub Check: test (3.11)
  • GitHub Check: test (3.9)
  • GitHub Check: test (3.12)
  • GitHub Check: test-docker
🔇 Additional comments (14)
docker-compose.yaml (1)

17-17: Good practice for selective test execution.

Excluding DynamoDB tests from the default containerized test environment is appropriate since these tests require additional dependencies and may not be available in all environments.

tests/test_dynamodb_checkpointer.py (13)

43-77: Well-structured test fixture.

The mock DynamoDB fixture is comprehensive and properly configured for async testing. The setup covers all necessary DynamoDB operations with appropriate async context managers.


78-88: Good test coverage for initialization scenarios.

The test properly verifies that existing tables are loaded correctly during initialization.


89-105: Comprehensive table creation test.

This test validates both the error handling for missing tables and the table creation logic with correct parameters.


107-126: Thorough shard allocation testing.

The test correctly verifies new shard allocation behavior, including internal state updates and DynamoDB interactions.


128-152: Good coverage of shard reallocation scenario.

This test properly handles the complex case of allocating an expired shard, including timestamp checking and sequence number retrieval.


154-175: Important edge case coverage.

Testing the busy shard scenario ensures the checkpointer respects existing active allocations.


177-198: Solid checkpointing test.

The test validates both the internal state updates and the DynamoDB update operations for successful checkpointing.


200-212: Critical security test.

Testing that checkpointing fails when not owning a shard is essential for preventing data corruption in distributed systems.


236-255: Excellent async heartbeat testing.

The test properly validates the heartbeat functionality with appropriate timing and async operations. The use of asyncio.sleep with a short duration is appropriate for testing.


282-297: Comprehensive manual checkpointing test.

This test validates the manual checkpointing mode behavior, ensuring updates are deferred until explicitly triggered. Good separation of concerns between automatic and manual modes.


299-315: Important cleanup test.

Testing the close() method ensures proper resource cleanup and shard deallocation, which is critical for preventing resource leaks in production.


1-333: Outstanding test coverage and quality.

This test suite demonstrates excellent practices:

  • Comprehensive mocking to isolate external dependencies
  • Coverage of both happy path and error scenarios
  • Proper async/await testing patterns
  • Edge case handling (busy shards, missing tables, etc.)
  • Resource cleanup testing

The tests provide confidence in the DynamoDB checkpointer implementation.


294-296: Verify the manual_checkpoint method exists.

The test calls manual_checkpoint() method but static analysis suggests this method may not exist in the implementation.

#!/bin/bash
# Description: Verify the manual_checkpoint method exists in DynamoDBCheckPointer
# Expected: Find the method definition in the kinesis/dynamodb.py file

ast-grep --pattern $'class DynamoDBCheckPointer {
  $$$
  def manual_checkpoint($$$) {
    $$$
  }
  $$$
}'

# Also search for any manual_checkpoint method
rg -A 5 "def manual_checkpoint"

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
tests/test_dynamodb_checkpointer.py (1)

25-37: Consider using a single with statement for cleaner code.

While the current nested structure works, you can make the code cleaner by combining the context managers.

-with patch.dict(
-    sys.modules,
-    {
-        "aioboto3": mock_aioboto3,
-        "botocore.exceptions": mock_botocore.exceptions,
-    },
-):
-    # Mock the HAS_DYNAMODB flag
-    with patch("kinesis.dynamodb.HAS_DYNAMODB", True):
-        with patch("kinesis.dynamodb.aioboto3", mock_aioboto3):
-            with patch("kinesis.dynamodb.ClientError", ClientError):
-                from kinesis.dynamodb import DynamoDBCheckPointer
+with (
+    patch.dict(
+        sys.modules,
+        {
+            "aioboto3": mock_aioboto3,
+            "botocore.exceptions": mock_botocore.exceptions,
+        },
+    ),
+    patch("kinesis.dynamodb.HAS_DYNAMODB", True),
+    patch("kinesis.dynamodb.aioboto3", mock_aioboto3),
+    patch("kinesis.dynamodb.ClientError", ClientError),
+):
+    from kinesis.dynamodb import DynamoDBCheckPointer
kinesis/dynamodb.py (1)

215-215: Simplify list creation from dictionary items.

Use list() directly instead of a comprehension for better readability.

-items = [(k, v) for k, v in self._manual_checkpoints.items()]
+items = list(self._manual_checkpoints.items())
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a9cc109 and 0cfdfd3.

📒 Files selected for processing (2)
  • kinesis/dynamodb.py (1 hunks)
  • tests/test_dynamodb_checkpointer.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
tests/test_dynamodb_checkpointer.py

4-4: os imported but unused

Remove unused import: os

(F401)


25-33: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)


47-48: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)


326-327: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)

kinesis/dynamodb.py

115-115: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


258-258: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 Pylint (3.3.7)
kinesis/dynamodb.py

[refactor] 28-28: Too many instance attributes (10/7)

(R0902)


[refactor] 52-52: Too many arguments (11/5)

(R0913)


[refactor] 52-52: Too many positional arguments (11/5)

(R0917)


[refactor] 215-215: Unnecessary use of a comprehension, use list(self._manual_checkpoints.items()) instead.

(R1721)


[refactor] 257-260: Unnecessary "else" after "raise", remove the "else" and de-indent the code inside it

(R1720)


[refactor] 371-375: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it

(R1705)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: test (3.10)
  • GitHub Check: test (3.11)
  • GitHub Check: test (3.9)
  • GitHub Check: test (3.12)
  • GitHub Check: test-docker
🔇 Additional comments (1)
kinesis/dynamodb.py (1)

28-84: Well-designed checkpointer implementation.

The class design properly extends BaseHeartbeatCheckPointer and provides comprehensive DynamoDB-specific functionality. While static analysis flags the number of instance attributes and parameters, they are all justified for the configurability needed in a distributed checkpointing system.

Comment on lines +325 to +330
# Record exists, check if we can take it
response = await table.get_item(Key={"shard_id": key})

if "Item" not in response:
# Race condition - someone deleted it, try again
return await self.allocate(shard_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Avoid unbounded recursion in allocation retry.

The recursive call on line 330 could lead to stack overflow in extreme race condition scenarios. Consider using a retry loop with a limit instead.

Consider refactoring the allocation logic to use a retry loop:

async def allocate(self, shard_id: str, max_retries: int = 3) -> Tuple[bool, Optional[str]]:
    """Try to allocate a shard for processing."""
    if not self._initialized:
        await self._initialize()

    for attempt in range(max_retries):
        # ... existing allocation logic ...
        
        # At line 330, instead of recursion:
        if "Item" not in response:
            # Race condition - someone deleted it, try again
            if attempt < max_retries - 1:
                continue
            else:
                log.warning(f"Failed to allocate {shard_id} after {max_retries} attempts")
                return False, None
🤖 Prompt for AI Agents
In kinesis/dynamodb.py around lines 325 to 330, the recursive call to
self.allocate when "Item" is not in response can cause unbounded recursion and
potential stack overflow. Refactor this by replacing the recursion with a retry
loop that attempts allocation a limited number of times (e.g., 3). On each
retry, repeat the allocation logic, and if the maximum retries are reached
without success, log a warning and return failure instead of recursing.

Comment on lines +256 to +260
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it")
else:
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Preserve exception context when re-raising.

Use from e to preserve the original ClientError context.

 except ClientError as e:
     if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
-        raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it")
+        raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it") from e
     else:
         raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it")
else:
raise
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise Exception(f"{self.get_ref()} tried to checkpoint {shard_id} but does not own it") from e
else:
raise
🧰 Tools
🪛 Ruff (0.11.9)

258-258: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 Pylint (3.3.7)

[refactor] 257-260: Unnecessary "else" after "raise", remove the "else" and de-indent the code inside it

(R1720)

🤖 Prompt for AI Agents
In kinesis/dynamodb.py around lines 256 to 260, when re-raising the exception
after checking the error code, modify the raise statement to use 'raise
Exception(...) from e' instead of just 'raise Exception(...)'. This change
preserves the original ClientError context for better debugging.

Comment on lines +110 to +117
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
if self.create_table:
await self._create_table(dynamodb)
else:
raise Exception(f"DynamoDB table {self.table_name} does not exist")
else:
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Preserve exception context when re-raising.

When raising a new exception within an except block, use from e to preserve the original exception context for better debugging.

 except ClientError as e:
     if e.response["Error"]["Code"] == "ResourceNotFoundException":
         if self.create_table:
             await self._create_table(dynamodb)
         else:
-            raise Exception(f"DynamoDB table {self.table_name} does not exist")
+            raise Exception(f"DynamoDB table {self.table_name} does not exist") from e
     else:
         raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
if self.create_table:
await self._create_table(dynamodb)
else:
raise Exception(f"DynamoDB table {self.table_name} does not exist")
else:
raise
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
if self.create_table:
await self._create_table(dynamodb)
else:
raise Exception(f"DynamoDB table {self.table_name} does not exist") from e
else:
raise
🧰 Tools
🪛 Ruff (0.11.9)

115-115: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In kinesis/dynamodb.py around lines 110 to 117, when raising a new Exception
inside the except block, modify the raise statement to use 'raise Exception(...)
from e' to preserve the original exception context. This involves appending
'from e' to the raise line that throws the new Exception about the missing
DynamoDB table.

@hampsterx hampsterx merged commit 446df29 into master Jun 22, 2025
7 checks passed
@hampsterx hampsterx deleted the dynamodb_checkpointer branch June 22, 2025 20:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant