Source code for lmcache.storage_backend.connector.redis_connector

import inspect
import os
from typing import List, Optional, Tuple, Union

import redis

from lmcache.logging import init_logger
from lmcache.storage_backend.connector.base_connector import RemoteConnector

logger = init_logger(__name__)


[docs] class RedisConnector(RemoteConnector): """ The remote url should start with "redis://" and only have one host-port pair """ def __init__(self, host: str, port: int): self.connection = redis.Redis(host=host, port=port)
[docs] def exists(self, key: str) -> bool: return bool(self.connection.exists(key))
[docs] def get(self, key: str) -> Optional[bytes]: result = self.connection.get(key) # assert that result is not a co-routine assert not inspect.isawaitable(result) return result if result is None else bytes(result)
[docs] def set(self, key: str, obj: bytes) -> None: self.connection.set(key, obj)
[docs] def list(self): cursor = 0 all_keys: List[bytes] = [] while True: ret: Tuple[int, List[bytes]] = self.connection.scan( cursor=cursor, match="*") # type: ignore cursor, keys = ret all_keys.extend(keys) if cursor == 0: break return [key.decode("utf-8") for key in all_keys]
[docs] def close(self): self.connection.close()
[docs] class RedisSentinelConnector(RemoteConnector): """ Uses redis.Sentinel to connect to a Redis cluster. The hosts are specified in the config file, started with "redis-sentinel://" and separated by commas. Example: remote_url: "redis-sentinel://localhost:26379,localhost:26380,localhost:26381" Extra environment variables: - REDIS_SERVICE_NAME (required) -- service name for redis. - REDIS_TIMEOUT (optional) -- Timeout in seconds, default is 1 if not set """ ENV_REDIS_TIMEOUT = "REDIS_TIMEOUT" ENV_REDIS_SERVICE_NAME = "REDIS_SERVICE_NAME" def __init__(self, hosts_and_ports: List[Tuple[str, Union[str, int]]]): # Get service name match os.environ.get(self.ENV_REDIS_SERVICE_NAME): case None: logger.warning( f"Environment variable {self.ENV_REDIS_SERVICE_NAME} is not" f"found, using default value 'mymaster'") service_name = "mymaster" case value: service_name = value timeout: float = -1000.0 # Get timeout match os.environ.get(self.ENV_REDIS_TIMEOUT): case None: timeout = 1 case value: timeout = float(value) self.sentinel = redis.Sentinel(hosts_and_ports, timeout) self.master = self.sentinel.master_for(service_name, socket_timeout=timeout) self.slave = self.sentinel.slave_for(service_name, socket_timeout=timeout)
[docs] def exists(self, key: str) -> bool: return self.slave.exists(key)
[docs] def get(self, key: str) -> Optional[bytes]: return self.slave.get(key)
[docs] def set(self, key: str, obj: bytes) -> None: self.master.set(key, obj)
[docs] def list(self): cursor = 0 all_keys = [] while True: cursor, keys = self.slave.scan(cursor=cursor, match="*") all_keys.extend(keys) if cursor == 0: break return [key.decode("utf-8") for key in all_keys]
[docs] def close(self): self.master.close() self.slave.close()