Skip to content

Commit 616b955

Browse files
authored
Merge pull request #137 from salute-developers/feature/DPNLPF_2142_stream_responses
DPNLPF-2142: stream responses
2 parents a5a2ac2 + a1be701 commit 616b955

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+817
-541
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@ htmlcov
3737
tests/TestPass.csv
3838
tests/parsed_log.csv
3939
_trial_marker
40+
/test.py

core/basic_models/actions/basic_actions.py

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# coding: utf-8
2-
import asyncio
32
import random
4-
from typing import Union, Dict, List, Any, Optional
3+
from typing import Union, Dict, List, Any, Optional, AsyncGenerator
54

65
import core.logging.logger_constants as log_const
76
from core.basic_models.actions.command import Command
@@ -33,8 +32,9 @@ def __init__(self, items: Optional[Dict[str, Any]] = None, id: Optional[str] = N
3332
self.version = items.get("version", -1)
3433

3534
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
36-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
35+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
3736
raise NotImplementedError
37+
yield
3838

3939
def on_run_error(self, text_preprocessing_result: BaseTextPreprocessingResult, user: BaseUser):
4040
log("exc_handler: Action failed to run. Return None. MESSAGE: %(masked_message)s.", user,
@@ -72,11 +72,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
7272
self.nodes = items.get("nodes") or {}
7373

7474
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
75-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
76-
commands = []
77-
commands.append(Command(self.command, self.nodes, self.id, request_type=self.request_type,
78-
request_data=self.request_data))
79-
return commands
75+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
76+
yield Command(self.command, self.nodes, self.id, request_type=self.request_type, request_data=self.request_data)
8077

8178

8279
class RequirementAction(Action):
@@ -105,11 +102,10 @@ def build_internal_item(self) -> str:
105102
return self._item
106103

107104
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
108-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
109-
commands = []
105+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
110106
if self.requirement.check(text_preprocessing_result, user, params):
111-
commands.extend(await self.internal_item.run(user, text_preprocessing_result, params) or [])
112-
return commands
107+
async for command in self.internal_item.run(user, text_preprocessing_result, params):
108+
yield command
113109

114110

115111
class ChoiceAction(Action):
@@ -141,18 +137,18 @@ def build_else_item(self) -> Optional[str]:
141137
return self._else_item
142138

143139
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
144-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
145-
commands = []
140+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
146141
choice_is_made = False
147142
for item in self.items:
148143
checked = item.requirement.check(text_preprocessing_result, user, params)
149144
if checked:
150-
commands.extend(await item.internal_item.run(user, text_preprocessing_result, params) or [])
145+
async for command in item.internal_item.run(user, text_preprocessing_result, params):
146+
yield command
151147
choice_is_made = True
152148
break
153149
if not choice_is_made and self._else_item:
154-
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
155-
return commands
150+
async for command in self.else_item.run(user, text_preprocessing_result, params):
151+
yield command
156152

157153

158154
class ElseAction(Action):
@@ -189,14 +185,16 @@ def build_item(self) -> str:
189185
def build_else_item(self) -> Optional[str]:
190186
return self._else_item
191187

192-
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
193-
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None) -> List[Command]:
194-
commands = []
188+
async def run(
189+
self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
190+
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None
191+
) -> AsyncGenerator[Command, None]:
195192
if self.requirement.check(text_preprocessing_result, user, params):
196-
commands.extend(await self.item.run(user, text_preprocessing_result, params) or [])
193+
async for command in self.item.run(user, text_preprocessing_result, params):
194+
yield command
197195
elif self._else_item:
198-
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
199-
return commands
196+
async for command in self.else_item.run(user, text_preprocessing_result, params):
197+
yield command
200198

201199

202200
class ActionOfActions(Action):
@@ -215,11 +213,10 @@ def build_actions(self) -> List[Action]:
215213

216214
class CompositeAction(ActionOfActions):
217215
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
218-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
219-
commands = []
216+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
220217
for action in self.actions:
221-
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
222-
return commands
218+
async for command in action.run(user, text_preprocessing_result, params):
219+
yield command
223220

224221

225222
class NonRepeatingAction(ActionOfActions):
@@ -231,8 +228,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
231228
self._last_action_ids_storage = items["last_action_ids_storage"]
232229

233230
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
234-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
235-
commands = []
231+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
236232
last_ids = user.last_action_ids[self._last_action_ids_storage]
237233
all_indexes = list(range(self._actions_count))
238234
max_last_ids_count = self._actions_count - 1
@@ -242,8 +238,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
242238
action_index = random.choice(available_indexes)
243239
action = self.actions[action_index]
244240
last_ids.add(action_index)
245-
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
246-
return commands
241+
async for command in action.run(user, text_preprocessing_result, params):
242+
yield command
247243

248244

249245
class RandomAction(Action):
@@ -259,9 +255,8 @@ def build_actions(self) -> List[Action]:
259255
return self._raw_actions
260256

261257
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
262-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
263-
commands = []
258+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
264259
pos = random.randint(0, len(self._raw_actions) - 1)
265260
action = self.actions[pos]
266-
commands.extend(await action.run(user, text_preprocessing_result, params=params) or [])
267-
return commands
261+
async for command in action.run(user, text_preprocessing_result, params=params):
262+
yield command

core/basic_models/actions/client_profile_actions.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, Any, Optional, Union, List
1+
from typing import Dict, Any, Optional, Union, AsyncGenerator
22

33
from core.basic_models.actions.command import Command
44
from core.basic_models.actions.string_actions import StringAction
@@ -73,15 +73,15 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
7373
self.request_data[KAFKA_REPLY_TOPIC] = config["template_settings"]["consumer_topic"]
7474

7575
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
76-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
76+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
7777
if self.behavior:
7878
callback_id = user.message.generate_new_callback_id()
7979
scenario_id = user.last_scenarios.last_scenario_name if hasattr(user, 'last_scenarios') else None
8080
user.behaviors.add(callback_id, self.behavior, scenario_id,
8181
text_preprocessing_result.raw, pickle_deepcopy(params))
8282

83-
commands = await super().run(user, text_preprocessing_result, params)
84-
return commands
83+
async for command in super().run(user, text_preprocessing_result, params):
84+
yield command
8585

8686

8787
class RememberThisAction(StringAction):
@@ -157,7 +157,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
157157
})
158158

