NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Runtime::BufferManager Class Reference

The BufferManager is responsible for: More...

#include <BufferManager.hpp>

Collaboration diagram for NES::Runtime::BufferManager:
[legend]

Public Member Functions

 BufferManager (uint32_t bufferSize=DEFAULT_BUFFER_SIZE, uint32_t numOfBuffers=DEFAULT_NUMBER_OF_BUFFERS, std::shared_ptr< std::pmr::memory_resource > memoryResource=std::make_shared< NesDefaultMemoryAllocator >(), uint32_t withAlignment=DEFAULT_ALIGNMENT)
 Creates a new global buffer manager. More...
 
 BufferManager (const BufferManager &)=delete
 
BufferManageroperator= (const BufferManager &)=delete
 
 ~BufferManager () override
 
BufferManagerType getBufferManagerType () const override
 
TupleBuffer getBufferBlocking () override
 Provides a new TupleBuffer. This blocks until a buffer is available. More...
 
std::optional< TupleBuffergetBufferNoBlocking () override
 Returns a new TupleBuffer wrapped in an optional or an invalid option if there is no buffer. More...
 
std::optional< TupleBuffergetBufferTimeout (std::chrono::milliseconds timeout_ms) override
 Returns a new Buffer wrapped in an optional or an invalid option if there is no buffer available within timeout_ms. More...
 
std::optional< TupleBuffergetUnpooledBuffer (size_t bufferSize) override
 Returns an unpooled buffer of size bufferSize wrapped in an optional or an invalid option if an error occurs. More...
 
size_t getBufferSize () const override
 
size_t getNumOfPooledBuffers () const override
 
size_t getNumOfUnpooledBuffers () const override
 
size_t getAvailableBuffers () const override
 
size_t getAvailableBuffersInFixedSizePools () const
 
LocalBufferPoolPtr createLocalBufferPool (size_t numberOfReservedBuffers) override
 Create a local buffer manager that is assigned to one pipeline or thread. More...
 
FixedSizeBufferPoolPtr createFixedSizeBufferPool (size_t numberOfReservedBuffers) override
 Create a local buffer manager that is assigned to one pipeline or thread. More...
 
void recyclePooledBuffer (detail::MemorySegment *segment) override
 Recycle a pooled buffer by making it available to others. More...
 
void recycleUnpooledBuffer (detail::MemorySegment *segment) override
 Recycle an unpooled buffer by making it available to others. More...
 
void destroy () override
 this method clears all local buffers pools and remove all buffers from the global buffer manager More...
 
- Public Member Functions inherited from NES::Runtime::AbstractBufferProvider
virtual ~AbstractBufferProvider ()
 

Friends

class TupleBuffer
 
class detail::MemorySegment
 

Detailed Description

The BufferManager is responsible for:

  1. Pooled Buffers: preallocated fixed-size buffers of memory that must be reference counted
  2. Unpooled Buffers: variable sized buffers that are allocated on-the-fly. They are also subject to reference counting.

The reference counting mechanism of the TupleBuffer is explained in TupleBuffer.hpp

The BufferManager stores the pooled buffers as MemorySegment-s. When a component asks for a Pooled buffer, then the BufferManager retrieves an available buffer (it blocks the calling thread, if no buffer is available). It then hands out a TupleBuffer that is constructed through the pointer stored inside a MemorySegment. This is necessary because the BufferManager must keep all buffers stored to ensure that when its destructor is called, all buffers that it has ever created are deallocated. Note the BufferManager will check also that no reference counter is non-zero and will throw a fatal exception, if a component hasnt returned every buffers. This is necessary to avoid memory leaks.

Unpooled buffers are either allocated on the spot or served via a previously allocated, unpooled buffer that has been returned to the BufferManager by some component.

Constructor & Destructor Documentation

◆ BufferManager() [1/2]

NES::Runtime::BufferManager::BufferManager ( uint32_t  bufferSize = DEFAULT_BUFFER_SIZE,
uint32_t  numOfBuffers = DEFAULT_NUMBER_OF_BUFFERS,
std::shared_ptr< std::pmr::memory_resource >  memoryResource = std::make_shared<NesDefaultMemoryAllocator>(),
uint32_t  withAlignment = DEFAULT_ALIGNMENT 
)
explicit

Creates a new global buffer manager.

Parameters
bufferSizethe size of each buffer in bytes
numOfBuffersthe total number of buffers in the pool
withAlignmentthe alignment of each buffer, default is 64 so ony cache line aligned buffers, This value must be a pow of two and smaller than page size

◆ BufferManager() [2/2]

NES::Runtime::BufferManager::BufferManager ( const BufferManager )
delete

◆ ~BufferManager()

NES::Runtime::BufferManager::~BufferManager ( )
override

References destroy().

Here is the call graph for this function:

Member Function Documentation

◆ createFixedSizeBufferPool()

FixedSizeBufferPoolPtr NES::Runtime::BufferManager::createFixedSizeBufferPool ( size_t  numberOfReservedBuffers)
overridevirtual

Create a local buffer manager that is assigned to one pipeline or thread.

Parameters
numberOfReservedBuffersnumber of exclusive buffers to give to the pool
Returns
a local buffer manager with numberOfReservedBuffers exclusive buffer

Implements NES::Runtime::AbstractPoolProvider.

References backward::details::move(), and NES_ASSERT2_FMT.

Here is the call graph for this function:

◆ createLocalBufferPool()

LocalBufferPoolPtr NES::Runtime::BufferManager::createLocalBufferPool ( size_t  numberOfReservedBuffers)
overridevirtual

