[Core] Scheduler perf fix (#4270)

This commit is contained in:
SangBin Cho 2024-04-23 06:11:06 +09:00 committed by GitHub
parent 3d925165f2
commit ad8d696a99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 14 deletions

View File

@ -540,7 +540,7 @@ def test_decode_schedule_preempted():
curr_loras = None curr_loras = None
for i in range(3): for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60) _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
running.append(seq_group) running.append(seq_group)
scheduler.block_manager.can_append_slots = MagicMock() scheduler.block_manager.can_append_slots = MagicMock()
@ -581,7 +581,7 @@ def test_decode_swap_beam_search():
budget = create_token_budget() budget = create_token_budget()
for i in range(3): for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2) _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
running.append(seq_group) running.append(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
budget.add_num_seqs(seq_group.request_id, budget.add_num_seqs(seq_group.request_id,
@ -629,7 +629,7 @@ def test_schedule_decode_blocks_to_copy_update():
running = deque() running = deque()
policy = PolicyFactory.get_policy(policy_name="fcfs") policy = PolicyFactory.get_policy(policy_name="fcfs")
curr_loras = None curr_loras = None
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
running.append(seq_group) running.append(seq_group)
@ -659,7 +659,7 @@ def test_schedule_swapped_simple():
curr_loras = None curr_loras = None
blocks_to_swap_out = {} blocks_to_swap_out = {}
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2) _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
swapped.append(seq_group) swapped.append(seq_group)
@ -687,7 +687,7 @@ def test_schedule_swapped_max_token_budget():
blocks_to_swap_out = {} blocks_to_swap_out = {}
for _ in range(2): for _ in range(2):
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2) _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
swapped.append(seq_group) swapped.append(seq_group)
@ -721,7 +721,7 @@ def test_schedule_swapped_max_seqs():
blocks_to_swap_out = {} blocks_to_swap_out = {}
for i in range(4): for i in range(4):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60) _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
swapped.append(seq_group) swapped.append(seq_group)
@ -759,7 +759,7 @@ def test_schedule_swapped_max_loras():
lora_name=str(i), lora_name=str(i),
lora_int_id=i + 1, lora_int_id=i + 1,
lora_local_path="abc")) lora_local_path="abc"))
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
swapped.append(seq_group) swapped.append(seq_group)
@ -783,7 +783,7 @@ def test_schedule_swapped_cannot_swap_in():
blocks_to_swap_out = {} blocks_to_swap_out = {}
for _ in range(2): for _ in range(2):
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2) _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
swapped.append(seq_group) swapped.append(seq_group)
@ -808,7 +808,7 @@ def test_schedule_swapped_blocks_to_copy():
policy = PolicyFactory.get_policy(policy_name="fcfs") policy = PolicyFactory.get_policy(policy_name="fcfs")
curr_loras = None curr_loras = None
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2) _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group, 60) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
blocks_to_swap_out = {} blocks_to_swap_out = {}
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)

View File

@ -297,7 +297,6 @@ class Scheduler:
def add_seq_group(self, seq_group: SequenceGroup) -> None: def add_seq_group(self, seq_group: SequenceGroup) -> None:
# Add sequence groups to the waiting queue. # Add sequence groups to the waiting queue.
logger.debug(f"add_seq_group {seq_group.request_id}")
self.waiting.append(seq_group) self.waiting.append(seq_group)
def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None: def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
@ -427,7 +426,6 @@ class Scheduler:
swapped_out.append(seq_group) swapped_out.append(seq_group)
break break
else: else:
logger.debug(f"append slot for {seq_group}")
self._append_slots(seq_group, blocks_to_copy) self._append_slots(seq_group, blocks_to_copy)
is_prefill = seq_group.is_prefill() is_prefill = seq_group.is_prefill()
if is_prefill: if is_prefill:
@ -659,7 +657,7 @@ class Scheduler:
if curr_loras is not None and lora_int_id > 0: if curr_loras is not None and lora_int_id > 0:
curr_loras.add(lora_int_id) curr_loras.add(lora_int_id)
waiting_queue.popleft() waiting_queue.popleft()
self._allocate_and_set_running(seq_group, num_new_tokens) self._allocate_and_set_running(seq_group)
seq_groups.append( seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group, ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens)) token_chunk_size=num_new_tokens))
@ -952,8 +950,7 @@ class Scheduler:
self.running = deque(seq_group for seq_group in self.running self.running = deque(seq_group for seq_group in self.running
if not seq_group.is_finished()) if not seq_group.is_finished())
def _allocate_and_set_running(self, seq_group: SequenceGroup, def _allocate_and_set_running(self, seq_group: SequenceGroup) -> None:
num_new_tokens: int) -> None:
self.block_manager.allocate(seq_group) self.block_manager.allocate(seq_group)
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING): for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
seq.status = SequenceStatus.RUNNING seq.status = SequenceStatus.RUNNING