[V1][Core] Remove should_shutdown to simplify core process termination (#11113)

Signed-off-by: Tyler Michael Smith <tyler@neuralmagic.com>
This commit is contained in:
Tyler Michael Smith 2024-12-11 18:34:54 -05:00 committed by GitHub
parent d1e21a979b
commit 452a723bf2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 2 additions and 17 deletions

View File

@ -5,7 +5,6 @@ import signal
import threading import threading
import time import time
from multiprocessing.process import BaseProcess from multiprocessing.process import BaseProcess
from multiprocessing.sharedctypes import Synchronized
from typing import List, Tuple, Type, Union from typing import List, Tuple, Type, Union
import zmq import zmq
@ -133,13 +132,9 @@ class EngineCoreProc(EngineCore):
input_path: str, input_path: str,
output_path: str, output_path: str,
ready_path: str, ready_path: str,
should_shutdown: Synchronized,
): ):
super().__init__(vllm_config, executor_class, usage_context) super().__init__(vllm_config, executor_class, usage_context)
# Signal from main process to shutdown (multiprocessing.Value).
self.should_shutdown = should_shutdown
# Background Threads and Queues for IO. These enable us to # Background Threads and Queues for IO. These enable us to
# overlap ZMQ socket IO with GPU since they release the GIL, # overlap ZMQ socket IO with GPU since they release the GIL,
# and to overlap some serialization/deserialization with the # and to overlap some serialization/deserialization with the
@ -195,7 +190,6 @@ class EngineCoreProc(EngineCore):
input_path: str, input_path: str,
output_path: str, output_path: str,
ready_path: str, ready_path: str,
should_shutdown: Synchronized,
) -> BaseProcess: ) -> BaseProcess:
# The current process might have CUDA context, # The current process might have CUDA context,
# so we need to spawn a new process. # so we need to spawn a new process.
@ -210,7 +204,6 @@ class EngineCoreProc(EngineCore):
"vllm_config": vllm_config, "vllm_config": vllm_config,
"executor_class": executor_class, "executor_class": executor_class,
"usage_context": usage_context, "usage_context": usage_context,
"should_shutdown": should_shutdown
} }
# Run EngineCore busy loop in background process. # Run EngineCore busy loop in background process.
proc = context.Process(target=EngineCoreProc.run_engine_core, proc = context.Process(target=EngineCoreProc.run_engine_core,
@ -260,8 +253,8 @@ class EngineCoreProc(EngineCore):
def run_busy_loop(self): def run_busy_loop(self):
"""Core busy loop of the EngineCore.""" """Core busy loop of the EngineCore."""
# Loop until we get a shutdown signal. # Loop until process is sent a SIGINT or SIGTERM
while not self.should_shutdown: while True:
# 1) Poll the input queue until there is work to do. # 1) Poll the input queue until there is work to do.
if not self.scheduler.has_unfinished_requests(): if not self.scheduler.has_unfinished_requests():
while True: while True:
@ -272,8 +265,6 @@ class EngineCoreProc(EngineCore):
except queue.Empty: except queue.Empty:
self._log_stats() self._log_stats()
logger.debug("EngineCore busy loop waiting.") logger.debug("EngineCore busy loop waiting.")
if self.should_shutdown:
return
except BaseException: except BaseException:
raise raise

View File

@ -1,5 +1,4 @@
import atexit import atexit
import multiprocessing
from typing import List, Union from typing import List, Union
import msgspec import msgspec
@ -149,21 +148,16 @@ class MPClient(EngineCoreClient):
self.input_socket.bind(input_path) self.input_socket.bind(input_path)
# Start EngineCore in background process. # Start EngineCore in background process.
self.should_shutdown = multiprocessing.Value('b', False, lock=False)
self.proc = EngineCoreProc.make_engine_core_process( self.proc = EngineCoreProc.make_engine_core_process(
*args, *args,
input_path=input_path, input_path=input_path,
output_path=output_path, output_path=output_path,
ready_path=ready_path, ready_path=ready_path,
should_shutdown=self.should_shutdown,
**kwargs, **kwargs,
) )
atexit.register(self.shutdown) atexit.register(self.shutdown)
def shutdown(self): def shutdown(self):
# Send shutdown signal to background process.
self.should_shutdown = True
# Shut down the zmq context. # Shut down the zmq context.
self.ctx.destroy(linger=0) self.ctx.destroy(linger=0)