Skip to content

Commit 644b5c6

Browse files
authored
Revert wrong handling of stream (#1354)
#1300 (comment) Signed-off-by: Spycsh <[email protected]>
1 parent 42f323f commit 644b5c6

File tree

2 files changed

+31
-34
lines changed

2 files changed

+31
-34
lines changed

comps/cores/mega/orchestrator.py

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict:
207207
all_outputs.update(result_dict[prev_node])
208208
return all_outputs
209209

210-
async def wrap_iterable(self, aiterable, is_first=True):
210+
def wrap_iterable(self, iterable, is_first=True):
211211

212212
with tracer.start_as_current_span("llm_generate_stream") if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
213213
while True:
@@ -217,10 +217,10 @@ async def wrap_iterable(self, aiterable, is_first=True):
217217
else contextlib.nullcontext()
218218
): # else tracer.start_as_current_span(f"llm_generate_stream_next_token")
219219
try:
220-
token = await anext(aiterable)
220+
token = next(iterable)
221221
yield token
222222
is_first = False
223-
except StopAsyncIteration:
223+
except StopIteration:
224224
# Exiting the iterable loop cleanly
225225
break
226226
except Exception as e:
@@ -259,51 +259,49 @@ async def execute(
259259
if ENABLE_OPEA_TELEMETRY
260260
else contextlib.nullcontext()
261261
):
262-
async with aiohttp.ClientSession() as session:
263-
response = await session.post(
264-
url=endpoint,
265-
data=json.dumps(inputs),
266-
headers={"Content-type": "application/json"},
267-
proxy=None,
268-
timeout=aiohttp.ClientTimeout(total=1000),
269-
)
262+
response = requests.post(
263+
url=endpoint,
264+
data=json.dumps(inputs),
265+
headers={"Content-type": "application/json"},
266+
proxies={"http": None},
267+
stream=True,
268+
timeout=1000,
269+
)
270270
downstream = runtime_graph.downstream(cur_node)
271271
if downstream:
272272
assert len(downstream) == 1, "Not supported multiple stream downstreams yet!"
273273
cur_node = downstream[0]
274274
hitted_ends = [".", "?", "!", "。", ",", "!"]
275275
downstream_endpoint = self.services[downstream[0]].endpoint_path
276276

277-
async def generate():
277+
def generate():
278278
token_start = req_start
279279
if response:
280280
# response.elapsed = time until first headers received
281281
buffered_chunk_str = ""
282282
is_first = True
283-
async for chunk in self.wrap_iterable(response.content.iter_chunked(None)):
283+
for chunk in self.wrap_iterable(response.iter_content(chunk_size=None)):
284284
if chunk:
285285
if downstream:
286286
chunk = chunk.decode("utf-8")
287287
buffered_chunk_str += self.extract_chunk_str(chunk)
288288
is_last = chunk.endswith("[DONE]\n\n")
289289
if (buffered_chunk_str and buffered_chunk_str[-1] in hitted_ends) or is_last:
290-
async with aiohttp.ClientSession() as downstream_session:
291-
res = await downstream_session.post(
292-
url=downstream_endpoint,
293-
data=json.dumps({"text": buffered_chunk_str}),
294-
proxy=None,
295-
)
296-
res_json = await res.json()
297-
if "text" in res_json:
298-
res_txt = res_json["text"]
299-
else:
300-
raise Exception("Other response types not supported yet!")
301-
buffered_chunk_str = "" # clear
302-
async for item in self.token_generator(
303-
res_txt, token_start, is_first=is_first, is_last=is_last
304-
):
305-
yield item
306-
token_start = time.time()
290+
res = requests.post(
291+
url=downstream_endpoint,
292+
data=json.dumps({"text": buffered_chunk_str}),
293+
proxies={"http": None},
294+
)
295+
res_json = res.json()
296+
if "text" in res_json:
297+
res_txt = res_json["text"]
298+
else:
299+
raise Exception("Other response types not supported yet!")
300+
buffered_chunk_str = "" # clear
301+
yield from self.token_generator(
302+
res_txt, token_start, is_first=is_first, is_last=is_last
303+
)
304+
token_start = time.time()
307305
else:
308306
token_start = self.metrics.token_update(token_start, is_first)
309307
yield chunk

comps/cores/mega/orchestrator_with_yaml.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Copyright (C) 2024 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

4-
import asyncio
54
import json
65
import re
76
from collections import OrderedDict
@@ -24,10 +23,10 @@ def __init__(self, yaml_file_path: str):
2423
if not is_valid:
2524
raise Exception("Invalid mega graph!")
2625

27-
async def execute(self, cur_node: str, inputs: Dict):
26+
def execute(self, cur_node: str, inputs: Dict):
2827
# send the cur_node request/reply
2928
endpoint = self.docs["opea_micro_services"][cur_node]["endpoint"]
30-
response = await asyncio.to_thread(requests.post, url=endpoint, data=json.dumps(inputs), proxies={"http": None})
29+
response = requests.post(url=endpoint, data=json.dumps(inputs), proxies={"http": None})
3130
print(response)
3231
return response.json()
3332

@@ -49,7 +48,7 @@ async def schedule(self, initial_inputs: Dict):
4948
inputs = initial_inputs
5049
else:
5150
inputs = self.process_outputs(self.predecessors(node))
52-
response = await self.execute(node, inputs)
51+
response = self.execute(node, inputs)
5352
self.result_dict[node] = response
5453

5554
def _load_from_yaml(self):

0 commit comments

Comments
 (0)