Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 0717a9c

Browse files
committed
Add input processing pipeline + codegate-version pipeline step
This adds a pipeline processing before the completion is ran where the request is either change or can be shortcut. This pipeline consists of steps, for now we implement a single step `CodegateVersion` that responds with the codegate version if the verbatim `codegate-version` string is found in the input. The pipeline also passes along a context, for now that is unused but I thought this would be where we store extracted code snippets etc. To avoid import loops, we also move the `BaseCompletionHandler` class to a new `completion` package. Since the shortcut replies are more or less simple strings, we add yet another package `providers/formatting` whose responsibility is to convert the string returned by the shortcut response to the format expected by the client, meaning either a reply or a stream of replies in the LLM-specific format. We use the `BaseCompletionHandler` as a way to convert to the LLM-specific format.
1 parent fb68dc1 commit 0717a9c

File tree

22 files changed

+560
-74
lines changed

22 files changed

+560
-74
lines changed

scripts/import_packages.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import json
2-
from utils.embedding_util import generate_embeddings
2+
33
import weaviate
4+
from weaviate.classes.config import DataType, Property
45
from weaviate.embedded import EmbeddedOptions
5-
from weaviate.classes.config import Property, DataType
66

7+
from utils.embedding_util import generate_embeddings
78

89
json_files = [
910
"data/archived.jsonl",

src/codegate/inference/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from .inference_engine import LlamaCppInferenceEngine
22

3-
__all__ = [LlamaCppInferenceEngine]
3+
__all__ = [LlamaCppInferenceEngine]

src/codegate/pipeline/__init__.py

Whitespace-only changes.

src/codegate/pipeline/base.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass, field
3+
from typing import Any, Dict, List, Optional
4+
5+
from litellm import ChatCompletionRequest
6+
7+
8+
@dataclass
9+
class CodeSnippet:
10+
"""
11+
Represents a code snippet with its programming language.
12+
13+
Args:
14+
language: The programming language identifier (e.g., 'python', 'javascript')
15+
code: The actual code content
16+
"""
17+
language: str
18+
code: str
19+
20+
def __post_init__(self):
21+
if not self.language or not self.language.strip():
22+
raise ValueError("Language must not be empty")
23+
if not self.code or not self.code.strip():
24+
raise ValueError("Code must not be empty")
25+
self.language = self.language.strip().lower()
26+
27+
@dataclass
28+
class PipelineContext:
29+
code_snippets: List[CodeSnippet] = field(default_factory=list)
30+
metadata: Dict[str, Any] = field(default_factory=dict)
31+
32+
def add_code_snippet(self, snippet: CodeSnippet):
33+
self.code_snippets.append(snippet)
34+
35+
def get_snippets_by_language(self, language: str) -> List[CodeSnippet]:
36+
return [s for s in self.code_snippets if s.language.lower() == language.lower()]
37+
38+
@dataclass
39+
class PipelineResponse:
40+
"""Response generated by a pipeline step"""
41+
content: str
42+
step_name: str # The name of the pipeline step that generated this response
43+
model: str # Taken from the original request's model field
44+
45+
@dataclass
46+
class PipelineResult:
47+
"""
48+
Represents the result of a pipeline operation.
49+
Either contains a modified request to continue processing,
50+
or a response to return to the client.
51+
"""
52+
request: Optional[ChatCompletionRequest] = None
53+
response: Optional[PipelineResponse] = None
54+
error_message: Optional[str] = None
55+
56+
def shortcuts_processing(self) -> bool:
57+
"""Returns True if this result should end pipeline processing"""
58+
return self.response is not None or self.error_message is not None
59+
60+
@property
61+
def success(self) -> bool:
62+
"""Returns True if the pipeline step completed without errors"""
63+
return self.error_message is None
64+
65+
66+
class PipelineStep(ABC):
67+
"""Base class for all pipeline steps in the processing chain."""
68+
69+
@property
70+
@abstractmethod
71+
def name(self) -> str:
72+
"""
73+
Returns the name of the pipeline step.
74+
75+
Returns:
76+
str: A unique identifier for this pipeline step
77+
"""
78+
pass
79+
80+
@staticmethod
81+
def get_last_user_message(
82+
request: ChatCompletionRequest,
83+
) -> Optional[tuple[str, int]]:
84+
"""
85+
Get the last user message and its index from the request.
86+
87+
Args:
88+
request (ChatCompletionRequest): The chat completion request to process
89+
90+
Returns:
91+
Optional[tuple[str, int]]: A tuple containing the message content and
92+
its index, or None if no user message is found
93+
"""
94+
if request.get("messages") is None:
95+
return None
96+
for i in reversed(range(len(request["messages"]))):
97+
if request["messages"][i]["role"] == "user":
98+
content = request["messages"][i]["content"]
99+
100+
# This is really another LiteLLM weirdness. Depending on the
101+
# provider inside the ChatCompletionRequest you might either
102+
# have a string or a list of Union, one of which is a
103+
# ChatCompletionTextObject. We'll handle this better by
104+
# either dumping litellm completely or converting to a more sane
105+
# format # in our own adapter
106+
107+
# Handle string content
108+
if isinstance(content, str):
109+
return content, i
110+
111+
# Handle iterable of ChatCompletionTextObject
112+
if isinstance(content, (list, tuple)):
113+
# Find first text content
114+
for item in content:
115+
if isinstance(item, dict) and item.get("type") == "text":
116+
return item["text"], i
117+
118+
# If no text content found, return None
119+
return None
120+
121+
return None
122+
123+
@abstractmethod
124+
async def process(
125+
self,
126+
request: ChatCompletionRequest,
127+
context: PipelineContext
128+
) -> PipelineResult:
129+
"""Process a request and return either modified request or response stream"""
130+
pass
131+
132+
133+
class PipelineProcessor:
134+
def __init__(self, pipeline_steps: List[PipelineStep]):
135+
self.pipeline_steps = pipeline_steps
136+
137+
async def process_request(
138+
self,
139+
request: ChatCompletionRequest,
140+
) -> PipelineResult:
141+
"""
142+
Process a request through all pipeline steps
143+
144+
Args:
145+
request: The chat completion request to process
146+
147+
Returns:
148+
PipelineResult containing either a modified request or response structure
149+
"""
150+
context = PipelineContext()
151+
current_request = request
152+
153+
for step in self.pipeline_steps:
154+
result = await step.process(current_request, context)
155+
if result is None:
156+
continue
157+
158+
if result.shortcuts_processing():
159+
return result
160+
161+
if result.request is not None:
162+
current_request = result.request
163+
164+
return PipelineResult(request=current_request)

src/codegate/pipeline/version/__init__.py

Whitespace-only changes.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from litellm import ChatCompletionRequest
2+
3+
from codegate import __version__
4+
from codegate.pipeline.base import (
5+
PipelineContext,
6+
PipelineResponse,
7+
PipelineResult,
8+
PipelineStep,
9+
)
10+
11+
12+
class CodegateVersion(PipelineStep):
13+
"""Pipeline step that handles version information requests."""
14+
15+
@property
16+
def name(self) -> str:
17+
"""
18+
Returns the name of this pipeline step.
19+
20+
Returns:
21+
str: The identifier 'codegate-version'
22+
"""
23+
return "codegate-version"
24+
25+
async def process(
26+
self,
27+
request: ChatCompletionRequest,
28+
context: PipelineContext
29+
) -> PipelineResult:
30+
"""
31+
Checks if the last user message contains "codegate-version" and
32+
responds with the current version.
33+
This short-circuits the pipeline if the message is found.
34+
35+
Args:
36+
request (ChatCompletionRequest): The chat completion request to process
37+
context (PipelineContext): The current pipeline context
38+
39+
Returns:
40+
PipelineResult: Contains version response if triggered, otherwise continues
41+
pipeline
42+
"""
43+
last_user_message = self.get_last_user_message(request)
44+
45+
if last_user_message is not None and "codegate-version" in last_user_message:
46+
return PipelineResult(
47+
response=PipelineResponse(
48+
step_name=self.name,
49+
content="Codegate version: {}".format(__version__),
50+
model=request["model"],
51+
),
52+
)
53+
54+
# Fall through
55+
return PipelineResult(request=request)

src/codegate/providers/anthropic/provider.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99

1010
class AnthropicProvider(BaseProvider):
11-
def __init__(self):
11+
def __init__(self, pipeline_processor=None):
1212
adapter = AnthropicAdapter()
1313
completion_handler = LiteLLmShim(adapter)
14-
super().__init__(completion_handler)
14+
super().__init__(completion_handler, pipeline_processor)
1515

1616
def _setup_routes(self):
1717
"""

src/codegate/providers/base.py

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,77 @@
11
from abc import ABC, abstractmethod
2-
from typing import Any, AsyncIterator, Callable, Dict
2+
from typing import Any, AsyncIterator, Callable, Dict, Optional, Union
33

44
from fastapi import APIRouter
5-
from fastapi.responses import StreamingResponse
5+
from litellm import ModelResponse
66

7-
StreamGenerator = Callable[[AsyncIterator[Any]], AsyncIterator[str]]
7+
from codegate.providers.completion.base import BaseCompletionHandler
8+
from codegate.providers.formatting.input_pipeline import PipelineResponseFormatter
89

10+
from ..pipeline.base import PipelineProcessor
911

10-
class BaseCompletionHandler(ABC):
11-
"""
12-
The completion handler is responsible for executing the completion request
13-
and creating the streaming response.
14-
"""
15-
16-
@abstractmethod
17-
async def complete(self, data: Dict, api_key: str) -> AsyncIterator[Any]:
18-
pass
19-
20-
@abstractmethod
21-
def create_streaming_response(
22-
self, stream: AsyncIterator[Any]
23-
) -> StreamingResponse:
24-
pass
25-
12+
StreamGenerator = Callable[[AsyncIterator[Any]], AsyncIterator[str]]
2613

2714
class BaseProvider(ABC):
2815
"""
2916
The provider class is responsible for defining the API routes and
3017
calling the completion method using the completion handler.
3118
"""
3219

33-
def __init__(self, completion_handler: BaseCompletionHandler):
20+
def __init__(
21+
self,
22+
completion_handler: BaseCompletionHandler,
23+
pipeline_processor: Optional[PipelineProcessor] = None
24+
):
3425
self.router = APIRouter()
3526
self._completion_handler = completion_handler
27+
self._pipeline_processor = pipeline_processor
28+
self._pipeline_response_formatter = \
29+
PipelineResponseFormatter(completion_handler)
3630
self._setup_routes()
3731

3832
@abstractmethod
3933
def _setup_routes(self) -> None:
4034
pass
4135

42-
async def complete(self, data: Dict, api_key: str) -> AsyncIterator[Any]:
43-
return await self._completion_handler.complete(data, api_key)
36+
async def complete(
37+
self, data: Dict, api_key: str,
38+
) -> Union[ModelResponse, AsyncIterator[ModelResponse]]:
39+
"""
40+
Main completion flow with pipeline integration
41+
42+
The flow has three main steps:
43+
- Translate the request to the OpenAI API format used internally
44+
- Process the request with the pipeline processor. This can modify the request
45+
or yield a response. The response can either be returned or streamed back to
46+
the client
47+
- Execute the completion and translate the response back to the
48+
provider-specific format
49+
"""
50+
completion_request = self._completion_handler.translate_request(data, api_key)
51+
streaming = data.get("stream", False)
52+
53+
if self._pipeline_processor is not None:
54+
result = await self._pipeline_processor.process_request(completion_request)
55+
56+
if result.error_message:
57+
raise Exception(result.error_message)
58+
59+
if result.response:
60+
return self._pipeline_response_formatter.handle_pipeline_response(
61+
result.response, streaming)
62+
63+
completion_request = result.request
64+
65+
# Execute the completion and translate the response
66+
# This gives us either a single response or a stream of responses
67+
# based on the streaming flag
68+
raw_response = await self._completion_handler.execute_completion(
69+
completion_request,
70+
stream=streaming
71+
)
72+
if not streaming:
73+
return self._completion_handler.translate_response(raw_response)
74+
return self._completion_handler.translate_streaming_response(raw_response)
4475

4576
def get_routes(self) -> APIRouter:
4677
return self.router

src/codegate/providers/completion/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)