[Misc] Update usage with mooncake lib for kv transfer (#16523)

Signed-off-by: Shangming Cai <caishangming@linux.alibaba.com>
This commit is contained in:
shangmingc 2025-04-14 19:31:37 +08:00 committed by GitHub
parent 7cbfc10943
commit 1dd23386ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 11 deletions

View File

@ -70,7 +70,7 @@ class MooncakeStore(KVStoreBufferBase):
):
try:
from mooncake_vllm_adaptor import MooncakeDistributedStore
from mooncake.store import MooncakeDistributedStore
except ImportError as e:
raise ImportError(
"Please install mooncake by following the instructions at "

View File

@ -57,14 +57,14 @@ class MooncakeTransferEngine:
def __init__(self, kv_rank: int, local_rank: int):
try:
import mooncake_vllm_adaptor as mva
from mooncake.engine import TransferEngine
except ImportError as e:
raise ImportError(
"Please install mooncake by following the instructions at "
"https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/build.md " # noqa: E501
"to run vLLM with MooncakeConnector.") from e
self.engine = mva.mooncake_vllm_adaptor()
self.engine = TransferEngine()
self.local_rank = local_rank
try:
@ -140,12 +140,12 @@ class MooncakeTransferEngine:
"Mooncake Configuration error. `metadata_backend`"
f" should be one of {supported_backend}.")
self.engine.initializeExt(local_hostname, metadata_server,
self.engine.initialize_ext(local_hostname, metadata_server,
protocol, device_name, metadata_backend)
def allocate_managed_buffer(self, length: int) -> int:
"""Allocate a managed buffer of the specified length."""
ret = self.engine.allocateManagedBuffer(length)
ret = self.engine.allocate_managed_buffer(length)
if ret <= 0:
logger.error("Allocation Return Error")
raise Exception("Allocation Return Error")
@ -153,12 +153,12 @@ class MooncakeTransferEngine:
def free_managed_buffer(self, buffer: int, length: int) -> int:
"""Free a previously allocated managed buffer."""
return self.engine.freeManagedBuffer(buffer, length)
return self.engine.free_managed_buffer(buffer, length)
def transfer_sync(self, buffer: int, peer_buffer_address: int,
length: int) -> int:
"""Synchronously transfer data to the specified address."""
ret = self.engine.transferSync(self.remote_url, buffer,
ret = self.engine.transfer_sync_read(self.remote_url, buffer,
peer_buffer_address, length)
if ret < 0:
logger.error("Transfer Return Error")
@ -168,11 +168,11 @@ class MooncakeTransferEngine:
def write_bytes_to_buffer(self, buffer: int, user_data: bytes,
length: int) -> int:
"""Write bytes to the allocated buffer."""
return self.engine.writeBytesToBuffer(buffer, user_data, length)
return self.engine.write_bytes_to_buffer(buffer, user_data, length)
def read_bytes_from_buffer(self, buffer: int, length: int) -> bytes:
"""Read bytes from the allocated buffer."""
return self.engine.readBytesFromBuffer(buffer, length)
return self.engine.read_bytes_from_buffer(buffer, length)
def wait_for_ack(self, src_ptr: int, length: int) -> None:
"""Asynchronously wait for ACK from the receiver."""