Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion vllm/benchmarks/lib/endpoint_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""The request function for API endpoints."""

import codecs
import io
import json
import os
Expand All @@ -25,11 +26,12 @@ class StreamedResponseHandler:

def __init__(self):
self.buffer = ""
self._decoder = codecs.getincrementaldecoder("utf-8")()

def add_chunk(self, chunk_bytes: bytes) -> list[str]:
"""Add a chunk of bytes to the buffer and return any complete
messages."""
chunk_str = chunk_bytes.decode("utf-8")
chunk_str = self._decoder.decode(chunk_bytes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The IncrementalDecoder can buffer incomplete byte sequences. If the stream ends with such an incomplete sequence, it will remain in the decoder's buffer and will be lost because there is no final call to flush the decoder. This can lead to data loss.

To fix this, you should add a mechanism to finalize the decoding process after the last chunk has been processed. This typically involves calling self._decoder.decode(b'', final=True) to flush any buffered data.

This would likely require adding a new method to StreamedResponseHandler, for example finalize(), and calling it from the request handling functions (e.g., async_request_openai_completions) after the streaming loop is complete.

Example of a finalize method:

def finalize(self) -> list[str]:
    """Flushes the decoder and processes any remaining buffered data."""
    final_chunk_str = self._decoder.decode(b'', final=True)
    if not final_chunk_str:
        return []
    
    self.buffer += final_chunk_str
    # It's best to refactor the message processing logic from add_chunk
    # into a private helper method to be reused here.
    messages = self._process_buffer()
    
    if self.buffer:
        # Handle or log any remaining incomplete message in the buffer
        # after final processing.
        pass

    return messages

The call sites in async_request_openai_completions, async_request_openai_chat_completions, and async_request_openai_audio would need to be updated to call this finalize method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @he-yufeng this looks like a valid point. Should we flush the incremental decoder?

self.buffer += chunk_str

messages = []
Expand Down
Loading