[docs]classLMSLocalBackend(LMSBackendInterface):""" Cache engine for storing the KV cache of the tokens in the local cpu/gpu memory. """def__init__(self,):""" Throws: RuntimeError if the loaded configuration does not match the current configuration """super().__init__()self.dict:OrderedDict[str,bytearray]=OrderedDict()self.update_lock=threading.Lock()self.evictor=DummyEvictor()
[docs]defcontains(self,key:str,)->bool:""" Check if the cache engine contains the key. Input: key: the key of the token chunk, including prefix hash and format Returns: True if the cache engine contains the key, False otherwise """returnkeyinself.dict
[docs]defremove(self,key:str,)->None:""" Remove the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format """self.dict.pop(key)
[docs]defput(self,key:str,kv_chunk_bytes:bytearray,blocking:bool=True,)->None:""" Store the KV cache of the tokens into the cache engine. Input: key: the key of the token chunk, including prefix hash and format kv_chunk_bytes: the kv cache of the token chunk, in the format of bytearray Returns: None Note: The KV cache should NOT have the "batch" dimension. """ifnotblocking:logger.warn("Non-blocking is not implemented for local backend")self.update_lock.acquire()# Obtain keys to evictevict_keys,put_status=self.evictor.update_on_put(self.dict,self.evictor.get_size(kv_chunk_bytes))# Abort put if cache too bigifput_status==PutStatus.ILLEGAL:self.update_lock.release()return# Evict cachesforevict_keyinevict_keys:self.remove(evict_key)# Store new chunkself.dict[key]=kv_chunk_bytesself.update_lock.release()
[docs]@_lmcache_nvtx_annotatedefget(self,key:str,)->Optional[bytearray]:""" Retrieve the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format Output: the kv cache of the token chunk, in the format of nested tuples None if the key is not found """self.update_lock.acquire()kv_chunk=self.dict.get(key,None)# Update cache recencyifkv_chunkisnotNone:self.evictor.update_on_get(key,self.dict)self.update_lock.release()returnkv_chunk
# TODO(Jiayi): need to optimize disk loading# current impl. with "naive open read/write" might not be efficient# (better than torch.load)
[docs]classLMSLocalDiskBackend(LMSBackendInterface):""" Cache engine for storing the KV cache of the tokens in the local disk. """def__init__(self,path:str,):""" Throws: RuntimeError if the loaded configuration does not match the current configuration """super().__init__()self.path=pathifnotos.path.exists(self.path):os.makedirs(self.path)self.dict:OrderedDict[str,DiskCacheMetadata]=OrderedDict()self.update_lock=threading.Lock()self.evictor=DummyEvictor()
[docs]defcontains(self,key:str,)->bool:""" Check if the cache engine contains the key. Input: key: the key of the token chunk, including prefix hash and format Returns: True if the cache engine contains the key, False otherwise """returnkeyinself.dict
def_key_to_path(self,key:str,)->str:""" Convert key to path_name Input: key: the key of the token chunk, including prefix hash and format Returns: returns the path name """returnself.path+key.replace("/","-")+".bin"
[docs]defremove(self,key:str,)->None:""" Remove the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format """self.update_lock.acquire()path=self.dict[key].pathself.dict.pop(key)self.update_lock.release()os.remove(path)
[docs]defput(self,key:str,kv_chunk_bytes:bytearray,blocking:bool=True,)->None:""" Store the KV cache of the tokens into the cache engine. Input: key: the key of the token chunk, including prefix hash and format kv_chunk: the kv cache of the token chunk, in the format of nested tuples Returns: None Note: The KV cache should NOT have the "batch" dimension. """ifnotblocking:logger.warn("Non-blocking is not implemented for local backend")path=self._key_to_path(key)# Obtain keys to evictevict_keys,put_status=self.evictor.update_on_put(self.dict,self.evictor.get_size(kv_chunk_bytes))# Abort put if cache too bigifput_status==PutStatus.ILLEGAL:return# evict cachesforevict_keyinevict_keys:self.remove(evict_key)logger.info(f"Saving cache to {path}")# torch.save(kv_chunk_bytes, self._key_to_path(key))withopen(self._key_to_path(key),"wb")asbinary_file:binary_file.write(kv_chunk_bytes)self.update_lock.acquire()self.dict[key]=DiskCacheMetadata(path,self.evictor.get_size(kv_chunk_bytes))self.update_lock.release()
[docs]@_lmcache_nvtx_annotatedefget(self,key:str,)->Optional[bytes]:""" Retrieve the KV cache chunk by the given key Input: key: the key of the token chunk, including prefix hash and format Output: the kv cache of the token chunk, in the format of nested tuples None if the key is not found """self.update_lock.acquire()ifkeynotinself.dict:self.update_lock.release()returnNonepath=self.dict[key].pathself.evictor.update_on_get(key,self.dict)withopen(path,"rb")asbinary_file:kv_chunk=binary_file.read()self.update_lock.release()returnkv_chunk