1131 lines
43 KiB
Python
1131 lines
43 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
import asyncio
|
|
import atexit
|
|
import gc
|
|
import importlib
|
|
import inspect
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import signal
|
|
import socket
|
|
import tempfile
|
|
import uuid
|
|
from argparse import Namespace
|
|
from collections.abc import AsyncIterator
|
|
from contextlib import asynccontextmanager
|
|
from functools import partial
|
|
from http import HTTPStatus
|
|
from typing import Annotated, Optional, Union
|
|
|
|
import uvloop
|
|
from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, Request
|
|
from fastapi.exceptions import RequestValidationError
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
|
from starlette.concurrency import iterate_in_threadpool
|
|
from starlette.datastructures import State
|
|
from starlette.routing import Mount
|
|
from typing_extensions import assert_never
|
|
|
|
import vllm.envs as envs
|
|
from vllm.config import VllmConfig
|
|
from vllm.engine.arg_utils import AsyncEngineArgs
|
|
from vllm.engine.async_llm_engine import AsyncLLMEngine # type: ignore
|
|
from vllm.engine.multiprocessing.client import MQLLMEngineClient
|
|
from vllm.engine.multiprocessing.engine import run_mp_engine
|
|
from vllm.engine.protocol import EngineClient
|
|
from vllm.entrypoints.chat_utils import (load_chat_template,
|
|
resolve_hf_chat_template,
|
|
resolve_mistral_chat_template)
|
|
from vllm.entrypoints.launcher import serve_http
|
|
from vllm.entrypoints.logger import RequestLogger
|
|
from vllm.entrypoints.openai.cli_args import (make_arg_parser,
|
|
validate_parsed_serve_args)
|
|
# yapf conflicts with isort for this block
|
|
# yapf: disable
|
|
from vllm.entrypoints.openai.protocol import (ChatCompletionRequest,
|
|
ChatCompletionResponse,
|
|
CompletionRequest,
|
|
CompletionResponse,
|
|
DetokenizeRequest,
|
|
DetokenizeResponse,
|
|
EmbeddingChatRequest,
|
|
EmbeddingCompletionRequest,
|
|
EmbeddingRequest,
|
|
EmbeddingResponse,
|
|
EmbeddingResponseData,
|
|
ErrorResponse,
|
|
LoadLoRAAdapterRequest,
|
|
PoolingChatRequest,
|
|
PoolingCompletionRequest,
|
|
PoolingRequest, PoolingResponse,
|
|
RerankRequest, RerankResponse,
|
|
ScoreRequest, ScoreResponse,
|
|
TokenizeRequest,
|
|
TokenizeResponse,
|
|
TranscriptionRequest,
|
|
TranscriptionResponse,
|
|
UnloadLoRAAdapterRequest)
|
|
# yapf: enable
|
|
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
|
|
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
|
|
from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding
|
|
from vllm.entrypoints.openai.serving_engine import OpenAIServing
|
|
from vllm.entrypoints.openai.serving_models import (BaseModelPath,
|
|
OpenAIServingModels)
|
|
from vllm.entrypoints.openai.serving_pooling import OpenAIServingPooling
|
|
from vllm.entrypoints.openai.serving_score import ServingScores
|
|
from vllm.entrypoints.openai.serving_tokenization import (
|
|
OpenAIServingTokenization)
|
|
from vllm.entrypoints.openai.serving_transcription import (
|
|
OpenAIServingTranscription)
|
|
from vllm.entrypoints.openai.tool_parsers import ToolParserManager
|
|
from vllm.entrypoints.utils import (cli_env_setup, load_aware_call,
|
|
with_cancellation)
|
|
from vllm.logger import init_logger
|
|
from vllm.reasoning import ReasoningParserManager
|
|
from vllm.transformers_utils.config import (
|
|
maybe_register_config_serialize_by_value)
|
|
from vllm.transformers_utils.tokenizer import MistralTokenizer
|
|
from vllm.usage.usage_lib import UsageContext
|
|
from vllm.utils import (Device, FlexibleArgumentParser, get_open_zmq_ipc_path,
|
|
is_valid_ipv6_address, set_ulimit)
|
|
from vllm.version import __version__ as VLLM_VERSION
|
|
|
|
TIMEOUT_KEEP_ALIVE = 5 # seconds
|
|
|
|
prometheus_multiproc_dir: tempfile.TemporaryDirectory
|
|
|
|
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
|
|
logger = init_logger('vllm.entrypoints.openai.api_server')
|
|
|
|
_running_tasks: set[asyncio.Task] = set()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
try:
|
|
if app.state.log_stats:
|
|
engine_client: EngineClient = app.state.engine_client
|
|
|
|
async def _force_log():
|
|
while True:
|
|
await asyncio.sleep(10.)
|
|
await engine_client.do_log_stats()
|
|
|
|
task = asyncio.create_task(_force_log())
|
|
_running_tasks.add(task)
|
|
task.add_done_callback(_running_tasks.remove)
|
|
else:
|
|
task = None
|
|
|
|
# Mark the startup heap as static so that it's ignored by GC.
|
|
# Reduces pause times of oldest generation collections.
|
|
gc.collect()
|
|
gc.freeze()
|
|
try:
|
|
yield
|
|
finally:
|
|
if task is not None:
|
|
task.cancel()
|
|
finally:
|
|
# Ensure app state including engine ref is gc'd
|
|
del app.state
|
|
|
|
|
|
@asynccontextmanager
|
|
async def build_async_engine_client(
|
|
args: Namespace) -> AsyncIterator[EngineClient]:
|
|
|
|
# Context manager to handle engine_client lifecycle
|
|
# Ensures everything is shutdown and cleaned up on error/exit
|
|
engine_args = AsyncEngineArgs.from_cli_args(args)
|
|
|
|
async with build_async_engine_client_from_engine_args(
|
|
engine_args, args.disable_frontend_multiprocessing) as engine:
|
|
yield engine
|
|
|
|
|
|
@asynccontextmanager
|
|
async def build_async_engine_client_from_engine_args(
|
|
engine_args: AsyncEngineArgs,
|
|
disable_frontend_multiprocessing: bool = False,
|
|
) -> AsyncIterator[EngineClient]:
|
|
"""
|
|
Create EngineClient, either:
|
|
- in-process using the AsyncLLMEngine Directly
|
|
- multiprocess using AsyncLLMEngine RPC
|
|
|
|
Returns the Client or None if the creation failed.
|
|
"""
|
|
|
|
# Create the EngineConfig (determines if we can use V1).
|
|
usage_context = UsageContext.OPENAI_API_SERVER
|
|
vllm_config = engine_args.create_engine_config(usage_context=usage_context)
|
|
|
|
# V1 AsyncLLM.
|
|
if envs.VLLM_USE_V1:
|
|
if disable_frontend_multiprocessing:
|
|
logger.warning(
|
|
"V1 is enabled, but got --disable-frontend-multiprocessing. "
|
|
"To disable frontend multiprocessing, set VLLM_USE_V1=0.")
|
|
|
|
from vllm.v1.engine.async_llm import AsyncLLM
|
|
async_llm: Optional[AsyncLLM] = None
|
|
try:
|
|
async_llm = AsyncLLM.from_vllm_config(
|
|
vllm_config=vllm_config,
|
|
usage_context=usage_context,
|
|
disable_log_requests=engine_args.disable_log_requests,
|
|
disable_log_stats=engine_args.disable_log_stats)
|
|
yield async_llm
|
|
finally:
|
|
if async_llm:
|
|
async_llm.shutdown()
|
|
|
|
# V0 AsyncLLM.
|
|
elif (MQLLMEngineClient.is_unsupported_config(vllm_config)
|
|
or disable_frontend_multiprocessing):
|
|
|
|
engine_client: Optional[EngineClient] = None
|
|
try:
|
|
engine_client = AsyncLLMEngine.from_vllm_config(
|
|
vllm_config=vllm_config,
|
|
usage_context=usage_context,
|
|
disable_log_requests=engine_args.disable_log_requests,
|
|
disable_log_stats=engine_args.disable_log_stats)
|
|
yield engine_client
|
|
finally:
|
|
if engine_client and hasattr(engine_client, "shutdown"):
|
|
engine_client.shutdown()
|
|
|
|
# V0MQLLMEngine.
|
|
else:
|
|
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
|
|
# Make TemporaryDirectory for prometheus multiprocessing
|
|
# Note: global TemporaryDirectory will be automatically
|
|
# cleaned up upon exit.
|
|
global prometheus_multiproc_dir
|
|
prometheus_multiproc_dir = tempfile.TemporaryDirectory()
|
|
os.environ[
|
|
"PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name
|
|
else:
|
|
logger.warning(
|
|
"Found PROMETHEUS_MULTIPROC_DIR was set by user. "
|
|
"This directory must be wiped between vLLM runs or "
|
|
"you will find inaccurate metrics. Unset the variable "
|
|
"and vLLM will properly handle cleanup.")
|
|
|
|
# Select random path for IPC.
|
|
ipc_path = get_open_zmq_ipc_path()
|
|
logger.debug("Multiprocessing frontend to use %s for IPC Path.",
|
|
ipc_path)
|
|
|
|
# Start RPCServer in separate process (holds the LLMEngine).
|
|
# the current process might have CUDA context,
|
|
# so we need to spawn a new process
|
|
context = multiprocessing.get_context("spawn")
|
|
|
|
# Ensure we can serialize transformer config before spawning
|
|
maybe_register_config_serialize_by_value()
|
|
|
|
# The Process can raise an exception during startup, which may
|
|
# not actually result in an exitcode being reported. As a result
|
|
# we use a shared variable to communicate the information.
|
|
engine_alive = multiprocessing.Value('b', True, lock=False)
|
|
engine_process = context.Process(
|
|
target=run_mp_engine,
|
|
args=(vllm_config, UsageContext.OPENAI_API_SERVER, ipc_path,
|
|
engine_args.disable_log_stats,
|
|
engine_args.disable_log_requests, engine_alive))
|
|
engine_process.start()
|
|
engine_pid = engine_process.pid
|
|
assert engine_pid is not None, "Engine process failed to start."
|
|
logger.info("Started engine process with PID %d", engine_pid)
|
|
|
|
def _cleanup_ipc_path():
|
|
socket_path = ipc_path.replace("ipc://", "")
|
|
if os.path.exists(socket_path):
|
|
os.remove(socket_path)
|
|
|
|
# Ensure we clean up the local IPC socket file on exit.
|
|
atexit.register(_cleanup_ipc_path)
|
|
|
|
# Build RPCClient, which conforms to EngineClient Protocol.
|
|
build_client = partial(MQLLMEngineClient, ipc_path, vllm_config,
|
|
engine_pid)
|
|
mq_engine_client = await asyncio.get_running_loop().run_in_executor(
|
|
None, build_client)
|
|
try:
|
|
while True:
|
|
try:
|
|
await mq_engine_client.setup()
|
|
break
|
|
except TimeoutError:
|
|
if (not engine_process.is_alive()
|
|
or not engine_alive.value):
|
|
raise RuntimeError(
|
|
"Engine process failed to start. See stack "
|
|
"trace for the root cause.") from None
|
|
|
|
yield mq_engine_client # type: ignore[misc]
|
|
finally:
|
|
# Ensure rpc server process was terminated
|
|
engine_process.terminate()
|
|
|
|
# Close all open connections to the backend
|
|
mq_engine_client.close()
|
|
|
|
# Wait for engine process to join
|
|
engine_process.join(4)
|
|
if engine_process.exitcode is None:
|
|
# Kill if taking longer than 5 seconds to stop
|
|
engine_process.kill()
|
|
|
|
# Lazy import for prometheus multiprocessing.
|
|
# We need to set PROMETHEUS_MULTIPROC_DIR environment variable
|
|
# before prometheus_client is imported.
|
|
# See https://prometheus.github.io/client_python/multiprocess/
|
|
from prometheus_client import multiprocess
|
|
multiprocess.mark_process_dead(engine_process.pid)
|
|
|
|
|
|
async def validate_json_request(raw_request: Request):
|
|
content_type = raw_request.headers.get("content-type", "").lower()
|
|
media_type = content_type.split(";", maxsplit=1)[0]
|
|
if media_type != "application/json":
|
|
raise HTTPException(
|
|
status_code=HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
|
|
detail="Unsupported Media Type: Only 'application/json' is allowed"
|
|
)
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def mount_metrics(app: FastAPI):
|
|
# Lazy import for prometheus multiprocessing.
|
|
# We need to set PROMETHEUS_MULTIPROC_DIR environment variable
|
|
# before prometheus_client is imported.
|
|
# See https://prometheus.github.io/client_python/multiprocess/
|
|
from prometheus_client import (REGISTRY, CollectorRegistry, make_asgi_app,
|
|
multiprocess)
|
|
from prometheus_fastapi_instrumentator import Instrumentator
|
|
|
|
registry = REGISTRY
|
|
|
|
prometheus_multiproc_dir_path = os.getenv("PROMETHEUS_MULTIPROC_DIR", None)
|
|
if prometheus_multiproc_dir_path is not None:
|
|
logger.debug("vLLM to use %s as PROMETHEUS_MULTIPROC_DIR",
|
|
prometheus_multiproc_dir_path)
|
|
registry = CollectorRegistry()
|
|
multiprocess.MultiProcessCollector(registry)
|
|
|
|
Instrumentator(
|
|
excluded_handlers=[
|
|
"/metrics",
|
|
"/health",
|
|
"/load",
|
|
"/ping",
|
|
"/version",
|
|
"/server_info",
|
|
],
|
|
registry=registry,
|
|
).add().instrument(app).expose(app)
|
|
|
|
# Add prometheus asgi middleware to route /metrics requests
|
|
metrics_route = Mount("/metrics", make_asgi_app(registry=registry))
|
|
|
|
# Workaround for 307 Redirect for /metrics
|
|
metrics_route.path_regex = re.compile("^/metrics(?P<path>.*)$")
|
|
app.routes.append(metrics_route)
|
|
|
|
|
|
def base(request: Request) -> OpenAIServing:
|
|
# Reuse the existing instance
|
|
return tokenization(request)
|
|
|
|
|
|
def models(request: Request) -> OpenAIServingModels:
|
|
return request.app.state.openai_serving_models
|
|
|
|
|
|
def chat(request: Request) -> Optional[OpenAIServingChat]:
|
|
return request.app.state.openai_serving_chat
|
|
|
|
|
|
def completion(request: Request) -> Optional[OpenAIServingCompletion]:
|
|
return request.app.state.openai_serving_completion
|
|
|
|
|
|
def pooling(request: Request) -> Optional[OpenAIServingPooling]:
|
|
return request.app.state.openai_serving_pooling
|
|
|
|
|
|
def embedding(request: Request) -> Optional[OpenAIServingEmbedding]:
|
|
return request.app.state.openai_serving_embedding
|
|
|
|
|
|
def score(request: Request) -> Optional[ServingScores]:
|
|
return request.app.state.openai_serving_scores
|
|
|
|
|
|
def rerank(request: Request) -> Optional[ServingScores]:
|
|
return request.app.state.openai_serving_scores
|
|
|
|
|
|
def tokenization(request: Request) -> OpenAIServingTokenization:
|
|
return request.app.state.openai_serving_tokenization
|
|
|
|
|
|
def transcription(request: Request) -> OpenAIServingTranscription:
|
|
return request.app.state.openai_serving_transcription
|
|
|
|
|
|
def engine_client(request: Request) -> EngineClient:
|
|
return request.app.state.engine_client
|
|
|
|
|
|
@router.get("/health")
|
|
async def health(raw_request: Request) -> Response:
|
|
"""Health check."""
|
|
await engine_client(raw_request).check_health()
|
|
return Response(status_code=200)
|
|
|
|
|
|
@router.get("/load")
|
|
async def get_server_load_metrics(request: Request):
|
|
# This endpoint returns the current server load metrics.
|
|
# It tracks requests utilizing the GPU from the following routes:
|
|
# - /v1/chat/completions
|
|
# - /v1/completions
|
|
# - /v1/audio/transcriptions
|
|
# - /v1/embeddings
|
|
# - /pooling
|
|
# - /score
|
|
# - /v1/score
|
|
# - /rerank
|
|
# - /v1/rerank
|
|
# - /v2/rerank
|
|
return JSONResponse(
|
|
content={'server_load': request.app.state.server_load_metrics})
|
|
|
|
|
|
@router.api_route("/ping", methods=["GET", "POST"])
|
|
async def ping(raw_request: Request) -> Response:
|
|
"""Ping check. Endpoint required for SageMaker"""
|
|
return await health(raw_request)
|
|
|
|
|
|
@router.post("/tokenize", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
async def tokenize(request: TokenizeRequest, raw_request: Request):
|
|
handler = tokenization(raw_request)
|
|
|
|
generator = await handler.create_tokenize(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, TokenizeResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/detokenize", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
async def detokenize(request: DetokenizeRequest, raw_request: Request):
|
|
handler = tokenization(raw_request)
|
|
|
|
generator = await handler.create_detokenize(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, DetokenizeResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.get("/v1/models")
|
|
async def show_available_models(raw_request: Request):
|
|
handler = models(raw_request)
|
|
|
|
models_ = await handler.show_available_models()
|
|
return JSONResponse(content=models_.model_dump())
|
|
|
|
|
|
@router.get("/version")
|
|
async def show_version():
|
|
ver = {"version": VLLM_VERSION}
|
|
return JSONResponse(content=ver)
|
|
|
|
|
|
@router.post("/v1/chat/completions",
|
|
dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_chat_completion(request: ChatCompletionRequest,
|
|
raw_request: Request):
|
|
handler = chat(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Chat Completions API")
|
|
|
|
generator = await handler.create_chat_completion(request, raw_request)
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
|
|
elif isinstance(generator, ChatCompletionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post("/v1/completions", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_completion(request: CompletionRequest, raw_request: Request):
|
|
handler = completion(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Completions API")
|
|
|
|
generator = await handler.create_completion(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, CompletionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post("/v1/embeddings", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
|
|
handler = embedding(raw_request)
|
|
if handler is None:
|
|
fallback_handler = pooling(raw_request)
|
|
if fallback_handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Embeddings API")
|
|
|
|
logger.warning(
|
|
"Embeddings API will become exclusive to embedding models "
|
|
"in a future release. To return the hidden states directly, "
|
|
"use the Pooling API (`/pooling`) instead.")
|
|
|
|
res = await fallback_handler.create_pooling(request, raw_request)
|
|
|
|
generator: Union[ErrorResponse, EmbeddingResponse]
|
|
if isinstance(res, PoolingResponse):
|
|
generator = EmbeddingResponse(
|
|
id=res.id,
|
|
object=res.object,
|
|
created=res.created,
|
|
model=res.model,
|
|
data=[
|
|
EmbeddingResponseData(
|
|
index=d.index,
|
|
embedding=d.data, # type: ignore
|
|
) for d in res.data
|
|
],
|
|
usage=res.usage,
|
|
)
|
|
else:
|
|
generator = res
|
|
else:
|
|
generator = await handler.create_embedding(request, raw_request)
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, EmbeddingResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/pooling", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_pooling(request: PoolingRequest, raw_request: Request):
|
|
handler = pooling(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Pooling API")
|
|
|
|
generator = await handler.create_pooling(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, PoolingResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/score", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_score(request: ScoreRequest, raw_request: Request):
|
|
handler = score(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Score API")
|
|
|
|
generator = await handler.create_score(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, ScoreResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/v1/score", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_score_v1(request: ScoreRequest, raw_request: Request):
|
|
logger.warning(
|
|
"To indicate that Score API is not part of standard OpenAI API, we "
|
|
"have moved it to `/score`. Please update your client accordingly.")
|
|
|
|
return await create_score(request, raw_request)
|
|
|
|
|
|
@router.post("/v1/audio/transcriptions")
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def create_transcriptions(request: Annotated[TranscriptionRequest,
|
|
Form()],
|
|
raw_request: Request):
|
|
handler = transcription(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Transcriptions API")
|
|
|
|
audio_data = await request.file.read()
|
|
generator = await handler.create_transcription(audio_data, request,
|
|
raw_request)
|
|
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
|
|
elif isinstance(generator, TranscriptionResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
return StreamingResponse(content=generator, media_type="text/event-stream")
|
|
|
|
|
|
@router.post("/rerank", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
@load_aware_call
|
|
async def do_rerank(request: RerankRequest, raw_request: Request):
|
|
handler = rerank(raw_request)
|
|
if handler is None:
|
|
return base(raw_request).create_error_response(
|
|
message="The model does not support Rerank (Score) API")
|
|
generator = await handler.do_rerank(request, raw_request)
|
|
if isinstance(generator, ErrorResponse):
|
|
return JSONResponse(content=generator.model_dump(),
|
|
status_code=generator.code)
|
|
elif isinstance(generator, RerankResponse):
|
|
return JSONResponse(content=generator.model_dump())
|
|
|
|
assert_never(generator)
|
|
|
|
|
|
@router.post("/v1/rerank", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
async def do_rerank_v1(request: RerankRequest, raw_request: Request):
|
|
logger.warning_once(
|
|
"To indicate that the rerank API is not part of the standard OpenAI"
|
|
" API, we have located it at `/rerank`. Please update your client "
|
|
"accordingly. (Note: Conforms to JinaAI rerank API)")
|
|
|
|
return await do_rerank(request, raw_request)
|
|
|
|
|
|
@router.post("/v2/rerank", dependencies=[Depends(validate_json_request)])
|
|
@with_cancellation
|
|
async def do_rerank_v2(request: RerankRequest, raw_request: Request):
|
|
return await do_rerank(request, raw_request)
|
|
|
|
|
|
TASK_HANDLERS: dict[str, dict[str, tuple]] = {
|
|
"generate": {
|
|
"messages": (ChatCompletionRequest, create_chat_completion),
|
|
"default": (CompletionRequest, create_completion),
|
|
},
|
|
"embed": {
|
|
"messages": (EmbeddingChatRequest, create_embedding),
|
|
"default": (EmbeddingCompletionRequest, create_embedding),
|
|
},
|
|
"score": {
|
|
"default": (RerankRequest, do_rerank)
|
|
},
|
|
"rerank": {
|
|
"default": (RerankRequest, do_rerank)
|
|
},
|
|
"reward": {
|
|
"messages": (PoolingChatRequest, create_pooling),
|
|
"default": (PoolingCompletionRequest, create_pooling),
|
|
},
|
|
"classify": {
|
|
"messages": (PoolingChatRequest, create_pooling),
|
|
"default": (PoolingCompletionRequest, create_pooling),
|
|
},
|
|
}
|
|
|
|
if envs.VLLM_SERVER_DEV_MODE:
|
|
|
|
@router.get("/server_info")
|
|
async def show_server_info(raw_request: Request):
|
|
server_info = {"vllm_config": str(raw_request.app.state.vllm_config)}
|
|
return JSONResponse(content=server_info)
|
|
|
|
@router.post("/reset_prefix_cache")
|
|
async def reset_prefix_cache(raw_request: Request):
|
|
"""
|
|
Reset the prefix cache. Note that we currently do not check if the
|
|
prefix cache is successfully reset in the API server.
|
|
"""
|
|
device = None
|
|
device_str = raw_request.query_params.get("device")
|
|
if device_str is not None:
|
|
device = Device[device_str.upper()]
|
|
logger.info("Resetting prefix cache with specific %s...", str(device))
|
|
await engine_client(raw_request).reset_prefix_cache(device)
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/sleep")
|
|
async def sleep(raw_request: Request):
|
|
# get POST params
|
|
level = raw_request.query_params.get("level", "1")
|
|
await engine_client(raw_request).sleep(int(level))
|
|
# FIXME: in v0 with frontend multiprocessing, the sleep command
|
|
# is sent but does not finish yet when we return a response.
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/wake_up")
|
|
async def wake_up(raw_request: Request):
|
|
tags = raw_request.query_params.getlist("tags")
|
|
if tags == []:
|
|
# set to None to wake up all tags if no tags are provided
|
|
tags = None
|
|
logger.info("wake up the engine with tags: %s", tags)
|
|
await engine_client(raw_request).wake_up(tags)
|
|
# FIXME: in v0 with frontend multiprocessing, the wake-up command
|
|
# is sent but does not finish yet when we return a response.
|
|
return Response(status_code=200)
|
|
|
|
@router.get("/is_sleeping")
|
|
async def is_sleeping(raw_request: Request):
|
|
logger.info("check whether the engine is sleeping")
|
|
is_sleeping = await engine_client(raw_request).is_sleeping()
|
|
return JSONResponse(content={"is_sleeping": is_sleeping})
|
|
|
|
|
|
@router.post("/invocations", dependencies=[Depends(validate_json_request)])
|
|
async def invocations(raw_request: Request):
|
|
"""
|
|
For SageMaker, routes requests to other handlers based on model `task`.
|
|
"""
|
|
body = await raw_request.json()
|
|
task = raw_request.app.state.task
|
|
|
|
if task not in TASK_HANDLERS:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Unsupported task: '{task}' for '/invocations'. "
|
|
f"Expected one of {set(TASK_HANDLERS.keys())}")
|
|
|
|
handler_config = TASK_HANDLERS[task]
|
|
if "messages" in body:
|
|
request_model, handler = handler_config["messages"]
|
|
else:
|
|
request_model, handler = handler_config["default"]
|
|
|
|
# this is required since we lose the FastAPI automatic casting
|
|
request = request_model.model_validate(body)
|
|
return await handler(request, raw_request)
|
|
|
|
|
|
if envs.VLLM_TORCH_PROFILER_DIR:
|
|
logger.warning(
|
|
"Torch Profiler is enabled in the API server. This should ONLY be "
|
|
"used for local development!")
|
|
|
|
@router.post("/start_profile")
|
|
async def start_profile(raw_request: Request):
|
|
logger.info("Starting profiler...")
|
|
await engine_client(raw_request).start_profile()
|
|
logger.info("Profiler started.")
|
|
return Response(status_code=200)
|
|
|
|
@router.post("/stop_profile")
|
|
async def stop_profile(raw_request: Request):
|
|
logger.info("Stopping profiler...")
|
|
await engine_client(raw_request).stop_profile()
|
|
logger.info("Profiler stopped.")
|
|
return Response(status_code=200)
|
|
|
|
|
|
if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING:
|
|
logger.warning(
|
|
"LoRA dynamic loading & unloading is enabled in the API server. "
|
|
"This should ONLY be used for local development!")
|
|
|
|
@router.post("/v1/load_lora_adapter",
|
|
dependencies=[Depends(validate_json_request)])
|
|
async def load_lora_adapter(request: LoadLoRAAdapterRequest,
|
|
raw_request: Request):
|
|
handler = models(raw_request)
|
|
response = await handler.load_lora_adapter(request)
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(content=response.model_dump(),
|
|
status_code=response.code)
|
|
|
|
return Response(status_code=200, content=response)
|
|
|
|
@router.post("/v1/unload_lora_adapter",
|
|
dependencies=[Depends(validate_json_request)])
|
|
async def unload_lora_adapter(request: UnloadLoRAAdapterRequest,
|
|
raw_request: Request):
|
|
handler = models(raw_request)
|
|
response = await handler.unload_lora_adapter(request)
|
|
if isinstance(response, ErrorResponse):
|
|
return JSONResponse(content=response.model_dump(),
|
|
status_code=response.code)
|
|
|
|
return Response(status_code=200, content=response)
|
|
|
|
|
|
def build_app(args: Namespace) -> FastAPI:
|
|
if args.disable_fastapi_docs:
|
|
app = FastAPI(openapi_url=None,
|
|
docs_url=None,
|
|
redoc_url=None,
|
|
lifespan=lifespan)
|
|
else:
|
|
app = FastAPI(lifespan=lifespan)
|
|
app.include_router(router)
|
|
app.root_path = args.root_path
|
|
|
|
mount_metrics(app)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=args.allowed_origins,
|
|
allow_credentials=args.allow_credentials,
|
|
allow_methods=args.allowed_methods,
|
|
allow_headers=args.allowed_headers,
|
|
)
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def validation_exception_handler(_, exc):
|
|
err = ErrorResponse(message=str(exc),
|
|
type="BadRequestError",
|
|
code=HTTPStatus.BAD_REQUEST)
|
|
return JSONResponse(err.model_dump(),
|
|
status_code=HTTPStatus.BAD_REQUEST)
|
|
|
|
# Ensure --api-key option from CLI takes precedence over VLLM_API_KEY
|
|
if token := args.api_key or envs.VLLM_API_KEY:
|
|
|
|
@app.middleware("http")
|
|
async def authentication(request: Request, call_next):
|
|
if request.method == "OPTIONS":
|
|
return await call_next(request)
|
|
url_path = request.url.path
|
|
if app.root_path and url_path.startswith(app.root_path):
|
|
url_path = url_path[len(app.root_path):]
|
|
if not url_path.startswith("/v1"):
|
|
return await call_next(request)
|
|
if request.headers.get("Authorization") != "Bearer " + token:
|
|
return JSONResponse(content={"error": "Unauthorized"},
|
|
status_code=401)
|
|
return await call_next(request)
|
|
|
|
if args.enable_request_id_headers:
|
|
logger.warning(
|
|
"CAUTION: Enabling X-Request-Id headers in the API Server. "
|
|
"This can harm performance at high QPS.")
|
|
|
|
@app.middleware("http")
|
|
async def add_request_id(request: Request, call_next):
|
|
request_id = request.headers.get(
|
|
"X-Request-Id") or uuid.uuid4().hex
|
|
response = await call_next(request)
|
|
response.headers["X-Request-Id"] = request_id
|
|
return response
|
|
|
|
if envs.VLLM_DEBUG_LOG_API_SERVER_RESPONSE:
|
|
logger.warning("CAUTION: Enabling log response in the API Server. "
|
|
"This can include sensitive information and should be "
|
|
"avoided in production.")
|
|
|
|
@app.middleware("http")
|
|
async def log_response(request: Request, call_next):
|
|
response = await call_next(request)
|
|
response_body = [
|
|
section async for section in response.body_iterator
|
|
]
|
|
response.body_iterator = iterate_in_threadpool(iter(response_body))
|
|
logger.info("response_body={%s}",
|
|
response_body[0].decode() if response_body else None)
|
|
return response
|
|
|
|
for middleware in args.middleware:
|
|
module_path, object_name = middleware.rsplit(".", 1)
|
|
imported = getattr(importlib.import_module(module_path), object_name)
|
|
if inspect.isclass(imported):
|
|
app.add_middleware(imported) # type: ignore[arg-type]
|
|
elif inspect.iscoroutinefunction(imported):
|
|
app.middleware("http")(imported)
|
|
else:
|
|
raise ValueError(f"Invalid middleware {middleware}. "
|
|
f"Must be a function or a class.")
|
|
|
|
return app
|
|
|
|
|
|
async def init_app_state(
|
|
engine_client: EngineClient,
|
|
vllm_config: VllmConfig,
|
|
state: State,
|
|
args: Namespace,
|
|
) -> None:
|
|
if args.served_model_name is not None:
|
|
served_model_names = args.served_model_name
|
|
else:
|
|
served_model_names = [args.model]
|
|
|
|
if args.disable_log_requests:
|
|
request_logger = None
|
|
else:
|
|
request_logger = RequestLogger(max_log_len=args.max_log_len)
|
|
|
|
base_model_paths = [
|
|
BaseModelPath(name=name, model_path=args.model)
|
|
for name in served_model_names
|
|
]
|
|
|
|
state.engine_client = engine_client
|
|
state.log_stats = not args.disable_log_stats
|
|
state.vllm_config = vllm_config
|
|
model_config = vllm_config.model_config
|
|
|
|
resolved_chat_template = load_chat_template(args.chat_template)
|
|
if resolved_chat_template is not None:
|
|
# Get the tokenizer to check official template
|
|
tokenizer = await engine_client.get_tokenizer()
|
|
|
|
if isinstance(tokenizer, MistralTokenizer):
|
|
# The warning is logged in resolve_mistral_chat_template.
|
|
resolved_chat_template = resolve_mistral_chat_template(
|
|
chat_template=resolved_chat_template)
|
|
else:
|
|
hf_chat_template = resolve_hf_chat_template(
|
|
tokenizer,
|
|
chat_template=None,
|
|
tools=None,
|
|
trust_remote_code=model_config.trust_remote_code)
|
|
|
|
if hf_chat_template != resolved_chat_template:
|
|
logger.warning(
|
|
"Using supplied chat template: %s\n"
|
|
"It is different from official chat template '%s'. "
|
|
"This discrepancy may lead to performance degradation.",
|
|
resolved_chat_template, args.model)
|
|
|
|
state.openai_serving_models = OpenAIServingModels(
|
|
engine_client=engine_client,
|
|
model_config=model_config,
|
|
base_model_paths=base_model_paths,
|
|
lora_modules=args.lora_modules,
|
|
prompt_adapters=args.prompt_adapters,
|
|
)
|
|
await state.openai_serving_models.init_static_loras()
|
|
state.openai_serving_chat = OpenAIServingChat(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
args.response_role,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
|
|
enable_auto_tools=args.enable_auto_tool_choice,
|
|
tool_parser=args.tool_call_parser,
|
|
enable_reasoning=args.enable_reasoning,
|
|
reasoning_parser=args.reasoning_parser,
|
|
enable_prompt_tokens_details=args.enable_prompt_tokens_details,
|
|
) if model_config.runner_type == "generate" else None
|
|
state.openai_serving_completion = OpenAIServingCompletion(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
|
|
) if model_config.runner_type == "generate" else None
|
|
state.openai_serving_pooling = OpenAIServingPooling(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
) if model_config.runner_type == "pooling" else None
|
|
state.openai_serving_embedding = OpenAIServingEmbedding(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
) if model_config.task == "embed" else None
|
|
state.openai_serving_scores = ServingScores(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger) if model_config.task in (
|
|
"score", "embed", "pooling") else None
|
|
state.jinaai_serving_reranking = ServingScores(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger
|
|
) if model_config.task == "score" else None
|
|
state.openai_serving_tokenization = OpenAIServingTokenization(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
chat_template=resolved_chat_template,
|
|
chat_template_content_format=args.chat_template_content_format,
|
|
)
|
|
state.openai_serving_transcription = OpenAIServingTranscription(
|
|
engine_client,
|
|
model_config,
|
|
state.openai_serving_models,
|
|
request_logger=request_logger,
|
|
) if model_config.runner_type == "transcription" else None
|
|
state.task = model_config.task
|
|
|
|
state.enable_server_load_tracking = args.enable_server_load_tracking
|
|
state.server_load_metrics = 0
|
|
|
|
|
|
def create_server_socket(addr: tuple[str, int]) -> socket.socket:
|
|
family = socket.AF_INET
|
|
if is_valid_ipv6_address(addr[0]):
|
|
family = socket.AF_INET6
|
|
|
|
sock = socket.socket(family=family, type=socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.bind(addr)
|
|
|
|
return sock
|
|
|
|
|
|
async def run_server(args, **uvicorn_kwargs) -> None:
|
|
logger.info("vLLM API server version %s", VLLM_VERSION)
|
|
logger.info("args: %s", args)
|
|
|
|
if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
|
|
ToolParserManager.import_tool_parser(args.tool_parser_plugin)
|
|
|
|
valid_tool_parses = ToolParserManager.tool_parsers.keys()
|
|
if args.enable_auto_tool_choice \
|
|
and args.tool_call_parser not in valid_tool_parses:
|
|
raise KeyError(f"invalid tool call parser: {args.tool_call_parser} "
|
|
f"(chose from {{ {','.join(valid_tool_parses)} }})")
|
|
|
|
valid_reasoning_parses = ReasoningParserManager.reasoning_parsers.keys()
|
|
if args.enable_reasoning \
|
|
and args.reasoning_parser not in valid_reasoning_parses:
|
|
raise KeyError(
|
|
f"invalid reasoning parser: {args.reasoning_parser} "
|
|
f"(chose from {{ {','.join(valid_reasoning_parses)} }})")
|
|
|
|
# workaround to make sure that we bind the port before the engine is set up.
|
|
# This avoids race conditions with ray.
|
|
# see https://github.com/vllm-project/vllm/issues/8204
|
|
sock_addr = (args.host or "", args.port)
|
|
sock = create_server_socket(sock_addr)
|
|
|
|
# workaround to avoid footguns where uvicorn drops requests with too
|
|
# many concurrent requests active
|
|
set_ulimit()
|
|
|
|
def signal_handler(*_) -> None:
|
|
# Interrupt server on sigterm while initializing
|
|
raise KeyboardInterrupt("terminated")
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
async with build_async_engine_client(args) as engine_client:
|
|
app = build_app(args)
|
|
|
|
vllm_config = await engine_client.get_vllm_config()
|
|
await init_app_state(engine_client, vllm_config, app.state, args)
|
|
|
|
def _listen_addr(a: str) -> str:
|
|
if is_valid_ipv6_address(a):
|
|
return '[' + a + ']'
|
|
return a or "0.0.0.0"
|
|
|
|
is_ssl = args.ssl_keyfile and args.ssl_certfile
|
|
logger.info("Starting vLLM API server on http%s://%s:%d",
|
|
"s" if is_ssl else "", _listen_addr(sock_addr[0]),
|
|
sock_addr[1])
|
|
|
|
shutdown_task = await serve_http(
|
|
app,
|
|
sock=sock,
|
|
enable_ssl_refresh=args.enable_ssl_refresh,
|
|
host=args.host,
|
|
port=args.port,
|
|
log_level=args.uvicorn_log_level,
|
|
# NOTE: When the 'disable_uvicorn_access_log' value is True,
|
|
# no access log will be output.
|
|
access_log=not args.disable_uvicorn_access_log,
|
|
timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
|
|
ssl_keyfile=args.ssl_keyfile,
|
|
ssl_certfile=args.ssl_certfile,
|
|
ssl_ca_certs=args.ssl_ca_certs,
|
|
ssl_cert_reqs=args.ssl_cert_reqs,
|
|
**uvicorn_kwargs,
|
|
)
|
|
|
|
# NB: Await server shutdown only after the backend context is exited
|
|
try:
|
|
await shutdown_task
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# NOTE(simon):
|
|
# This section should be in sync with vllm/entrypoints/cli/main.py for CLI
|
|
# entrypoints.
|
|
cli_env_setup()
|
|
parser = FlexibleArgumentParser(
|
|
description="vLLM OpenAI-Compatible RESTful API server.")
|
|
parser = make_arg_parser(parser)
|
|
args = parser.parse_args()
|
|
validate_parsed_serve_args(args)
|
|
|
|
uvloop.run(run_server(args))
|