添加本地连接器#

概述#

本地连接器是高性能的 C++ 存储后端,通过 pybind11 与 LMCache 集成。它们在 两种 LMCache 操作模式下均可工作:

  • **非 MP 模式**(单进程):通过 ``ConnectorClientBase``(asyncio 集成)

  • 多进程模式 (multiprocess): 通过 NativeConnectorL2Adapter (L2 适配器接口)

一次编写连接器,免费获得两种模式。

该框架位于 csrc/storage_backends/,Redis RESP 连接器作为参考实现。

架构#

Non-MP mode:
  CacheEngine -> RemoteBackend -> ConnectorClientBase -> native client (C++)
                                    (asyncio event loop)

MP mode:
  StoreController / PrefetchController
        |
  NativeConnectorL2Adapter (Python bridge)
    +-- 3 eventfds (store, lookup, load)
    +-- completion demux thread
    +-- ObjectKey <-> string serialization
    +-- client-side lock tracking
        |
  native client (C++)
    +-- 1 eventfd, worker threads, GIL-free I/O

设计原则:

  1. 在 pybind 层释放 GIL 以实现本地线程之间的真正并发

  2. 批处理与分块:批量请求的工作在线程之间均匀分配

  3. 基于 eventfd 的完成:内核唤醒 Python -- 无需轮询

  4. 非阻塞提交:提交队列 / 完成队列架构

步骤 1:C++ 连接器#

创建你的连接器目录(例如,csrc/storage_backends/mybackend/),并从 ConnectorBase<YourConnectionType> 继承。你需要重写 4 个必需的方法(并可选地重写 do_single_delete 以支持逐出)。

connector.h:

// csrc/storage_backends/mybackend/connector.h
#pragma once
#include "../connector_base.h"

namespace lmcache {
namespace connector {

// Per-thread connection state
struct MyConn {
  int fd = -1;
  // your connection fields
};

class MyConnector : public ConnectorBase<MyConn> {
 public:
  MyConnector(std::string host, int port, int num_workers)
      : ConnectorBase(num_workers), host_(host), port_(port) {
    start_workers();  // IMPORTANT: call at END of constructor
  }

 protected:
  // 1. Create a connection (called once per worker thread)
  MyConn create_connection() override {
    MyConn conn;
    // connect to server...
    return conn;
  }

  // 2. GET: read value for key into buf
  void do_single_get(MyConn& conn, const std::string& key,
                     void* buf, size_t len,
                     size_t chunk_size) override {
    // send GET command, recv response into buf
  }

  // 3. SET: write data from buf under key
  void do_single_set(MyConn& conn, const std::string& key,
                     const void* buf, size_t len,
                     size_t chunk_size) override {
    // send SET command with data from buf
  }

  // 4. EXISTS: check if key exists
  bool do_single_exists(MyConn& conn,
                        const std::string& key) override {
    // send EXISTS, return true/false
  }

  // 5. DELETE: remove key (optional, has default no-op)
  bool do_single_delete(MyConn& conn,
                        const std::string& key) override {
    // send DELETE, return true if deleted, false if not found
  }

  // Optional: clean shutdown
  void shutdown_connections() override {
    // close sockets, free resources
  }

 private:
  std::string host_;
  int port_;
};

}  // namespace connector
}  // namespace lmcache

ConnectorBase 为您免费提供的内容:

  • 每个线程连接的工作线程池

  • 提交队列(无锁入队)和完成队列

  • 自动平铺:批量操作在工作节点之间拆分

  • 完成时的 eventfd 信号(内核唤醒 Python)

  • 优雅关闭(停止标志,排空,加入)

重要

始终在派生构造函数的**末尾**调用 start_workers(),在所有成员变量初始化之后。工作线程会立即调用 create_connection(),因此对象必须完全构造。

参考: csrc/storage_backends/redis/connector.hconnector.cpp

步骤 2:Pybind 模块#

使用 LMCACHE_BIND_CONNECTOR_METHODS 宏,该宏绑定所有 7 个方法(event_fdsubmit_batch_get/set/exists/deletedrain_completionsclose),并正确处理 GIL 释放和 Python 缓冲区协议。

// csrc/storage_backends/mybackend/pybind.cpp
#include <pybind11/pybind11.h>
#include "../connector_pybind_utils.h"
#include "connector.h"

namespace py = pybind11;

PYBIND11_MODULE(lmcache_mybackend, m) {
  py::class_<lmcache::connector::MyConnector>(m, "LMCacheMyBackendClient")
      .def(py::init<std::string, int, int>(),
           py::arg("host"), py::arg("port"),
           py::arg("num_workers"))
      LMCACHE_BIND_CONNECTOR_METHODS(
          lmcache::connector::MyConnector);
}

