[Misc] Optimize ray worker initialization time (#11275)

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Co-authored-by: Cody Yu <hao.yu.cody@gmail.com>
This commit is contained in:
Rui Qiao 2024-12-18 23:38:02 -08:00 committed by GitHub
parent 8936316d58
commit f26c4aeecb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -123,6 +123,7 @@ class RayGPUExecutor(DistributedGPUExecutor):
# Create the workers. # Create the workers.
driver_ip = get_ip() driver_ip = get_ip()
workers = []
for bundle_id, bundle in enumerate(placement_group.bundle_specs): for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0): if not bundle.get("GPU", 0):
continue continue
@ -138,20 +139,30 @@ class RayGPUExecutor(DistributedGPUExecutor):
scheduling_strategy=scheduling_strategy, scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs, **ray_remote_kwargs,
)(RayWorkerWrapper).remote(vllm_config=self.vllm_config) )(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
workers.append(worker)
if self.use_ray_spmd_worker: worker_ip_refs = [
self.workers.append(worker) worker.get_node_ip.remote() # type: ignore[attr-defined]
else: for worker in workers
worker_ip = ray.get(worker.get_node_ip.remote()) ]
if worker_ip == driver_ip and self.driver_dummy_worker is None: worker_ips = ray.get(worker_ip_refs)
if not self.use_ray_spmd_worker:
for i in range(len(workers)):
worker = workers[i]
worker_ip = worker_ips[i]
if self.driver_dummy_worker is None and worker_ip == driver_ip:
# If the worker is on the same node as the driver, we use it # If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process. # as the resource holder for the driver process.
self.driver_dummy_worker = worker self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper( self.driver_worker = RayWorkerWrapper(
vllm_config=self.vllm_config) vllm_config=self.vllm_config)
else: workers.pop(i)
# Else, added to the list of workers. worker_ips.pop(i)
self.workers.append(worker) self.workers = workers
break
else:
self.workers = workers
logger.debug("workers: %s", self.workers) logger.debug("workers: %s", self.workers)
logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker) logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker)
@ -161,14 +172,12 @@ class RayGPUExecutor(DistributedGPUExecutor):
"adjusting the Ray placement group or running the driver on a " "adjusting the Ray placement group or running the driver on a "
"GPU node.") "GPU node.")
worker_ips = [
ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined]
for worker in self.workers
]
ip_counts: Dict[str, int] = {} ip_counts: Dict[str, int] = {}
for ip in worker_ips: for ip in worker_ips:
ip_counts[ip] = ip_counts.get(ip, 0) + 1 ip_counts[ip] = ip_counts.get(ip, 0) + 1
worker_to_ip = dict(zip(self.workers, worker_ips))
def sort_by_driver_then_worker_ip(worker): def sort_by_driver_then_worker_ip(worker):
""" """
Sort the workers based on 3 properties: Sort the workers based on 3 properties:
@ -179,7 +188,7 @@ class RayGPUExecutor(DistributedGPUExecutor):
3. Finally, if the work is on a node with smaller IP address, it 3. Finally, if the work is on a node with smaller IP address, it
should be placed first. should be placed first.
""" """
ip = ray.get(worker.get_node_ip.remote()) ip = worker_to_ip[worker]
return (ip != driver_ip, ip_counts[ip], ip) return (ip != driver_ip, ip_counts[ip], ip)
# After sorting, the workers on the same node will be # After sorting, the workers on the same node will be