Skip to content

feat(sdk): add audit_actor and audit_stamp methods to DataHubClient #13676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

sgomezvillamor
Copy link
Contributor

Summary

  • Add audit_actor method to extract actor from JWT tokens with fallback support
  • Add audit_stamp method to create AuditStampClass with actor and timestamp
  • Use PyJWT library for secure JWT token parsing instead of manual base64 decoding
  • Comprehensive test coverage with parameterized tests using freezegun for deterministic time testing

Key Features

  • JWT Token Parsing: Extracts actor ID from JWT tokens with proper error handling
  • Fallback Support: Uses fallback actors or DEFAULT_ACTOR_URN when JWT parsing fails
  • Audit Stamp Creation: Generates standardized audit stamps for metadata operations
  • Type Safety: Full mypy compliance with proper type annotations

TODO

Making use of these new methods across the codebase to replace hardcoded actor usage is a follow-up task.

Test Plan

  • All existing tests pass
  • New parameterized tests cover all JWT scenarios (valid, invalid, missing fields)
  • Time-dependent tests use freezegun for deterministic behavior
  • Code formatting and type checking pass

🤖 Generated with Claude Code

- Add audit_actor method to extract actor from JWT tokens with fallback support
- Add audit_stamp method to create AuditStampClass with actor and timestamp
- Use PyJWT library for secure JWT token parsing
- Comprehensive test coverage with parameterized tests using freezegun
- Add TODO comment to migrate from DEFAULT_ACTOR_URN to new audit_actor method

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jun 3, 2025
@sgomezvillamor sgomezvillamor requested a review from hsheth2 June 3, 2025 15:26
Copy link