pybind 工具自动:

  • 在 GIL 下从 Python memoryview 对象中提取缓冲区指针

  • 在调用 C++ 之前释放 GIL

  • 将 C++ Completion 结构转换为 Python 元组 (future_id, ok, error, result_bools)

参考: csrc/storage_backends/redis/pybind.cpp

步骤 3:构建系统#

setup.py 中注册您的 C++ 源文件,与现有的 Redis 扩展一起:

# In cuda_extension() and rocm_extension():
mybackend_sources = [
    "csrc/storage_backends/mybackend/pybind.cpp",
    "csrc/storage_backends/mybackend/connector.cpp",
]

# Add to ext_modules list:
cpp_extension.CppExtension(
    "lmcache.lmcache_mybackend",
    sources=mybackend_sources,
    include_dirs=[
        "csrc/storage_backends",
        "csrc/storage_backends/mybackend",
    ],
    extra_compile_args={"cxx": ["-O3", "-std=c++17"]},
),

然后重建:

pip install -e .

步骤 4:Python 客户端(非 MP 模式)#

ConnectorClientBase 继承,该类提供 asyncio 事件循环集成、未来管理以及同步和异步方法。

# lmcache/v1/storage_backend/native_clients/mybackend_client.py
from .connector_client_base import ConnectorClientBase
from lmcache.lmcache_mybackend import LMCacheMyBackendClient

class MyBackendClient(ConnectorClientBase[LMCacheMyBackendClient]):
    def __init__(self, host: str, port: int,
                 num_workers: int, loop=None):
        native = LMCacheMyBackendClient(host, port, num_workers)
        super().__init__(native, loop)

这为您提供了 batch_getbatch_set``batch_exists``(异步)及其同步变体,所有这些都具有自动的基于 eventfd 的完成处理。

参考: lmcache/v1/storage_backend/native_clients/resp_client.py

步骤 5:L2 适配器(MP 模式)#

要将您的连接器用作 MP 模式下的 L2 适配器,请创建一个单独的 Python 模块,该模块定义配置类、工厂函数,并自我注册这两者。NativeConnectorL2Adapter 桥接处理所有复杂性(eventfd 解复用、键序列化、锁定)。

在 L2 适配器包中创建一个新文件:

# lmcache/v1/distributed/l2_adapters/mybackend_l2_adapter.py
from __future__ import annotations
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from lmcache.v1.distributed.internal_api import L1MemoryDesc

from lmcache.v1.distributed.l2_adapters.base import (
    L2AdapterInterface,
)
from lmcache.v1.distributed.l2_adapters.config import (
    L2AdapterConfigBase,
    register_l2_adapter_type,
)
from lmcache.v1.distributed.l2_adapters.factory import (
    register_l2_adapter_factory,
)


class MyBackendL2AdapterConfig(L2AdapterConfigBase):
    def __init__(self, host: str, port: int,
                 num_workers: int = 8,
                 max_capacity_gb: float = 0):
        self.host = host
        self.port = port
        self.num_workers = num_workers
        self.max_capacity_gb = max_capacity_gb

    @classmethod
    def from_dict(cls, d: dict) -> "MyBackendL2AdapterConfig":
        host = d.get("host")
        if not isinstance(host, str) or not host:
            raise ValueError("host must be a non-empty string")
        port = d.get("port")
        if not isinstance(port, int) or port <= 0:
            raise ValueError("port must be a positive integer")
        num_workers = d.get("num_workers", 8)
        max_capacity_gb = d.get("max_capacity_gb", 0)
        return cls(host=host, port=port,
                   num_workers=num_workers,
                   max_capacity_gb=max_capacity_gb)

    @classmethod
    def help(cls) -> str:
        return (
            "MyBackend L2 adapter config fields:\n"
            "- host (str): server hostname (required)\n"
            "- port (int): server port (required)\n"
            "- num_workers (int): worker threads (default 8)"
        )


def _create_mybackend_l2_adapter(
    config: L2AdapterConfigBase,
    l1_memory_desc: "Optional[L1MemoryDesc]" = None,
) -> L2AdapterInterface:
    from lmcache.lmcache_mybackend import LMCacheMyBackendClient
    from lmcache.v1.distributed.l2_adapters \
        .native_connector_l2_adapter import (
        NativeConnectorL2Adapter,
    )

    assert isinstance(config, MyBackendL2AdapterConfig)
    native_client = LMCacheMyBackendClient(
        config.host, config.port, config.num_workers
    )
    return NativeConnectorL2Adapter(
        native_client,
        max_capacity_gb=config.max_capacity_gb,
    )


# Self-register -- runs automatically when the module
# is imported by the L2 adapter auto-discovery mechanism
register_l2_adapter_type("mybackend", MyBackendL2AdapterConfig)
register_l2_adapter_factory("mybackend", _create_mybackend_l2_adapter)

备注

