1
+ import functools
2
+ import time
3
+ from concurrent .futures import Future , ThreadPoolExecutor
1
4
from datetime import datetime
2
- from typing import Optional , Tuple
5
+ from typing import List , Optional , Tuple
3
6
4
7
import boto3
5
8
import requests
6
9
from mypy_boto3_lambda import LambdaClient
7
10
from mypy_boto3_lambda .type_defs import InvocationResponseTypeDef
11
+ from pydantic import BaseModel
8
12
from requests import Request , Response
9
13
from requests .exceptions import RequestException
10
14
from retry import retry
11
15
16
+ GetLambdaResponse = Tuple [InvocationResponseTypeDef , datetime ]
17
+
18
+
19
+ class GetLambdaResponseOptions (BaseModel ):
20
+ lambda_arn : str
21
+ payload : Optional [str ] = None
22
+ client : Optional [LambdaClient ] = None
23
+ raise_on_error : bool = True
24
+
25
+ # Maintenance: Pydantic v2 deprecated it; we should update in v3
26
+ class Config :
27
+ arbitrary_types_allowed = True
28
+
12
29
13
30
def get_lambda_response (
14
31
lambda_arn : str ,
15
32
payload : Optional [str ] = None ,
16
33
client : Optional [LambdaClient ] = None ,
17
34
raise_on_error : bool = True ,
18
- ) -> Tuple [ InvocationResponseTypeDef , datetime ] :
35
+ ) -> GetLambdaResponse :
19
36
"""Invoke function synchronously
20
37
21
38
Parameters
@@ -42,18 +59,18 @@ def get_lambda_response(
42
59
client = client or boto3 .client ("lambda" )
43
60
payload = payload or ""
44
61
execution_time = datetime .utcnow ()
45
-
46
62
response : InvocationResponseTypeDef = client .invoke (
47
63
FunctionName = lambda_arn ,
48
64
InvocationType = "RequestResponse" ,
49
65
Payload = payload ,
50
66
)
67
+
51
68
has_error = response .get ("FunctionError" , "" ) == "Unhandled"
52
69
if has_error and raise_on_error :
53
70
error_payload = response ["Payload" ].read ().decode ()
54
71
raise RuntimeError (f"Function failed invocation: { error_payload } " )
55
72
56
- return client . invoke ( FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload ) , execution_time
73
+ return response , execution_time
57
74
58
75
59
76
@retry (RequestException , delay = 2 , jitter = 1.5 , tries = 5 )
@@ -62,3 +79,39 @@ def get_http_response(request: Request) -> Response:
62
79
result = session .send (request .prepare ())
63
80
result .raise_for_status ()
64
81
return result
82
+
83
+
84
+ def get_lambda_response_in_parallel (
85
+ get_lambda_response_options : List [GetLambdaResponseOptions ],
86
+ ) -> List [GetLambdaResponse ]:
87
+ """Invoke functions in parallel
88
+
89
+ Parameters
90
+ ----------
91
+ get_lambda_response_options : List[GetLambdaResponseOptions]
92
+ List of options to call get_lambda_response with
93
+
94
+ Returns
95
+ -------
96
+ List[GetLambdaResponse]
97
+ Function responses and approximate execution time
98
+ """
99
+ result_list = []
100
+ with ThreadPoolExecutor () as executor :
101
+ running_tasks : List [Future ] = []
102
+ for options in get_lambda_response_options :
103
+ # Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way
104
+ # we can guarantee that lambdas are executed in parallel, but they are
105
+ # called in the same "order" as they are passed in, thus guaranteeing that
106
+ # we can assert on the correct output.
107
+ time .sleep (0.5 * len (running_tasks ))
108
+
109
+ get_lambda_response_callback = functools .partial (get_lambda_response , ** options .dict ())
110
+ running_tasks .append (
111
+ executor .submit (get_lambda_response_callback ),
112
+ )
113
+
114
+ executor .shutdown (wait = True )
115
+ result_list .extend (running_task .result () for running_task in running_tasks )
116
+
117
+ return result_list
0 commit comments