vllm源码学习

Ikko Lv4

kv_cache_manager.py

路径vllm/v1/core/kv_cache_manager.py

1
2
3
4
5
6
7
8
# 这段代码是 vLLM v1 架构中核心的显存管理层实现。
# 它位于调度器(Scheduler)和底层物理显存池(Block Pool)之间,
# 主要负责 PagedAttention 机制下的显存块分配、回收,以及 Prefix Caching(前缀缓存)功能。

# 代码主要由两个类构成:
# 1. KVCacheBlocks:调度器与管理器之间的隔离层
# 2. KVCacheManager:缓存与分配的控制中枢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class KVCacheBlocks:
# 这是一个数据类,本质上是对物理显存块列表的封装。
# 它的作用是隐藏 KVCacheManager 内部的数据结构,
# 只向上层调度器暴露稳定接口。

blocks: tuple[Sequence[KVCacheBlock], ...]
# blocks 是一个元组,支持多个 kv_cache_group。
# 这么设计是为了兼容未来不同组使用不同 block_size 的架构。

def __add__(self, other: "KVCacheBlocks") -> "KVCacheBlocks":
# 把两个 KVCacheBlocks 对象按组拼接起来。
# 这个操作常用于把新算出的块和已有块合并。
return KVCacheBlocks(
tuple(
list(itertools.chain(blk1, blk2))
for blk1, blk2 in zip(self.blocks, other.blocks)
)
)

@overload
def get_block_ids(
self,
allow_none: Literal[False] = False,
) -> tuple[list[int], ...]: ...

@overload
def get_block_ids(
self,
allow_none: Literal[True] = True,
) -> tuple[list[int], ...] | None: ...

def get_block_ids(
self,
allow_none: bool = False,
) -> tuple[list[int], ...] | None:
# 把 KVCacheBlocks 转换成 block_id 列表,方便调度器读取。
# 如果 allow_none=True 且所有组都为空,就直接返回 None。
if allow_none and all(len(group) == 0 for group in self.blocks):
return None
return tuple([blk.block_id for blk in group] for group in self.blocks)

def get_unhashed_block_ids(self) -> list[int]:
# 返回尚未进行哈希,也就是还没有进入前缀缓存管理的块 ID。
# 这里默认只支持单组场景。
assert len(self.blocks) == 1, "Only one group is supported"
return [block.block_id for block in self.blocks[0] if block.block_hash is None]

def get_unhashed_block_ids_all_groups(self) -> list[list[int]]:
# 按组返回未哈希块 ID,并跳过 padding block。
return [
[
block.block_id
for block in group
if block.block_hash is None and not block.is_null
]
for group in self.blocks
]

def new_empty(self) -> "KVCacheBlocks":
# 创建一个与当前组数一致、但不包含任何块的新对象。
return KVCacheBlocks(tuple(() for _ in range(len(self.blocks))))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class KVCacheManager:
def __init__(
self,
kv_cache_config: KVCacheConfig,
max_model_len: int,
hash_block_size: int,
max_num_batched_tokens: int | None = None,
enable_caching: bool = True,
use_eagle: bool = False,
log_stats: bool = False,
enable_kv_cache_events: bool = False,
dcp_world_size: int = 1,
pcp_world_size: int = 1,
metrics_collector: KVCacheMetricsCollector | None = None,
) -> None:
# KVCacheManager 是管理 KV Cache 生命周期的大脑。
# 它处理常规生成请求,也兼容投机解码(Speculative Decoding)、
# 多模态外部缓存接入等复杂场景。

self.max_model_len = max_model_len
# 如果调度器没有传入上限,就回退到 max_model_len。
if max_num_batched_tokens is None:
max_num_batched_tokens = max_model_len

self.enable_caching = enable_caching
self.use_eagle = use_eagle
self.log_stats = log_stats
self.metrics_collector = metrics_collector

# prefix_cache_stats 只在开启日志时创建。
self.prefix_cache_stats = PrefixCacheStats() if log_stats else None

# coordinator 是真正负责底层 block pool 和 cache 组织的协作者。
self.coordinator = get_kv_cache_coordinator(
kv_cache_config=kv_cache_config,
max_model_len=self.max_model_len,
max_num_batched_tokens=max_num_batched_tokens,
use_eagle=self.use_eagle,
enable_caching=self.enable_caching,
enable_kv_cache_events=enable_kv_cache_events,
dcp_world_size=dcp_world_size,
pcp_world_size=pcp_world_size,
hash_block_size=hash_block_size,
metrics_collector=self.metrics_collector,
)
self.num_kv_cache_groups = len(kv_cache_config.kv_cache_groups)
self.block_pool = self.coordinator.block_pool
self.kv_cache_config = kv_cache_config

# 提前构造一个空 KVCacheBlocks,避免频繁创建空对象带来 GC 开销。
self.empty_kv_cache_blocks = KVCacheBlocks(
tuple(() for _ in range(self.num_kv_cache_groups))
)

@property
def usage(self) -> float:
# 返回当前 KV cache 使用率。
return self.block_pool.get_usage()

def make_prefix_cache_stats(self) -> PrefixCacheStats | None:
# 读取并重置 prefix cache 的统计信息。
# 如果没有开启 log_stats,就直接返回 None。
if not self.log_stats:
return None
stats = self.prefix_cache_stats
self.prefix_cache_stats = PrefixCacheStats()
return stats

def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
# 核心机制一:前缀缓存匹配。
# 当启用了 enable_caching 时,这个方法会在处理新请求前被调用。
# 它会根据 request.block_hashes 去底层 coordinator 查找是否已经存在
# 可复用的 KV Cache。

# 典型场景是 System Prompt 复用或多轮对话。
# 如果命中,就直接返回已经缓存好的物理块列表,以及命中的 token 数量。
# 这样可以跳过对应前缀的 Attention 计算,从而显著降低首字延迟(TTFT)。

if not self.enable_caching or request.skip_reading_prefix_cache:
return self.empty_kv_cache_blocks, 0

max_cache_hit_length = request.num_tokens - 1
computed_blocks, num_new_computed_tokens = (
self.coordinator.find_longest_cache_hit(
request.block_hashes, max_cache_hit_length
)
)

if self.log_stats:
assert self.prefix_cache_stats is not None
self.prefix_cache_stats.record(
num_tokens=request.num_tokens,
num_hits=num_new_computed_tokens,
preempted=request.num_preemptions > 0,
)

return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def allocate_slots(
self,
request: Request,
num_new_tokens: int,
num_new_computed_tokens: int = 0,
new_computed_blocks: KVCacheBlocks | None = None,
num_lookahead_tokens: int = 0,
num_external_computed_tokens: int = 0,
delay_cache_blocks: bool = False,
num_encoder_tokens: int = 0,
full_sequence_must_fit: bool = False,
) -> KVCacheBlocks | None:
# 核心机制二:复杂的槽位分配。
# 这是整个类里最核心、也最复杂的函数。
# 它负责为一个请求在当前推理步申请物理显存块。

# 代码中把请求在显存中的逻辑布局划分得很细:
# 已经计算或缓存的部分:
# comp = 当前请求之前已经计算完的 token
# new_comp = 刚刚通过前缀缓存匹配到的 token
# ext_comp = 由外部组件(比如多模态 connector)计算并传入的 token
# 需要新分配空间的部分:
# new = 当前步骤即将输入或生成的新 token
# lookahead = 为投机解码预留的额外槽位,用来存放多个未来草稿 token

# 分配流程可以概括为四步:
# 1. 容量预检:提前判断 full sequence 是否能放下,避免请求中途 OOM。
# 2. 清理废弃块:比如滑动窗口外的旧 token。
# 3. 物理分配:向 block pool 申请真正的物理块。
# 4. 状态固化:把已经确认的 token 写入前缀缓存池,供后续请求复用。

# 当加载外部 KV 数据时,可能没有 new tokens,但仍然需要分配槽位。
if num_new_tokens == 0 and num_external_computed_tokens == 0:
raise ValueError(
"num_new_tokens must be greater than 0 when there are no external computed tokens"
)

# new_computed_blocks 为空时,统一用空对象,避免后面写分支判断。
if new_computed_blocks is not None:
new_computed_block_list = new_computed_blocks.blocks
else:
new_computed_block_list = self.empty_kv_cache_blocks.blocks

# 统计当前已经计算完成的 token 数量。
num_local_computed_tokens = request.num_computed_tokens + num_new_computed_tokens
total_computed_tokens = min(
num_local_computed_tokens + num_external_computed_tokens,
self.max_model_len,
)

# full_sequence_must_fit 表示:必须提前确认整段序列都能放下。
if full_sequence_must_fit:
full_num_tokens = min(request.num_tokens, self.max_model_len)

num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
request_id=request.request_id,
num_tokens=full_num_tokens,
new_computed_blocks=new_computed_block_list,
num_encoder_tokens=num_encoder_tokens,
total_computed_tokens=total_computed_tokens,
num_tokens_main_model=full_num_tokens,
apply_admission_cap=True,
)
if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
return None

# 新请求本轮主模型总 token 数。
num_tokens_main_model = total_computed_tokens + num_new_tokens
# 需要预留的总槽位数,包含 lookahead。
num_tokens_need_slot = min(
num_tokens_main_model + num_lookahead_tokens, self.max_model_len
)

# 先释放 attention 不再需要的旧块,比如滑动窗口外的 token。
self.coordinator.remove_skipped_blocks(request.request_id, total_computed_tokens)

num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
request_id=request.request_id,
num_tokens=num_tokens_need_slot,
new_computed_blocks=new_computed_block_list,
num_encoder_tokens=num_encoder_tokens,
total_computed_tokens=num_local_computed_tokens + num_external_computed_tokens,
num_tokens_main_model=num_tokens_main_model,
)

if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
return None

# 如果有新算出的前缀块或者外部块,先把它们挂到请求上,避免后续分配失败。
if (
new_computed_block_list is not self.empty_kv_cache_blocks.blocks
or num_external_computed_tokens > 0
):
self.coordinator.allocate_new_computed_blocks(
request_id=request.request_id,
new_computed_blocks=new_computed_block_list,
num_local_computed_tokens=num_local_computed_tokens,
num_external_computed_tokens=num_external_computed_tokens,
)

# 真正分配当前步需要的新块。
new_blocks = self.coordinator.allocate_new_blocks(
request.request_id,
num_tokens_need_slot,
num_tokens_main_model,
num_encoder_tokens,
)

# 如果禁用了缓存,或者这是延迟缓存路径,就只返回新块,不写回 prefix cache。
if not self.enable_caching or delay_cache_blocks:
return self.create_kv_cache_blocks(new_blocks)

# 只缓存最终确认的 token,避免把可能被 reject 的 draft token 写进去。
num_tokens_to_cache = min(
total_computed_tokens + num_new_tokens,
request.num_tokens,
)
self.coordinator.cache_blocks(request, num_tokens_to_cache)

return self.create_kv_cache_blocks(new_blocks)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def free(self, request: Request) -> None:
# 核心机制三:资源回收与维护。
# 当请求生成完毕或被中止时,将其持有的所有物理块交还给底层池。
self.coordinator.free(request.request_id)


def remove_skipped_blocks(self, request_id: str, total_computed_tokens: int) -> None:
# 移除 attention 计算中不再需要的块,并用 null block 替换被删除位置。
self.coordinator.remove_skipped_blocks(request_id, total_computed_tokens)


def evict_blocks(self, block_ids: set[int]) -> None:
# 从前缀缓存中驱逐指定块,通常用于缓存池满了之后执行 LRU 式淘汰。
self.block_pool.evict_blocks(block_ids)


def reset_prefix_cache(self) -> bool:
# 重置 prefix cache。训练权重更新或基准测试时会用到。
if not self.block_pool.reset_prefix_cache():
return False
if self.log_stats:
assert self.prefix_cache_stats is not None
self.prefix_cache_stats.reset = True
return True


def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
# 统计当前正在运行的所有请求共享了多少个前缀块。
# 这个指标对于调度策略优化和缓存效率评估非常关键。
return self.coordinator.get_num_common_prefix_blocks(running_request_id)


def take_events(self) -> list[KVCacheEvent]:
# 取出 block pool 中积累的 KV cache 事件。
return self.block_pool.take_events()


def get_blocks(self, request_id: str) -> KVCacheBlocks:
# 获取某个请求当前持有的 blocks。
return self.create_kv_cache_blocks(self.coordinator.get_blocks(request_id))


def get_block_ids(self, request_id: str) -> tuple[list[int], ...]:
# 获取某个请求的 block_id 列表。
return self.get_blocks(request_id).get_block_ids()


def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
# 把某个请求的 blocks 写入 prefix cache。
if self.enable_caching:
self.coordinator.cache_blocks(request, num_computed_tokens)


def create_kv_cache_blocks(self, blocks: tuple[list[KVCacheBlock], ...]) -> KVCacheBlocks:
# 只有在 blocks 非空时才创建新对象,否则直接复用 empty_kv_cache_blocks。
return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks


def take_new_block_ids(self) -> list[int]:
# 取出新分配的 attention block IDs,供后续清零使用。
ids: list[int] = []
for mgr in self.coordinator.single_type_managers:
ids.extend(mgr.take_new_block_ids())
return ids


def new_step_starts(self) -> None:
# 当前一步开始时调用,用来推进 coordinator 的内部状态。
self.coordinator.new_step_starts()
1
2
3
4
# 总结:
# 这段代码展示了 vLLM 如何把“理论上的 PagedAttention”落地成“工业级的显存调度系统”。
# 它通过哈希匹配实现跨请求的内存共享(Prefix Caching),并通过非常细致的槽位类型划分,
# 支持了当前大模型推理领域中的多种前沿加速技术。

文末问答

  1. block 是什么时候分配的?

    新请求进入、prefill/decode 推进时,如果已有 block 不够,就会分配新的 block。

  2. logical token 怎么映射 physical block?

    通过 block table 来映射:逻辑 token 先定位到逻辑块,再由 block table 找到对应的物理块 ID。

  3. request 结束后怎么回收?

    block table 里的 physical blocks 会被归还给 block pool / allocator,随后重新进入空闲池。

  4. attention kernel 怎么读 KV?

    kernel 不要求 KV 连续,而是根据 block table 找到对应的 physical block,再读取该 block 内 offset 位置的 K/V。
    Returns:
    A tuple containing:
    - A list of blocks that are computed for the request.
    - The number of computed tokens.
    “””
    # We skip finding the prefix cache hit when prefix caching is
    # disabled or the request is marked as skipping kv cache read
    # (which happens when the request requires prompt logprobs
    # or calls a pooling model with all pooling).
    if not self.enable_caching or request.skip_reading_prefix_cache:
    return self.empty_kv_cache_blocks, 0

    # NOTE: When all tokens hit the cache, we must recompute the last token
    # to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1.
    # This can trigger recomputation of an entire block, rather than just
    # the single last token, because allocate_slots() requires
    # num_computed_tokens to be block-size aligned. Removing this limitation
    # could slightly improve performance in the future.
    max_cache_hit_length = request.num_tokens - 1
    computed_blocks, num_new_computed_tokens = (
    self.coordinator.find_longest_cache_hit(
    request.block_hashes, max_cache_hit_length
    )
    )

    if self.log_stats:
    assert self.prefix_cache_stats is not None
    self.prefix_cache_stats.record(
    num_tokens=request.num_tokens,
    num_hits=num_new_computed_tokens,
    preempted=request.num_preemptions > 0,
    )

    return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
```python
def allocate_slots(
self,
request: Request,
num_new_tokens: int,
num_new_computed_tokens: int = 0,
new_computed_blocks: KVCacheBlocks | None = None,
num_lookahead_tokens: int = 0,
num_external_computed_tokens: int = 0,
delay_cache_blocks: bool = False,
num_encoder_tokens: int = 0,
full_sequence_must_fit: bool = False,
) -> KVCacheBlocks | None:
# """Add slots for a request with new tokens to append.

# Args:
# request: The request to allocate slots.
# num_new_tokens: The number of new tokens to be allocated and computed.
# num_new_computed_tokens: The number of new computed tokens just
# hitting the prefix caching, excluding external tokens.
# new_computed_blocks: The cached blocks for the above new computed
# tokens, grouped as a tuple by kv cache groups.
# num_lookahead_tokens: The number of speculative tokens to allocate.
# This is used by spec decode proposers with kv-cache such
# as eagle.
# num_external_computed_tokens: The number of tokens that their
# KV caches are not cached by vLLM but cached by the connector.
# delay_cache_blocks: Whether to skip caching the blocks. This is
# used by P/D when allocating blocks used in a KV transfer
# which will complete in a future step.
# num_encoder_tokens: The number of encoder tokens to allocate for
# cross-attention in encoder-decoder models(e.g., Whisper).
# For decoder-only models, this should be 0.
# full_sequence_must_fit: Only allocate blocks if the KV cache has enough
# free blocks to hold the full sequence, accounting for prefix cache hits
# and sliding window. Used as an admission gate to prevent over-admitting
# requests when chunked prefill would otherwise only check the first chunk

# Blocks layout:
# ```
# ----------------------------------------------------------------------
# | < comp > | < new_comp > | < ext_comp > | < new > | < lookahead > |
# ----------------------------------------------------------------------
# | < to be computed > |
# ----------------------------------------------------------------------
# | < to be allocated > |
# ----------------------------------------------------------------------
# | < to be cached (roughly, |
# | details below)> |
# ----------------------------------------------------------------------
# | Prefix-cached tokens from either vLLM |
# | or connector. Can be safely removed if |
# | they are outside sliding window. |
# ----------------------------------------------------------------------
# | < cached by vLLM > | not cached by |
# | vLLM, but |
# | ref_cnt | ref_cnt not | cached by |
# | increased| increased yet| connector |
# ----------------------------------------------------------------------
# ```

# Abbrivations:

# ```
# comp = request.num_computed_tokens
# new_comp = num_new_computed_tokens
# = len(new_computed_blocks) * block_size
# ext_comp = num_external_computed_tokens, cached by the connector
# new = num_new_tokens, including unverified draft tokens
# lookahead = num_lookahead_tokens
# ```

# NOTE: for new tokens which include both verified and unverified draft
# tokens, we only cache the verified tokens (by capping the number at
# `request.num_tokens`).

# The allocation has three stages:
# - Free unnecessary blocks in `comp` and check
# if we have sufficient free blocks (return None if not).
# - Handle prefix tokens (`comp + new_comp + ext_comp`):
# - Free unnecessary blocks (e.g. outside sliding window)
# - Allocate new blocks for `ext_comp` tokens inside
# sliding window
# - Allocate new blocks for tokens to be computed (`new + lookahead`)

# Returns:
# A list of new allocated blocks.
# """
# # When loading KV data asynchronously, we may have zero new tokens to
# # compute while still allocating slots for externally computed tokens.
# if num_new_tokens == 0 and num_external_computed_tokens == 0:
# raise ValueError(
# "num_new_tokens must be greater than 0 when there are no "
# "external computed tokens"
# )

# if new_computed_blocks is not None:
# new_computed_block_list = new_computed_blocks.blocks
# else:
# new_computed_block_list = self.empty_kv_cache_blocks.blocks

# # The number of computed tokens is the number of computed tokens plus
# # the new prefix caching hits
# num_local_computed_tokens = (
# request.num_computed_tokens + num_new_computed_tokens
# )
# total_computed_tokens = min(
# num_local_computed_tokens + num_external_computed_tokens,
# self.max_model_len,
# )

# if full_sequence_must_fit:
# # First check and fail if the full request sequence won't fit.
# full_num_tokens = min(request.num_tokens, self.max_model_len)

# num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
# request_id=request.request_id,
# num_tokens=full_num_tokens,
# new_computed_blocks=new_computed_block_list,
# num_encoder_tokens=num_encoder_tokens,
# total_computed_tokens=total_computed_tokens,
# num_tokens_main_model=full_num_tokens,
# apply_admission_cap=True,
# )
# if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
# return None

# num_tokens_main_model = total_computed_tokens + num_new_tokens
# num_tokens_need_slot = min(
# num_tokens_main_model + num_lookahead_tokens, self.max_model_len
# )

# # Free the blocks that are skipped during the attention computation
# # (e.g., tokens outside the sliding window).
# # We can do this even if we cannot schedule this request due to
# # insufficient free blocks.
# # Should call this function before allocating new blocks to reduce
# # the number of evicted blocks.
self.coordinator.remove_skipped_blocks(
request.request_id, total_computed_tokens
)

num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
request_id=request.request_id,
num_tokens=num_tokens_need_slot,
new_computed_blocks=new_computed_block_list,
num_encoder_tokens=num_encoder_tokens,
total_computed_tokens=num_local_computed_tokens
+ num_external_computed_tokens,
num_tokens_main_model=num_tokens_main_model,
)

if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
# Cannot allocate new blocks
return None

if (
new_computed_block_list is not self.empty_kv_cache_blocks.blocks
or num_external_computed_tokens > 0
):
# Append the new computed blocks to the request blocks until now to
# avoid the case where the new blocks cannot be allocated.
self.coordinator.allocate_new_computed_blocks(
request_id=request.request_id,
new_computed_blocks=new_computed_block_list,
num_local_computed_tokens=num_local_computed_tokens,
num_external_computed_tokens=num_external_computed_tokens,
)

new_blocks = self.coordinator.allocate_new_blocks(
request.request_id,
num_tokens_need_slot,
num_tokens_main_model,
num_encoder_tokens,
)

# P/D: delay caching blocks if we have to recv from
# remote. Update state for locally cached blocks.
if not self.enable_caching or delay_cache_blocks:
return self.create_kv_cache_blocks(new_blocks)

# NOTE(woosuk): We want to commit (cache) up to num_local_computed_tokens
# + num_external_computed_tokens + num_new_tokens, but must exclude
# "non-committable" tokens (e.g., draft tokens that could be rejected).
# Therefore, we cap the number at `request.num_tokens`, ensuring only
# "finalized" tokens are cached.
num_tokens_to_cache = min(
total_computed_tokens + num_new_tokens,
request.num_tokens,
)
self.coordinator.cache_blocks(request, num_tokens_to_cache)

return self.create_kv_cache_blocks(new_blocks)

def free(self, request: Request) -> None:
"""Free the blocks allocated for the request.
We free the blocks in reverse order so that the tail blocks are evicted
first when caching is enabled.

Args:
request: The request to free the blocks.
"""
self.coordinator.free(request.request_id)

def remove_skipped_blocks(
self, request_id: str, total_computed_tokens: int
) -> None:
"""Remove the blocks that are no longer needed from `blocks` and replace
the removed blocks with null_block.

Args:
request_id: The request ID.
total_computed_tokens: The total number of computed tokens, including
local computed tokens and external computed tokens.
"""
self.coordinator.remove_skipped_blocks(request_id, total_computed_tokens)

def evict_blocks(self, block_ids: set[int]) -> None:
"""evict blocks from the prefix cache by their block IDs.

Args:
block_ids: Set of block IDs to evict from cache.
"""
self.block_pool.evict_blocks(block_ids)

def reset_prefix_cache(self) -> bool:
"""Reset prefix cache. This function may be used in RLHF
flows to invalidate prefix caching after the weights are updated,
or used for resetting prefix caching status for benchmarking.

Returns:
bool: True if the prefix cache is successfully reset,
False otherwise.
"""
if not self.block_pool.reset_prefix_cache():
return False
if self.log_stats:
assert self.prefix_cache_stats is not None
self.prefix_cache_stats.reset = True
return True

def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
"""Calculate the number of common prefix blocks for each kv cache group.

The function selects a running request and iterates through its blocks.
A block is considered a common prefix block if ALL requests with
allocated KV cache share it (i.e., ref_cnt equals the number of entries
in req_to_blocks).

NOTE(woosuk): The number of requests with allocated KV cache is **greater
than or equal to** the number of requests scheduled in the current step.
This is because having allocated KV cache only indicates that:
1. The request has not yet finished, and
2. The request holds its blocks unfreed.

While all scheduled requests must have allocated KV cache, the inverse
is not necessarily true. There may be requests with allocated KV cache
that are not scheduled in the current step.

This can result in an edge case where the number of common prefix blocks
is 0, even though all scheduled requests share a common prefix. This
occurs because there may be unscheduled requests that do not share the
common prefix. Currently, this case cannot be easily detected, so the
function returns 0 in such cases.

Args:
running_request_id: The request ID of any running request, used to
identify the common prefix blocks.

Returns:
list[int]: The number of common prefix blocks for each kv cache
group.
"""
return self.coordinator.get_num_common_prefix_blocks(running_request_id)

def take_events(self) -> list[KVCacheEvent]:
"""Take the KV cache events from the block pool.

Returns:
A list of KV cache events.
"""
return self.block_pool.take_events()

def get_blocks(self, request_id: str) -> KVCacheBlocks:
"""Get the blocks of a request."""
return self.create_kv_cache_blocks(self.coordinator.get_blocks(request_id))

def get_block_ids(self, request_id: str) -> tuple[list[int], ...]:
"""Get the block ids of a request."""
return self.get_blocks(request_id).get_block_ids()

def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
"""Cache the blocks for the request, if enabled.

Args:
request: The request to cache the blocks.
num_computed_tokens: The number of computed tokens, including tokens
that are already cached and tokens to be cached.
"""
if self.enable_caching:
self.coordinator.cache_blocks(request, num_computed_tokens)

def create_kv_cache_blocks(
self, blocks: tuple[list[KVCacheBlock], ...]
) -> KVCacheBlocks:
# Only create new KVCacheBlocks for non-empty blocks
return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks

def take_new_block_ids(self) -> list[int]:
"""Drain and return new attention block IDs for zeroing."""
ids: list[int] = []
for mgr in self.coordinator.single_type_managers:
ids.extend(mgr.take_new_block_ids())
return ids

def new_step_starts(self) -> None:
"""Called when a new step is started."""
self.coordinator.new_step_starts()

代码说明

这段代码是 vLLM v1 架构中核心的显存管理层实现。它位于调度器(Scheduler)和底层物理显存池(Block Pool)之间,主要负责 PagedAttention 机制下的显存块分配、回收,以及 Prefix Caching(前缀缓存)功能。

代码主要由两个类构成,下面按模块拆解。

1. KVCacheBlocks:调度器与管理器的隔离层

这是一个数据类,本质上是对物理显存块列表的封装,用来把 KVCacheManager 内部的数据结构隐藏起来,只向上层调度器暴露稳定接口。

  • 设计目的:作为 Scheduler 的返回对象,避免调度器直接接触内部 block pool 的复杂结构。
  • 多组支持:blocks 是一个元组,支持多个 kv_cache_group。这样可以兼容未来不同组采用不同 block_size 的架构。
  • 核心功能:
    • get_block_ids:把对象转换成物理块 ID 列表,方便调度器使用。
    • get_unhashed_block_ids:返回尚未进行哈希、也就是还没有进入前缀缓存管理的块 ID。
    • get_unhashed_block_ids_all_groups:按组返回未哈希块 ID,并跳过 padding block。

2. KVCacheManager:缓存与分配的控制中枢

这是管理 KV Cache 生命周期的大脑。它不仅处理常规的生成请求,还兼容投机解码(Speculative Decoding)、多模态外部缓存接入等复杂场景。

3. 核心机制一:前缀缓存匹配 get_computed_blocks

当系统启用了 enable_caching 时,这个方法会在处理新请求前被调用。

  • 它会根据请求的 block_hashes,去底层 coordinator 里查找是否已经存在可复用的 KV Cache。
  • 典型场景是 System Prompt 复用或多轮对话。如果命中,就直接返回已经缓存好的物理块列表,以及命中的 token 数量。
  • 这样可以跳过对应前缀的 Attention 计算,从而显著降低首字延迟(TTFT)。

4. 核心机制二:复杂的槽位分配 allocate_slots

这是整个类里最核心、也最复杂的函数。它负责为一个请求在当前推理步申请物理显存块。代码中把请求在显存中的逻辑布局划分得很细,可以理解成下面几段:

  • 已经计算或缓存的部分:
    • comp:当前请求之前已经计算完的 token。
    • new_comp:刚刚通过前缀缓存匹配到的 token。
    • ext_comp:由外部组件(比如多模态 connector)计算并传入的 token。
  • 需要新分配空间的部分:
    • new:当前步骤即将输入或生成的新 token。
    • lookahead:专门为投机解码预留的额外槽位,用来存放模型一次性预测出的多个未来草稿 token。

分配流程可以概括为四步:

  1. 容量预检:如果设置了 full_sequence_must_fit,会提前计算该请求直到最大长度所需的总块数。如果系统当前空闲块不足,就直接拒绝分配,防止请求中途 OOM。
  2. 清理废弃块:调用 remove_skipped_blocks。如果模型使用了滑动窗口注意力,超出窗口范围的旧 token 占用的块会被提前释放。
  3. 物理分配:通过 coordinator.allocate_new_blocks 向系统申请真正的物理块。
  4. 状态固化:将当前已经确认无误的 token,尤其是排除了投机解码中可能被拒绝的草稿 token 后的部分,推入前缀缓存池,供后续请求复用。

5. 核心机制三:资源回收与维护

  • free(request):当请求生成完毕或被中止时,将其持有的所有物理块交还给底层池。
  • evict_blocks(block_ids):强制从前缀缓存中驱逐指定块,通常用于缓存池满了之后执行最近最少使用(LRU)式淘汰。
  • get_num_common_prefix_blocks:统计当前正在运行的所有请求共享了多少个前缀块,这对于调度策略优化和缓存效率评估非常关键。

6. 总结

这段代码展示了 vLLM 如何把“理论上的 PagedAttention”落地成“工业级的显存调度系统”。它通过哈希匹配实现跨请求的内存共享(Prefix Caching),并通过非常细致的槽位类型划分,支持了当前大模型推理领域中的多种前沿加速技术。

  • Title: vllm源码学习
  • Author: Ikko
  • Created at : 2026-05-04 13:57:24
  • Updated at : 2026-05-04 14:41:52
  • Link: http://ikko-debug.github.io/2026/05/04/vllm-1/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments
On this page
vllm源码学习