NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
The TupleBuffer allows Runtime components to access memory to store records in a reference-counted and thread-safe manner. More...
#include <TupleBuffer.hpp>
Public Types | |
using | NestedTupleBufferKey = uint32_t |
This is the logical identifier of a child tuple buffer. More... | |
Public Member Functions | |
constexpr | TupleBuffer () noexcept=default |
Default constructor creates an empty wrapper around nullptr without controlBlock (nullptr) and size 0. More... | |
constexpr | TupleBuffer (TupleBuffer const &other) noexcept |
Copy constructor: Increase the reference count associated to the control buffer. More... | |
constexpr | TupleBuffer (TupleBuffer &&other) noexcept |
Move constructor: Steal the resources from other . This does not affect the reference count. @dev In this constructor, other is cleared, because otherwise its destructor would release its old memory. More... | |
TupleBuffer & | operator= (TupleBuffer const &other) noexcept |
Assign the other resource to this TupleBuffer; increase and decrease reference count if necessary. More... | |
TupleBuffer & | operator= (TupleBuffer &&other) noexcept |
Assign the other resource to this TupleBuffer; Might release the resource this currently points to. More... | |
TupleBuffer * | operator& ()=delete |
Delete address-of operator to make it harder to circumvent reference counting mechanism with an l-value. More... | |
constexpr auto | operator! () const noexcept -> bool |
Return if this is not valid. More... | |
~TupleBuffer () noexcept | |
release the resource if necessary. More... | |
TupleBuffer & | retain () noexcept |
Increases the internal reference counter by one and return this. More... | |
void | release () noexcept |
Decrease internal reference counter by one and release the resource when the reference count reaches 0. More... | |
uint8_t * | getBuffer () noexcept |
template<typename T = uint8_t> | |
T * | getBuffer () noexcept |
return the TupleBuffer's content as pointer to T . More... | |
template<typename T = uint8_t> | |
const T * | getBuffer () const noexcept |
return the TupleBuffer's content as pointer to T . More... | |
uint32_t | getReferenceCounter () const noexcept |
uint64_t | getBufferSize () const noexcept |
get the buffer's size. More... | |
constexpr uint64_t | getNumberOfTuples () const noexcept |
get the number of tuples stored. More... | |
void | setNumberOfTuples (uint64_t numberOfTuples) noexcept |
set the number of tuples stored. More... | |
constexpr uint64_t | getWatermark () const noexcept |
get the watermark as a timestamp More... | |
void | setWatermark (uint64_t value) noexcept |
set the watermark from a timestamp More... | |
constexpr uint64_t | getCreationTimestampInMS () const noexcept |
get the creation timestamp in milliseconds More... | |
void | setSequenceNumber (uint64_t sequenceNumber) noexcept |
set the sequence number More... | |
void | setSequenceData (SequenceData sequenceData) noexcept |
set the sequence data, i.e., sequenceNumber, chunkNumber, and lastChunk More... | |
SequenceData | getSequenceData () const noexcept |
gets the sequence data from this buffer More... | |
constexpr uint64_t | getSequenceNumber () const noexcept |
get the sequence number More... | |
void | setChunkNumber (uint64_t chunkNumber) noexcept |
set the sequence number More... | |
constexpr uint64_t | getChunkNumber () const noexcept |
get the chunk number More... | |
void | setLastChunk (bool isLastChunk) noexcept |
set if this is the last chunk of a sequence number More... | |
constexpr bool | isLastChunk () const noexcept |
retrieves if this is the last chunk More... | |
void | setCreationTimestampInMS (uint64_t value) noexcept |
set the creation timestamp in milliseconds More... | |
constexpr OriginId | getOriginId () const noexcept |
get the buffer's origin id (the operator id that creates this buffer). More... | |
constexpr StatisticId | getStatisticId () const noexcept |
get the buffer's statistic id (where it was last touched). More... | |
void | setOriginId (OriginId id) noexcept |
set the buffer's origin id (the operator id that creates this buffer). More... | |
void | setStatisticId (StatisticId statisticId) noexcept |
set the buffer's statistic id (where it was last touched). More... | |
void | addRecycleCallback (std::function< void(detail::MemorySegment *, BufferRecycler *)> newCallback) noexcept |
set the buffer's recycle callback. More... | |
NestedTupleBufferKey | storeChildBuffer (TupleBuffer &buffer) const noexcept |
attach a child tuple buffer to the parent. the child tuple buffer is then identified via NestedTupleBufferKey More... | |
TupleBuffer | loadChildBuffer (NestedTupleBufferKey bufferIndex) const noexcept |
retrieve a child tuple buffer via its NestedTupleBufferKey More... | |
constexpr uint32_t | getNumberOfChildrenBuffer () const noexcept |
bool | hasSpaceLeft (uint64_t used, uint64_t needed) const |
returns true if there is enought space to write More... | |
Static Public Member Functions | |
static TupleBuffer | reinterpretAsTupleBuffer (void *bufferPointer) |
Interprets the void* as a pointer to the content of tuple buffer. More... | |
static TupleBuffer | wrapMemory (uint8_t *ptr, size_t length, BufferRecycler *parent) |
Creates a TupleBuffer of length bytes starting at ptr address. More... | |
static TupleBuffer | wrapMemory (uint8_t *ptr, size_t length, std::function< void(detail::MemorySegment *segment, BufferRecycler *recycler)> &&recycler) |
template<class T > | |
static TupleBuffer | wrapPtr (std::unique_ptr< T > object) |
Static Public Attributes | |
constexpr static uint64_t | INITIAL_SEQUENCE_NUMBER = 1 |
The initial sequence number of a stream of tuple buffers is 1. More... | |
constexpr static uint64_t | INITIAL_CHUNK_NUMBER = 1 |
When the content of a tuple buffer outgrows the buffer size tuple buffers are chunked to preserve the order of sequencenumbers (or prevent duplicated). The initial chunk number is 1. More... | |
Friends | |
class | BufferManager |
Utilize the wrapped-memory constructor. More... | |
class | FixedSizeBufferPool |
class | LocalBufferPool |
class | detail::MemorySegment |
template<typename T > | |
class | NES::Network::detail::NetworkDataSender |
Utilize the wrapped-memory constructor and requires direct access to the control block for the ZMQ sink. More... | |
template<typename T > | |
class | NES::Network::detail::NetworkEventSender |
void | swap (TupleBuffer &lhs, TupleBuffer &rhs) noexcept |
Swap lhs and rhs . @dev Accessible via ADL in an unqualified call. More... | |
std::ostream & | operator<< (std::ostream &os, const TupleBuffer &buff) noexcept |
Print the buffer's address. @dev TODO: consider changing the reinterpret_cast to std::bit_cast in C++2a if possible. More... | |
The TupleBuffer allows Runtime components to access memory to store records in a reference-counted and thread-safe manner.
The purpose of the TupleBuffer is to zero memory allocations and enable batching. In order to zero the memory allocation, a BufferManager keeps a fixed set of fixed-size buffers that it hands out to components. A TupleBuffer's content is automatically recycled or deleted once its reference count reaches zero.
Prefer passing the TupleBuffer by reference whenever possible, pass the TupleBuffer to another thread by value.
Important note: when a component is done with a TupleBuffer, it must be released. Not returning a TupleBuffer will result in a Runtime error that the BufferManager will raise by the termination of the NES program.
A TupleBuffer may store one or more child/nested TupleBuffer. As soon as a TupleBuffer is attached to a parent, it loses ownership of its internal MemorySegment, whose lifecycle is linked to the lifecycle of the parent. This means that when the parent TupleBuffer goes out of scope, no child TupleBuffer must be alive in the program. If that occurs, an error is raised.
Reminder: this class should be header-only to help inlining
using NES::Runtime::TupleBuffer::NestedTupleBufferKey = uint32_t |
This is the logical identifier of a child tuple buffer.
|
constexprdefaultnoexcept |
Default constructor creates an empty wrapper around nullptr without controlBlock (nullptr) and size 0.
Referenced by reinterpretAsTupleBuffer(), and wrapMemory().
|
inlineconstexprnoexcept |
Copy constructor: Increase the reference count associated to the control buffer.
References NES::Runtime::detail::BufferControlBlock::retain().
|
inlineconstexprnoexcept |
Move constructor: Steal the resources from other
. This does not affect the reference count. @dev In this constructor, other
is cleared, because otherwise its destructor would release its old memory.
|
inlinenoexcept |
|
inlinenoexcept |
set the buffer's recycle callback.
References NES::Runtime::detail::BufferControlBlock::addRecycleCallback(), and backward::details::move().
|
inlinenoexcept |
return the TupleBuffer's content as pointer to T
.
|
inlinenoexcept |
Referenced by NES::Runtime::allocateWithin(), NES::Runtime::MemoryLayouts::ColumnLayoutField< T, boundaryChecks >::create(), NES::Runtime::MemoryLayouts::RowLayoutField< T, boundaryChecks >::create(), NES::createSimpleInputStream(), NES::TestUtils::createVecFromTupleBuffer(), NES::Runtime::CustomEventWrapper::data(), MultifieldGPUPipelineStage::execute(), ColumnLayoutGPUPipelineStage::execute(), WindowedAggregationGPUPipelineStage::execute(), NES::MockedExecutablePipeline::execute(), NES::Runtime::detail::ReconfigurationEntryPointPipelineStage::execute(), NES::Runtime::Execution::ExecutablePipeline::execute(), NES::TextExecutablePipeline::execute(), NES::BinarySource::fillBuffer(), NES::NesFormat::getFormattedBuffer(), NES::DataGeneratorMultiKey::getSource(), NES::DataGenerator::getSource(), NES::DataGeneratorMultiValue::getSource(), NES::BenchmarkSource::open(), NES::Runtime::MemoryLayouts::DynamicTuple::operator[](), NES::Util::printTupleBufferAsCSV(), NES::Util::printTupleBufferAsText(), NES::Statistic::CountMinStatisticFormat::readStatisticsFromBuffer(), NES::Statistic::HyperLogLogStatisticFormat::readStatisticsFromBuffer(), NES::Runtime::MemoryLayouts::readVarSizedData(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), NES::TEST_F(), NES::Network::TEST_F(), NES::Runtime::Task::toString(), NES::Network::TestSink::writeData(), NES::CollectTestSink< Type >::writeData(), NES::MonitoringSink::writeData(), NES::RawBufferSink::writeData(), and NES::NESBinaryParser::writeInputTupleToTupleBuffer().
|
inlinenoexcept |
return the TupleBuffer's content as pointer to T
.
|
inlinenoexcept |
get the buffer's size.
References size().
Referenced by NES::Runtime::MemoryLayouts::TestTupleBuffer::createTestTupleBuffer(), ColumnLayoutGPUPipelineStage::execute(), NES::CSVSource::fillBuffer(), NES::BinarySource::fillBuffer(), NES::Runtime::MemoryLayouts::TestTupleBuffer::pushRecordToBufferAtIndex(), NES::Monitoring::CpuMetrics::readFromBuffer(), NES::Monitoring::DiskMetrics::readFromBuffer(), NES::Monitoring::MemoryMetrics::readFromBuffer(), NES::Monitoring::NetworkMetrics::readFromBuffer(), NES::Monitoring::RegistrationMetrics::readFromBuffer(), NES::Monitoring::RuntimeMetrics::readFromBuffer(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), NES::Network::TEST_F(), NES::Runtime::MemoryLayouts::TestTupleBuffer::TestTupleBuffer(), NES::Runtime::Task::toString(), NES::Network::TestSink::writeData(), NES::RawBufferSink::writeData(), NES::ZmqSink::writeData(), NES::NESBinaryParser::writeInputTupleToTupleBuffer(), NES::Monitoring::CpuMetrics::writeToBuffer(), NES::Monitoring::DiskMetrics::writeToBuffer(), NES::Monitoring::MemoryMetrics::writeToBuffer(), NES::Monitoring::NetworkMetrics::writeToBuffer(), NES::Monitoring::RegistrationMetrics::writeToBuffer(), NES::Monitoring::RuntimeMetrics::writeToBuffer(), NES::Monitoring::CpuMetricsWrapper::writeToBuffer(), and NES::Monitoring::NetworkMetricsWrapper::writeToBuffer().
|
inlineconstexprnoexcept |
get the chunk number
References NES::Runtime::detail::BufferControlBlock::getChunkNumber().
Referenced by NES::DataSource::emitWork(), getSequenceData(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), and NES::Runtime::AbstractQueryManager::updateStatistics().
|
inlineconstexprnoexcept |
get the creation timestamp in milliseconds
References NES::Runtime::detail::BufferControlBlock::getCreationTimestamp().
Referenced by NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), and NES::Runtime::AbstractQueryManager::updateStatistics().
|
inlineconstexprnoexcept |
References NES::Runtime::detail::BufferControlBlock::getNumberOfChildrenBuffer().
Referenced by NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer().
|
inlineconstexprnoexcept |
get the number of tuples stored.
References NES::Runtime::detail::BufferControlBlock::getNumberOfTuples().
Referenced by NES::TestUtils::createVecFromTupleBuffer(), NES::FormatIterator::end(), MultifieldGPUPipelineStage::execute(), ColumnLayoutGPUPipelineStage::execute(), WindowedAggregationGPUPipelineStage::execute(), NES::MockedExecutablePipeline::execute(), NES::TextExecutablePipeline::execute(), NES::NesFormat::getFormattedBuffer(), NES::Runtime::Task::getNumberOfTuples(), NES::Runtime::MemoryLayouts::TestTupleBuffer::getNumberOfTuples(), NES::Util::printTupleBufferAsCSV(), NES::Util::printTupleBufferAsText(), NES::Runtime::MemoryLayouts::TestTupleBuffer::pushRecordToBufferAtIndex(), NES::Monitoring::CpuMetricsWrapper::readFromBuffer(), NES::Monitoring::NetworkMetricsWrapper::readFromBuffer(), NES::Statistic::CountMinStatisticFormat::readStatisticsFromBuffer(), NES::Statistic::HyperLogLogStatisticFormat::readStatisticsFromBuffer(), NES::Runtime::MemoryLayouts::TestTupleBuffer::requires(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), NES::Runtime::Task::Task(), NES::Network::TEST_F(), NES::Runtime::Task::toString(), NES::Network::TestSink::writeData(), NES::CollectTestSink< Type >::writeData(), NES::MonitoringSink::writeData(), NES::RawBufferSink::writeData(), NES::ZmqSink::writeData(), NES::Monitoring::CpuMetrics::writeToBuffer(), NES::Monitoring::DiskMetrics::writeToBuffer(), NES::Monitoring::MemoryMetrics::writeToBuffer(), NES::Monitoring::NetworkMetrics::writeToBuffer(), NES::Monitoring::RegistrationMetrics::writeToBuffer(), and NES::Monitoring::RuntimeMetrics::writeToBuffer().
|
inlineconstexprnoexcept |
get the buffer's origin id (the operator id that creates this buffer).
References NES::Runtime::detail::BufferControlBlock::getOriginId().
Referenced by NES::DataSource::emitWork(), NES::Runtime::Execution::ExecutablePipeline::execute(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), NES::Runtime::Task::toString(), NES::Runtime::AbstractQueryManager::updateStatistics(), and NES::Network::NetworkSink::writeData().
|
inlinenoexcept |
References NES::Runtime::detail::BufferControlBlock::getReferenceCount().
|
inlinenoexcept |
gets the sequence data from this buffer
References getChunkNumber(), getSequenceNumber(), and isLastChunk().
|
inlineconstexprnoexcept |
get the sequence number
References NES::Runtime::detail::BufferControlBlock::getSequenceNumber().
Referenced by NES::DataSource::emitWork(), getSequenceData(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), and NES::Network::NetworkSink::writeData().
|
inlineconstexprnoexcept |
get the buffer's statistic id (where it was last touched).
References NES::Runtime::detail::BufferControlBlock::getStatisticId().
Referenced by NES::DataSource::emitWork().
|
inlineconstexprnoexcept |
get the watermark as a timestamp
References NES::Runtime::detail::BufferControlBlock::getWatermark().
Referenced by NES::Runtime::BufferSorter::operator()(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), NES::Runtime::Task::toString(), and NES::ZmqSink::writeData().
bool NES::Runtime::TupleBuffer::hasSpaceLeft | ( | uint64_t | used, |
uint64_t | needed | ||
) | const |
|
inlineconstexprnoexcept |
retrieves if this is the last chunk
References NES::Runtime::detail::BufferControlBlock::isLastChunk().
Referenced by NES::DataSource::emitWork(), getSequenceData(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), and setLastChunk().
|
noexcept |
retrieve a child tuple buffer via its NestedTupleBufferKey
References NES_ASSERT.
Referenced by NES::Runtime::MemoryLayouts::readVarSizedData(), NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer(), and NES::Network::TEST_F().
|
inlineconstexprnoexcept |
Return if this is not valid.
|
delete |
Delete address-of operator to make it harder to circumvent reference counting mechanism with an l-value.
|
inlinenoexcept |
Assign the other
resource to this TupleBuffer; Might release the resource this currently points to.
References PLACEHOLDER_UNLIKELY, std::swap(), and swap.
|
inlinenoexcept |
Assign the other
resource to this TupleBuffer; increase and decrease reference count if necessary.
References PLACEHOLDER_UNLIKELY, retain(), and size().
|
static |
Interprets the void* as a pointer to the content of tuple buffer.
bufferPointer |
References NES::Runtime::alignBufferSize(), NES::Runtime::detail::BufferControlBlock::getOwner(), and TupleBuffer().
|
inlinenoexcept |
Decrease internal reference counter by one and release the resource when the reference count reaches 0.
References NES::Runtime::detail::BufferControlBlock::release(), and size().
Referenced by NES::BenchmarkSource::close(), NES::BenchmarkSource::recyclePooledBuffer(), NES::BenchmarkSource::recycleUnpooledBuffer(), wrapPtr(), NES::BenchmarkSource::~BenchmarkSource(), and ~TupleBuffer().
|
inlinenoexcept |
Increases the internal reference counter by one and return this.
References NES::Runtime::detail::BufferControlBlock::retain().
Referenced by operator=(), and NES::Network::detail::NetworkDataSender< BaseChannelType >::sendBuffer().
|
inlinenoexcept |
set the sequence number
References NES::Runtime::detail::BufferControlBlock::setChunkNumber().
Referenced by NES::DataSource::emitWork(), and setSequenceData().
|
inlinenoexcept |
set the creation timestamp in milliseconds
References NES::Runtime::detail::BufferControlBlock::setCreationTimestamp(), and magic_enum::detail::value().
Referenced by NES::DataSource::emitWork().
|
inlinenoexcept |
set if this is the last chunk of a sequence number
References isLastChunk(), and NES::Runtime::detail::BufferControlBlock::setLastChunk().
Referenced by NES::DataSource::emitWork(), and setSequenceData().
|
inlinenoexcept |
set the number of tuples stored.
References NES::Runtime::detail::BufferControlBlock::setNumberOfTuples().
Referenced by NES::Runtime::allocateWithin(), NES::createSimpleInputStream(), NES::StatisticsIntegrationTest::createWorker(), NES::MockedExecutablePipeline::execute(), NES::TextExecutablePipeline::execute(), NES::BinarySource::fillBuffer(), NES::Network::fillBuffer(), fillBufferColumnLayout(), fillBufferToMultiFieldSchema(), fillBufferToSimpleSchema(), fillBufferToWindowSchema(), NES::DataGenerator::getSource(), NES::DataGeneratorMultiValue::getSource(), NES::Runtime::MemoryLayouts::TestTupleBuffer::setNumberOfTuples(), NES::NESBinaryParser::writeInputTupleToTupleBuffer(), NES::Statistic::CountMinStatisticFormat::writeStatisticsIntoBuffers(), NES::Statistic::HyperLogLogStatisticFormat::writeStatisticsIntoBuffers(), NES::Monitoring::CpuMetrics::writeToBuffer(), NES::Monitoring::DiskMetrics::writeToBuffer(), NES::Monitoring::MemoryMetrics::writeToBuffer(), NES::Monitoring::NetworkMetrics::writeToBuffer(), NES::Monitoring::RegistrationMetrics::writeToBuffer(), and NES::Monitoring::RuntimeMetrics::writeToBuffer().
|
inlinenoexcept |
set the buffer's origin id (the operator id that creates this buffer).
References NES::Runtime::detail::BufferControlBlock::setOriginId().
Referenced by NES::DataSource::emitWork().
|
inlinenoexcept |
set the sequence data, i.e., sequenceNumber, chunkNumber, and lastChunk
References setChunkNumber(), setLastChunk(), and setSequenceNumber().
Referenced by fillBuffer().
|
inlinenoexcept |
set the sequence number
References NES::Runtime::detail::BufferControlBlock::setSequenceNumber().
Referenced by NES::DataSource::emitWork(), and setSequenceData().
|
inlinenoexcept |
set the buffer's statistic id (where it was last touched).
References NES::Runtime::detail::BufferControlBlock::setStatisticId().
Referenced by NES::DataSource::emitWork().
|
inlinenoexcept |
set the watermark from a timestamp
References NES::Runtime::detail::BufferControlBlock::setWatermark(), and magic_enum::detail::value().
Referenced by NES::TEST_F().
|
noexcept |
attach a child tuple buffer to the parent. the child tuple buffer is then identified via NestedTupleBufferKey
References index, NES_ASSERT2_FMT, and std::swap().
Referenced by NES::Runtime::MemoryLayouts::DynamicTuple::writeVarSized(), and NES::Runtime::MemoryLayouts::writeVarSizedData().
|
static |
Creates a TupleBuffer of length bytes starting at ptr address.
ptr | resource's address. |
length | the size of the allocated memory. |
parent | will be notified of the buffer release. Only at that point, the ptr memory area can be freed, which is the caller's responsibility. |
References backward::details::move(), and TupleBuffer().
Referenced by NES::DataSourceProxy::getRecyclableBuffer(), and wrapPtr().
|
static |
|
inlinestatic |
Wrap an object in a tuple buffer. The tuple buffer retrieves ownership and frees the object when the buffer is released.
ownership | to object |
References release(), and wrapMemory().
|
friend |
Utilize the wrapped-memory constructor.
|
friend |
|
friend |
|
friend |
|
friend |
Utilize the wrapped-memory constructor and requires direct access to the control block for the ZMQ sink.
|
friend |
|
friend |
Print the buffer's address. @dev TODO: consider changing the reinterpret_cast to std::bit_cast in C++2a if possible.
|
friend |
Swap lhs
and rhs
. @dev Accessible via ADL in an unqualified call.
Referenced by operator=().
|
staticconstexpr |
When the content of a tuple buffer outgrows the buffer size tuple buffers are chunked to preserve the order of sequencenumbers (or prevent duplicated). The initial chunk number is 1.
|
staticconstexpr |
The initial sequence number of a stream of tuple buffers is 1.