|
| 1 | +import os |
| 2 | +import tempfile |
| 3 | + |
1 | 4 | import pytest
|
2 | 5 | from litellm import ModelResponse
|
3 | 6 | from litellm.types.utils import Delta, StreamingChoices
|
4 | 7 |
|
5 | 8 | from codegate.pipeline.base import PipelineContext, PipelineSensitiveData
|
6 | 9 | from codegate.pipeline.output import OutputPipelineContext
|
7 | 10 | from codegate.pipeline.secrets.manager import SecretsManager
|
8 |
| -from codegate.pipeline.secrets.secrets import SecretUnredactionStep |
| 11 | +from codegate.pipeline.secrets.secrets import ( |
| 12 | + SecretsEncryptor, |
| 13 | + SecretsObfuscator, |
| 14 | + SecretUnredactionStep, |
| 15 | +) |
| 16 | +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match |
| 17 | + |
| 18 | + |
| 19 | +class TestSecretsModifier: |
| 20 | + def test_get_absolute_position(self): |
| 21 | + modifier = SecretsObfuscator() # Using concrete implementation for testing |
| 22 | + text = "line1\nline2\nline3" |
| 23 | + |
| 24 | + # Test various positions |
| 25 | + assert modifier._get_absolute_position(1, 0, text) == 0 # Start of first line |
| 26 | + assert modifier._get_absolute_position(2, 0, text) == 6 # Start of second line |
| 27 | + assert modifier._get_absolute_position(1, 4, text) == 4 # Middle of first line |
| 28 | + |
| 29 | + def test_extend_match_boundaries(self): |
| 30 | + modifier = SecretsObfuscator() |
| 31 | + |
| 32 | + # Test extension with quotes |
| 33 | + text = 'config = "secret_value" # comment' |
| 34 | + secret = "secret_value" |
| 35 | + start = text.index(secret) |
| 36 | + end = start + len(secret) |
| 37 | + |
| 38 | + start, end = modifier._extend_match_boundaries(text, start, end) |
| 39 | + assert text[start:end] == secret |
| 40 | + |
| 41 | + # Test extension without quotes spaces |
| 42 | + text = "config = secret_value # comment" |
| 43 | + secret = "secret_value" |
| 44 | + start = text.index(secret) |
| 45 | + end = start + len(secret) |
| 46 | + |
| 47 | + start, end = modifier._extend_match_boundaries(text, start, end) |
| 48 | + assert text[start:end] == secret |
| 49 | + |
| 50 | + |
| 51 | +@pytest.fixture |
| 52 | +def valid_yaml_content(): |
| 53 | + return """ |
| 54 | +- AWS: |
| 55 | + - Access Key: '[A-Z0-9]{20}' |
| 56 | +""" |
| 57 | + |
| 58 | + |
| 59 | +@pytest.fixture |
| 60 | +def temp_yaml_file(valid_yaml_content): |
| 61 | + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: |
| 62 | + f.write(valid_yaml_content) |
| 63 | + yield f.name |
| 64 | + os.unlink(f.name) |
| 65 | + |
| 66 | + |
| 67 | +class TestSecretsEncryptor: |
| 68 | + @pytest.fixture(autouse=True) |
| 69 | + def setup(self, temp_yaml_file): |
| 70 | + CodegateSignatures.initialize(temp_yaml_file) |
| 71 | + self.context = PipelineContext() |
| 72 | + self.secrets_manager = SecretsManager() |
| 73 | + self.session_id = "test_session" |
| 74 | + self.encryptor = SecretsEncryptor(self.secrets_manager, self.context, self.session_id) |
| 75 | + |
| 76 | + def test_hide_secret(self): |
| 77 | + # Create a test match |
| 78 | + match = Match( |
| 79 | + service="AWS", |
| 80 | + type="Access Key", |
| 81 | + value="AKIAIOSFODNN7EXAMPLE", |
| 82 | + line_number=1, |
| 83 | + start_index=0, |
| 84 | + end_index=9, |
| 85 | + ) |
| 86 | + |
| 87 | + # Test secret hiding |
| 88 | + hidden = self.encryptor._hide_secret(match) |
| 89 | + assert hidden.startswith("REDACTED<$") |
| 90 | + assert hidden.endswith(">") |
| 91 | + |
| 92 | + # Verify the secret was stored |
| 93 | + encrypted_value = hidden[len("REDACTED<$") : -1] |
| 94 | + original = self.secrets_manager.get_original_value(encrypted_value, self.session_id) |
| 95 | + assert original == "AKIAIOSFODNN7EXAMPLE" |
| 96 | + |
| 97 | + def test_obfuscate(self): |
| 98 | + # Test text with a secret |
| 99 | + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nOther text" |
| 100 | + protected, count = self.encryptor.obfuscate(text) |
| 101 | + |
| 102 | + assert count == 1 |
| 103 | + assert "REDACTED<$" in protected |
| 104 | + assert "AKIAIOSFODNN7EXAMPLE" not in protected |
| 105 | + assert "Other text" in protected |
| 106 | + |
| 107 | + |
| 108 | +class TestSecretsObfuscator: |
| 109 | + @pytest.fixture(autouse=True) |
| 110 | + def setup(self, temp_yaml_file): |
| 111 | + CodegateSignatures.initialize(temp_yaml_file) |
| 112 | + self.obfuscator = SecretsObfuscator() |
| 113 | + |
| 114 | + def test_hide_secret(self): |
| 115 | + match = Match( |
| 116 | + service="AWS", |
| 117 | + type="Access Key", |
| 118 | + value="AKIAIOSFODNN7EXAMPLE", |
| 119 | + line_number=1, |
| 120 | + start_index=0, |
| 121 | + end_index=15, |
| 122 | + ) |
| 123 | + |
| 124 | + hidden = self.obfuscator._hide_secret(match) |
| 125 | + assert hidden == "*" * 32 |
| 126 | + assert len(hidden) == 32 # Consistent length regardless of input |
| 127 | + |
| 128 | + def test_obfuscate(self): |
| 129 | + # Test text with multiple secrets |
| 130 | + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nPASSWORD=AKIAIOSFODNN7EXAMPLE" |
| 131 | + protected, count = self.obfuscator.obfuscate(text) |
| 132 | + |
| 133 | + assert count == 2 |
| 134 | + assert "AKIAIOSFODNN7EXAMPLE" not in protected |
| 135 | + assert "*" * 32 in protected |
| 136 | + |
| 137 | + # Verify format |
| 138 | + expected_pattern = f"API_KEY={'*' * 32}\nPASSWORD={'*' * 32}" |
| 139 | + assert protected == expected_pattern |
| 140 | + |
| 141 | + def test_obfuscate_no_secrets(self): |
| 142 | + text = "Regular text without secrets" |
| 143 | + protected, count = self.obfuscator.obfuscate(text) |
| 144 | + |
| 145 | + assert count == 0 |
| 146 | + assert protected == text |
9 | 147 |
|
10 | 148 |
|
11 | 149 | def create_model_response(content: str) -> ModelResponse:
|
|
0 commit comments