NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
#include <ChunkCollector.hpp>
Public Member Functions | |
std::optional< std::pair< SequenceNumber, uint64_t > > | collect (SequenceData data, uint64_t watermark) |
Collect a chunk and return a sequenceNumber and the associated watermark if all chunks have been collected. More... | |
The purpose of the ChunkCollector is to keep track of SequenceNumber which have been split into multiple chunks. The ChunkCollector is able to collect chunks in a multithreaded scenario and return if all chunks for a specific sequence number have been seen.
NodeSize | Internally the ChunkCollector uses a linked list of arrays, which are called nodes. Usually a larger size of nodes will yield better performance, but will use more memory. |
Alignment | From benchmarking, changing the alignment of the atomic counters does not appear to increase performance. However, this could change in the future and an alignment of std::hardware_destructive_interference_size could improve performance |
std::optional< std::pair< SequenceNumber, uint64_t > > NES::ChunkCollector< NodeSize, Alignment >::collect | ( | SequenceData | data, |
uint64_t | watermark | ||
) |
Collect a chunk and return a sequenceNumber and the associated watermark if all chunks have been collected.
We store the chunk number counter for every non-completed sequence number. We want to be able to release completed chunk numbers so they do not grow infinitely. We use a linked list with each node holding N chunk number counters. Once all sequence numbers in one such node have been completed, we can remove the node from the linked list without invalidating (moving) other counters. We have to lock the linked list to locate the relevant node; if no such node exists, we append a new one. Within the node, we locate the chunk counter for the sequence number. We decrease the counter for every non-last chunk. Once we receive the last chunk, we increase the chunk counter by the current chunk number (which will be the maximum chunk number for this sequence). If the result of updating the chunk number is 0, we know that the SequenceNumber is complete. Each node in the linked list has a counter of completed sequences, the thread that completes a sequence number will decrease the nodes counter. We assume that once a sequence number is completed, it will never reappear. Thus, we can delete the node from the linked list when a thread decrements the last node counter.
References data, and magic_enum::detail::n().