From 452a723bf2e8410ee9b47f82f90c7ea48aa6d14f Mon Sep 17 00:00:00 2001 From: Tyler Michael Smith Date: Wed, 11 Dec 2024 18:34:54 -0500 Subject: [PATCH] [V1][Core] Remove should_shutdown to simplify core process termination (#11113) Signed-off-by: Tyler Michael Smith --- vllm/v1/engine/core.py | 13 ++----------- vllm/v1/engine/core_client.py | 6 ------ 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 55a5c4df..a26ffe74 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -5,7 +5,6 @@ import signal import threading import time from multiprocessing.process import BaseProcess -from multiprocessing.sharedctypes import Synchronized from typing import List, Tuple, Type, Union import zmq @@ -133,13 +132,9 @@ class EngineCoreProc(EngineCore): input_path: str, output_path: str, ready_path: str, - should_shutdown: Synchronized, ): super().__init__(vllm_config, executor_class, usage_context) - # Signal from main process to shutdown (multiprocessing.Value). - self.should_shutdown = should_shutdown - # Background Threads and Queues for IO. These enable us to # overlap ZMQ socket IO with GPU since they release the GIL, # and to overlap some serialization/deserialization with the @@ -195,7 +190,6 @@ class EngineCoreProc(EngineCore): input_path: str, output_path: str, ready_path: str, - should_shutdown: Synchronized, ) -> BaseProcess: # The current process might have CUDA context, # so we need to spawn a new process. @@ -210,7 +204,6 @@ class EngineCoreProc(EngineCore): "vllm_config": vllm_config, "executor_class": executor_class, "usage_context": usage_context, - "should_shutdown": should_shutdown } # Run EngineCore busy loop in background process. proc = context.Process(target=EngineCoreProc.run_engine_core, @@ -260,8 +253,8 @@ class EngineCoreProc(EngineCore): def run_busy_loop(self): """Core busy loop of the EngineCore.""" - # Loop until we get a shutdown signal. - while not self.should_shutdown: + # Loop until process is sent a SIGINT or SIGTERM + while True: # 1) Poll the input queue until there is work to do. if not self.scheduler.has_unfinished_requests(): while True: @@ -272,8 +265,6 @@ class EngineCoreProc(EngineCore): except queue.Empty: self._log_stats() logger.debug("EngineCore busy loop waiting.") - if self.should_shutdown: - return except BaseException: raise diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 4d96b323..1d5ddf4d 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,5 +1,4 @@ import atexit -import multiprocessing from typing import List, Union import msgspec @@ -149,21 +148,16 @@ class MPClient(EngineCoreClient): self.input_socket.bind(input_path) # Start EngineCore in background process. - self.should_shutdown = multiprocessing.Value('b', False, lock=False) self.proc = EngineCoreProc.make_engine_core_process( *args, input_path=input_path, output_path=output_path, ready_path=ready_path, - should_shutdown=self.should_shutdown, **kwargs, ) atexit.register(self.shutdown) def shutdown(self): - # Send shutdown signal to background process. - self.should_shutdown = True - # Shut down the zmq context. self.ctx.destroy(linger=0)