From c5711ef98519de25d1f51121f7848a13f2891fc1 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Fri, 17 May 2024 10:52:11 -0700 Subject: [PATCH] [Doc] Update Ray Data distributed offline inference example (#4871) --- examples/offline_inference_distributed.py | 48 ++++++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/examples/offline_inference_distributed.py b/examples/offline_inference_distributed.py index e4f085fa..1e59e895 100644 --- a/examples/offline_inference_distributed.py +++ b/examples/offline_inference_distributed.py @@ -9,19 +9,31 @@ from typing import Dict import numpy as np import ray +from packaging.version import Version +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from vllm import LLM, SamplingParams +assert Version(ray.__version__) >= Version( + "2.22.0"), "Ray version must be at least 2.22.0" + # Create a sampling params object. sampling_params = SamplingParams(temperature=0.8, top_p=0.95) +# Set tensor parallelism per instance. +tensor_parallel_size = 1 + +# Set number of instances. Each instance will use tensor_parallel_size GPUs. +num_instances = 1 + # Create a class to do batch inference. class LLMPredictor: def __init__(self): # Create an LLM. - self.llm = LLM(model="meta-llama/Llama-2-7b-chat-hf") + self.llm = LLM(model="meta-llama/Llama-2-7b-chat-hf", + tensor_parallel_size=tensor_parallel_size) def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]: # Generate texts from the prompts. @@ -43,17 +55,41 @@ class LLMPredictor: # from cloud storage (such as JSONL, Parquet, CSV, binary format). ds = ray.data.read_text("s3://anonymous@air-example-data/prompts.txt") + +# For tensor_parallel_size > 1, we need to create placement groups for vLLM +# to use. Every actor has to have its own placement group. +def scheduling_strategy_fn(): + # One bundle per tensor parallel worker + pg = ray.util.placement_group( + [{ + "GPU": 1, + "CPU": 1 + }] * tensor_parallel_size, + strategy="STRICT_PACK", + ) + return dict(scheduling_strategy=PlacementGroupSchedulingStrategy( + pg, placement_group_capture_child_tasks=True)) + + +resources_kwarg = {} +if tensor_parallel_size == 1: + # For tensor_parallel_size == 1, we simply set num_gpus=1. + resources_kwarg["num_gpus"] = 1 +else: + # Otherwise, we have to set num_gpus=0 and provide + # a function that will create a placement group for + # each instance. + resources_kwarg["num_gpus"] = 0 + resources_kwarg["ray_remote_args_fn"] = scheduling_strategy_fn + # Apply batch inference for all input data. ds = ds.map_batches( LLMPredictor, # Set the concurrency to the number of LLM instances. - concurrency=10, - # Specify the number of GPUs required per LLM instance. - # NOTE: Do NOT set `num_gpus` when using vLLM with tensor-parallelism - # (i.e., `tensor_parallel_size`). - num_gpus=1, + concurrency=num_instances, # Specify the batch size for inference. batch_size=32, + **resources_kwarg, ) # Peek first 10 results.