diff --git a/src/codegate/api/v1_processing.py b/src/codegate/api/v1_processing.py index 6606a882..cb2c0c9b 100644 --- a/src/codegate/api/v1_processing.py +++ b/src/codegate/api/v1_processing.py @@ -16,6 +16,7 @@ PartialQuestionAnswer, PartialQuestions, QuestionAnswer, + QuestionType, TokenUsageAggregate, TokenUsageByModel, ) @@ -384,8 +385,13 @@ async def match_conversations( selected_partial_qa = partial_qa break - # check if we have a question and answer, otherwise do not add it - if selected_partial_qa and selected_partial_qa.answer is not None: + # check if we have a question and answer, otherwise do not add it + # if the question is a FIM question, we should add it even if there is no answer + # not add Chat questions without answers + if selected_partial_qa and ( + selected_partial_qa.answer is not None + or selected_partial_qa.partial_questions.type == QuestionType.fim + ): # if we don't have a first question, set it. We will use it # to set the conversation timestamp and provider first_partial_qa = first_partial_qa or selected_partial_qa @@ -396,7 +402,7 @@ async def match_conversations( alerts.extend(deduped_alerts) token_usage_agg.add_model_token_usage(selected_partial_qa.model_token_usage) - # only add conversation if we have some answers + # if we have a conversation with at least one question and answer if len(questions_answers) > 0 and first_partial_qa is not None: if token_usage_agg.token_usage.input_tokens == 0: token_usage_agg = None @@ -435,7 +441,6 @@ async def parse_messages_in_conversations( Get all the messages from the database and return them as a list of conversations. """ partial_question_answers = await _process_prompt_output_to_partial_qa(prompts_outputs) - conversations, map_q_id_to_conversation = await match_conversations(partial_question_answers) return conversations, map_q_id_to_conversation @@ -510,9 +515,6 @@ async def remove_duplicate_alerts(alerts: List[v1_models.Alert]) -> List[v1_mode for alert in sorted( alerts, key=lambda x: x.timestamp, reverse=True ): # Sort alerts by timestamp descending - if alert.trigger_type != "codegate-secrets": - unique_alerts.append(alert) - continue # Extract trigger string content until "Context" trigger_string_content = alert.trigger_string.split("Context")[0] diff --git a/src/codegate/db/connection.py b/src/codegate/db/connection.py index 78a2d607..2d56fccd 100644 --- a/src/codegate/db/connection.py +++ b/src/codegate/db/connection.py @@ -610,7 +610,7 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id( LEFT JOIN outputs o ON p.id = o.prompt_id LEFT JOIN alerts a ON p.id = a.prompt_id WHERE p.workspace_id = :workspace_id - AND a.trigger_category LIKE :trigger_category + AND (a.trigger_category = :trigger_category OR a.trigger_category is NULL) ORDER BY o.timestamp DESC, a.timestamp DESC """ # noqa: E501 ) @@ -622,7 +622,6 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id( IntermediatePromptWithOutputUsageAlerts, sql, conditions, should_raise=True ) ) - prompts_dict: Dict[str, GetPromptWithOutputsRow] = {} for row in rows: prompt_id = row.prompt_id diff --git a/src/codegate/providers/base.py b/src/codegate/providers/base.py index d22afcc0..452fe08b 100644 --- a/src/codegate/providers/base.py +++ b/src/codegate/providers/base.py @@ -96,6 +96,24 @@ def _get_base_url(self) -> str: config = Config.get_config() return config.provider_urls.get(self.provider_route_name) if config else "" + async def process_stream_no_pipeline( + self, stream: AsyncIterator[ModelResponse], context: PipelineContext + ) -> AsyncIterator[ModelResponse]: + """ + Process a stream when there is no pipeline. + This is needed to record the output stream chunks for FIM. + """ + try: + async for chunk in stream: + context.add_output(chunk) + yield chunk + except Exception as e: + # Log exception and stop processing + logger.error(f"Error processing stream: {e}") + raise e + finally: + await self._db_recorder.record_context(context) + async def _run_output_stream_pipeline( self, input_context: PipelineContext, @@ -121,7 +139,7 @@ async def _run_output_stream_pipeline( and self.provider_route_name != "anthropic" ): logger.info("No output pipeline steps configured, passing through") - return model_stream + return self.process_stream_no_pipeline(model_stream, input_context) normalized_stream = self._output_normalizer.normalize_streaming(model_stream)