L2 适配器包使用 pkgutil.iter_modules 自动发现 lmcache/v1/distributed/l2_adapters/ 中的所有模块。仅仅创建上述文件就足够了——不需要对 __init__.py 或任何其他现有文件进行更改。

命令行使用:

lmcache server \
    --l1-size-gb 100 --eviction-policy LRU \
    --l2-adapter '{"type": "mybackend", "host": "10.0.0.1", "port": 9000}'

原生连接器 L2Adapter 如何弥合差距#

C++ 连接器有 1 个 eventfd 和混合完成。MP 模式的 L2AdapterInterface 需要 3 个独立的 eventfd 和类型化结果。桥接器对此进行了透明处理:

L2 适配器方法

本地调用

额外逻辑

submit_store_task(keys, objs)

submit_batch_set

ObjectKey 转换为字符串,MemoryObj 转换为 memoryview

submit_lookup_and_lock_task(keys)

submit_batch_exists

  • 客户端锁引用计数

submit_load_task(keys, objs)

submit_batch_get

ObjectKey 转换为字符串,MemoryObj 转换为 memoryview

submit_unlock(keys)

(无)

客户端锁减少

pop_completed_store_tasks()

通过 drain_completions

按操作类型解复用

query_lookup_and_lock_result()

通过 drain_completions

存在结果到位图,应用锁定

query_load_result()

通过 drain_completions

成功/失败到位图

一个后台解复用线程轮询本地 eventfd,调用 drain_completions(),查找每个 future_id 以确定其操作类型,将结果路由到正确的完成字典,并发出相应的 Python eventfd 信号。

第三方原生连接器插件(native_plugin#

上述步骤描述了如何在 LMCache 源代码树中**内部**添加一个原生连接器。如果您想将连接器作为**单独的、可通过 pip 安装的包**(例如,专有存储后端)进行发布,请改用 native_plugin L2 适配器类型。它在运行时动态加载您的连接器——无需修改 LMCache 源代码。

如何工作#

native_plugin 适配器类型加载一个第三方 **连接器对象**(pybind 封装的 C++ 或纯 Python),并用内置的 NativeConnectorL2Adapter 桥接器将其包装。这意味着您只需实现 6 个连接器方法——Python 端的解复用/锁定桥接逻辑是从 LMCache 重用的。

pluginnative_plugin#

方面

plugin

native_plugin

加载的内容是什么

一个完整的 L2AdapterInterface 子类

一个 连接器 对象(较低级别)

桥接逻辑

由插件本身提供

NativeConnectorL2Adapter 重用

第三方努力

必须实现所有抽象方法 + 3 个 eventfds

仅需 6 个连接器方法

必需的连接器接口#

动态加载的连接器实例必须暴露以下方法(与 pybind LMCACHE_BIND_CONNECTOR_METHODS 合同相同):

class NativeConnectorProtocol:
    def event_fd(self) -> int: ...
    def submit_batch_get(
        self,
        keys: list[str],
        memoryviews: list[memoryview],
    ) -> int: ...
    def submit_batch_set(
        self,
        keys: list[str],
        memoryviews: list[memoryview],
    ) -> int: ...
    def submit_batch_exists(
        self,
        keys: list[str],
    ) -> int: ...
    def submit_batch_delete(
        self,
        keys: list[str],
    ) -> int: ...
    def drain_completions(
        self,
    ) -> list[tuple[int, bool, str, list[bool] | None]]: ...
    def close(self) -> None: ...

工厂在创建时验证前 6 个方法,如果缺少任何方法则引发 TypeErrorsubmit_batch_delete可选 的——如果缺失,适配器的 delete() 方法将不执行任何操作(逐出不会从后端移除键)。

配置#

{
  "type": "native_plugin",
  "module_path": "my_ext_package",
  "class_name": "MyConnectorClient",
  "adapter_params": {
    "host": "localhost",
    "port": 1234
  }
}
NativePluginL2AdapterConfig 字段#

字段

类型

必需

描述

module_path

str

连接器类所在模块的点分 Python 导入路径。

class_name

str

module_path 中连接器类的名称。

adapter_params

dict

作为 **kwargs 转发给连接器类构造函数。

max_capacity_gb

float

客户端使用跟踪的最大 L2 存储容量(以 GB 为单位)。这是 L2 逐出的必要条件。默认值为 0(禁用)。

加载流程#

CLI / config JSON
  |
  v
NativePluginL2AdapterConfig.from_dict(d)
  |
  v
_create_native_plugin_l2_adapter(config, ...)
  |
  +-- importlib.import_module(config.module_path)
  +-- getattr(module, config.class_name)
  +-- connector_cls(**config.adapter_params)
  +-- validate 6 required methods
  +-- NativeConnectorL2Adapter(native_client)
          |
          v
  L2AdapterInterface instance (ready for use)

逐步:构建外部原生连接器插件#

  1. 创建一个 Python 包,其中包含一个继承自 ConnectorBase<T> 的 C++ pybind11 扩展(与内置连接器相同的基类)。

    项目布局:

    my_ext_connector/
    +-- csrc/
    |   +-- connector.h      # C++ connector class
    |   +-- connector.cpp    # C++ implementation
    |   +-- pybind.cpp       # pybind11 bindings
    +-- src/
    |   +-- my_ext_connector/
    |       +-- __init__.py   # re-export the factory class
    |       +-- connector.py  # Python factory wrapper
    +-- pyproject.toml
    +-- setup.py              # build C++ extension
    
  2. 实现 C++ 连接器,继承自 ConnectorBase<T>,并重写 4 个必需的方法(create_connectiondo_single_getdo_single_setdo_single_exists),可选地重写 do_single_delete 以支持逐出。

  3. 使用 ``LMCACHE_BIND_CONNECTOR_METHODS`` 宏创建 pybind11 绑定:

    #include <pybind11/pybind11.h>
    #include "connector_pybind_utils.h"
    #include "connector.h"
    
    namespace py = pybind11;
    
    PYBIND11_MODULE(_native, m) {
      py::class_<MyFSConnector>(m, "MyFSConnector")
          .def(py::init<std::string, int>(),
               py::arg("base_path"),
               py::arg("num_workers"))
          LMCACHE_BIND_CONNECTOR_METHODS(MyFSConnector);
    }
    
  4. 编写一个 Python 工厂类,选择后端并返回本地连接器实例:

    from my_ext_connector._native import MyFSConnector
    
    class MyConnectorClient:
        def __new__(
            cls,
            base_path: str = "/tmp/my_ext",
            num_workers: int = 2,
        ):
            return MyFSConnector(base_path, num_workers)
    
  5. 构建和安装 包:

    cd my_ext_connector
    pip install -e .
    
  6. 配置 LMCache 以使用它:

    --l2-adapter '{
      "type": "native_plugin",
      "module_path": "my_ext_connector",
      "class_name": "MyConnectorClient",
      "adapter_params": {
        "base_path": "/tmp/my_ext",
        "num_workers": 2
      }
    }'
    

