2025-02-02 14:58:18 -05:00
|
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
2025-01-17 20:47:01 +08:00
|
|
|
import pytest
|
|
|
|
|
|
|
|
from vllm import LLM
|
|
|
|
|
2025-03-17 19:33:35 +08:00
|
|
|
from ...utils import create_new_process_for_each_test
|
2025-01-17 20:47:01 +08:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("tp_size", [1, 2])
|
|
|
|
@pytest.mark.parametrize("backend", ["mp", "ray"])
|
2025-03-17 19:33:35 +08:00
|
|
|
@create_new_process_for_each_test()
|
2025-01-17 20:47:01 +08:00
|
|
|
def test_collective_rpc(tp_size, backend):
|
|
|
|
if tp_size == 1 and backend == "ray":
|
|
|
|
pytest.skip("Skip duplicate test case")
|
|
|
|
if tp_size == 1:
|
|
|
|
backend = None
|
|
|
|
|
|
|
|
# intentionally define the method and class in the test function,
|
|
|
|
# to test if they can be serialized and sent to the workers
|
|
|
|
def echo_rank(self):
|
|
|
|
return self.rank
|
|
|
|
|
|
|
|
from vllm.worker.worker import Worker
|
|
|
|
|
|
|
|
class MyWorker(Worker):
|
|
|
|
|
|
|
|
def echo_rank(self):
|
|
|
|
return self.rank
|
|
|
|
|
2025-02-22 19:19:45 -08:00
|
|
|
llm = LLM(model="meta-llama/Llama-3.2-1B-Instruct",
|
2025-01-17 20:47:01 +08:00
|
|
|
enforce_eager=True,
|
|
|
|
load_format="dummy",
|
|
|
|
tensor_parallel_size=tp_size,
|
|
|
|
distributed_executor_backend=backend,
|
|
|
|
worker_cls=MyWorker)
|
|
|
|
for method in ["echo_rank", echo_rank]:
|
|
|
|
assert llm.collective_rpc(method) == list(range(tp_size))
|