Source code for lmcache.server.server_storage_backend.local_backend

import os
import threading
from collections import OrderedDict
from typing import List, Optional

from lmcache.logging import init_logger
from lmcache.server.server_storage_backend.abstract_backend import \
    LMSBackendInterface
from lmcache.storage_backend.evictor import DummyEvictor
from lmcache.storage_backend.evictor.base_evictor import PutStatus
from lmcache.utils import DiskCacheMetadata, _lmcache_nvtx_annotate

logger = init_logger(__name__)


[docs] class LMSLocalBackend(LMSBackendInterface): """ Cache engine for storing the KV cache of the tokens in the local cpu/gpu memory. """ def __init__(self, ): """ Throws: RuntimeError if the loaded configuration does not match the current configuration """ super().__init__() self.dict: OrderedDict[str, bytearray] = OrderedDict() self.update_lock = threading.Lock() self.evictor = DummyEvictor()
[docs] def list_keys(self) -> List[str]: return list(self.dict.keys())
[docs] def contains( self, key: str, ) -> bool: """ Check if the cache engine contains the key. Input: key: the key of the token chunk, including prefix hash and format Returns: True if the cache engine contains the key, False otherwise """ return key in self.dict
[docs] def remove( self, key: str, ) -> None: """ Remove the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format """ self.dict.pop(key)
[docs] def put( self, key: str, kv_chunk_bytes: bytearray, blocking: bool = True, ) -> None: """ Store the KV cache of the tokens into the cache engine. Input: key: the key of the token chunk, including prefix hash and format kv_chunk_bytes: the kv cache of the token chunk, in the format of bytearray Returns: None Note: The KV cache should NOT have the "batch" dimension. """ if not blocking: logger.warn("Non-blocking is not implemented for local backend") self.update_lock.acquire() # Obtain keys to evict evict_keys, put_status = self.evictor.update_on_put( self.dict, kv_chunk_bytes) # Abort put if cache too big if put_status == PutStatus.ILLEGAL: self.update_lock.release() return # Evict caches for evict_key in evict_keys: self.remove(evict_key) # Store new chunk self.dict[key] = kv_chunk_bytes self.update_lock.release()
[docs] @_lmcache_nvtx_annotate def get( self, key: str, ) -> Optional[bytearray]: """ Retrieve the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format Output: the kv cache of the token chunk, in the format of nested tuples None if the key is not found """ self.update_lock.acquire() kv_chunk = self.dict.get(key, None) # Update cache recency if kv_chunk is not None: self.evictor.update_on_get(key, self.dict) self.update_lock.release() return kv_chunk
[docs] def close(self): pass
# TODO(Jiayi): need to optimize disk loading # current impl. with "naive open read/write" might not be efficient # (better than torch.load)
[docs] class LMSLocalDiskBackend(LMSBackendInterface): """ Cache engine for storing the KV cache of the tokens in the local disk. """ def __init__( self, path: str, ): """ Throws: RuntimeError if the loaded configuration does not match the current configuration """ super().__init__() self.path = path if not os.path.exists(self.path): os.makedirs(self.path) self.dict: OrderedDict[str, DiskCacheMetadata] = OrderedDict() self.update_lock = threading.Lock() self.evictor = DummyEvictor()
[docs] def list_keys(self) -> List[str]: return list(self.dict.keys())
[docs] def contains( self, key: str, ) -> bool: """ Check if the cache engine contains the key. Input: key: the key of the token chunk, including prefix hash and format Returns: True if the cache engine contains the key, False otherwise """ return key in self.dict
def _key_to_path( self, key: str, ) -> str: """ Convert key to path_name Input: key: the key of the token chunk, including prefix hash and format Returns: returns the path name """ return self.path + key.replace("/", "-") + ".bin"
[docs] def remove( self, key: str, ) -> None: """ Remove the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format """ self.update_lock.acquire() path = self.dict[key].path self.dict.pop(key) self.update_lock.release() os.remove(path)
[docs] def put( self, key: str, kv_chunk_bytes: bytearray, blocking: bool = True, ) -> None: """ Store the KV cache of the tokens into the cache engine. Input: key: the key of the token chunk, including prefix hash and format kv_chunk: the kv cache of the token chunk, in the format of nested tuples Returns: None Note: The KV cache should NOT have the "batch" dimension. """ if not blocking: logger.warn("Non-blocking is not implemented for local backend") path = self._key_to_path(key) # Obtain keys to evict evict_keys, put_status = self.evictor.update_on_put( self.dict, kv_chunk_bytes) # Abort put if cache too big if put_status == PutStatus.ILLEGAL: return # evict caches for evict_key in evict_keys: self.remove(evict_key) logger.info(f"Saving cache to {path}") # torch.save(kv_chunk_bytes, self._key_to_path(key)) with open(self._key_to_path(key), "wb") as binary_file: binary_file.write(kv_chunk_bytes) self.update_lock.acquire() self.dict[key] = DiskCacheMetadata( path, self.evictor.get_size(kv_chunk_bytes)) self.update_lock.release()
[docs] @_lmcache_nvtx_annotate def get( self, key: str, ) -> Optional[bytes]: """ Retrieve the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format Output: the kv cache of the token chunk, in the format of nested tuples None if the key is not found """ self.update_lock.acquire() if key not in self.dict: self.update_lock.release() return None path = self.dict[key].path self.evictor.update_on_get(key, self.dict) with open(path, "rb") as binary_file: kv_chunk = binary_file.read() self.update_lock.release() return kv_chunk
# return torch.load(self._key_to_path(key))
[docs] def close(self): pass