Source code for lmcache.experimental.storage_backend.connector.infinistore_connector

import asyncio
import ctypes
from typing import List, Optional, Union, no_type_check

import infinistore

from lmcache.experimental.memory_management import (MemoryAllocatorInterface,
                                                    MemoryObj)
# reuse
from lmcache.experimental.protocol import RedisMetadata
from lmcache.experimental.storage_backend.connector.base_connector import \
    RemoteConnector
from lmcache.logging import init_logger
from lmcache.utils import CacheEngineKey

logger = init_logger(__name__)

METADATA_BYTES_LEN = 28


def _get_ptr(mv: Union[bytearray, memoryview]) -> int:
    return ctypes.addressof(ctypes.c_char.from_buffer(mv))


[docs] class InfinistoreConnector(RemoteConnector): def __init__(self, host: str, port: int, dev_name, loop: asyncio.AbstractEventLoop, memory_allocator: MemoryAllocatorInterface): config = infinistore.ClientConfig( host_addr=host, service_port=port, log_level="info", connection_type=infinistore.TYPE_RDMA, ib_port=1, link_type=infinistore.LINK_ETHERNET, dev_name=dev_name, ) self.rdma_conn = infinistore.InfinityConnection(config) self.memory_allocator = memory_allocator self.loop = loop self.rdma_conn.connect() # allocate 4KB buffer for RDMA read self.buffer_size = 4 << 10 self.buffer = bytearray(self.buffer_size) self.rdma_conn.register_mr(_get_ptr(self.buffer), self.buffer_size)
[docs] async def exists(self, key: CacheEngineKey) -> bool: def blocking_io(): return self.rdma_conn.check_exist(key.to_string() + "metadata") return await self.loop.run_in_executor(None, blocking_io)
[docs] async def get(self, key: CacheEngineKey) -> Optional[MemoryObj]: key_str = key.to_string() try: await self.rdma_conn.read_cache_single_async( key_str + "metadata", _get_ptr(self.buffer), len(self.buffer)) except infinistore.lib.InfiniStoreKeyNotFound: return None metadata = RedisMetadata.deserialize(self.buffer[:METADATA_BYTES_LEN]) memory_obj = self.memory_allocator.allocate( metadata.shape, metadata.dtype, metadata.fmt, ) if memory_obj is None: logger.warning("Failed to allocate memory during remote receive") return None # TODO: we could have memory allocator which pre-allocate # and register RDMA memory. # register memory is a heavy operation, so we should avoid it. kv_bytes = bytes(memory_obj.get_size()) pointer = ctypes.cast(ctypes.c_char_p(kv_bytes), ctypes.POINTER(ctypes.c_char)) ptr = ctypes.addressof(pointer.contents) size = memory_obj.get_size() await self.loop.run_in_executor(None, self.rdma_conn.register_mr, ptr, size) try: await self.rdma_conn.read_cache_single_async( key_str + "kv_bytes", ptr, size) except infinistore.lib.InfiniStoreKeyNotFound: return None view = memoryview(memory_obj.byte_array) view[:metadata.length] = kv_bytes return memory_obj
[docs] async def put(self, key: CacheEngineKey, memory_obj: MemoryObj): # TODO(Jiayi): The following code is ugly. # Please use a function like `memory_obj.to_meta()`. kv_bytes = memory_obj.byte_array kv_shape = memory_obj.get_shape() kv_dtype = memory_obj.get_dtype() memory_format = memory_obj.get_memory_format() metadata_bytes = RedisMetadata(len(kv_bytes), kv_shape, kv_dtype, memory_format).serialize() # not likely to happen assert len(metadata_bytes ) <= self.buffer_size, "metadata size exceeds buffer size" # copy metadata to self.buffer self.buffer[:len(metadata_bytes)] = metadata_bytes await self.rdma_conn.rdma_write_cache_single_async( key.to_string() + "metadata", _get_ptr(self.buffer), len(self.buffer)) pointer = ctypes.cast(ctypes.c_char_p(memory_obj.byte_array), ctypes.POINTER(ctypes.c_char)) ptr = ctypes.addressof(pointer.contents) size = memory_obj.get_size() await self.loop.run_in_executor(None, self.rdma_conn.register_mr, ptr, size) await self.rdma_conn.rdma_write_cache_single_async( key.to_string() + "kv_bytes", ptr, size) self.memory_allocator.ref_count_down(memory_obj)
# TODO
[docs] @no_type_check async def list(self) -> List[str]: pass
[docs] async def close(self): self.rdma_conn.close() logger.info("Closed the infinistore connection")