159159
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
160-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
160+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
161161
self._nodes.update({
162162
"consumer": {
163163
"projectId": user.settings["template_settings"]["project_id"]
@@ -174,5 +174,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
174174
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
175175
self.request_data[KAFKA_REPLY_TOPIC] = user.settings["template_settings"]["consumer_topic"]
176176

177-
commands = await super().run(user, text_preprocessing_result, params)
178-
return commands
177+
async for command in super().run(user, text_preprocessing_result, params):
178+
yield command

core/basic_models/actions/counter_actions.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# coding: utf-8
2-
from typing import Union, Dict, Any, Optional, List
2+
from typing import Union, Dict, Any, Optional, AsyncGenerator
33

44
from core.basic_models.actions.basic_actions import Action
55
from core.basic_models.actions.command import Command
@@ -22,26 +22,26 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
2222

2323
class CounterIncrementAction(CounterAction):
2424
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
25-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
26-
commands = []
25+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
2726
user.counters[self.key].inc(self.value, self.lifetime)
28-
return commands
27+
return
28+
yield
2929

3030

3131
class CounterDecrementAction(CounterAction):
3232
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
33-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
34-
commands = []
33+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
3534
user.counters[self.key].dec(-self.value, self.lifetime)
36-
return commands
35+
return
36+
yield
3737

3838

3939
class CounterClearAction(CounterAction):
4040
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
41-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
42-
commands = []
41+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
4342
user.counters.clear(self.key)
44-
return commands
43+
return
44+
yield
4545

4646

4747
class CounterSetAction(CounterAction):
@@ -58,10 +58,10 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
5858
self.time_shift = items.get("time_shift", 0)
5959

6060
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
61-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
62-
commands = []
61+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
6362
user.counters[self.key].set(self.value, self.reset_time, self.time_shift)
64-
return commands
63+
return
64+
yield
6565

6666

6767
class CounterCopyAction(Action):
@@ -73,8 +73,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
7373
self.time_shift = items.get("time_shift", 0)
7474

7575
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
76-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
77-
commands = []
76+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
7877
value = user.counters[self.src].value
7978
user.counters[self.dst].set(value, self.reset_time, self.time_shift)
80-
return commands
79+
return
80+
yield

core/basic_models/actions/external_actions.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional, Dict, Any, Union, List
1+
from typing import Optional, Dict, Any, Union, AsyncGenerator
22

33
from core.basic_models.actions.basic_actions import CommandAction, Action
44
from core.basic_models.actions.basic_actions import action_factory
@@ -21,7 +21,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
2121
self._action_key: str = items["action"]
2222

2323
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
24-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
24+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
2525
action: Action = user.descriptions["external_actions"][self._action_key]
26-
commands = await action.run(user, text_preprocessing_result, params)
27-
return commands
26+
async for command in action.run(user, text_preprocessing_result, params):
27+
yield command

core/basic_models/actions/push_action.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# coding: utf-8
22
import base64
33
import uuid
4-
from typing import Union, Dict, List, Any, Optional
4+
from typing import Union, Dict, Any, Optional, AsyncGenerator
55

66
from core.basic_models.actions.command import Command
77
from core.basic_models.actions.string_actions import StringAction
@@ -69,7 +69,7 @@ def _render_request_data(self, action_params):
6969
return request_data
7070

7171
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
72-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
72+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
7373
params = params or {}
7474
command_params = {
7575
"projectId": user.settings["template_settings"]["project_id"],
@@ -78,9 +78,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
7878
"content": self._generate_command_context(user, text_preprocessing_result, params),
7979
}
8080
requests_data = self._render_request_data(params)
81-
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
82-
request_data=requests_data, need_payload_wrap=False, need_message_name=False)]
83-
return commands
81+
yield Command(self.command, command_params, self.id, request_type=self.request_type,
82+
request_data=requests_data, need_payload_wrap=False, need_message_name=False)
8483

8584

8685
class PushAuthenticationActionHttp(PushAction):
@@ -133,11 +132,12 @@ def _create_authorization_token(self, items: Dict[str, Any]) -> str:
133132
return authorization_token
134133

135134
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
136-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
135+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
137136
params = params or {}
138137
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
139138
params.update(collected)
140-
return await self.http_request_action.run(user, text_preprocessing_result, params)
139+
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
140+
yield command
141141

142142

143143
class GetRuntimePermissionsAction(PushAction):
@@ -167,7 +167,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
167167
self.command = GET_RUNTIME_PERMISSIONS
168168

169169
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
170-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
170+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
171171
params = params or {}
172172
scenario_id = user.last_scenarios.last_scenario_name
173173
user.behaviors.add(user.message.generate_new_callback_id(), self.behavior, scenario_id,
@@ -183,9 +183,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
183183
}
184184
}
185185
command_params = self._generate_command_context(user, text_preprocessing_result, params)
186-
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
187-
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)]
188-
return commands
186+
yield Command(self.command, command_params, self.id, request_type=self.request_type,
187+
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)
189188

190189

191190
class PushActionHttp(PushAction):
@@ -310,7 +309,7 @@ def _create_instance_of_http_request_action(self, items: Dict[str, Any], id: Opt
310309
self.http_request_action = HTTPRequestAction(items, id)
311310

312311
async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
313-
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
312+
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
314313
params = params or {}
315314
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
316315
params.update(collected)
@@ -331,4 +330,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
331330
"payload": self.payload
332331
}
333332
self.http_request_action.method_params["json"] = request_body_parameters
334-
return await self.http_request_action.run(user, text_preprocessing_result, params)
333+
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
334+
yield command

0 commit comments

Comments
 (0)