Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Obfuscate secrets before sending a snippet out for analysis #332

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/codegate/pipeline/extract_snippets/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from codegate.pipeline.base import CodeSnippet, PipelineContext
from codegate.pipeline.extract_snippets.extract_snippets import extract_snippets
from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep
from codegate.pipeline.secrets.secrets import SecretsObfuscator
from codegate.storage import StorageEngine

logger = structlog.get_logger("codegate")
Expand Down Expand Up @@ -41,8 +42,12 @@ def _create_chunk(self, original_chunk: ModelResponse, content: str) -> ModelRes

async def _snippet_comment(self, snippet: CodeSnippet, context: PipelineContext) -> str:
"""Create a comment for a snippet"""
# make sure we don't accidentally leak a secret in the output snippet
obfuscator = SecretsObfuscator()
obfuscated_code, _ = obfuscator.obfuscate(snippet.code)

snippet.libraries = await PackageExtractor.extract_packages(
content=snippet.code,
content=obfuscated_code,
provider=context.sensitive.provider if context.sensitive else None,
model=context.sensitive.model if context.sensitive else None,
api_key=context.sensitive.api_key if context.sensitive else None,
Expand Down
158 changes: 113 additions & 45 deletions src/codegate/pipeline/secrets/secrets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from abc import abstractmethod
from typing import Optional

import structlog
Expand All @@ -14,30 +15,43 @@
)
from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.secrets.signatures import CodegateSignatures
from codegate.pipeline.secrets.signatures import CodegateSignatures, Match
from codegate.pipeline.systemmsg import add_or_update_system_message

logger = structlog.get_logger("codegate")


class CodegateSecrets(PipelineStep):
"""Pipeline step that handles secret information requests."""
class SecretsModifier:
"""
A class that helps obfuscate text by piping it through the secrets manager
that finds the secrets and then calling hide_secret to modify them.

What modifications are done is up to the user who subclasses SecretsModifier
"""

def __init__(self):
"""Initialize the CodegateSecrets pipeline step."""
super().__init__()
# Initialize and load signatures immediately
CodegateSignatures.initialize("signatures.yaml")

@property
def name(self) -> str:
@abstractmethod
def _hide_secret(self, match: Match) -> str:
"""
Returns the name of this pipeline step.
User-defined callable to hide a secret match to either obfuscate
it or reversibly encrypt
"""
pass

Returns:
str: The identifier 'codegate-secrets'.
@abstractmethod
def _notify_secret(self, secret):
"""
return "codegate-secrets"
Notify about a found secret
TODO: We should probably not notify about a secret value but rather
an obfuscated string. It might be nice to report the context as well
(e.g. the file or a couple of lines before and after)
"""
pass

def _get_absolute_position(self, line_number: int, line_offset: int, text: str) -> int:
"""
Expand Down Expand Up @@ -78,21 +92,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int

return start, end

def _redact_text(
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
) -> tuple[str, int]:
"""
Find and encrypt secrets in the given text.

Args:
text: The text to protect
secrets_manager: ..
session_id: ..
context: The pipeline context to be able to log alerts
Returns:
Tuple containing protected text with encrypted values and the count of redacted secrets
"""
# Find secrets in the text
def obfuscate(self, text: str) -> tuple[str, int]:
matches = CodegateSignatures.find_in_string(text)
if not matches:
return text, 0
Expand Down Expand Up @@ -123,48 +123,116 @@ def _redact_text(

# Replace each match with its encrypted value
for start, end, match in absolute_matches:
# Encrypt and store the value
encrypted_value = secrets_manager.store_secret(
match.value,
match.service,
match.type,
session_id,
)

# Create the replacement string
replacement = f"REDACTED<${encrypted_value}>"
# Store the protected text in DB.
context.add_alert(
self.name, trigger_string=replacement, severity_category=AlertSeverity.CRITICAL
)
hidden_secret = self._hide_secret(match)
self._notify_secret(hidden_secret)

# Replace the secret in the text
protected_text[start:end] = replacement
protected_text[start:end] = hidden_secret
# Store for logging
found_secrets.append(
{
"service": match.service,
"type": match.type,
"original": match.value,
"encrypted": encrypted_value,
"encrypted": hidden_secret,
}
)

# Convert back to string
protected_string = "".join(protected_text)

# Log the findings
logger.info("\nFound secrets:")

for secret in found_secrets:
logger.info(f"\nService: {secret['service']}")
logger.info(f"Type: {secret['type']}")
logger.info(f"Original: {secret['original']}")
logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>")
logger.info(f"Encrypted: {secret['encrypted']}")

# Convert back to string
protected_string = "".join(protected_text)
print(f"\nProtected text:\n{protected_string}")
return protected_string, len(found_secrets)


class SecretsEncryptor(SecretsModifier):
def __init__(
self,
secrets_manager: SecretsManager,
context: PipelineContext,
session_id: str,
):
self._secrets_manager = secrets_manager
self._session_id = session_id
self._context = context
self._name = "codegate-secrets"
super().__init__()

def _hide_secret(self, match: Match) -> str:
# Encrypt and store the value
encrypted_value = self._secrets_manager.store_secret(
match.value,
match.service,
match.type,
self._session_id,
)
return f"REDACTED<${encrypted_value}>"

def _notify_secret(self, notify_string):
self._context.add_alert(
self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL
)


class SecretsObfuscator(SecretsModifier):
def __init__(
self,
):
super().__init__()

def _hide_secret(self, match: Match) -> str:
"""
Obfuscate the secret value. We use a hardcoded number of asterisks
to not leak the length of the secret.
"""
return "*" * 32

def _notify_secret(self, secret):
pass


class CodegateSecrets(PipelineStep):
"""Pipeline step that handles secret information requests."""

def __init__(self):
"""Initialize the CodegateSecrets pipeline step."""
super().__init__()

@property
def name(self) -> str:
"""
Returns the name of this pipeline step.

Returns:
str: The identifier 'codegate-secrets'.
"""
return "codegate-secrets"

def _redact_text(
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
) -> tuple[str, int]:
"""
Find and encrypt secrets in the given text.

Args:
text: The text to protect
secrets_manager: ..
session_id: ..
context: The pipeline context to be able to log alerts
Returns:
Tuple containing protected text with encrypted values and the count of redacted secrets
"""
# Find secrets in the text
text_encryptor = SecretsEncryptor(secrets_manager, context, session_id)
return text_encryptor.obfuscate(text)

async def process(
self, request: ChatCompletionRequest, context: PipelineContext
) -> PipelineResult:
Expand Down
5 changes: 5 additions & 0 deletions src/codegate/pipeline/secrets/signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ def _sanitize_pattern(cls, pattern: str) -> str:
@classmethod
def _add_signature_group(cls, name: str, patterns: Dict[str, str]) -> None:
"""Add a new signature group and compile its regex patterns."""
# Check if this group already exists
if any(group.name == name for group in cls._signature_groups):
logger.debug(f"Signature group {name} already exists, skipping")
return

signature_group = SignatureGroup(name, patterns)

for pattern_name, pattern in patterns.items():
Expand Down
2 changes: 1 addition & 1 deletion src/codegate/providers/copilot/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async def process_body(self, headers: list[str], body: bytes) -> Tuple[bytes, Pi
# in the original LLM format
body = self.normalizer.denormalize(result.request)
logger.debug(f"Pipeline processed request: {body}")

return body, result.context
except Exception as e:
logger.error(f"Pipeline processing error: {e}")
Expand Down
Loading