添加本地连接器#
概述#
本地连接器是高性能的 C++ 存储后端,通过 pybind11 与 LMCache 集成。它们在 两种 LMCache 操作模式下均可工作:
多进程模式 (multiprocess): 通过
NativeConnectorL2Adapter(L2 适配器接口)
一次编写连接器,免费获得两种模式。
该框架位于 csrc/storage_backends/,Redis RESP 连接器作为参考实现。
架构#
Non-MP mode:
CacheEngine -> RemoteBackend -> ConnectorClientBase -> native client (C++)
(asyncio event loop)
MP mode:
StoreController / PrefetchController
|
NativeConnectorL2Adapter (Python bridge)
+-- 3 eventfds (store, lookup, load)
+-- completion demux thread
+-- ObjectKey <-> string serialization
+-- client-side lock tracking
|
native client (C++)
+-- 1 eventfd, worker threads, GIL-free I/O
设计原则:
在 pybind 层释放 GIL 以实现本地线程之间的真正并发
批处理与分块:批量请求的工作在线程之间均匀分配
基于 eventfd 的完成:内核唤醒 Python -- 无需轮询
非阻塞提交:提交队列 / 完成队列架构
步骤 1:C++ 连接器#
创建你的连接器目录(例如,csrc/storage_backends/mybackend/),并从 ConnectorBase<YourConnectionType> 继承。你需要重写 4 个必需的方法(并可选地重写 do_single_delete 以支持逐出)。
connector.h:
// csrc/storage_backends/mybackend/connector.h
#pragma once
#include "../connector_base.h"
namespace lmcache {
namespace connector {
// Per-thread connection state
struct MyConn {
int fd = -1;
// your connection fields
};
class MyConnector : public ConnectorBase<MyConn> {
public:
MyConnector(std::string host, int port, int num_workers)
: ConnectorBase(num_workers), host_(host), port_(port) {
start_workers(); // IMPORTANT: call at END of constructor
}
protected:
// 1. Create a connection (called once per worker thread)
MyConn create_connection() override {
MyConn conn;
// connect to server...
return conn;
}
// 2. GET: read value for key into buf
void do_single_get(MyConn& conn, const std::string& key,
void* buf, size_t len,
size_t chunk_size) override {
// send GET command, recv response into buf
}
// 3. SET: write data from buf under key
void do_single_set(MyConn& conn, const std::string& key,
const void* buf, size_t len,
size_t chunk_size) override {
// send SET command with data from buf
}
// 4. EXISTS: check if key exists
bool do_single_exists(MyConn& conn,
const std::string& key) override {
// send EXISTS, return true/false
}
// 5. DELETE: remove key (optional, has default no-op)
bool do_single_delete(MyConn& conn,
const std::string& key) override {
// send DELETE, return true if deleted, false if not found
}
// Optional: clean shutdown
void shutdown_connections() override {
// close sockets, free resources
}
private:
std::string host_;
int port_;
};
} // namespace connector
} // namespace lmcache
ConnectorBase 为您免费提供的内容:
每个线程连接的工作线程池
提交队列(无锁入队)和完成队列
自动平铺:批量操作在工作节点之间拆分
完成时的 eventfd 信号(内核唤醒 Python)
优雅关闭(停止标志,排空,加入)
重要
始终在派生构造函数的**末尾**调用 start_workers(),在所有成员变量初始化之后。工作线程会立即调用 create_connection(),因此对象必须完全构造。
参考: csrc/storage_backends/redis/connector.h 和 connector.cpp
步骤 2:Pybind 模块#
使用 LMCACHE_BIND_CONNECTOR_METHODS 宏,该宏绑定所有 7 个方法(event_fd、submit_batch_get/set/exists/delete、drain_completions、close),并正确处理 GIL 释放和 Python 缓冲区协议。
// csrc/storage_backends/mybackend/pybind.cpp
#include <pybind11/pybind11.h>
#include "../connector_pybind_utils.h"
#include "connector.h"
namespace py = pybind11;
PYBIND11_MODULE(lmcache_mybackend, m) {
py::class_<lmcache::connector::MyConnector>(m, "LMCacheMyBackendClient")
.def(py::init<std::string, int, int>(),
py::arg("host"), py::arg("port"),
py::arg("num_workers"))
LMCACHE_BIND_CONNECTOR_METHODS(
lmcache::connector::MyConnector);
}
pybind 工具自动:
在 GIL 下从 Python
memoryview对象中提取缓冲区指针在调用 C++ 之前释放 GIL
将 C++
Completion结构转换为 Python 元组(future_id, ok, error, result_bools)
参考: csrc/storage_backends/redis/pybind.cpp
步骤 3:构建系统#
在 setup.py 中注册您的 C++ 源文件,与现有的 Redis 扩展一起:
# In cuda_extension() and rocm_extension():
mybackend_sources = [
"csrc/storage_backends/mybackend/pybind.cpp",
"csrc/storage_backends/mybackend/connector.cpp",
]
# Add to ext_modules list:
cpp_extension.CppExtension(
"lmcache.lmcache_mybackend",
sources=mybackend_sources,
include_dirs=[
"csrc/storage_backends",
"csrc/storage_backends/mybackend",
],
extra_compile_args={"cxx": ["-O3", "-std=c++17"]},
),
然后重建:
pip install -e .
步骤 4:Python 客户端(非 MP 模式)#
从 ConnectorClientBase 继承,该类提供 asyncio 事件循环集成、未来管理以及同步和异步方法。
# lmcache/v1/storage_backend/native_clients/mybackend_client.py
from .connector_client_base import ConnectorClientBase
from lmcache.lmcache_mybackend import LMCacheMyBackendClient
class MyBackendClient(ConnectorClientBase[LMCacheMyBackendClient]):
def __init__(self, host: str, port: int,
num_workers: int, loop=None):
native = LMCacheMyBackendClient(host, port, num_workers)
super().__init__(native, loop)
这为您提供了 batch_get、batch_set、``batch_exists``(异步)及其同步变体,所有这些都具有自动的基于 eventfd 的完成处理。
参考: lmcache/v1/storage_backend/native_clients/resp_client.py
步骤 5:L2 适配器(MP 模式)#
要将您的连接器用作 MP 模式下的 L2 适配器,请创建一个单独的 Python 模块,该模块定义配置类、工厂函数,并自我注册这两者。NativeConnectorL2Adapter 桥接处理所有复杂性(eventfd 解复用、键序列化、锁定)。
在 L2 适配器包中创建一个新文件:
# lmcache/v1/distributed/l2_adapters/mybackend_l2_adapter.py
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from lmcache.v1.distributed.internal_api import L1MemoryDesc
from lmcache.v1.distributed.l2_adapters.base import (
L2AdapterInterface,
)
from lmcache.v1.distributed.l2_adapters.config import (
L2AdapterConfigBase,
register_l2_adapter_type,
)
from lmcache.v1.distributed.l2_adapters.factory import (
register_l2_adapter_factory,
)
class MyBackendL2AdapterConfig(L2AdapterConfigBase):
def __init__(self, host: str, port: int,
num_workers: int = 8,
max_capacity_gb: float = 0):
self.host = host
self.port = port
self.num_workers = num_workers
self.max_capacity_gb = max_capacity_gb
@classmethod
def from_dict(cls, d: dict) -> "MyBackendL2AdapterConfig":
host = d.get("host")
if not isinstance(host, str) or not host:
raise ValueError("host must be a non-empty string")
port = d.get("port")
if not isinstance(port, int) or port <= 0:
raise ValueError("port must be a positive integer")
num_workers = d.get("num_workers", 8)
max_capacity_gb = d.get("max_capacity_gb", 0)
return cls(host=host, port=port,
num_workers=num_workers,
max_capacity_gb=max_capacity_gb)
@classmethod
def help(cls) -> str:
return (
"MyBackend L2 adapter config fields:\n"
"- host (str): server hostname (required)\n"
"- port (int): server port (required)\n"
"- num_workers (int): worker threads (default 8)"
)
def _create_mybackend_l2_adapter(
config: L2AdapterConfigBase,
l1_memory_desc: "Optional[L1MemoryDesc]" = None,
) -> L2AdapterInterface:
from lmcache.lmcache_mybackend import LMCacheMyBackendClient
from lmcache.v1.distributed.l2_adapters \
.native_connector_l2_adapter import (
NativeConnectorL2Adapter,
)
assert isinstance(config, MyBackendL2AdapterConfig)
native_client = LMCacheMyBackendClient(
config.host, config.port, config.num_workers
)
return NativeConnectorL2Adapter(
native_client,
max_capacity_gb=config.max_capacity_gb,
)
# Self-register -- runs automatically when the module
# is imported by the L2 adapter auto-discovery mechanism
register_l2_adapter_type("mybackend", MyBackendL2AdapterConfig)
register_l2_adapter_factory("mybackend", _create_mybackend_l2_adapter)
备注
L2 适配器包使用 pkgutil.iter_modules 自动发现 lmcache/v1/distributed/l2_adapters/ 中的所有模块。仅仅创建上述文件就足够了——不需要对 __init__.py 或任何其他现有文件进行更改。
命令行使用:
lmcache server \
--l1-size-gb 100 --eviction-policy LRU \
--l2-adapter '{"type": "mybackend", "host": "10.0.0.1", "port": 9000}'
原生连接器 L2Adapter 如何弥合差距#
C++ 连接器有 1 个 eventfd 和混合完成。MP 模式的 L2AdapterInterface 需要 3 个独立的 eventfd 和类型化结果。桥接器对此进行了透明处理:
L2 适配器方法 |
本地调用 |
额外逻辑 |
|---|---|---|
|
|
ObjectKey 转换为字符串,MemoryObj 转换为 memoryview |
|
|
|
|
|
ObjectKey 转换为字符串,MemoryObj 转换为 memoryview |
|
(无) |
客户端锁减少 |
|
通过 |
按操作类型解复用 |
|
通过 |
存在结果到位图,应用锁定 |
|
通过 |
成功/失败到位图 |
一个后台解复用线程轮询本地 eventfd,调用 drain_completions(),查找每个 future_id 以确定其操作类型,将结果路由到正确的完成字典,并发出相应的 Python eventfd 信号。
第三方原生连接器插件(native_plugin)#
上述步骤描述了如何在 LMCache 源代码树中**内部**添加一个原生连接器。如果您想将连接器作为**单独的、可通过 pip 安装的包**(例如,专有存储后端)进行发布,请改用 native_plugin L2 适配器类型。它在运行时动态加载您的连接器——无需修改 LMCache 源代码。
如何工作#
native_plugin 适配器类型加载一个第三方 **连接器对象**(pybind 封装的 C++ 或纯 Python),并用内置的 NativeConnectorL2Adapter 桥接器将其包装。这意味着您只需实现 6 个连接器方法——Python 端的解复用/锁定桥接逻辑是从 LMCache 重用的。
方面 |
|
|
|---|---|---|
加载的内容是什么 |
一个完整的 |
一个 连接器 对象(较低级别) |
桥接逻辑 |
由插件本身提供 |
从 |
第三方努力 |
必须实现所有抽象方法 + 3 个 eventfds |
仅需 6 个连接器方法 |
必需的连接器接口#
动态加载的连接器实例必须暴露以下方法(与 pybind LMCACHE_BIND_CONNECTOR_METHODS 合同相同):
class NativeConnectorProtocol:
def event_fd(self) -> int: ...
def submit_batch_get(
self,
keys: list[str],
memoryviews: list[memoryview],
) -> int: ...
def submit_batch_set(
self,
keys: list[str],
memoryviews: list[memoryview],
) -> int: ...
def submit_batch_exists(
self,
keys: list[str],
) -> int: ...
def submit_batch_delete(
self,
keys: list[str],
) -> int: ...
def drain_completions(
self,
) -> list[tuple[int, bool, str, list[bool] | None]]: ...
def close(self) -> None: ...
工厂在创建时验证前 6 个方法,如果缺少任何方法则引发 TypeError。submit_batch_delete 是 可选 的——如果缺失,适配器的 delete() 方法将不执行任何操作(逐出不会从后端移除键)。
配置#
{
"type": "native_plugin",
"module_path": "my_ext_package",
"class_name": "MyConnectorClient",
"adapter_params": {
"host": "localhost",
"port": 1234
}
}
字段 |
类型 |
必需 |
描述 |
|---|---|---|---|
|
|
是 |
连接器类所在模块的点分 Python 导入路径。 |
|
|
是 |
|
|
|
不 |
作为 |
|
|
不 |
客户端使用跟踪的最大 L2 存储容量(以 GB 为单位)。这是 L2 逐出的必要条件。默认值为 0(禁用)。 |
加载流程#
CLI / config JSON
|
v
NativePluginL2AdapterConfig.from_dict(d)
|
v
_create_native_plugin_l2_adapter(config, ...)
|
+-- importlib.import_module(config.module_path)
+-- getattr(module, config.class_name)
+-- connector_cls(**config.adapter_params)
+-- validate 6 required methods
+-- NativeConnectorL2Adapter(native_client)
|
v
L2AdapterInterface instance (ready for use)
逐步:构建外部原生连接器插件#
创建一个 Python 包,其中包含一个继承自
ConnectorBase<T>的 C++ pybind11 扩展(与内置连接器相同的基类)。项目布局:
my_ext_connector/ +-- csrc/ | +-- connector.h # C++ connector class | +-- connector.cpp # C++ implementation | +-- pybind.cpp # pybind11 bindings +-- src/ | +-- my_ext_connector/ | +-- __init__.py # re-export the factory class | +-- connector.py # Python factory wrapper +-- pyproject.toml +-- setup.py # build C++ extension实现 C++ 连接器,继承自
ConnectorBase<T>,并重写 4 个必需的方法(create_connection、do_single_get、do_single_set、do_single_exists),可选地重写do_single_delete以支持逐出。使用 ``LMCACHE_BIND_CONNECTOR_METHODS`` 宏创建 pybind11 绑定:
#include <pybind11/pybind11.h> #include "connector_pybind_utils.h" #include "connector.h" namespace py = pybind11; PYBIND11_MODULE(_native, m) { py::class_<MyFSConnector>(m, "MyFSConnector") .def(py::init<std::string, int>(), py::arg("base_path"), py::arg("num_workers")) LMCACHE_BIND_CONNECTOR_METHODS(MyFSConnector); }编写一个 Python 工厂类,选择后端并返回本地连接器实例:
from my_ext_connector._native import MyFSConnector class MyConnectorClient: def __new__( cls, base_path: str = "/tmp/my_ext", num_workers: int = 2, ): return MyFSConnector(base_path, num_workers)构建和安装 包:
cd my_ext_connector pip install -e .配置 LMCache 以使用它:
--l2-adapter '{ "type": "native_plugin", "module_path": "my_ext_connector", "class_name": "MyConnectorClient", "adapter_params": { "base_path": "/tmp/my_ext", "num_workers": 2 } }'
参考实现#
请参阅 examples/lmc_external_native_connector/ 以获取完整的、可通过 pip 安装的示例连接器插件,该插件演示了:
从
ConnectorBase<T>继承的 C++ pybind11 封装连接器(与内置的 Redis/FS 相同)。两个后端:文件系统(
ExampleFSConnector)和内存(ExampleMemoryConnector),均为 C++ 实现。一个薄的 Python 工厂类 (
ExampleNativeConnector),通过"backend"参数选择后端。带有 eventfd 通知的工作线程池(继承自
ConnectorBase)。通过
pip install -e .使用 pybind11 + setuptools 构建。
清单#
在添加新的本地连接器时,请使用此检查清单:
继承
ConnectorBase<T>的 C++ 连接器,具有 4 个必需的方法重写和 1 个可选的方法重写(do_single_delete)使用
LMCACHE_BIND_CONNECTOR_METHODS的 Pybind 模块setup.py中用于新的CppExtension的条目继承
ConnectorClientBase的 Python 客户端(非 MP 模式)带有配置类和工厂自注册的 L2 适配器模块(多进程模式)
单元测试(请参见
tests/v1/distributed/test_native_connector_l2_adapter.py)使用
pip install -e .重新构建并验证两种模式均正常工作
对于 外部 原生连接器插件 (native_plugin):
带有 C++ pybind11 扩展的独立 pip 可安装包
连接器类,暴露 6 个必需的方法(+ 可选的
submit_batch_delete用于逐出)后端选择的 Python 工厂类
pip install -e .并通过--l2-adapterJSON 进行配置单元测试(参见
examples/lmc_external_native_connector/tests/)
附加资源#
框架源代码:
csrc/storage_backends/ConnectorBase模板:csrc/storage_backends/connector_base.hIStorageConnector接口:csrc/storage_backends/connector_interface.hPybind 工具:
csrc/storage_backends/connector_pybind_utils.hRedis 参考实现:
csrc/storage_backends/redis/架构自述文件:
csrc/storage_backends/README.md外部原生连接器示例:
examples/lmc_external_native_connector/原生插件适配器源:
lmcache/v1/distributed/l2_adapters/native_connector_l2_adapter.py设计文档:
lmcache/v1/distributed/l2_adapters/design_docs/plugin.mdRESP 后端用户指南: RESP (原生 Redis/Valkey)