NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
The BufferManager is responsible for: More...
#include <BufferManager.hpp>
Public Member Functions | |
BufferManager (uint32_t bufferSize=DEFAULT_BUFFER_SIZE, uint32_t numOfBuffers=DEFAULT_NUMBER_OF_BUFFERS, std::shared_ptr< std::pmr::memory_resource > memoryResource=std::make_shared< NesDefaultMemoryAllocator >(), uint32_t withAlignment=DEFAULT_ALIGNMENT) | |
Creates a new global buffer manager. More... | |
BufferManager (const BufferManager &)=delete | |
BufferManager & | operator= (const BufferManager &)=delete |
~BufferManager () override | |
BufferManagerType | getBufferManagerType () const override |
TupleBuffer | getBufferBlocking () override |
Provides a new TupleBuffer. This blocks until a buffer is available. More... | |
std::optional< TupleBuffer > | getBufferNoBlocking () override |
Returns a new TupleBuffer wrapped in an optional or an invalid option if there is no buffer. More... | |
std::optional< TupleBuffer > | getBufferTimeout (std::chrono::milliseconds timeout_ms) override |
Returns a new Buffer wrapped in an optional or an invalid option if there is no buffer available within timeout_ms. More... | |
std::optional< TupleBuffer > | getUnpooledBuffer (size_t bufferSize) override |
Returns an unpooled buffer of size bufferSize wrapped in an optional or an invalid option if an error occurs. More... | |
size_t | getBufferSize () const override |
size_t | getNumOfPooledBuffers () const override |
size_t | getNumOfUnpooledBuffers () const override |
size_t | getAvailableBuffers () const override |
size_t | getAvailableBuffersInFixedSizePools () const |
LocalBufferPoolPtr | createLocalBufferPool (size_t numberOfReservedBuffers) override |
Create a local buffer manager that is assigned to one pipeline or thread. More... | |
FixedSizeBufferPoolPtr | createFixedSizeBufferPool (size_t numberOfReservedBuffers) override |
Create a local buffer manager that is assigned to one pipeline or thread. More... | |
void | recyclePooledBuffer (detail::MemorySegment *segment) override |
Recycle a pooled buffer by making it available to others. More... | |
void | recycleUnpooledBuffer (detail::MemorySegment *segment) override |
Recycle an unpooled buffer by making it available to others. More... | |
void | destroy () override |
this method clears all local buffers pools and remove all buffers from the global buffer manager More... | |
![]() | |
virtual | ~AbstractBufferProvider () |
Friends | |
class | TupleBuffer |
class | detail::MemorySegment |
The BufferManager is responsible for:
The reference counting mechanism of the TupleBuffer is explained in TupleBuffer.hpp
The BufferManager stores the pooled buffers as MemorySegment-s. When a component asks for a Pooled buffer, then the BufferManager retrieves an available buffer (it blocks the calling thread, if no buffer is available). It then hands out a TupleBuffer that is constructed through the pointer stored inside a MemorySegment. This is necessary because the BufferManager must keep all buffers stored to ensure that when its destructor is called, all buffers that it has ever created are deallocated. Note the BufferManager will check also that no reference counter is non-zero and will throw a fatal exception, if a component hasnt returned every buffers. This is necessary to avoid memory leaks.
Unpooled buffers are either allocated on the spot or served via a previously allocated, unpooled buffer that has been returned to the BufferManager by some component.
|
explicit |
Creates a new global buffer manager.
bufferSize | the size of each buffer in bytes |
numOfBuffers | the total number of buffers in the pool |
withAlignment | the alignment of each buffer, default is 64 so ony cache line aligned buffers, This value must be a pow of two and smaller than page size |
|
delete |
|
override |
|
overridevirtual |
Create a local buffer manager that is assigned to one pipeline or thread.
numberOfReservedBuffers | number of exclusive buffers to give to the pool |
Implements NES::Runtime::AbstractPoolProvider.
References backward::details::move(), and NES_ASSERT2_FMT.
|
overridevirtual |
Create a local buffer manager that is assigned to one pipeline or thread.
numberOfReservedBuffers | number of exclusive buffers to give to the pool |
Implements NES::Runtime::AbstractPoolProvider.
References backward::details::move(), NES_ASSERT2_FMT, and NES_DEBUG.
|
overridevirtual |
this method clears all local buffers pools and remove all buffers from the global buffer manager
Implements NES::Runtime::AbstractBufferProvider.
References NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, and NES_THROW_RUNTIME_ERROR.
Referenced by ~BufferManager().
|
overridevirtual |
Implements NES::Runtime::AbstractBufferProvider.
size_t NES::Runtime::BufferManager::getAvailableBuffersInFixedSizePools | ( | ) | const |
References NES::Runtime::FIXED, and type.
|
overridevirtual |
Provides a new TupleBuffer. This blocks until a buffer is available.
Implements NES::Runtime::AbstractBufferProvider.
References NES_THROW_RUNTIME_ERROR, NES_TRACE, and TupleBuffer.
Referenced by NES::Statistic::CountMinStatisticFormat::writeStatisticsIntoBuffers(), and NES::Statistic::HyperLogLogStatisticFormat::writeStatisticsIntoBuffers().
|
overridevirtual |
Implements NES::Runtime::AbstractBufferProvider.
References NES::Runtime::GLOBAL.
|
overridevirtual |
Returns a new TupleBuffer wrapped in an optional or an invalid option if there is no buffer.
Implements NES::Runtime::AbstractBufferProvider.
References NES_THROW_RUNTIME_ERROR, and TupleBuffer.
Referenced by NES::getBufferNoBlocking().
|
overridevirtual |
Implements NES::Runtime::AbstractBufferProvider.
|
overridevirtual |
Returns a new Buffer wrapped in an optional or an invalid option if there is no buffer available within timeout_ms.
timeout_ms | the amount of time to wait for a new buffer to be retuned |
Implements NES::Runtime::AbstractBufferProvider.
References backward::details::move(), NES_THROW_RUNTIME_ERROR, and TupleBuffer.
|
overridevirtual |
Implements NES::Runtime::AbstractBufferProvider.
|
overridevirtual |
Implements NES::Runtime::AbstractBufferProvider.
|
overridevirtual |
Returns an unpooled buffer of size bufferSize wrapped in an optional or an invalid option if an error occurs.
bufferSize |
Implements NES::Runtime::AbstractBufferProvider.
References NES::Runtime::alignBufferSize(), NES::bufferSize, backward::details::move(), NES_THROW_RUNTIME_ERROR, NES_TRACE, NES::Runtime::BufferRecycler::recycleUnpooledBuffer(), and TupleBuffer.
Referenced by NES::Runtime::MemoryLayouts::DynamicTuple::writeVarSized(), and NES::Runtime::MemoryLayouts::writeVarSizedData().
|
delete |
|
overridevirtual |
Recycle a pooled buffer by making it available to others.
buffer |
Implements NES::Runtime::BufferRecycler.
References NES_THROW_RUNTIME_ERROR.
|
overridevirtual |
Recycle an unpooled buffer by making it available to others.
buffer |
Implements NES::Runtime::BufferRecycler.
References NES_THROW_RUNTIME_ERROR.
|
friend |
|
friend |
Referenced by getBufferBlocking(), getBufferNoBlocking(), getBufferTimeout(), and getUnpooledBuffer().