Skip to content

Record output from non-streaming responses in DB #1078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/codegate/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,23 @@ async def _run_output_stream_pipeline(
denormalized_stream = self._output_normalizer.denormalize_streaming(pipeline_output_stream)
return denormalized_stream

def _run_output_pipeline(
async def _run_output_pipeline(
self,
normalized_response: ModelResponse,
input_context: PipelineContext,
model_response: Any,
) -> ModelResponse:
# we don't have a pipeline for non-streamed output yet
return normalized_response
"""
Run the output pipeline for a single response.

For the moment we don't have a pipeline for non-streamed output, so we
just normalize the response and record the context. It is done here to match
the behaviour of the streaming pipeline.
"""
normalized_response = self._output_normalizer.normalize(model_response)
input_context.add_output(normalized_response)
await self._db_recorder.record_context(input_context)
output_result = self._output_normalizer.denormalize(normalized_response)
return output_result

async def _run_input_pipeline(
self,
Expand Down Expand Up @@ -263,10 +274,7 @@ async def complete(
is_fim_request=is_fim_request,
)
if not streaming:
normalized_response = self._output_normalizer.normalize(model_response)
pipeline_output = self._run_output_pipeline(normalized_response)
await self._db_recorder.record_context(input_pipeline_result.context)
return self._output_normalizer.denormalize(pipeline_output)
return await self._run_output_pipeline(input_pipeline_result.context, model_response)

pipeline_output_stream = await self._run_output_stream_pipeline(
input_pipeline_result.context, model_response, is_fim_request=is_fim_request # type: ignore
Expand Down