codecov bot commented Jun 3, 2025

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
2517 3 2514 33
View the top 3 failed test(s) by shortest run time
::tests.integration.test_great_expectations
Stack Traces | 0s run time
ImportError while importing test module '.../tests/integration/test_great_expectations.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
venv/lib/python3.11.../site-packages/_pytest/python.py:497: in importtestmodule
    mod = import_path(
venv/lib/python3.11.../site-packages/_pytest/pathlib.py:587: in import_path
    importlib.import_module(module_name)
.../hostedtoolcache/Python/3.11.12.../x64/lib/python3.11/importlib/__init__.py:126: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
<frozen importlib._bootstrap>:1204: in _gcd_import
    ???
<frozen importlib._bootstrap>:1176: in _find_and_load
    ???
<frozen importlib._bootstrap>:1147: in _find_and_load_unlocked
    ???
<frozen importlib._bootstrap>:690: in _load_unlocked
    ???
venv/lib/python3.11.../_pytest/assertion/rewrite.py:186: in exec_module
    exec(co, module.__dict__)
tests/integration/test_great_expectations.py:12: in <module>
    from datahub.testing.compare_metadata_json import assert_metadata_files_equal
../...../datahub/testing/compare_metadata_json.py:17: in <module>
    from datahub.ingestion.source.file import read_metadata_file
../...../ingestion/source/file.py:20: in <module>
    from datahub.ingestion.api.decorators import (
../...../ingestion/api/decorators.py:6: in <module>
    from datahub.ingestion.api.source import (
../...../ingestion/api/source.py:41: in <module>
    from datahub.ingestion.api.source_helpers import (
../...../ingestion/api/source_helpers.py:43: in <module>
    from datahub.sdk.entity import Entity
../...../datahub/sdk/__init__.py:25: in <module>
    from datahub.sdk.main_client import DataHubClient
../...../datahub/sdk/main_client.py:7: in <module>
    import jwt
E   ModuleNotFoundError: No module named 'jwt'
tests.unit.sdk_v2.test_client_v2::test_audit_stamp[jwt_overrides_fallback_with_custom_time]
Stack Traces | 0.085s run time
token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY3RvcklkIjoiand0X3VzZXIiLCJ0eXBlIjoiUEVSU09OQUwifQ.XtcuQdF7KenAYbC-bA-m8kZFhW9EV-ovhMfFqpC-BNc'
fallback_actor = CorpGroupUrn(urn:li:corpGroup:group_fallback)
fallback_timestamp = datetime.datetime(2023, 12, 25, 18, 45, 30)
expected_actor = CorpUserUrn(urn:li:corpuser:jwt_user)
expected_time_ms = 1703526330000

    @freeze_time(FROZEN_TIME)
    @pytest.mark.parametrize(
        "token,fallback_actor,fallback_timestamp,expected_actor,expected_time_ms",
        [
            pytest.param(
                create_jwt_token({"actorId": "stamp_user", "type": "PERSONAL"}),
                None,
                None,
                CorpUserUrn.create_from_id("stamp_user"),
                1705321845000,  # FROZEN_TIME in milliseconds (UTC)
                id="valid_jwt_current_time",
            ),
            pytest.param(
                None,
                CorpUserUrn.create_from_id("fallback_user"),
                None,
                CorpUserUrn.create_from_id("fallback_user"),
                1705321845000,  # FROZEN_TIME in milliseconds (UTC)
                id="fallback_actor_current_time",
            ),
            pytest.param(
                None,
                None,
                datetime(2023, 6, 15, 10, 30, 0),
                DEFAULT_ACTOR_URN,
                1686817800000,  # Custom timestamp in milliseconds
                id="custom_timestamp",
            ),
            pytest.param(
                create_jwt_token({"actorId": "jwt_user", "type": "PERSONAL"}),
                CorpGroupUrn.create_from_id("group_fallback"),
                datetime(2023, 12, 25, 18, 45, 30),
                CorpUserUrn.create_from_id("jwt_user"),
                1703526330000,  # Custom timestamp in milliseconds
                id="jwt_overrides_fallback_with_custom_time",
            ),
        ],
    )
    def test_audit_stamp(
        token, fallback_actor, fallback_timestamp, expected_actor, expected_time_ms
    ):
        """Test audit_stamp method with various scenarios."""
        mock_graph = Mock(spec=DataHubGraph)
        mock_config = Mock()
        mock_config.token = token
        mock_graph.config = mock_config
    
        client = DataHubClient(graph=mock_graph)
    
        result = client.audit_stamp(
            fallback_actor=fallback_actor, fallback_timestamp=fallback_timestamp
        )
    
        # Verify the result is an AuditStampClass
        assert isinstance(result, AuditStampClass)
    
        # Verify the actor is correct
        assert result.actor == str(expected_actor)
    
        # Verify the timestamp is exactly what we expect
>       assert result.time == expected_time_ms
E       AssertionError: assert 1703529930000 == 1703526330000
E        +  where 1703529930000 = AuditStampClass({'time': 1703529930000, 'actor': 'urn:li:corpuser:jwt_user', 'impersonator': None, 'message': None}).time

.../unit/sdk_v2/test_client_v2.py:204: AssertionError
tests.unit.sdk_v2.test_client_v2::test_audit_stamp[custom_timestamp]
Stack Traces | 0.086s run time
token = None, fallback_actor = None
fallback_timestamp = datetime.datetime(2023, 6, 15, 10, 30)
expected_actor = CorpUserUrn(urn:li:corpuser:__ingestion)
expected_time_ms = 1686817800000

    @freeze_time(FROZEN_TIME)
    @pytest.mark.parametrize(
        "token,fallback_actor,fallback_timestamp,expected_actor,expected_time_ms",
        [
            pytest.param(
                create_jwt_token({"actorId": "stamp_user", "type": "PERSONAL"}),
                None,
                None,
                CorpUserUrn.create_from_id("stamp_user"),
                1705321845000,  # FROZEN_TIME in milliseconds (UTC)
                id="valid_jwt_current_time",
            ),
            pytest.param(
                None,
                CorpUserUrn.create_from_id("fallback_user"),
                None,
                CorpUserUrn.create_from_id("fallback_user"),
                1705321845000,  # FROZEN_TIME in milliseconds (UTC)
                id="fallback_actor_current_time",
            ),
            pytest.param(
                None,
                None,
                datetime(2023, 6, 15, 10, 30, 0),
                DEFAULT_ACTOR_URN,
                1686817800000,  # Custom timestamp in milliseconds
                id="custom_timestamp",
            ),
            pytest.param(
                create_jwt_token({"actorId": "jwt_user", "type": "PERSONAL"}),
                CorpGroupUrn.create_from_id("group_fallback"),
                datetime(2023, 12, 25, 18, 45, 30),
                CorpUserUrn.create_from_id("jwt_user"),
                1703526330000,  # Custom timestamp in milliseconds
                id="jwt_overrides_fallback_with_custom_time",
            ),
        ],
    )
    def test_audit_stamp(
        token, fallback_actor, fallback_timestamp, expected_actor, expected_time_ms
    ):
        """Test audit_stamp method with various scenarios."""
        mock_graph = Mock(spec=DataHubGraph)
        mock_config = Mock()
        mock_config.token = token
        mock_graph.config = mock_config
    
        client = DataHubClient(graph=mock_graph)
    
        result = client.audit_stamp(
            fallback_actor=fallback_actor, fallback_timestamp=fallback_timestamp
        )
    
        # Verify the result is an AuditStampClass
        assert isinstance(result, AuditStampClass)
    
        # Verify the actor is correct
        assert result.actor == str(expected_actor)
    
        # Verify the timestamp is exactly what we expect
>       assert result.time == expected_time_ms
E       AssertionError: assert 1686825000000 == 1686817800000
E        +  where 1686825000000 = AuditStampClass({'time': 1686825000000, 'actor': 'urn:li:corpuser:__ingestion', 'impersonator': None, 'message': None}).time

.../unit/sdk_v2/test_client_v2.py:204: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Jun 3, 2025
fallback_timestamp: Optional[datetime] = None,
) -> AuditStampClass:
"""Get an AuditStampClass for auditing purposes.
It uses the actor obtained from the audit_actor method and the current timestamp, unless fallback_timestamp is given.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use google-style docstrings - this formatting looks a bit odd

@@ -118,3 +129,55 @@ def assertions(self) -> AssertionsClient: # type: ignore[return-value] # Type
"AssertionsClient is not installed, please install it with `pip install acryl-datahub-cloud`"
)
return AssertionsClient(self)

def audit_actor(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you anticipate this to actually be used? it's not clear to me because this is a client method, but it would need to be used from classes that don't always have access to a client type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

you are thinking of ingestion pipelines, right?

I could:

  • move most of the code to some utils
  • keep audit_actor and audit_stamp in the DataHubClient... so it is accessible to SDK users
  • add similar methods to Pipeline ... so it is accessible to ingestion, even if DataHub(Graph|Client) is not available

is there any other place where we use the SDK not having DataHub(Graph|Client)?

any other proposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option would be to push down this functionality, directly to the AuditStampClass

class AuditStampClass(DictWrapper):

    @staticmethod
    def from_token(
        auth_token: Optional[str] = None,
        fallback_actor: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None,
        fallback_timestamp: Optional[datetime] = None,
    ) -> AuditStampClass

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR based on adding the feature directly to AuditStampClass #13710
CC: @hsheth2

from datetime import datetime
from typing import Optional, Union, overload

import jwt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like an additional dep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my guess too and was expecting CI builds to fail because of that... and they did, so yes, dep to be added

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jun 3, 2025
@sgomezvillamor
Copy link
Contributor Author

Superseded by #13710

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-response Issue/request has been reviewed but requires a response from the submitter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants