远程存储插件#
LMCache 支持内置的远程存储连接器,包括 Redis、InfiniStore、MooncakeStore、S3、Hugging Face Buckets 等。远程存储插件系统提供了通过动态加载添加自定义存储连接器的能力。这使得在不修改核心代码的情况下扩展远程存储功能成为可能。
备注
remote_url 配置已**弃用**,并将在未来的版本中移除。请改用 remote_storage_plugins。
连接器定义要求#
自定义远程存储连接器需要两个类:
ConnectorAdapter: 处理 URL 方案匹配和连接器实例化
从
ConnectorAdapter继承在构造函数中设置 URL 方案(例如,
mystore://)实现
create_connector方法
RemoteConnector: 实现实际的存储操作
从
RemoteConnector继承实现所有抽象方法:
exists、exists_sync、get、put、list、close
备注
ConnectorAdapter 构造函数不接收来自 LMCache 的任何参数。应通过调用父构造函数并传入方案字符串来设置方案。
create_connector 方法接收一个 ConnectorContext 对象,其中包含 URL、事件循环、本地 CPU 后端、配置、元数据和 plugin_name。
插件命名约定#
插件名称遵循格式 {type} 或 {type}.{instance}:
{type}— 该连接器类型的单个实例(例如fs,mooncakestore){type}.{instance}— 一个命名实例,允许 **同类型的多个实例**(例如fs.primary,fs.backup)
框架提取 type 部分(在第一个 . 之前的所有内容)以定位匹配的 ConnectorAdapter。完整的插件名称用作配置键前缀。
通过插件使用内置连接器#
内置连接器(fs, mooncakestore, hfbucket 等)可以通过 remote_storage_plugins 直接使用,而无需指定 module_path 或 class_name。它们的配置放在 extra_config 下:
chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["fs"]
extra_config:
remote_storage_plugin.fs.base_path: /tmp/lmcache
同一连接器类型的多个实例:
remote_storage_plugins: ["fs.primary", "fs.backup"]
extra_config:
remote_storage_plugin.fs.primary.base_path: /data/cache1
remote_storage_plugin.fs.backup.base_path: /data/cache2
混合不同的连接器类型:
remote_storage_plugins: ["fs.local", "mooncakestore"]
extra_config:
remote_storage_plugin.fs.local.base_path: /data/cache
remote_storage_plugin.mooncakestore.master_server_address: "localhost:50051"
内置 Hugging Face Buckets 示例:
remote_storage_plugins: ["hfbucket"]
extra_config:
remote_storage_plugin.hfbucket.bucket_handle: hf://buckets/my-org/lmcache-kv/prod
remote_storage_plugin.hfbucket.token_env: HF_TOKEN
如何将自定义远程存储与 LMCache 集成#
在 LMCache 环境中安装您的连接器包
将
remote_storage_plugins及其相关的module_path和class_name添加到 LMCache 配置的extra_config部分,如下所示:
chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["mystore"]
extra_config:
remote_storage_plugin.mystore.module_path: <module_path>
remote_storage_plugin.mystore.class_name: <adapter_class_name>
自定义远程存储连接器的示例配置:
chunk_size: 64
local_cpu: False
max_local_cpu_size: 5
remote_storage_plugins: ["mystore"]
extra_config:
remote_storage_plugin.mystore.module_path: my_package.my_connector
remote_storage_plugin.mystore.class_name: MyStoreConnectorAdapter
多个自定义连接器的实例:
remote_storage_plugins: ["mystore.region_a", "mystore.region_b"]
extra_config:
remote_storage_plugin.mystore.region_a.module_path: my_package.my_connector
remote_storage_plugin.mystore.region_a.class_name: MyStoreConnectorAdapter
remote_storage_plugin.mystore.region_b.module_path: my_package.my_connector
remote_storage_plugin.mystore.region_b.class_name: MyStoreConnectorAdapter
备注
remote_url已被 弃用;请改用remote_storage_plugins。remote_storage_plugin.<plugin_name>使用完整的插件名称(包括实例后缀)作为键前缀可以同时加载多个远程存储插件
内置连接器不需要
module_path/class_name
连接器适配器实现#
ConnectorAdapter 类负责:
定义它处理的 URL 方案
创建适当的
RemoteConnector实例
from lmcache.v1.storage_backend.connector import (
ConnectorAdapter,
ConnectorContext,
)
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector
from lmcache.v1.storage_backend.connector import extract_plugin_type
PLUGIN_TYPE = "mystore"
class MyStoreConnectorAdapter(ConnectorAdapter):
"""Adapter for MyStore remote storage."""
def __init__(self) -> None:
# Register the URL scheme this adapter handles
super().__init__("mystore://")
def can_parse(self, url: str) -> bool:
"""Match legacy URL or plugin://{type}[.{instance}] format."""
if url.startswith(self.schema):
return True
if url.startswith("plugin://"):
pname = url[len("plugin://"):]
return extract_plugin_type(pname) == PLUGIN_TYPE
return False
def create_connector(self, context: ConnectorContext) -> RemoteConnector:
"""Create and return a MyStoreConnector instance."""
# Access context properties as needed:
# - context.url: the full remote URL
# - context.loop: asyncio event loop
# - context.config: LMCacheEngineConfig
# - context.metadata: LMCacheMetadata
# - context.plugin_name: plugin instance name
# (e.g. "mystore", "mystore.region_a")
return MyStoreConnector(
context.config,
context.metadata,
plugin_name=context.plugin_name,
)
RemoteConnector 实现#
RemoteConnector 类定义了远程存储操作的接口。您的实现必须提供以下抽象方法:
from typing import List, Optional
from lmcache.utils import CacheEngineKey
from lmcache.v1.config import LMCacheEngineConfig
from lmcache.v1.metadata import LMCacheMetadata
from lmcache.v1.memory_management import MemoryObj
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector
class MyStoreConnector(RemoteConnector):
"""Custom connector for MyStore remote storage."""
def __init__(
self,
config: LMCacheEngineConfig,
metadata: Optional[LMCacheMetadata]
):
super().__init__(config, metadata)
# Initialize your connection here
async def exists(self, key: CacheEngineKey) -> bool:
"""Check if a key exists in the remote store."""
raise NotImplementedError
def exists_sync(self, key: CacheEngineKey) -> bool:
"""Synchronous version of exists."""
raise NotImplementedError
async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]:
"""Retrieve a memory object by key. Return None if not found."""
raise NotImplementedError
async def put(self, key: CacheEngineKey, memory_obj: MemoryObj):
"""Store a memory object with the given key."""
raise NotImplementedError
async def list(self) -> List[str]:
"""List all keys in the remote store."""
raise NotImplementedError
async def close(self):
"""Close the connection to the remote store."""
raise NotImplementedError
可选方法#
RemoteConnector 基类还提供了可选的方法,可以被重写以增强功能:
support_ping()/ping(): 健康检查支持support_batched_get()/batched_get(): 批量检索操作support_batched_put()/batched_put(): 批量存储操作support_batched_contains()/batched_contains(): 批量存在性检查remove_sync(): 同步键移除
实现示例#
一个完整的远程存储连接器实现将包括适配器和连接器类在一个模块或包中。以下是一个最小的工作示例结构:
my_connector_package/
├── __init__.py
├── adapter.py # Contains MyStoreConnectorAdapter
└── connector.py # Contains MyStoreConnector
适配器模块 (adapter.py):
from lmcache.v1.storage_backend.connector import (
ConnectorAdapter,
ConnectorContext,
extract_plugin_type,
)
from lmcache.v1.storage_backend.connector.base_connector import RemoteConnector
from .connector import MyStoreConnector
PLUGIN_TYPE = "mystore"
class MyStoreConnectorAdapter(ConnectorAdapter):
def __init__(self) -> None:
super().__init__("mystore://")
def can_parse(self, url: str) -> bool:
if url.startswith(self.schema):
return True
if url.startswith("plugin://"):
pname = url[len("plugin://"):]
return extract_plugin_type(pname) == PLUGIN_TYPE
return False
def create_connector(self, context: ConnectorContext) -> RemoteConnector:
return MyStoreConnector(
context.config,
context.metadata,
plugin_name=context.plugin_name,
)
配置将引用适配器:
remote_storage_plugins: ["mystore"]
extra_config:
remote_storage_plugin.mystore.module_path: my_connector_package.adapter
remote_storage_plugin.mystore.class_name: MyStoreConnectorAdapter