Create a local buffer manager that is assigned to one pipeline or thread.

Parameters
numberOfReservedBuffersnumber of exclusive buffers to give to the pool
Returns
a local buffer manager with numberOfReservedBuffers exclusive buffer

Implements NES::Runtime::AbstractPoolProvider.

References backward::details::move(), NES_ASSERT2_FMT, and NES_DEBUG.

Here is the call graph for this function:

◆ destroy()

void NES::Runtime::BufferManager::destroy ( )
overridevirtual

this method clears all local buffers pools and remove all buffers from the global buffer manager

Implements NES::Runtime::AbstractBufferProvider.

References NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, and NES_THROW_RUNTIME_ERROR.

Referenced by ~BufferManager().

Here is the caller graph for this function:

◆ getAvailableBuffers()

size_t NES::Runtime::BufferManager::getAvailableBuffers ( ) const
overridevirtual
Returns
Number of available buffers in the pool

Implements NES::Runtime::AbstractBufferProvider.

◆ getAvailableBuffersInFixedSizePools()

size_t NES::Runtime::BufferManager::getAvailableBuffersInFixedSizePools ( ) const
Returns
Number of available buffers in the fixed size pool

References NES::Runtime::FIXED, and type.

◆ getBufferBlocking()

TupleBuffer NES::Runtime::BufferManager::getBufferBlocking ( )
overridevirtual

Provides a new TupleBuffer. This blocks until a buffer is available.

Returns
a new buffer

Implements NES::Runtime::AbstractBufferProvider.

References NES_THROW_RUNTIME_ERROR, NES_TRACE, and TupleBuffer.

Referenced by NES::Statistic::CountMinStatisticFormat::writeStatisticsIntoBuffers(), and NES::Statistic::HyperLogLogStatisticFormat::writeStatisticsIntoBuffers().

Here is the caller graph for this function:

◆ getBufferManagerType()

BufferManagerType NES::Runtime::BufferManager::getBufferManagerType ( ) const
overridevirtual

◆ getBufferNoBlocking()

std::optional< TupleBuffer > NES::Runtime::BufferManager::getBufferNoBlocking ( )
overridevirtual

Returns a new TupleBuffer wrapped in an optional or an invalid option if there is no buffer.

Returns
a new buffer

Implements NES::Runtime::AbstractBufferProvider.

References NES_THROW_RUNTIME_ERROR, and TupleBuffer.

Referenced by NES::getBufferNoBlocking().

Here is the caller graph for this function:

◆ getBufferSize()

size_t NES::Runtime::BufferManager::getBufferSize ( ) const
overridevirtual
Returns
Configured size of the buffers

Implements NES::Runtime::AbstractBufferProvider.

◆ getBufferTimeout()

std::optional< TupleBuffer > NES::Runtime::BufferManager::getBufferTimeout ( std::chrono::milliseconds  timeout_ms)
overridevirtual

Returns a new Buffer wrapped in an optional or an invalid option if there is no buffer available within timeout_ms.

Parameters
timeout_msthe amount of time to wait for a new buffer to be retuned
Returns
a new buffer

Implements NES::Runtime::AbstractBufferProvider.

References backward::details::move(), NES_THROW_RUNTIME_ERROR, and TupleBuffer.

Here is the call graph for this function:

◆ getNumOfPooledBuffers()

size_t NES::Runtime::BufferManager::getNumOfPooledBuffers ( ) const
overridevirtual
Returns
Number of total buffers in the pool

Implements NES::Runtime::AbstractBufferProvider.

◆ getNumOfUnpooledBuffers()

size_t NES::Runtime::BufferManager::getNumOfUnpooledBuffers ( ) const
overridevirtual
Returns
number of unpooled buffers

Implements NES::Runtime::AbstractBufferProvider.

◆ getUnpooledBuffer()

std::optional< TupleBuffer > NES::Runtime::BufferManager::getUnpooledBuffer ( size_t  bufferSize)
overridevirtual

Returns an unpooled buffer of size bufferSize wrapped in an optional or an invalid option if an error occurs.

Parameters
bufferSize
Returns
a new buffer

Implements NES::Runtime::AbstractBufferProvider.

References NES::Runtime::alignBufferSize(), NES::bufferSize, backward::details::move(), NES_THROW_RUNTIME_ERROR, NES_TRACE, NES::Runtime::BufferRecycler::recycleUnpooledBuffer(), and TupleBuffer.

Referenced by NES::Runtime::MemoryLayouts::DynamicTuple::writeVarSized(), and NES::Runtime::MemoryLayouts::writeVarSizedData().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ operator=()

BufferManager& NES::Runtime::BufferManager::operator= ( const BufferManager )
delete

◆ recyclePooledBuffer()

void NES::Runtime::BufferManager::recyclePooledBuffer ( detail::MemorySegment segment)
overridevirtual

Recycle a pooled buffer by making it available to others.

Parameters
buffer

Implements NES::Runtime::BufferRecycler.

References NES_THROW_RUNTIME_ERROR.

◆ recycleUnpooledBuffer()

void NES::Runtime::BufferManager::recycleUnpooledBuffer ( detail::MemorySegment segment)
overridevirtual

Recycle an unpooled buffer by making it available to others.

Parameters
buffer

Implements NES::Runtime::BufferRecycler.

References NES_THROW_RUNTIME_ERROR.

Friends And Related Function Documentation

◆ detail::MemorySegment

friend class detail::MemorySegment
friend

◆ TupleBuffer


The documentation for this class was generated from the following files: