From 57194aa7280ec4125916e29417e90f4400ebd7b3 Mon Sep 17 00:00:00 2001 From: Jakub Hrozek Date: Sat, 14 Dec 2024 14:01:22 +0100 Subject: [PATCH 1/3] Load signatures only once This was manifesting in tests - if we kept loading signatures, they were being added to the list of signatures, which was causing double matches --- src/codegate/pipeline/secrets/signatures.py | 5 +++++ tests/pipeline/secrets/test_signatures.py | 23 +++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/codegate/pipeline/secrets/signatures.py b/src/codegate/pipeline/secrets/signatures.py index 95d306e1..8d2a0c3d 100644 --- a/src/codegate/pipeline/secrets/signatures.py +++ b/src/codegate/pipeline/secrets/signatures.py @@ -155,6 +155,11 @@ def _sanitize_pattern(cls, pattern: str) -> str: @classmethod def _add_signature_group(cls, name: str, patterns: Dict[str, str]) -> None: """Add a new signature group and compile its regex patterns.""" + # Check if this group already exists + if any(group.name == name for group in cls._signature_groups): + logger.debug(f"Signature group {name} already exists, skipping") + return + signature_group = SignatureGroup(name, patterns) for pattern_name, pattern in patterns.items(): diff --git a/tests/pipeline/secrets/test_signatures.py b/tests/pipeline/secrets/test_signatures.py index 42f6470f..11a6c5c8 100644 --- a/tests/pipeline/secrets/test_signatures.py +++ b/tests/pipeline/secrets/test_signatures.py @@ -167,3 +167,26 @@ def test_duplicate_patterns(): assert match.value == "AKIAIOSFODNN7EXAMPLE" finally: os.unlink(f.name) + + +def test_no_duplicate_signature_groups(temp_yaml_file): + """Test that signature groups aren't added multiple times""" + CodegateSignatures.initialize(temp_yaml_file) + + # First load + CodegateSignatures._signatures_loaded = False + CodegateSignatures._load_signatures() + initial_group_count = len(CodegateSignatures._signature_groups) + initial_regex_count = len(CodegateSignatures._compiled_regexes) + + # Second load + CodegateSignatures._signatures_loaded = False + CodegateSignatures._load_signatures() + + # Verify counts haven't changed + assert len(CodegateSignatures._signature_groups) == initial_group_count + assert len(CodegateSignatures._compiled_regexes) == initial_regex_count + + # Verify GitHub group appears only once + github_groups = [g for g in CodegateSignatures._signature_groups if g.name == "GitHub"] + assert len(github_groups) == 1 From 19af521a89dcca70cc345d1d2651bf92035a5cb0 Mon Sep 17 00:00:00 2001 From: Jakub Hrozek Date: Fri, 13 Dec 2024 11:26:08 +0000 Subject: [PATCH 2/3] Split out secret obfuscation into reusable classes Instead of coding up the secret encryption directly in the Pipeline step, let's split it out into a class of its own based on its own. The actual method that changes the secret is pluggable, for encryption where we need to get the secret value back we use the method we had used in the pipeline step. For things like extracting packages from a code snippet where we don't need to retrieve the original value we just replace the secret with a fixed number of asterisks. --- src/codegate/pipeline/secrets/secrets.py | 158 ++++++++++++++++------- 1 file changed, 113 insertions(+), 45 deletions(-) diff --git a/src/codegate/pipeline/secrets/secrets.py b/src/codegate/pipeline/secrets/secrets.py index 0f3d61c2..fe601284 100644 --- a/src/codegate/pipeline/secrets/secrets.py +++ b/src/codegate/pipeline/secrets/secrets.py @@ -1,4 +1,5 @@ import re +from abc import abstractmethod from typing import Optional import structlog @@ -14,14 +15,19 @@ ) from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep from codegate.pipeline.secrets.manager import SecretsManager -from codegate.pipeline.secrets.signatures import CodegateSignatures +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match from codegate.pipeline.systemmsg import add_or_update_system_message logger = structlog.get_logger("codegate") -class CodegateSecrets(PipelineStep): - """Pipeline step that handles secret information requests.""" +class SecretsModifier: + """ + A class that helps obfuscate text by piping it through the secrets manager + that finds the secrets and then calling hide_secret to modify them. + + What modifications are done is up to the user who subclasses SecretsModifier + """ def __init__(self): """Initialize the CodegateSecrets pipeline step.""" @@ -29,15 +35,23 @@ def __init__(self): # Initialize and load signatures immediately CodegateSignatures.initialize("signatures.yaml") - @property - def name(self) -> str: + @abstractmethod + def _hide_secret(self, match: Match) -> str: """ - Returns the name of this pipeline step. + User-defined callable to hide a secret match to either obfuscate + it or reversibly encrypt + """ + pass - Returns: - str: The identifier 'codegate-secrets'. + @abstractmethod + def _notify_secret(self, secret): """ - return "codegate-secrets" + Notify about a found secret + TODO: We should probably not notify about a secret value but rather + an obfuscated string. It might be nice to report the context as well + (e.g. the file or a couple of lines before and after) + """ + pass def _get_absolute_position(self, line_number: int, line_offset: int, text: str) -> int: """ @@ -78,21 +92,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int return start, end - def _redact_text( - self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext - ) -> tuple[str, int]: - """ - Find and encrypt secrets in the given text. - - Args: - text: The text to protect - secrets_manager: .. - session_id: .. - context: The pipeline context to be able to log alerts - Returns: - Tuple containing protected text with encrypted values and the count of redacted secrets - """ - # Find secrets in the text + def obfuscate(self, text: str) -> tuple[str, int]: matches = CodegateSignatures.find_in_string(text) if not matches: return text, 0 @@ -123,48 +123,116 @@ def _redact_text( # Replace each match with its encrypted value for start, end, match in absolute_matches: - # Encrypt and store the value - encrypted_value = secrets_manager.store_secret( - match.value, - match.service, - match.type, - session_id, - ) - - # Create the replacement string - replacement = f"REDACTED<${encrypted_value}>" - # Store the protected text in DB. - context.add_alert( - self.name, trigger_string=replacement, severity_category=AlertSeverity.CRITICAL - ) + hidden_secret = self._hide_secret(match) + self._notify_secret(hidden_secret) # Replace the secret in the text - protected_text[start:end] = replacement + protected_text[start:end] = hidden_secret # Store for logging found_secrets.append( { "service": match.service, "type": match.type, "original": match.value, - "encrypted": encrypted_value, + "encrypted": hidden_secret, } ) - # Convert back to string - protected_string = "".join(protected_text) - # Log the findings logger.info("\nFound secrets:") - for secret in found_secrets: logger.info(f"\nService: {secret['service']}") logger.info(f"Type: {secret['type']}") logger.info(f"Original: {secret['original']}") - logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>") + logger.info(f"Encrypted: {secret['encrypted']}") + # Convert back to string + protected_string = "".join(protected_text) print(f"\nProtected text:\n{protected_string}") return protected_string, len(found_secrets) + +class SecretsEncryptor(SecretsModifier): + def __init__( + self, + secrets_manager: SecretsManager, + context: PipelineContext, + session_id: str, + ): + self._secrets_manager = secrets_manager + self._session_id = session_id + self._context = context + self._name = "codegate-secrets" + super().__init__() + + def _hide_secret(self, match: Match) -> str: + # Encrypt and store the value + encrypted_value = self._secrets_manager.store_secret( + match.value, + match.service, + match.type, + self._session_id, + ) + return f"REDACTED<${encrypted_value}>" + + def _notify_secret(self, notify_string): + self._context.add_alert( + self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL + ) + + +class SecretsObfuscator(SecretsModifier): + def __init__( + self, + ): + super().__init__() + + def _hide_secret(self, match: Match) -> str: + """ + Obfuscate the secret value. We use a hardcoded number of asterisks + to not leak the length of the secret. + """ + return "*" * 32 + + def _notify_secret(self, secret): + pass + + +class CodegateSecrets(PipelineStep): + """Pipeline step that handles secret information requests.""" + + def __init__(self): + """Initialize the CodegateSecrets pipeline step.""" + super().__init__() + + @property + def name(self) -> str: + """ + Returns the name of this pipeline step. + + Returns: + str: The identifier 'codegate-secrets'. + """ + return "codegate-secrets" + + def _redact_text( + self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext + ) -> tuple[str, int]: + """ + Find and encrypt secrets in the given text. + + Args: + text: The text to protect + secrets_manager: .. + session_id: .. + context: The pipeline context to be able to log alerts + Returns: + Tuple containing protected text with encrypted values and the count of redacted secrets + """ + # Find secrets in the text + text_encryptor = SecretsEncryptor(secrets_manager, context, session_id) + return text_encryptor.obfuscate(text) + async def process( self, request: ChatCompletionRequest, context: PipelineContext ) -> PipelineResult: From 45159c83e64db6d6ced3ef195e011d2065d8174d Mon Sep 17 00:00:00 2001 From: Jakub Hrozek Date: Fri, 13 Dec 2024 11:38:36 +0000 Subject: [PATCH 3/3] Obfuscate secrets in code snippet before the code extraction step We use the previously added SecretsObfuscator to hide the secrets before passing them to an LLM. --- .../pipeline/extract_snippets/output.py | 7 +- src/codegate/providers/copilot/pipeline.py | 2 +- tests/pipeline/secrets/test_secrets.py | 140 +++++++++++++++++- 3 files changed, 146 insertions(+), 3 deletions(-) diff --git a/src/codegate/pipeline/extract_snippets/output.py b/src/codegate/pipeline/extract_snippets/output.py index a103e342..2923a2e4 100644 --- a/src/codegate/pipeline/extract_snippets/output.py +++ b/src/codegate/pipeline/extract_snippets/output.py @@ -8,6 +8,7 @@ from codegate.pipeline.base import CodeSnippet, PipelineContext from codegate.pipeline.extract_snippets.extract_snippets import extract_snippets from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep +from codegate.pipeline.secrets.secrets import SecretsObfuscator from codegate.storage import StorageEngine logger = structlog.get_logger("codegate") @@ -41,8 +42,12 @@ def _create_chunk(self, original_chunk: ModelResponse, content: str) -> ModelRes async def _snippet_comment(self, snippet: CodeSnippet, context: PipelineContext) -> str: """Create a comment for a snippet""" + # make sure we don't accidentally leak a secret in the output snippet + obfuscator = SecretsObfuscator() + obfuscated_code, _ = obfuscator.obfuscate(snippet.code) + snippet.libraries = await PackageExtractor.extract_packages( - content=snippet.code, + content=obfuscated_code, provider=context.sensitive.provider if context.sensitive else None, model=context.sensitive.model if context.sensitive else None, api_key=context.sensitive.api_key if context.sensitive else None, diff --git a/src/codegate/providers/copilot/pipeline.py b/src/codegate/providers/copilot/pipeline.py index 16de57f3..34b4b1da 100644 --- a/src/codegate/providers/copilot/pipeline.py +++ b/src/codegate/providers/copilot/pipeline.py @@ -93,7 +93,7 @@ async def process_body(self, headers: list[str], body: bytes) -> Tuple[bytes, Pi # in the original LLM format body = self.normalizer.denormalize(result.request) logger.debug(f"Pipeline processed request: {body}") - + return body, result.context except Exception as e: logger.error(f"Pipeline processing error: {e}") diff --git a/tests/pipeline/secrets/test_secrets.py b/tests/pipeline/secrets/test_secrets.py index be8e0963..71b43f8f 100644 --- a/tests/pipeline/secrets/test_secrets.py +++ b/tests/pipeline/secrets/test_secrets.py @@ -1,3 +1,6 @@ +import os +import tempfile + import pytest from litellm import ModelResponse from litellm.types.utils import Delta, StreamingChoices @@ -5,7 +8,142 @@ from codegate.pipeline.base import PipelineContext, PipelineSensitiveData from codegate.pipeline.output import OutputPipelineContext from codegate.pipeline.secrets.manager import SecretsManager -from codegate.pipeline.secrets.secrets import SecretUnredactionStep +from codegate.pipeline.secrets.secrets import ( + SecretsEncryptor, + SecretsObfuscator, + SecretUnredactionStep, +) +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match + + +class TestSecretsModifier: + def test_get_absolute_position(self): + modifier = SecretsObfuscator() # Using concrete implementation for testing + text = "line1\nline2\nline3" + + # Test various positions + assert modifier._get_absolute_position(1, 0, text) == 0 # Start of first line + assert modifier._get_absolute_position(2, 0, text) == 6 # Start of second line + assert modifier._get_absolute_position(1, 4, text) == 4 # Middle of first line + + def test_extend_match_boundaries(self): + modifier = SecretsObfuscator() + + # Test extension with quotes + text = 'config = "secret_value" # comment' + secret = "secret_value" + start = text.index(secret) + end = start + len(secret) + + start, end = modifier._extend_match_boundaries(text, start, end) + assert text[start:end] == secret + + # Test extension without quotes spaces + text = "config = secret_value # comment" + secret = "secret_value" + start = text.index(secret) + end = start + len(secret) + + start, end = modifier._extend_match_boundaries(text, start, end) + assert text[start:end] == secret + + +@pytest.fixture +def valid_yaml_content(): + return """ +- AWS: + - Access Key: '[A-Z0-9]{20}' +""" + + +@pytest.fixture +def temp_yaml_file(valid_yaml_content): + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write(valid_yaml_content) + yield f.name + os.unlink(f.name) + + +class TestSecretsEncryptor: + @pytest.fixture(autouse=True) + def setup(self, temp_yaml_file): + CodegateSignatures.initialize(temp_yaml_file) + self.context = PipelineContext() + self.secrets_manager = SecretsManager() + self.session_id = "test_session" + self.encryptor = SecretsEncryptor(self.secrets_manager, self.context, self.session_id) + + def test_hide_secret(self): + # Create a test match + match = Match( + service="AWS", + type="Access Key", + value="AKIAIOSFODNN7EXAMPLE", + line_number=1, + start_index=0, + end_index=9, + ) + + # Test secret hiding + hidden = self.encryptor._hide_secret(match) + assert hidden.startswith("REDACTED<$") + assert hidden.endswith(">") + + # Verify the secret was stored + encrypted_value = hidden[len("REDACTED<$") : -1] + original = self.secrets_manager.get_original_value(encrypted_value, self.session_id) + assert original == "AKIAIOSFODNN7EXAMPLE" + + def test_obfuscate(self): + # Test text with a secret + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nOther text" + protected, count = self.encryptor.obfuscate(text) + + assert count == 1 + assert "REDACTED<$" in protected + assert "AKIAIOSFODNN7EXAMPLE" not in protected + assert "Other text" in protected + + +class TestSecretsObfuscator: + @pytest.fixture(autouse=True) + def setup(self, temp_yaml_file): + CodegateSignatures.initialize(temp_yaml_file) + self.obfuscator = SecretsObfuscator() + + def test_hide_secret(self): + match = Match( + service="AWS", + type="Access Key", + value="AKIAIOSFODNN7EXAMPLE", + line_number=1, + start_index=0, + end_index=15, + ) + + hidden = self.obfuscator._hide_secret(match) + assert hidden == "*" * 32 + assert len(hidden) == 32 # Consistent length regardless of input + + def test_obfuscate(self): + # Test text with multiple secrets + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nPASSWORD=AKIAIOSFODNN7EXAMPLE" + protected, count = self.obfuscator.obfuscate(text) + + assert count == 2 + assert "AKIAIOSFODNN7EXAMPLE" not in protected + assert "*" * 32 in protected + + # Verify format + expected_pattern = f"API_KEY={'*' * 32}\nPASSWORD={'*' * 32}" + assert protected == expected_pattern + + def test_obfuscate_no_secrets(self): + text = "Regular text without secrets" + protected, count = self.obfuscator.obfuscate(text) + + assert count == 0 + assert protected == text def create_model_response(content: str) -> ModelResponse: