Source code for lmcache.storage_backend.mem_pool.local_pool
from math import prod
from typing import List, Optional
import torch
from lmcache.config import LMCacheMemPoolMetadata
from lmcache.logging import init_logger
from lmcache.storage_backend.mem_pool.base_pool import BasePool, KVObj
logger = init_logger(__name__)
[docs]
class LocalPool(BasePool):
def __init__(self, metadata: LMCacheMemPoolMetadata):
self.chunk_size = metadata.kv_shape[2]
self.max_chunk_num = 200
self.size_per_chunk = prod(metadata.kv_shape) *\
metadata.kv_dtype.itemsize
self.mem_pool: List[torch.Tensor] = []
self.free_pool = [i for i in range(self.max_chunk_num)]
[docs]
def init_max_chunk_num(self, metadata: LMCacheMemPoolMetadata) -> int:
"""
Initialize the maximum number of chunks in the memory pool.
"""
max_chunk_num = int(metadata.max_local_cache_size *\
1024**3) // self.size_per_chunk + 1
return int(max_chunk_num)
[docs]
def allocate(self, kv_chunk: torch.Tensor) -> Optional[KVObj]:
"""
Allocate a buffer memory pointer from the memory pool.
Input:
kv_chunk: the kv tensor to be stored
Returns:
KVObj with a memory pointer (torch tensor view).
None if memory is full.
Note:
This does not perform the actual memory movement.
"""
num_tok = kv_chunk.shape[2]
assert num_tok <= self.chunk_size
if not self.free_pool:
logger.warning("No free memory chunks. "
"Shouldn't happen! Evictor might be failing!")
raise Exception("Mempool allocation failed")
chunk_idx = self.free_pool.pop()
return KVObj(chunk_idx, self.size_per_chunk,
self.mem_pool[chunk_idx][:, :, 0:num_tok])
[docs]
def free(self, kv_obj: KVObj) -> None:
"""
Free the corresponding memory chunk
Input:
the KVObj to be freed
"""
self.free_pool.append(kv_obj.chunk_idx)
[docs]
class LocalCPUPool(LocalPool):
def __init__(self, metadata: LMCacheMemPoolMetadata):
self.chunk_size = metadata.kv_shape[2]
self.size_per_chunk = prod(metadata.kv_shape) *\
metadata.kv_dtype.itemsize
self.max_chunk_num = self.init_max_chunk_num(metadata)
use_pinned_memory = True
kv_dtype = metadata.kv_dtype
logger.info(
f"Initializing cpu mem, is_pinned: {use_pinned_memory}, "
f"max_local_cache_size: {metadata.max_local_cache_size} GB, "
f"max_chunk_num: {self.max_chunk_num}.")
with torch.inference_mode():
self.mem_pool = [
torch.empty(metadata.kv_shape,
dtype=kv_dtype,
device='cpu',
pin_memory=use_pinned_memory)
for i in range(self.max_chunk_num)
]
self.free_pool = [i for i in range(self.max_chunk_num)]
[docs]
class LocalCPUBufferPool(LocalCPUPool):
[docs]
def allocate(self, kv_chunk: torch.Tensor) -> Optional[KVObj]:
num_tok = kv_chunk.shape[2]
assert num_tok <= self.chunk_size
if not self.free_pool:
logger.info("No free memory chunks. Waiting...")
return None
chunk_idx = self.free_pool.pop()
return KVObj(chunk_idx, self.size_per_chunk,
self.mem_pool[chunk_idx][:, :, 0:num_tok])
[docs]
class LocalGPUPool(LocalPool):
""" only for unit testing, might not be useful in production """
""" incur double copy, but we can use this as the only gpu buffer"""
def __init__(self, metadata: LMCacheMemPoolMetadata):
self.chunk_size = metadata.kv_shape[2]
self.size_per_chunk = prod(metadata.kv_shape) *\
metadata.kv_dtype.itemsize
self.max_chunk_num = self.init_max_chunk_num(metadata)
kv_dtype = metadata.kv_dtype
logger.info("Initializing gpu mem")
with torch.inference_mode():
self.mem_pool = [
torch.empty(metadata.kv_shape, dtype=kv_dtype, device='cuda')
for i in range(self.max_chunk_num)
]
self.free_pool = [i for i in range(self.max_chunk_num)]