11
11
from codegate .config import Config
12
12
from codegate .inference .inference_engine import LlamaCppInferenceEngine
13
13
from codegate .providers .base import BaseCompletionHandler
14
+ from codegate .types .openai import (
15
+ stream_generator ,
16
+ LegacyCompletion ,
17
+ StreamingChatCompletion ,
18
+ )
19
+
20
+
21
+ # async def llamacpp_stream_generator(
22
+ # stream: AsyncIterator[CreateChatCompletionStreamResponse],
23
+ # ) -> AsyncIterator[str]:
24
+ # """OpenAI-style SSE format"""
25
+ # try:
26
+ # async for chunk in stream:
27
+ # chunk = json.dumps(chunk)
28
+ # try:
29
+ # yield f"data:{chunk}\n\n"
30
+ # except Exception as e:
31
+ # yield f"data:{str(e)}\n\n"
32
+ # except Exception as e:
33
+ # yield f"data: {str(e)}\n\n"
34
+ # finally:
35
+ # yield "data: [DONE]\n\n"
14
36
15
37
16
- async def llamacpp_stream_generator (
17
- stream : AsyncIterator [CreateChatCompletionStreamResponse ],
18
- ) -> AsyncIterator [str ]:
19
- """OpenAI-style SSE format"""
20
- try :
21
- async for chunk in stream :
22
- chunk = json .dumps (chunk )
23
- try :
24
- yield f"data:{ chunk } \n \n "
25
- except Exception as e :
26
- yield f"data:{ str (e )} \n \n "
27
- except Exception as e :
28
- yield f"data: { str (e )} \n \n "
29
- finally :
30
- yield "data: [DONE]\n \n "
31
-
32
-
33
- async def convert_to_async_iterator (
34
- sync_iterator : Iterator [CreateChatCompletionStreamResponse ],
35
- ) -> AsyncIterator [CreateChatCompletionStreamResponse ]:
38
+ async def completion_to_async_iterator (
39
+ sync_iterator : Iterator [dict ],
40
+ ) -> AsyncIterator [LegacyCompletion ]:
36
41
"""
37
42
Convert a synchronous iterator to an asynchronous iterator. This makes the logic easier
38
43
because both the pipeline and the completion handler can use async iterators.
39
44
"""
40
45
for item in sync_iterator :
41
- yield item
42
- await asyncio .sleep (0 )
46
+ yield LegacyCompletion (** item )
47
+
48
+
49
+ async def chat_to_async_iterator (
50
+ sync_iterator : Iterator [dict ],
51
+ ) -> AsyncIterator [StreamingChatCompletion ]:
52
+ for item in sync_iterator :
53
+ yield StreamingChatCompletion (** item )
43
54
44
55
45
56
class LlamaCppCompletionHandler (BaseCompletionHandler ):
@@ -57,33 +68,60 @@ async def execute_completion(
57
68
"""
58
69
Execute the completion request with inference engine API
59
70
"""
60
- model_path = f"{ request [ ' base_url' ] } /{ request [ 'model' ] } .gguf"
71
+ model_path = f"{ base_url } /{ request . get_model () } .gguf"
61
72
62
73
# Create a copy of the request dict and remove stream_options
63
74
# Reason - Request error as JSON:
64
75
# {'error': "Llama.create_completion() got an unexpected keyword argument 'stream_options'"}
65
- request_dict = dict (request )
66
- request_dict .pop ("stream_options" , None )
67
- # Remove base_url from the request dict. We use this field as a standard across
68
- # all providers to specify the base URL of the model.
69
- request_dict .pop ("base_url" , None )
70
-
71
76
if is_fim_request :
77
+ request_dict = request .dict (exclude = {
78
+ "best_of" ,
79
+ "frequency_pentalty" ,
80
+ "n" ,
81
+ "stream_options" ,
82
+ "user" ,
83
+ })
84
+
72
85
response = await self .inference_engine .complete (
73
86
model_path ,
74
87
Config .get_config ().chat_model_n_ctx ,
75
88
Config .get_config ().chat_model_n_gpu_layers ,
76
89
** request_dict ,
77
90
)
91
+
92
+ if stream :
93
+ return completion_to_async_iterator (response )
94
+ return LegacyCompletion (** response )
78
95
else :
96
+ request_dict = request .dict (exclude = {
97
+ "audio" ,
98
+ "frequency_pentalty" ,
99
+ "include_reasoning" ,
100
+ "metadata" ,
101
+ "max_completion_tokens" ,
102
+ "modalities" ,
103
+ "n" ,
104
+ "parallel_tool_calls" ,
105
+ "prediction" ,
106
+ "prompt" ,
107
+ "reasoning_effort" ,
108
+ "service_tier" ,
109
+ "store" ,
110
+ "stream_options" ,
111
+ "user" ,
112
+ })
113
+
79
114
response = await self .inference_engine .chat (
80
115
model_path ,
81
116
Config .get_config ().chat_model_n_ctx ,
82
117
Config .get_config ().chat_model_n_gpu_layers ,
83
118
** request_dict ,
84
119
)
85
120
86
- return convert_to_async_iterator (response ) if stream else response
121
+ if stream :
122
+ return chat_to_async_iterator (response )
123
+ else :
124
+ return StreamingChatCompletion (** response )
87
125
88
126
def _create_streaming_response (
89
127
self ,
@@ -95,7 +133,7 @@ def _create_streaming_response(
95
133
is the format that FastAPI expects for streaming responses.
96
134
"""
97
135
return StreamingResponse (
98
- llamacpp_stream_generator (stream ),
136
+ stream_generator (stream ),
99
137
headers = {
100
138
"Cache-Control" : "no-cache" ,
101
139
"Connection" : "keep-alive" ,
0 commit comments