参考实现#

请参阅 examples/lmc_external_native_connector/ 以获取完整的、可通过 pip 安装的示例连接器插件,该插件演示了:

  • ConnectorBase<T> 继承的 C++ pybind11 封装连接器(与内置的 Redis/FS 相同)。

  • 两个后端:文件系统(ExampleFSConnector)和内存(ExampleMemoryConnector),均为 C++ 实现。

  • 一个薄的 Python 工厂类 (ExampleNativeConnector),通过 "backend" 参数选择后端。

  • 带有 eventfd 通知的工作线程池(继承自 ConnectorBase)。

  • 通过 pip install -e . 使用 pybind11 + setuptools 构建。

清单#

在添加新的本地连接器时,请使用此检查清单:

  1. 继承 ConnectorBase<T> 的 C++ 连接器,具有 4 个必需的方法重写和 1 个可选的方法重写(do_single_delete

  2. 使用 LMCACHE_BIND_CONNECTOR_METHODS 的 Pybind 模块

  3. setup.py 中用于新的 CppExtension 的条目

  4. 继承 ConnectorClientBase 的 Python 客户端(非 MP 模式)

  5. 带有配置类和工厂自注册的 L2 适配器模块(多进程模式)

  6. 单元测试(请参见 tests/v1/distributed/test_native_connector_l2_adapter.py

  7. 使用 pip install -e . 重新构建并验证两种模式均正常工作

对于 外部 原生连接器插件 (native_plugin):

  1. 带有 C++ pybind11 扩展的独立 pip 可安装包

  2. 连接器类,暴露 6 个必需的方法(+ 可选的 submit_batch_delete 用于逐出)

  3. 后端选择的 Python 工厂类

  4. pip install -e . 并通过 --l2-adapter JSON 进行配置

  5. 单元测试(参见 examples/lmc_external_native_connector/tests/

附加资源#

  • 框架源代码:csrc/storage_backends/

  • ConnectorBase 模板: csrc/storage_backends/connector_base.h

  • IStorageConnector 接口: csrc/storage_backends/connector_interface.h

  • Pybind 工具: csrc/storage_backends/connector_pybind_utils.h

  • Redis 参考实现: csrc/storage_backends/redis/

  • 架构自述文件: csrc/storage_backends/README.md

  • 外部原生连接器示例:examples/lmc_external_native_connector/

  • 原生插件适配器源: lmcache/v1/distributed/l2_adapters/native_connector_l2_adapter.py

  • 设计文档:lmcache/v1/distributed/l2_adapters/design_docs/plugin.md

  • RESP 后端用户指南: RESP (原生 Redis/Valkey)