diff --git a/src/codegate/pipeline/extract_snippets/output.py b/src/codegate/pipeline/extract_snippets/output.py index a103e342..2923a2e4 100644 --- a/src/codegate/pipeline/extract_snippets/output.py +++ b/src/codegate/pipeline/extract_snippets/output.py @@ -8,6 +8,7 @@ from codegate.pipeline.base import CodeSnippet, PipelineContext from codegate.pipeline.extract_snippets.extract_snippets import extract_snippets from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep +from codegate.pipeline.secrets.secrets import SecretsObfuscator from codegate.storage import StorageEngine logger = structlog.get_logger("codegate") @@ -41,8 +42,12 @@ def _create_chunk(self, original_chunk: ModelResponse, content: str) -> ModelRes async def _snippet_comment(self, snippet: CodeSnippet, context: PipelineContext) -> str: """Create a comment for a snippet""" + # make sure we don't accidentally leak a secret in the output snippet + obfuscator = SecretsObfuscator() + obfuscated_code, _ = obfuscator.obfuscate(snippet.code) + snippet.libraries = await PackageExtractor.extract_packages( - content=snippet.code, + content=obfuscated_code, provider=context.sensitive.provider if context.sensitive else None, model=context.sensitive.model if context.sensitive else None, api_key=context.sensitive.api_key if context.sensitive else None, diff --git a/src/codegate/pipeline/secrets/secrets.py b/src/codegate/pipeline/secrets/secrets.py index 0f3d61c2..fe601284 100644 --- a/src/codegate/pipeline/secrets/secrets.py +++ b/src/codegate/pipeline/secrets/secrets.py @@ -1,4 +1,5 @@ import re +from abc import abstractmethod from typing import Optional import structlog @@ -14,14 +15,19 @@ ) from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep from codegate.pipeline.secrets.manager import SecretsManager -from codegate.pipeline.secrets.signatures import CodegateSignatures +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match from codegate.pipeline.systemmsg import add_or_update_system_message logger = structlog.get_logger("codegate") -class CodegateSecrets(PipelineStep): - """Pipeline step that handles secret information requests.""" +class SecretsModifier: + """ + A class that helps obfuscate text by piping it through the secrets manager + that finds the secrets and then calling hide_secret to modify them. + + What modifications are done is up to the user who subclasses SecretsModifier + """ def __init__(self): """Initialize the CodegateSecrets pipeline step.""" @@ -29,15 +35,23 @@ def __init__(self): # Initialize and load signatures immediately CodegateSignatures.initialize("signatures.yaml") - @property - def name(self) -> str: + @abstractmethod + def _hide_secret(self, match: Match) -> str: """ - Returns the name of this pipeline step. + User-defined callable to hide a secret match to either obfuscate + it or reversibly encrypt + """ + pass - Returns: - str: The identifier 'codegate-secrets'. + @abstractmethod + def _notify_secret(self, secret): """ - return "codegate-secrets" + Notify about a found secret + TODO: We should probably not notify about a secret value but rather + an obfuscated string. It might be nice to report the context as well + (e.g. the file or a couple of lines before and after) + """ + pass def _get_absolute_position(self, line_number: int, line_offset: int, text: str) -> int: """ @@ -78,21 +92,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int return start, end - def _redact_text( - self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext - ) -> tuple[str, int]: - """ - Find and encrypt secrets in the given text. - - Args: - text: The text to protect - secrets_manager: .. - session_id: .. - context: The pipeline context to be able to log alerts - Returns: - Tuple containing protected text with encrypted values and the count of redacted secrets - """ - # Find secrets in the text + def obfuscate(self, text: str) -> tuple[str, int]: matches = CodegateSignatures.find_in_string(text) if not matches: return text, 0 @@ -123,48 +123,116 @@ def _redact_text( # Replace each match with its encrypted value for start, end, match in absolute_matches: - # Encrypt and store the value - encrypted_value = secrets_manager.store_secret( - match.value, - match.service, - match.type, - session_id, - ) - - # Create the replacement string - replacement = f"REDACTED<${encrypted_value}>" - # Store the protected text in DB. - context.add_alert( - self.name, trigger_string=replacement, severity_category=AlertSeverity.CRITICAL - ) + hidden_secret = self._hide_secret(match) + self._notify_secret(hidden_secret) # Replace the secret in the text - protected_text[start:end] = replacement + protected_text[start:end] = hidden_secret # Store for logging found_secrets.append( { "service": match.service, "type": match.type, "original": match.value, - "encrypted": encrypted_value, + "encrypted": hidden_secret, } ) - # Convert back to string - protected_string = "".join(protected_text) - # Log the findings logger.info("\nFound secrets:") - for secret in found_secrets: logger.info(f"\nService: {secret['service']}") logger.info(f"Type: {secret['type']}") logger.info(f"Original: {secret['original']}") - logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>") + logger.info(f"Encrypted: {secret['encrypted']}") + # Convert back to string + protected_string = "".join(protected_text) print(f"\nProtected text:\n{protected_string}") return protected_string, len(found_secrets) + +class SecretsEncryptor(SecretsModifier): + def __init__( + self, + secrets_manager: SecretsManager, + context: PipelineContext, + session_id: str, + ): + self._secrets_manager = secrets_manager + self._session_id = session_id + self._context = context + self._name = "codegate-secrets" + super().__init__() + + def _hide_secret(self, match: Match) -> str: + # Encrypt and store the value + encrypted_value = self._secrets_manager.store_secret( + match.value, + match.service, + match.type, + self._session_id, + ) + return f"REDACTED<${encrypted_value}>" + + def _notify_secret(self, notify_string): + self._context.add_alert( + self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL + ) + + +class SecretsObfuscator(SecretsModifier): + def __init__( + self, + ): + super().__init__() + + def _hide_secret(self, match: Match) -> str: + """ + Obfuscate the secret value. We use a hardcoded number of asterisks + to not leak the length of the secret. + """ + return "*" * 32 + + def _notify_secret(self, secret): + pass + + +class CodegateSecrets(PipelineStep): + """Pipeline step that handles secret information requests.""" + + def __init__(self): + """Initialize the CodegateSecrets pipeline step.""" + super().__init__() + + @property + def name(self) -> str: + """ + Returns the name of this pipeline step. + + Returns: + str: The identifier 'codegate-secrets'. + """ + return "codegate-secrets" + + def _redact_text( + self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext + ) -> tuple[str, int]: + """ + Find and encrypt secrets in the given text. + + Args: + text: The text to protect + secrets_manager: .. + session_id: .. + context: The pipeline context to be able to log alerts + Returns: + Tuple containing protected text with encrypted values and the count of redacted secrets + """ + # Find secrets in the text + text_encryptor = SecretsEncryptor(secrets_manager, context, session_id) + return text_encryptor.obfuscate(text) + async def process( self, request: ChatCompletionRequest, context: PipelineContext ) -> PipelineResult: diff --git a/src/codegate/pipeline/secrets/signatures.py b/src/codegate/pipeline/secrets/signatures.py index 95d306e1..8d2a0c3d 100644 --- a/src/codegate/pipeline/secrets/signatures.py +++ b/src/codegate/pipeline/secrets/signatures.py @@ -155,6 +155,11 @@ def _sanitize_pattern(cls, pattern: str) -> str: @classmethod def _add_signature_group(cls, name: str, patterns: Dict[str, str]) -> None: """Add a new signature group and compile its regex patterns.""" + # Check if this group already exists + if any(group.name == name for group in cls._signature_groups): + logger.debug(f"Signature group {name} already exists, skipping") + return + signature_group = SignatureGroup(name, patterns) for pattern_name, pattern in patterns.items(): diff --git a/src/codegate/providers/copilot/pipeline.py b/src/codegate/providers/copilot/pipeline.py index 16de57f3..34b4b1da 100644 --- a/src/codegate/providers/copilot/pipeline.py +++ b/src/codegate/providers/copilot/pipeline.py @@ -93,7 +93,7 @@ async def process_body(self, headers: list[str], body: bytes) -> Tuple[bytes, Pi # in the original LLM format body = self.normalizer.denormalize(result.request) logger.debug(f"Pipeline processed request: {body}") - + return body, result.context except Exception as e: logger.error(f"Pipeline processing error: {e}") diff --git a/tests/pipeline/secrets/test_secrets.py b/tests/pipeline/secrets/test_secrets.py index be8e0963..71b43f8f 100644 --- a/tests/pipeline/secrets/test_secrets.py +++ b/tests/pipeline/secrets/test_secrets.py @@ -1,3 +1,6 @@ +import os +import tempfile + import pytest from litellm import ModelResponse from litellm.types.utils import Delta, StreamingChoices @@ -5,7 +8,142 @@ from codegate.pipeline.base import PipelineContext, PipelineSensitiveData from codegate.pipeline.output import OutputPipelineContext from codegate.pipeline.secrets.manager import SecretsManager -from codegate.pipeline.secrets.secrets import SecretUnredactionStep +from codegate.pipeline.secrets.secrets import ( + SecretsEncryptor, + SecretsObfuscator, + SecretUnredactionStep, +) +from codegate.pipeline.secrets.signatures import CodegateSignatures, Match + + +class TestSecretsModifier: + def test_get_absolute_position(self): + modifier = SecretsObfuscator() # Using concrete implementation for testing + text = "line1\nline2\nline3" + + # Test various positions + assert modifier._get_absolute_position(1, 0, text) == 0 # Start of first line + assert modifier._get_absolute_position(2, 0, text) == 6 # Start of second line + assert modifier._get_absolute_position(1, 4, text) == 4 # Middle of first line + + def test_extend_match_boundaries(self): + modifier = SecretsObfuscator() + + # Test extension with quotes + text = 'config = "secret_value" # comment' + secret = "secret_value" + start = text.index(secret) + end = start + len(secret) + + start, end = modifier._extend_match_boundaries(text, start, end) + assert text[start:end] == secret + + # Test extension without quotes spaces + text = "config = secret_value # comment" + secret = "secret_value" + start = text.index(secret) + end = start + len(secret) + + start, end = modifier._extend_match_boundaries(text, start, end) + assert text[start:end] == secret + + +@pytest.fixture +def valid_yaml_content(): + return """ +- AWS: + - Access Key: '[A-Z0-9]{20}' +""" + + +@pytest.fixture +def temp_yaml_file(valid_yaml_content): + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write(valid_yaml_content) + yield f.name + os.unlink(f.name) + + +class TestSecretsEncryptor: + @pytest.fixture(autouse=True) + def setup(self, temp_yaml_file): + CodegateSignatures.initialize(temp_yaml_file) + self.context = PipelineContext() + self.secrets_manager = SecretsManager() + self.session_id = "test_session" + self.encryptor = SecretsEncryptor(self.secrets_manager, self.context, self.session_id) + + def test_hide_secret(self): + # Create a test match + match = Match( + service="AWS", + type="Access Key", + value="AKIAIOSFODNN7EXAMPLE", + line_number=1, + start_index=0, + end_index=9, + ) + + # Test secret hiding + hidden = self.encryptor._hide_secret(match) + assert hidden.startswith("REDACTED<$") + assert hidden.endswith(">") + + # Verify the secret was stored + encrypted_value = hidden[len("REDACTED<$") : -1] + original = self.secrets_manager.get_original_value(encrypted_value, self.session_id) + assert original == "AKIAIOSFODNN7EXAMPLE" + + def test_obfuscate(self): + # Test text with a secret + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nOther text" + protected, count = self.encryptor.obfuscate(text) + + assert count == 1 + assert "REDACTED<$" in protected + assert "AKIAIOSFODNN7EXAMPLE" not in protected + assert "Other text" in protected + + +class TestSecretsObfuscator: + @pytest.fixture(autouse=True) + def setup(self, temp_yaml_file): + CodegateSignatures.initialize(temp_yaml_file) + self.obfuscator = SecretsObfuscator() + + def test_hide_secret(self): + match = Match( + service="AWS", + type="Access Key", + value="AKIAIOSFODNN7EXAMPLE", + line_number=1, + start_index=0, + end_index=15, + ) + + hidden = self.obfuscator._hide_secret(match) + assert hidden == "*" * 32 + assert len(hidden) == 32 # Consistent length regardless of input + + def test_obfuscate(self): + # Test text with multiple secrets + text = "API_KEY=AKIAIOSFODNN7EXAMPLE\nPASSWORD=AKIAIOSFODNN7EXAMPLE" + protected, count = self.obfuscator.obfuscate(text) + + assert count == 2 + assert "AKIAIOSFODNN7EXAMPLE" not in protected + assert "*" * 32 in protected + + # Verify format + expected_pattern = f"API_KEY={'*' * 32}\nPASSWORD={'*' * 32}" + assert protected == expected_pattern + + def test_obfuscate_no_secrets(self): + text = "Regular text without secrets" + protected, count = self.obfuscator.obfuscate(text) + + assert count == 0 + assert protected == text def create_model_response(content: str) -> ModelResponse: diff --git a/tests/pipeline/secrets/test_signatures.py b/tests/pipeline/secrets/test_signatures.py index 42f6470f..11a6c5c8 100644 --- a/tests/pipeline/secrets/test_signatures.py +++ b/tests/pipeline/secrets/test_signatures.py @@ -167,3 +167,26 @@ def test_duplicate_patterns(): assert match.value == "AKIAIOSFODNN7EXAMPLE" finally: os.unlink(f.name) + + +def test_no_duplicate_signature_groups(temp_yaml_file): + """Test that signature groups aren't added multiple times""" + CodegateSignatures.initialize(temp_yaml_file) + + # First load + CodegateSignatures._signatures_loaded = False + CodegateSignatures._load_signatures() + initial_group_count = len(CodegateSignatures._signature_groups) + initial_regex_count = len(CodegateSignatures._compiled_regexes) + + # Second load + CodegateSignatures._signatures_loaded = False + CodegateSignatures._load_signatures() + + # Verify counts haven't changed + assert len(CodegateSignatures._signature_groups) == initial_group_count + assert len(CodegateSignatures._compiled_regexes) == initial_regex_count + + # Verify GitHub group appears only once + github_groups = [g for g in CodegateSignatures._signature_groups if g.name == "GitHub"] + assert len(github_groups) == 1