Source code for lmcache.experimental.storage_backend.connector

import asyncio
import re
from dataclasses import dataclass
from typing import Dict, List, Optional

from lmcache.experimental.memory_management import MemoryAllocatorInterface
from lmcache.experimental.storage_backend.connector.base_connector import \
    RemoteConnector
from lmcache.experimental.storage_backend.connector.lm_connector import \
    LMCServerConnector
from lmcache.experimental.storage_backend.connector.redis_connector import (
    RedisConnector, RedisSentinelConnector)
from lmcache.logging import init_logger

from .infinistore_connector import InfinistoreConnector

logger = init_logger(__name__)


[docs] @dataclass class ParsedRemoteURL: """ The parsed URL of the format: <connector_type>://<host>:<port>[/path][?query],<host2>:<port2>[/path2][?query2],... """ connector_type: str hosts: List[str] ports: List[int] paths: List[str] query_params: List[Dict[str, str]]
[docs] def parse_remote_url(url: str) -> ParsedRemoteURL: """ Parses the remote URL into its constituent parts with support for: - Multiple hosts (comma-separated) - Path and query parameters in each host definition - Forward compatibility with legacy format Raises: ValueError: If the URL is invalid. """ pattern = r"(.+)://(.*)" m = re.match(pattern, url) if m is None: logger.error(f"Cannot parse remote_url {url} in the config") raise ValueError(f"Invalid remote url {url}") connector_type, hosts_section = m.groups() hosts = [] ports = [] paths = [] query_params = [] for host_def in hosts_section.split(","): host_pattern = r""" ^ ([^:]+) # hostname : # : (\d+) # port (/?[^?]*) # path(optional, start with /) (?:\?(.*))? # query(optional,? content after ?) $ """ match = re.match(host_pattern, host_def, re.VERBOSE) if not match: raise ValueError( f"Invalid host definition: {host_def} in URL: {url}") host = match.group(1) port = int(match.group(2)) path = match.group(3).lstrip('/') path = path.lstrip('/') query_str = match.group(4) or "" params_dict = {} if query_str: for param in query_str.split('&'): if '=' in param: key, value = param.split('=', 1) params_dict[key] = value elif param: params_dict[param] = "" hosts.append(host) ports.append(port) paths.append(path) query_params.append(params_dict) return ParsedRemoteURL(connector_type=connector_type, hosts=hosts, ports=ports, paths=paths, query_params=query_params)
[docs] def CreateConnector( url: str, loop: asyncio.AbstractEventLoop, memory_allocator: MemoryAllocatorInterface, ) -> RemoteConnector: """ Creates the corresponding remote connector from the given URL. """ m = re.match(r"(.*)://(.*):(\d+)", url) if m is None: raise ValueError(f"Invalid remote url {url}") parsed_url = parse_remote_url(url) num_hosts = len(parsed_url.hosts) connector: Optional[RemoteConnector] = None match parsed_url.connector_type: case "redis": if num_hosts == 1: host, port = parsed_url.hosts[0], parsed_url.ports[0] connector = RedisConnector(host, port, loop, memory_allocator) else: raise ValueError( f"Redis connector only supports a single host, but got url:" f" {url}") case "redis-sentinel": connector = RedisSentinelConnector( list(zip(parsed_url.hosts, parsed_url.ports)), loop, memory_allocator, ) case "lm": if num_hosts == 1: host, port = parsed_url.hosts[0], parsed_url.ports[0] connector = LMCServerConnector(host, port, loop, memory_allocator) else: raise ValueError( f"LM connector only supports a single host, but got url:" f" {url}") case "infinistore": host, port = parsed_url.hosts[0], parsed_url.ports[0] device_name = parsed_url.query_params[0].get("device", "mlx5_0") connector = InfinistoreConnector(host, port, device_name, loop, memory_allocator) case _: raise ValueError( f"Unknown connector type {parsed_url.connector_type} " f"(url is: {url})") return connector