@@ -575,12 +575,13 @@ def _ensure_output_processor(self) -> None:
575
575
# Already initialized, no need to reinitialize
576
576
return
577
577
578
- # this is a hotfix - we shortcut before selecting the output pipeline for FIM
579
- # because our FIM output pipeline is actually empty as of now. We should fix this
580
- # but don't have any immediate need.
581
- is_fim = self .proxy .context_tracking .metadata .get ("is_fim" , False )
582
- if is_fim :
583
- return
578
+ # # this is a hotfix - we shortcut before selecting the output pipeline for FIM
579
+ # # because our FIM output pipeline is actually empty as of now. We should fix this
580
+ # # but don't have any immediate need.
581
+ # is_fim = self.proxy.context_tracking.metadata.get("is_fim", False)
582
+ # if is_fim:
583
+ # return
584
+ #
584
585
585
586
logger .debug ("Tracking context for pipeline processing" )
586
587
self .sse_processor = SSEProcessor ()
@@ -601,16 +602,25 @@ async def _process_stream(self):
601
602
async def stream_iterator ():
602
603
while True :
603
604
incoming_record = await self .stream_queue .get ()
605
+ if incoming_record .get ("type" ) == "done" :
606
+ break
607
+
604
608
record_content = incoming_record .get ("content" , {})
605
609
606
610
streaming_choices = []
607
611
for choice in record_content .get ("choices" , []):
612
+ is_fim = self .proxy .context_tracking .metadata .get ("is_fim" , False )
613
+ if is_fim :
614
+ content = choice .get ("text" , "" )
615
+ else :
616
+ content = choice .get ("delta" , {}).get ("content" )
617
+
608
618
streaming_choices .append (
609
619
StreamingChoices (
610
620
finish_reason = choice .get ("finish_reason" , None ),
611
621
index = 0 ,
612
622
delta = Delta (
613
- content = choice . get ( "delta" , {}). get ( " content" ) , role = "assistant"
623
+ content = content , role = "assistant"
614
624
),
615
625
logprobs = None ,
616
626
)
@@ -624,12 +634,16 @@ async def stream_iterator():
624
634
model = record_content .get ("model" , "" ),
625
635
object = "chat.completion.chunk" ,
626
636
)
637
+ print ("---> YIELDING" , mr )
627
638
yield mr
628
639
629
640
async for record in self .output_pipeline_instance .process_stream (stream_iterator ()):
641
+ print ("----> RECEIVED RECORD" , record )
630
642
chunk = record .model_dump_json (exclude_none = True , exclude_unset = True )
643
+ # if fim, then put the content into text
631
644
sse_data = f"data:{ chunk } \n \n " .encode ("utf-8" )
632
645
chunk_size = hex (len (sse_data ))[2 :] + "\r \n "
646
+ print ("WRITING CHUNK: " , chunk )
633
647
self ._proxy_transport_write (chunk_size .encode ())
634
648
self ._proxy_transport_write (sse_data )
635
649
self ._proxy_transport_write (b"\r \n " )
@@ -648,6 +662,7 @@ async def stream_iterator():
648
662
649
663
def _process_chunk (self , chunk : bytes ):
650
664
records = self .sse_processor .process_chunk (chunk )
665
+ print ("RECEIVED RECORDS" , records )
651
666
652
667
for record in records :
653
668
if self .stream_queue is None :
@@ -658,6 +673,9 @@ def _process_chunk(self, chunk: bytes):
658
673
self .stream_queue .put_nowait (record )
659
674
660
675
def _proxy_transport_write (self , data : bytes ):
676
+ if not self .proxy .transport or self .proxy .transport .is_closing ():
677
+ print ("TRIED TO WRITE TO A CLOSED TRANSPORT" )
678
+ return
661
679
self .proxy .transport .write (data )
662
680
663
681
def data_received (self , data : bytes ) -> None :
@@ -682,6 +700,7 @@ def data_received(self, data: bytes) -> None:
682
700
683
701
data = data [header_end + 4 :]
684
702
703
+ print ("PROCESSING CHUNK: " , data )
685
704
self ._process_chunk (data )
686
705
687
706
def connection_lost (self , exc : Optional [Exception ]) -> None :
0 commit comments