-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Milestone
Description
async def executeQueriesAsync(self, queries):
"""Executes multiple StackQL queries asynchronously using the current StackQL instance.
This method utilizes an asyncio event loop to concurrently run a list of provided
StackQL queries. Each query is executed independently, and the combined results of
all the queries are returned as a list of JSON objects if 'dict' output mode is selected,
or as a concatenated DataFrame if 'pandas' output mode is selected.
The order of the results in the returned list or DataFrame may not necessarily
correspond to the order of the queries in the input list due to the asynchronous nature
of execution.
:param queries: A list of StackQL query strings to be executed concurrently.
:type queries: list[str], required
:return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
:rtype: list[dict] or pd.DataFrame
:raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux).
:raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').
Example:
>>> from pystackql import StackQL
>>> stackql = StackQL()
>>> queries = [
>>> \"\"\"SELECT '%s' as region, instanceType, COUNT(*) as num_instances
... FROM aws.ec2.instances
... WHERE region = '%s'
... GROUP BY instanceType\"\"\" % (region, region)
>>> for region in regions ]
>>> result = stackql.executeQueriesAsync(queries)
Note:
- When operating in `server_mode`, this method is not supported.
"""
if self.server_mode:
raise ValueError("executeQueriesAsync are not supported in sever_mode.")
if self.output not in ['dict', 'pandas']:
raise ValueError("executeQueriesAsync supports only 'dict' or 'pandas' output modes.")
async def main():
with ThreadPoolExecutor() as executor:
# New connection is created for each query in server_mode, reused otherwise.
new_connection = self.server_mode
# Gather results from all the async calls.
loop = asyncio.get_event_loop()
futures = [loop.run_in_executor(executor, self._sync_query, query, new_connection) for query in queries]
results = await asyncio.gather(*futures)
# Concatenate DataFrames if output mode is 'pandas'.
if self.output == 'pandas':
return pd.concat(results, ignore_index=True)
else:
return [item for sublist in results for item in sublist]
# Running the async function
return await main()
Metadata
Metadata
Assignees
Labels
No labels