Source code for lmcache.storage_backend.connector.base_connector
import abc
import time
from typing import List, Optional
from lmcache.logging import init_logger
from lmcache.utils import _lmcache_nvtx_annotate
logger = init_logger(__name__)
[docs]
class RemoteConnector(metaclass=abc.ABCMeta):
"""
Interface for remote connector
"""
[docs]
@abc.abstractmethod
def exists(self, key: str) -> bool:
"""
Check if the remote server contains the key
Input:
key: a string
Returns:
True if the cache engine contains the key, False otherwise
"""
raise NotImplementedError
[docs]
@abc.abstractmethod
def get(self, key: str) -> Optional[bytes]:
"""
Get the objects (bytes) of the corresponding key
Input:
key: the key of the corresponding object
Returns:
The object (bytes) of the corresponding key
Return None if the key does not exist
"""
raise NotImplementedError
[docs]
@abc.abstractmethod
def set(self, key: str, obj: bytes) -> None:
"""
Send the objects (bytes) with the corresponding key to the remote server
Input:
key: the key of the corresponding object
obj: the object (bytes) of the corresponding key
"""
raise NotImplementedError
[docs]
@abc.abstractmethod
def list(self) -> List[str]:
"""
List all keys in the remote server
Returns:
A list of keys in the remote server
"""
raise NotImplementedError
[docs]
@abc.abstractmethod
def close(self) -> None:
"""
Close remote server
"""
raise NotImplementedError
[docs]
class RemoteConnectorDebugWrapper(RemoteConnector):
def __init__(self, connector: RemoteConnector):
self.connector = connector
[docs]
def exists(self, key: str) -> bool:
return self.connector.exists(key)
[docs]
@_lmcache_nvtx_annotate
def get(self, key: str) -> Optional[bytes]:
start = time.perf_counter()
ret = self.connector.get(key)
end = time.perf_counter()
if ret is None or len(ret) == 0:
logger.debug(
"Didn't get any data from the remote backend, key is {key}")
return None
logger.debug(
"Get %.2f MBytes data from the remote backend takes %.2f ms",
len(ret) / 1e6,
(end - start) * 1e3,
)
return ret
[docs]
def set(self, key: str, obj: bytes) -> None:
start = time.perf_counter()
self.connector.set(key, obj)
end = time.perf_counter()
logger.debug(
"Put %.2f MBytes data to the remote backend takes %.2f ms",
len(obj) / 1e6,
(end - start) * 1e3,
)
[docs]
def list(self) -> List[str]:
return self.connector.list()
[docs]
def close(self) -> None:
return self.connector.close()