[V1][Metrics] Handle preemptions (#13169)
This commit is contained in:
parent
378b3ef6f8
commit
cd711c48b2
@ -227,6 +227,7 @@ EXPECTED_METRICS_V1 = [
|
|||||||
"vllm:gpu_cache_usage_perc",
|
"vllm:gpu_cache_usage_perc",
|
||||||
"vllm:gpu_prefix_cache_queries",
|
"vllm:gpu_prefix_cache_queries",
|
||||||
"vllm:gpu_prefix_cache_hits",
|
"vllm:gpu_prefix_cache_hits",
|
||||||
|
"vllm:num_preemptions_total",
|
||||||
"vllm:prompt_tokens_total",
|
"vllm:prompt_tokens_total",
|
||||||
"vllm:generation_tokens_total",
|
"vllm:generation_tokens_total",
|
||||||
"vllm:iteration_tokens_total",
|
"vllm:iteration_tokens_total",
|
||||||
|
@ -164,6 +164,7 @@ class Scheduler:
|
|||||||
self.kv_cache_manager.free(preempted_req)
|
self.kv_cache_manager.free(preempted_req)
|
||||||
preempted_req.status = RequestStatus.PREEMPTED
|
preempted_req.status = RequestStatus.PREEMPTED
|
||||||
preempted_req.num_computed_tokens = 0
|
preempted_req.num_computed_tokens = 0
|
||||||
|
self.request_preempted(preempted_req, scheduled_timestamp)
|
||||||
|
|
||||||
self.waiting.appendleft(preempted_req)
|
self.waiting.appendleft(preempted_req)
|
||||||
preempted_reqs.append(preempted_req)
|
preempted_reqs.append(preempted_req)
|
||||||
@ -281,9 +282,9 @@ class Scheduler:
|
|||||||
self.waiting.popleft()
|
self.waiting.popleft()
|
||||||
self.running.append(request)
|
self.running.append(request)
|
||||||
self.scheduled_req_ids.add(request.request_id)
|
self.scheduled_req_ids.add(request.request_id)
|
||||||
|
self.request_scheduled(request, scheduled_timestamp)
|
||||||
if request.status == RequestStatus.WAITING:
|
if request.status == RequestStatus.WAITING:
|
||||||
scheduled_new_reqs.append(request)
|
scheduled_new_reqs.append(request)
|
||||||
self.request_scheduled(request, scheduled_timestamp)
|
|
||||||
elif request.status == RequestStatus.PREEMPTED:
|
elif request.status == RequestStatus.PREEMPTED:
|
||||||
scheduled_resumed_reqs.append(request)
|
scheduled_resumed_reqs.append(request)
|
||||||
else:
|
else:
|
||||||
@ -675,6 +676,13 @@ class Scheduler:
|
|||||||
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
|
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
|
||||||
timestamp))
|
timestamp))
|
||||||
|
|
||||||
|
def request_preempted(self, request: Request, timestamp: float):
|
||||||
|
if not self.log_stats:
|
||||||
|
return
|
||||||
|
request.events.append(
|
||||||
|
EngineCoreEvent.new_event(EngineCoreEventType.PREEMPTED,
|
||||||
|
timestamp))
|
||||||
|
|
||||||
def make_stats(self) -> Optional[SchedulerStats]:
|
def make_stats(self) -> Optional[SchedulerStats]:
|
||||||
if not self.log_stats:
|
if not self.log_stats:
|
||||||
return None
|
return None
|
||||||
|
@ -65,6 +65,7 @@ class EngineCoreEventType(enum.IntEnum):
|
|||||||
"""The type of engine core request event."""
|
"""The type of engine core request event."""
|
||||||
QUEUED = 1
|
QUEUED = 1
|
||||||
SCHEDULED = 2
|
SCHEDULED = 2
|
||||||
|
PREEMPTED = 3
|
||||||
|
|
||||||
|
|
||||||
class EngineCoreEvent(msgspec.Struct):
|
class EngineCoreEvent(msgspec.Struct):
|
||||||
|
@ -132,6 +132,11 @@ class PrometheusStatLogger(StatLoggerBase):
|
|||||||
"GPU prefix cache hits, in terms of number of cached blocks.",
|
"GPU prefix cache hits, in terms of number of cached blocks.",
|
||||||
labelnames=labelnames).labels(*labelvalues)
|
labelnames=labelnames).labels(*labelvalues)
|
||||||
|
|
||||||
|
self.counter_num_preempted_reqs = prometheus_client.Counter(
|
||||||
|
name="vllm:num_preemptions_total",
|
||||||
|
documentation="Cumulative number of preemption from the engine.",
|
||||||
|
labelnames=labelnames).labels(*labelvalues)
|
||||||
|
|
||||||
self.counter_prompt_tokens = prometheus_client.Counter(
|
self.counter_prompt_tokens = prometheus_client.Counter(
|
||||||
name="vllm:prompt_tokens_total",
|
name="vllm:prompt_tokens_total",
|
||||||
documentation="Number of prefill tokens processed.",
|
documentation="Number of prefill tokens processed.",
|
||||||
@ -282,6 +287,7 @@ class PrometheusStatLogger(StatLoggerBase):
|
|||||||
self.counter_gpu_prefix_cache_hits.inc(
|
self.counter_gpu_prefix_cache_hits.inc(
|
||||||
scheduler_stats.prefix_cache_stats.hits)
|
scheduler_stats.prefix_cache_stats.hits)
|
||||||
|
|
||||||
|
self.counter_num_preempted_reqs.inc(iteration_stats.num_preempted_reqs)
|
||||||
self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
|
self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
|
||||||
self.counter_generation_tokens.inc(
|
self.counter_generation_tokens.inc(
|
||||||
iteration_stats.num_generation_tokens)
|
iteration_stats.num_generation_tokens)
|
||||||
@ -289,10 +295,19 @@ class PrometheusStatLogger(StatLoggerBase):
|
|||||||
iteration_stats.num_prompt_tokens + \
|
iteration_stats.num_prompt_tokens + \
|
||||||
iteration_stats.num_generation_tokens)
|
iteration_stats.num_generation_tokens)
|
||||||
|
|
||||||
|
for ttft in iteration_stats.time_to_first_tokens_iter:
|
||||||
|
self.histogram_time_to_first_token.observe(ttft)
|
||||||
|
for tpot in iteration_stats.time_per_output_tokens_iter:
|
||||||
|
self.histogram_time_per_output_token.observe(tpot)
|
||||||
|
|
||||||
for finished_request in iteration_stats.finished_requests:
|
for finished_request in iteration_stats.finished_requests:
|
||||||
self.counter_request_success[finished_request.finish_reason].inc()
|
self.counter_request_success[finished_request.finish_reason].inc()
|
||||||
self.histogram_e2e_time_request.observe(
|
self.histogram_e2e_time_request.observe(
|
||||||
finished_request.e2e_latency)
|
finished_request.e2e_latency)
|
||||||
|
self.histogram_queue_time_request.observe(
|
||||||
|
finished_request.queued_time)
|
||||||
|
self.histogram_prefill_time_request.observe(
|
||||||
|
finished_request.prefill_time)
|
||||||
self.histogram_inference_time_request.observe(
|
self.histogram_inference_time_request.observe(
|
||||||
finished_request.inference_time)
|
finished_request.inference_time)
|
||||||
self.histogram_decode_time_request.observe(
|
self.histogram_decode_time_request.observe(
|
||||||
@ -302,15 +317,6 @@ class PrometheusStatLogger(StatLoggerBase):
|
|||||||
self.histogram_num_generation_tokens_request.observe(
|
self.histogram_num_generation_tokens_request.observe(
|
||||||
finished_request.num_generation_tokens)
|
finished_request.num_generation_tokens)
|
||||||
|
|
||||||
for ttft in iteration_stats.time_to_first_tokens_iter:
|
|
||||||
self.histogram_time_to_first_token.observe(ttft)
|
|
||||||
for tpot in iteration_stats.time_per_output_tokens_iter:
|
|
||||||
self.histogram_time_per_output_token.observe(tpot)
|
|
||||||
for queue_time in iteration_stats.queue_times_iter:
|
|
||||||
self.histogram_queue_time_request.observe(queue_time)
|
|
||||||
for prefill_time in iteration_stats.prefill_times_iter:
|
|
||||||
self.histogram_prefill_time_request.observe(prefill_time)
|
|
||||||
|
|
||||||
if self.gauge_lora_info is not None:
|
if self.gauge_lora_info is not None:
|
||||||
running_lora_adapters = \
|
running_lora_adapters = \
|
||||||
",".join(iteration_stats.running_lora_adapters.keys())
|
",".join(iteration_stats.running_lora_adapters.keys())
|
||||||
|
@ -67,6 +67,8 @@ class FinishedRequestStats:
|
|||||||
e2e_latency: float = 0.0
|
e2e_latency: float = 0.0
|
||||||
num_prompt_tokens: int = 0
|
num_prompt_tokens: int = 0
|
||||||
num_generation_tokens: int = 0
|
num_generation_tokens: int = 0
|
||||||
|
queued_time: float = 0.0
|
||||||
|
prefill_time: float = 0.0
|
||||||
inference_time: float = 0.0
|
inference_time: float = 0.0
|
||||||
decode_time: float = 0.0
|
decode_time: float = 0.0
|
||||||
|
|
||||||
@ -78,11 +80,10 @@ class IterationStats:
|
|||||||
self.iteration_timestamp = time.time()
|
self.iteration_timestamp = time.time()
|
||||||
self.num_generation_tokens = 0
|
self.num_generation_tokens = 0
|
||||||
self.num_prompt_tokens = 0
|
self.num_prompt_tokens = 0
|
||||||
|
self.num_preempted_reqs = 0
|
||||||
self.finished_requests: List[FinishedRequestStats] = []
|
self.finished_requests: List[FinishedRequestStats] = []
|
||||||
self.time_to_first_tokens_iter: List[float] = []
|
self.time_to_first_tokens_iter: List[float] = []
|
||||||
self.time_per_output_tokens_iter: List[float] = []
|
self.time_per_output_tokens_iter: List[float] = []
|
||||||
self.queue_times_iter: List[float] = []
|
|
||||||
self.prefill_times_iter: List[float] = []
|
|
||||||
self.waiting_lora_adapters: Dict[str, int] = {}
|
self.waiting_lora_adapters: Dict[str, int] = {}
|
||||||
self.running_lora_adapters: Dict[str, int] = {}
|
self.running_lora_adapters: Dict[str, int] = {}
|
||||||
|
|
||||||
@ -122,9 +123,6 @@ class IterationStats:
|
|||||||
if is_prefilling:
|
if is_prefilling:
|
||||||
# TODO: re-enable no-output-for-partial-prefills invariant as above
|
# TODO: re-enable no-output-for-partial-prefills invariant as above
|
||||||
if num_new_generation_tokens > 0:
|
if num_new_generation_tokens > 0:
|
||||||
prefill_interval = \
|
|
||||||
engine_core_timestamp - req_stats.scheduled_ts
|
|
||||||
self.prefill_times_iter.append(prefill_interval)
|
|
||||||
req_stats.first_token_ts = engine_core_timestamp
|
req_stats.first_token_ts = engine_core_timestamp
|
||||||
else:
|
else:
|
||||||
tpot = engine_core_timestamp - req_stats.last_token_ts
|
tpot = engine_core_timestamp - req_stats.last_token_ts
|
||||||
@ -145,24 +143,39 @@ class IterationStats:
|
|||||||
if lora_stats is not None:
|
if lora_stats is not None:
|
||||||
lora_stats.waiting_requests.add(req_id)
|
lora_stats.waiting_requests.add(req_id)
|
||||||
elif event.type == EngineCoreEventType.SCHEDULED:
|
elif event.type == EngineCoreEventType.SCHEDULED:
|
||||||
queued_interval = event.timestamp - req_stats.queued_ts
|
if req_stats.scheduled_ts == 0.0: # ignore preemptions
|
||||||
self.queue_times_iter.append(queued_interval)
|
req_stats.scheduled_ts = event.timestamp
|
||||||
req_stats.scheduled_ts = event.timestamp
|
|
||||||
LoRARequestStates.scheduled_request(lora_stats, req_id)
|
LoRARequestStates.scheduled_request(lora_stats, req_id)
|
||||||
|
elif event.type == EngineCoreEventType.PREEMPTED:
|
||||||
|
self.num_preempted_reqs += 1
|
||||||
|
|
||||||
def update_from_finished_request(self, finish_reason: "FinishReason",
|
def update_from_finished_request(self, finish_reason: "FinishReason",
|
||||||
request_output: "RequestOutput",
|
request_output: "RequestOutput",
|
||||||
req_stats: RequestStateStats):
|
req_stats: RequestStateStats):
|
||||||
e2e_latency = self._time_since(req_stats.arrival_time)
|
e2e_latency = self._time_since(req_stats.arrival_time)
|
||||||
|
|
||||||
inference_time = req_stats.last_token_ts - req_stats.scheduled_ts
|
# Queued interval is from first QUEUED event to first SCHEDULED
|
||||||
|
queued_time = req_stats.scheduled_ts - req_stats.queued_ts
|
||||||
|
|
||||||
|
# Prefill interval is from first SCHEDULED to first NEW_TOKEN
|
||||||
|
# Any preemptions during prefill is included in the interval
|
||||||
|
prefill_time = req_stats.first_token_ts - req_stats.scheduled_ts
|
||||||
|
|
||||||
|
# Decode interval is from first NEW_TOKEN to last NEW_TOKEN
|
||||||
|
# Any preemptions during decode are included
|
||||||
decode_time = req_stats.last_token_ts - req_stats.first_token_ts
|
decode_time = req_stats.last_token_ts - req_stats.first_token_ts
|
||||||
|
|
||||||
|
# Inference interval is from first SCHEDULED to last NEW_TOKEN
|
||||||
|
# Any preemptions during prefill or decode are included
|
||||||
|
inference_time = req_stats.last_token_ts - req_stats.scheduled_ts
|
||||||
|
|
||||||
finished_req = \
|
finished_req = \
|
||||||
FinishedRequestStats(finish_reason=finish_reason,
|
FinishedRequestStats(finish_reason=finish_reason,
|
||||||
e2e_latency=e2e_latency,
|
e2e_latency=e2e_latency,
|
||||||
num_prompt_tokens=len(request_output.prompt_token_ids),
|
num_prompt_tokens=len(request_output.prompt_token_ids),
|
||||||
num_generation_tokens=req_stats.num_generation_tokens,
|
num_generation_tokens=req_stats.num_generation_tokens,
|
||||||
|
queued_time=queued_time,
|
||||||
|
prefill_time=prefill_time,
|
||||||
inference_time=inference_time,
|
inference_time=inference_time,
|
||||||
decode_time=decode_time)
|
decode_time=decode_time)
|
||||||
self.finished_requests.append(finished_req)
|
self.finished_requests.append(finished_req)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user