@@ -76,7 +76,7 @@ def __init__(self, client: RequestQueueClient) -> None:
7676 Preferably use the `RequestQueue.open` constructor to create a new instance.
7777
7878 Args:
79- client: An instance of a key-value store client.
79+ client: An instance of a request queue client.
8080 """
8181 self ._client = client
8282
@@ -111,7 +111,7 @@ async def open(
111111 if id and name :
112112 raise ValueError ('Only one of "id" or "name" can be specified, not both.' )
113113
114- # Check if key value store is already cached by id or name
114+ # Check if request queue is already cached by id or name
115115 if id and id in cls ._cache_by_id :
116116 return cls ._cache_by_id [id ]
117117 if name and name in cls ._cache_by_name :
@@ -126,7 +126,15 @@ async def open(
126126 configuration = configuration ,
127127 )
128128
129- return cls (client )
129+ rq = cls (client )
130+
131+ # Cache the request queue by id and name if available
132+ if rq .id :
133+ cls ._cache_by_id [rq .id ] = rq
134+ if rq .name :
135+ cls ._cache_by_name [rq .name ] = rq
136+
137+ return rq
130138
131139 @override
132140 async def drop (self ) -> None :
@@ -163,27 +171,32 @@ async def add_requests(
163171 transformed_requests = self ._transform_requests (requests )
164172 wait_time_secs = wait_time_between_batches .total_seconds ()
165173
166- async def _process_batch (batch : Sequence [Request ]) -> None :
167- request_count = len (batch )
168- response = await self ._client .add_batch_of_requests (batch , forefront = forefront )
169- logger .debug (f'Added { request_count } requests to the queue, response: { response } ' )
170-
171174 # Wait for the first batch to be added
172175 first_batch = transformed_requests [:batch_size ]
173176 if first_batch :
174- await _process_batch (first_batch )
177+ await self ._process_batch (
178+ first_batch ,
179+ base_retry_wait = wait_time_between_batches ,
180+ forefront = forefront ,
181+ )
175182
176183 async def _process_remaining_batches () -> None :
177184 for i in range (batch_size , len (transformed_requests ), batch_size ):
178185 batch = transformed_requests [i : i + batch_size ]
179- await _process_batch (batch )
186+ await self ._process_batch (
187+ batch ,
188+ base_retry_wait = wait_time_between_batches ,
189+ forefront = forefront ,
190+ )
180191 if i + batch_size < len (transformed_requests ):
181192 await asyncio .sleep (wait_time_secs )
182193
183194 # Create and start the task to process remaining batches in the background
184195 remaining_batches_task = asyncio .create_task (
185- _process_remaining_batches (), name = 'request_queue_process_remaining_batches_task'
196+ _process_remaining_batches (),
197+ name = 'request_queue_process_remaining_batches_task' ,
186198 )
199+
187200 self ._add_requests_tasks .append (remaining_batches_task )
188201 remaining_batches_task .add_done_callback (lambda _ : self ._add_requests_tasks .remove (remaining_batches_task ))
189202
@@ -195,69 +208,6 @@ async def _process_remaining_batches() -> None:
195208 timeout = wait_for_all_requests_to_be_added_timeout ,
196209 )
197210
198- # Wait for the first batch to be added
199- first_batch = transformed_requests [:batch_size ]
200- if first_batch :
201- await self ._process_batch (first_batch , base_retry_wait = wait_time_between_batches )
202-
203- async def _process_remaining_batches () -> None :
204- for i in range (batch_size , len (transformed_requests ), batch_size ):
205- batch = transformed_requests [i : i + batch_size ]
206- await self ._process_batch (batch , base_retry_wait = wait_time_between_batches )
207- if i + batch_size < len (transformed_requests ):
208- await asyncio .sleep (wait_time_secs )
209-
210- # Create and start the task to process remaining batches in the background
211- remaining_batches_task = asyncio .create_task (
212- _process_remaining_batches (), name = 'request_queue_process_remaining_batches_task'
213- )
214- self ._tasks .append (remaining_batches_task )
215- remaining_batches_task .add_done_callback (lambda _ : self ._tasks .remove (remaining_batches_task ))
216-
217- # Wait for all tasks to finish if requested
218- if wait_for_all_requests_to_be_added :
219- await wait_for_all_tasks_for_finish (
220- (remaining_batches_task ,),
221- logger = logger ,
222- timeout = wait_for_all_requests_to_be_added_timeout ,
223- )
224-
225- async def _process_batch (self , batch : Sequence [Request ], base_retry_wait : timedelta , attempt : int = 1 ) -> None :
226- max_attempts = 5
227- response = await self ._resource_client .batch_add_requests (batch )
228-
229- if response .unprocessed_requests :
230- logger .debug (f'Following requests were not processed: { response .unprocessed_requests } .' )
231- if attempt > max_attempts :
232- logger .warning (
233- f'Following requests were not processed even after { max_attempts } attempts:\n '
234- f'{ response .unprocessed_requests } '
235- )
236- else :
237- logger .debug ('Retry to add requests.' )
238- unprocessed_requests_unique_keys = {request .unique_key for request in response .unprocessed_requests }
239- retry_batch = [request for request in batch if request .unique_key in unprocessed_requests_unique_keys ]
240- await asyncio .sleep ((base_retry_wait * attempt ).total_seconds ())
241- await self ._process_batch (retry_batch , base_retry_wait = base_retry_wait , attempt = attempt + 1 )
242-
243- request_count = len (batch ) - len (response .unprocessed_requests )
244- self ._assumed_total_count += request_count
245- if request_count :
246- logger .debug (
247- f'Added { request_count } requests to the queue. Processed requests: { response .processed_requests } '
248- )
249-
250- async def get_request (self , request_id : str ) -> Request | None :
251- """Retrieve a request from the queue.
252-
253- Args:
254- request_id: ID of the request to retrieve.
255-
256- Returns:
257- The retrieved request, or `None`, if it does not exist.
258- """
259- # TODO: implement
260-
261211 async def fetch_next_request (self ) -> Request | None :
262212 """Return the next request in the queue to be processed.
263213
@@ -346,3 +296,35 @@ async def is_finished(self) -> bool:
346296 return True
347297
348298 return False
299+
300+ async def _process_batch (
301+ self ,
302+ batch : Sequence [Request ],
303+ * ,
304+ base_retry_wait : timedelta ,
305+ attempt : int = 1 ,
306+ forefront : bool = False ,
307+ ) -> None :
308+ max_attempts = 5
309+ response = await self ._client .add_batch_of_requests (batch , forefront = forefront )
310+
311+ if response .unprocessed_requests :
312+ logger .debug (f'Following requests were not processed: { response .unprocessed_requests } .' )
313+ if attempt > max_attempts :
314+ logger .warning (
315+ f'Following requests were not processed even after { max_attempts } attempts:\n '
316+ f'{ response .unprocessed_requests } '
317+ )
318+ else :
319+ logger .debug ('Retry to add requests.' )
320+ unprocessed_requests_unique_keys = {request .unique_key for request in response .unprocessed_requests }
321+ retry_batch = [request for request in batch if request .unique_key in unprocessed_requests_unique_keys ]
322+ await asyncio .sleep ((base_retry_wait * attempt ).total_seconds ())
323+ await self ._process_batch (retry_batch , base_retry_wait = base_retry_wait , attempt = attempt + 1 )
324+
325+ request_count = len (batch ) - len (response .unprocessed_requests )
326+
327+ if request_count :
328+ logger .debug (
329+ f'Added { request_count } requests to the queue. Processed requests: { response .processed_requests } '
330+ )
0 commit comments