@@ -329,7 +329,7 @@ def _register_to_bootstrap(self):
329
329
"role" : "Prefill" ,
330
330
"rank_ip" : get_local_ip_by_remote (),
331
331
"rank_port" : self .rank_port ,
332
- "bootstrap_key " : f" { bootstrap_server_url } _ { self .kv_args .engine_rank } " ,
332
+ "engine_rank " : self .kv_args .engine_rank ,
333
333
}
334
334
335
335
try :
@@ -400,28 +400,29 @@ def __init__(
400
400
self .session_id = self .kv_mgr .get_session_id ()
401
401
self .kv_mgr .update_status (bootstrap_room , KVPoll .Bootstrapping )
402
402
403
- self .bootstrap_key = f"{ self .bootstrap_addr } _{ self .kv_mgr .kv_args .engine_rank } "
403
+ # NOTE: key distinguished by bootstrap_addr and engine_rank
404
+ bootstrap_key = f"{ self .bootstrap_addr } _{ self .kv_mgr .kv_args .engine_rank } "
404
405
405
- if self . bootstrap_key not in self .kv_mgr .connection_pool :
406
+ if bootstrap_key not in self .kv_mgr .connection_pool :
406
407
self .bootstrap_info = self ._get_bootstrap_info_from_server (
407
- self .bootstrap_key
408
+ self .kv_mgr . kv_args . engine_rank
408
409
)
409
410
if self .bootstrap_info is None :
410
411
logger .error (
411
412
f"Could not fetch bootstrap info for engine rank: { self .kv_mgr .kv_args .engine_rank } "
412
413
)
413
414
else :
414
- self .kv_mgr .connection_pool [self . bootstrap_key ] = self .bootstrap_info
415
+ self .kv_mgr .connection_pool [bootstrap_key ] = self .bootstrap_info
415
416
else :
416
- self .bootstrap_info = self .kv_mgr .connection_pool [self . bootstrap_key ]
417
+ self .bootstrap_info = self .kv_mgr .connection_pool [bootstrap_key ]
417
418
418
419
assert self .bootstrap_info is not None
419
420
self .kv_mgr .update_status (bootstrap_room , KVPoll .WaitingForInput )
420
421
421
- def _get_bootstrap_info_from_server (self , bootstrap_key : str ):
422
+ def _get_bootstrap_info_from_server (self , engine_rank ):
422
423
"""Fetch the bootstrap info from the bootstrap server."""
423
424
try :
424
- url = f"http://{ self .bootstrap_addr } /route?bootstrap_key= { bootstrap_key } "
425
+ url = f"http://{ self .bootstrap_addr } /route?engine_rank= { engine_rank } "
425
426
response = requests .get (url )
426
427
if response .status_code == 200 :
427
428
bootstrap_info = response .json ()
@@ -556,28 +557,28 @@ async def _handle_route_put(self, request: web.Request):
556
557
role = data ["role" ]
557
558
rank_ip = data ["rank_ip" ]
558
559
rank_port = int (data ["rank_port" ])
559
- bootstrap_key = data ["bootstrap_key" ]
560
+ engine_rank = int ( data ["engine_rank" ])
560
561
561
562
# Add lock to make sure thread-safe
562
563
if role == "Prefill" :
563
- self .prefill_port_table [bootstrap_key ] = {
564
+ self .prefill_port_table [engine_rank ] = {
564
565
"rank_ip" : rank_ip ,
565
566
"rank_port" : rank_port ,
566
567
}
567
568
logger .debug (
568
- f"Registered Prefill bootstrap_key : { bootstrap_key } with rank_ip: { rank_ip } and rank_port: { rank_port } "
569
+ f"Registered Prefill boostrap : { engine_rank } with rank_ip: { rank_ip } and rank_port: { rank_port } "
569
570
)
570
571
571
572
return web .Response (text = "OK" , status = 200 )
572
573
573
574
async def _handle_route_get (self , request : web .Request ):
574
- bootstrap_key = request .query .get ("bootstrap_key " )
575
- if not bootstrap_key :
576
- return web .Response (text = "Missing bootstrap_key " , status = 400 )
575
+ engine_rank = request .query .get ("engine_rank " )
576
+ if not engine_rank :
577
+ return web .Response (text = "Missing rank " , status = 400 )
577
578
578
579
# Find corresponding prefill info
579
580
async with self .lock :
580
- bootstrap_info = self .prefill_port_table .get (bootstrap_key )
581
+ bootstrap_info = self .prefill_port_table .get (int ( engine_rank ) )
581
582
582
583
if bootstrap_info is not None :
583
584
return web .json_response (bootstrap_info , status = 200 )
0 commit comments