# SPDX-License-Identifier: Apache-2.0 # usage: VLLM_USE_V1=1 python examples/offline_inference/data_parallel.py # we need to have a launcher to create multiple data parallel # ranks. And each rank will create a vLLM instance to process its own prompts. import os from vllm import LLM, SamplingParams from vllm.utils import get_open_port def main(dp_size, dp_rank, dp_master_ip, dp_master_port, GPUs_per_dp_rank): os.environ["VLLM_DP_RANK"] = str(dp_rank) os.environ["VLLM_DP_SIZE"] = str(dp_size) os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port) # set devices for each dp_rank os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( str(i) for i in range(dp_rank * GPUs_per_dp_rank, (dp_rank + 1) * GPUs_per_dp_rank)) # Sample prompts. prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] # with DP, each rank should process different prompts. # usually all the DP ranks process a full dataset, # and each rank processes a different part of the dataset. promts_per_rank = len(prompts) // dp_size start = dp_rank * promts_per_rank end = start + promts_per_rank prompts = prompts[start:end] if len(prompts) == 0: # if any rank has no prompts to process, # we need to set a placeholder prompt prompts = ["Placeholder"] print(f"DP rank {dp_rank} needs to process {len(prompts)} prompts") # Create a sampling params object. # since we are doing data parallel, every rank can have different # sampling params. here we set different max_tokens for different # ranks for demonstration. sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=16 * (dp_rank + 1)) # Create an LLM. llm = LLM(model="facebook/opt-125m", tensor_parallel_size=2, enforce_eager=True) outputs = llm.generate(prompts, sampling_params) # Print the outputs. for output in outputs: prompt = output.prompt generated_text = output.outputs[0].text print(f"DP rank {dp_rank}, Prompt: {prompt!r}, " f"Generated text: {generated_text!r}") if __name__ == "__main__": from multiprocessing import Process dp_size = 2 GPUs_per_dp_rank = 2 dp_master_ip = "127.0.0.1" dp_master_port = get_open_port() procs = [] for i in range(dp_size): proc = Process(target=main, args=(dp_size, i, dp_master_ip, dp_master_port, GPUs_per_dp_rank)) proc.start() procs.append(proc) for proc in procs: proc.join()