远程存储插件#

LMCache 支持内置的远程存储连接器,包括 Redis、InfiniStore、MooncakeStore、S3、Hugging Face Buckets 等。远程存储插件系统提供了通过动态加载添加自定义存储连接器的能力。这使得在不修改核心代码的情况下扩展远程存储功能成为可能。

备注

remote_url 配置已**弃用**,并将在未来的版本中移除。请改用 remote_storage_plugins

连接器定义要求#

自定义远程存储连接器需要两个类:

  1. ConnectorAdapter: 处理 URL 方案匹配和连接器实例化

    • ConnectorAdapter 继承

    • 在构造函数中设置 URL 方案(例如,mystore://

    • 实现 create_connector 方法

  2. RemoteConnector: 实现实际的存储操作

    • RemoteConnector 继承

    • 实现所有抽象方法:existsexists_syncgetputlistclose

备注

ConnectorAdapter 构造函数不接收来自 LMCache 的任何参数。应通过调用父构造函数并传入方案字符串来设置方案。

create_connector 方法接收一个 ConnectorContext 对象,其中包含 URL、事件循环、本地 CPU 后端、配置、元数据和 plugin_name

插件命名约定#

插件名称遵循格式 {type}{type}.{instance}

  • {type} — 该连接器类型的单个实例(例如 fs, mooncakestore

  • {type}.{instance} — 一个命名实例,允许 **同类型的多个实例**(例如 fs.primary, fs.backup

框架提取 type 部分(在第一个 . 之前的所有内容)以定位匹配的 ConnectorAdapter。完整的插件名称用作配置键前缀。

通过插件使用内置连接器#

内置连接器(fs, mooncakestore, hfbucket 等)可以通过 remote_storage_plugins 直接使用,而无需指定 module_pathclass_name。它们的配置放在 extra_config 下:

chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["fs"]
extra_config:
  remote_storage_plugin.fs.base_path: /tmp/lmcache

同一连接器类型的多个实例:

remote_storage_plugins: ["fs.primary", "fs.backup"]
extra_config:
  remote_storage_plugin.fs.primary.base_path: /data/cache1
  remote_storage_plugin.fs.backup.base_path: /data/cache2

混合不同的连接器类型:

remote_storage_plugins: ["fs.local", "mooncakestore"]
extra_config:
  remote_storage_plugin.fs.local.base_path: /data/cache
  remote_storage_plugin.mooncakestore.master_server_address: "localhost:50051"

内置 Hugging Face Buckets 示例:

remote_storage_plugins: ["hfbucket"]
extra_config:
  remote_storage_plugin.hfbucket.bucket_handle: hf://buckets/my-org/lmcache-kv/prod
  remote_storage_plugin.hfbucket.token_env: HF_TOKEN

如何将自定义远程存储与 LMCache 集成#

  1. 在 LMCache 环境中安装您的连接器包

  2. remote_storage_plugins 及其相关的 module_pathclass_name 添加到 LMCache 配置的 extra_config 部分,如下所示:

chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["mystore"]
extra_config:
  remote_storage_plugin.mystore.module_path: <module_path>
  remote_storage_plugin.mystore.class_name: <adapter_class_name>

自定义远程存储连接器的示例配置:

chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["mystore"]
extra_config:
  remote_storage_plugin.mystore.module_path: my_package.my_connector
  remote_storage_plugin.mystore.class_name: MyStoreConnectorAdapter

多个自定义连接器的实例:

remote_storage_plugins: ["mystore.region_a", "mystore.region_b"]
extra_config:
  remote_storage_plugin.mystore.region_a.module_path: my_package.my_connector
  remote_storage_plugin.mystore.region_a.class_name: MyStoreConnectorAdapter
  remote_storage_plugin.mystore.region_b.module_path: my_package.my_connector
  remote_storage_plugin.mystore.region_b.class_name: MyStoreConnectorAdapter

备注

  • remote_url 已被 弃用;请改用 remote_storage_plugins

  • remote_storage_plugin.<plugin_name> 使用完整的插件名称(包括实例后缀)作为键前缀

  • 可以同时加载多个远程存储插件

  • 内置连接器不需要 module_path / class_name

连接器适配器实现#

ConnectorAdapter 类负责:

  • 定义它处理的 URL 方案

  • 创建适当的 RemoteConnector 实例

from lmcache.v1.storage_backend.connector import (
    ConnectorAdapter,
    ConnectorContext,
)
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector

from lmcache.v1.storage_backend.connector import extract_plugin_type

PLUGIN_TYPE = "mystore"

class MyStoreConnectorAdapter(ConnectorAdapter):
    """Adapter for MyStore remote storage."""

    def __init__(self) -> None:
        # Register the URL scheme this adapter handles
        super().__init__("mystore://")

    def can_parse(self, url: str) -> bool:
        """Match legacy URL or plugin://{type}[.{instance}] format."""
        if url.startswith(self.schema):
            return True
        if url.startswith("plugin://"):
            pname = url[len("plugin://"):]
            return extract_plugin_type(pname) == PLUGIN_TYPE
        return False

    def create_connector(self, context: ConnectorContext) -> RemoteConnector:
        """Create and return a MyStoreConnector instance."""
        # Access context properties as needed:
        # - context.url: the full remote URL
        # - context.loop: asyncio event loop
        # - context.config: LMCacheEngineConfig
        # - context.metadata: LMCacheMetadata
        # - context.plugin_name: plugin instance name
        #   (e.g. "mystore", "mystore.region_a")
        return MyStoreConnector(
            context.config,
            context.metadata,
            plugin_name=context.plugin_name,
        )

RemoteConnector 实现#

RemoteConnector 类定义了远程存储操作的接口。您的实现必须提供以下抽象方法:

from typing import List, Optional
from lmcache.utils import CacheEngineKey
from lmcache.v1.config import LMCacheEngineConfig
from lmcache.v1.metadata import LMCacheMetadata
from lmcache.v1.memory_management import MemoryObj
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector

class MyStoreConnector(RemoteConnector):
    """Custom connector for MyStore remote storage."""

    def __init__(
        self,
        config: LMCacheEngineConfig,
        metadata: Optional[LMCacheMetadata]
    ):
        super().__init__(config, metadata)
        # Initialize your connection here

    async def exists(self, key: CacheEngineKey) -> bool:
        """Check if a key exists in the remote store."""
        raise NotImplementedError

    def exists_sync(self, key: CacheEngineKey) -> bool:
        """Synchronous version of exists."""
        raise NotImplementedError

    async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]:
        """Retrieve a memory object by key. Return None if not found."""
        raise NotImplementedError

    async def put(self, key: CacheEngineKey, memory_obj: MemoryObj):
        """Store a memory object with the given key."""
        raise NotImplementedError

    async def list(self) -> List[str]:
        """List all keys in the remote store."""
        raise NotImplementedError

    async def close(self):
        """Close the connection to the remote store."""
        raise NotImplementedError

可选方法#

RemoteConnector 基类还提供了可选的方法,可以被重写以增强功能:

  • support_ping() / ping(): 健康检查支持

  • support_batched_get() / batched_get(): 批量检索操作

  • support_batched_put() / batched_put(): 批量存储操作

  • support_batched_contains() / batched_contains(): 批量存在性检查

  • remove_sync(): 同步键移除

实现示例#

一个完整的远程存储连接器实现将包括适配器和连接器类在一个模块或包中。以下是一个最小的工作示例结构:

my_connector_package/
├── __init__.py
├── adapter.py      # Contains MyStoreConnectorAdapter
└── connector.py    # Contains MyStoreConnector

适配器模块 (adapter.py):

from lmcache.v1.storage_backend.connector import (
    ConnectorAdapter,
    ConnectorContext,
    extract_plugin_type,
)
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector
from .connector import MyStoreConnector

PLUGIN_TYPE = "mystore"

class MyStoreConnectorAdapter(ConnectorAdapter):
    def __init__(self) -> None:
        super().__init__("mystore://")

    def can_parse(self, url: str) -> bool:
        if url.startswith(self.schema):
            return True
        if url.startswith("plugin://"):
            pname = url[len("plugin://"):]
            return extract_plugin_type(pname) == PLUGIN_TYPE
        return False

    def create_connector(self, context: ConnectorContext) -> RemoteConnector:
        return MyStoreConnector(
            context.config,
            context.metadata,
            plugin_name=context.plugin_name,
        )

配置将引用适配器:

remote_storage_plugins: ["mystore"]
extra_config:
  remote_storage_plugin.mystore.module_path: my_connector_package.adapter
  remote_storage_plugin.mystore.class_name: MyStoreConnectorAdapter