NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
A WorkerContext represents the current state of a worker thread Note that it is not thread-safe per se but it is meant to be used in a thread-safe manner by the ThreadPool. More...
#include <WorkerContext.hpp>
Public Member Functions | |
WorkerContext (WorkerThreadId workerId, const BufferManagerPtr &bufferManager, uint64_t numberOfBuffersPerWorker, uint32_t queueId=0) | |
~WorkerContext () | |
TupleBuffer | allocateTupleBuffer () |
Allocates a new tuple buffer. More... | |
WorkerContextBufferProviderPtr | getBufferProvider () |
Returns the thread-local buffer provider. More... | |
WorkerThreadId | getId () const |
get current worker context thread id. This is assigned by calling NesThread::getId() More... | |
void | setObjectRefCnt (void *object, uint32_t refCnt) |
Sets the ref counter for a generic object using its pointer address as lookup. More... | |
uint32_t | decreaseObjectRefCnt (void *object) |
Reduces by one the ref cnt. It deletes the object as soon as ref cnt reaches 0. More... | |
uint32_t | getQueueId () const |
get the queue id of the the current worker More... | |
void | storeNetworkChannel (OperatorId id, DecomposedQueryPlanVersion version, Network::NetworkChannelPtr &&channel) |
This stores a network channel for an operator. More... | |
bool | containsNetworkChannel (OperatorId id) |
void | storeNetworkChannelFuture (OperatorId id, DecomposedQueryPlanVersion version, std::pair< std::future< Network::NetworkChannelPtr >, std::promise< bool >> &&channelFuture) |
This stores a future for network channel creation and a promise which can be used to abort the creation. More... | |
bool | containsNetworkChannelFuture (OperatorId id) |
void | createStorage (Network::NesPartition nesPartition) |
This method creates a network storage for a thread. More... | |
void | insertIntoStorage (Network::NesPartition nesPartition, NES::Runtime::TupleBuffer buffer) |
This method inserts a tuple buffer into the storage. More... | |
bool | trimStorage (Network::NesPartition nesPartition, uint64_t timestamp) |
This method deletes a tuple buffer from the storage. More... | |
std::optional< NES::Runtime::TupleBuffer > | getTopTupleFromStorage (Network::NesPartition nesPartition) |
get the oldest buffered tuple for the specified partition More... | |
void | removeTopTupleFromStorage (Network::NesPartition nesPartition) |
if the storage is not empty remove the oldest buffered tuple for the specified partition More... | |
bool | releaseNetworkChannel (OperatorId id, DecomposedQueryPlanVersion version, Runtime::QueryTerminationType terminationType, uint16_t sendingThreadCount, uint64_t currentMessageSequenceNumber, bool shouldPropagateMarker, const std::optional< ReconfigurationMarkerPtr > &reconfigurationMarker) |
removes a registered network channel with a termination type More... | |
void | storeEventOnlyChannel (OperatorId id, Network::EventOnlyNetworkChannelPtr &&channel) |
This stores a network channel for an operator. More... | |
bool | releaseEventOnlyChannel (OperatorId id, Runtime::QueryTerminationType terminationType) |
removes a registered network channel More... | |
Network::NetworkChannel * | getNetworkChannel (OperatorId ownerId, DecomposedQueryPlanVersion ownerVersion) |
retrieve a registered output channel More... | |
std::optional< Network::NetworkChannelPtr > | getAsyncConnectionResult (OperatorId operatorId, DecomposedQueryPlanVersion version) |
retrieves an asynchronously established output channel. More... | |
Network::NetworkChannelPtr | waitForAsyncConnection (OperatorId operatorId) |
blocks until async connection of a network channel has succeeded or timed out More... | |
bool | isAsyncConnectionInProgress (OperatorId operatorId) |
check if an async connection that was started by the operator with the specified id is currently in progress More... | |
Network::EventOnlyNetworkChannel * | getEventOnlyNetworkChannel (OperatorId operatorId) |
retrieve a registered output channel More... | |
void | insertIntoReconnectBufferStorage (OperatorId operatorId, NES::Runtime::TupleBuffer buffer) |
insert a tuple buffer into the reconnect buffer storage More... | |
std::optional< TupleBuffer > | removeBufferFromReconnectBufferStorage (OperatorId operatorId) |
retrieve and delete a tuple buffer from the tuple buffer storage More... | |
void | abortConnectionProcess (OperatorId operatorId) |
stop a connection process which is currently in progress More... | |
bool | doesNetworkChannelExist (OperatorId operatorId) |
check if a network channel exists for the sink in question More... | |
void | storeEventChannelFuture (OperatorId id, DecomposedQueryPlanVersion version, std::pair< std::future< Network::EventOnlyNetworkChannelPtr >, std::promise< bool >> &&channelFuture) |
store a future for an event channel that is in the process of connecting More... | |
std::optional< Network::EventOnlyNetworkChannelPtr > | getAsyncEventChannelConnectionResult (OperatorId operatorId) |
retrieves an asynchronously established event channel. More... | |
Network::EventOnlyNetworkChannelPtr | waitForAsyncConnectionEventChannel (OperatorId operatorId) |
blocks until async connection of an event channel has succeeded or timed out More... | |
bool | doesEventChannelExist (OperatorId operatorId) |
check if a network channel exists for the operator in question More... | |
Static Public Member Functions | |
static WorkerContextBufferProviderRawPtr | getBufferProviderTLS () |
Returns the thread-local buffer provider singleton. This can be accessed at any point in time also without the pointer to the context. Calling this method from a non worker thread results in undefined behaviour. More... | |
A WorkerContext represents the current state of a worker thread Note that it is not thread-safe per se but it is meant to be used in a thread-safe manner by the ThreadPool.
|
explicit |
References NES_ASSERT.
NES::Runtime::WorkerContext::~WorkerContext | ( | ) |
void NES::Runtime::WorkerContext::abortConnectionProcess | ( | OperatorId | operatorId | ) |
stop a connection process which is currently in progress
operatorId | the id of the operator that started the connection process |
References NES::Runtime::Failure.
Referenced by NES::Network::NetworkSink::reconfigure().
TupleBuffer NES::Runtime::WorkerContext::allocateTupleBuffer | ( | ) |
Allocates a new tuple buffer.
Referenced by MultifieldGPUPipelineStage::execute(), ColumnLayoutGPUPipelineStage::execute(), WindowedAggregationGPUPipelineStage::execute(), NES::MockedExecutablePipeline::execute(), and NES::TextExecutablePipeline::execute().
bool NES::Runtime::WorkerContext::containsNetworkChannel | ( | OperatorId | id | ) |
bool NES::Runtime::WorkerContext::containsNetworkChannelFuture | ( | OperatorId | id | ) |
void NES::Runtime::WorkerContext::createStorage | ( | Network::NesPartition | nesPartition | ) |
This method creates a network storage for a thread.
nesPartitionId | partition |
Referenced by NES::Network::NetworkSink::reconfigure().
uint32_t NES::Runtime::WorkerContext::decreaseObjectRefCnt | ( | void * | object | ) |
Reduces by one the ref cnt. It deletes the object as soon as ref cnt reaches 0.
object | the object that we want to ref count |
Referenced by NES::Runtime::Execution::ExecutablePipeline::reconfigure(), and NES::Network::NetworkSink::reconfigure().
bool NES::Runtime::WorkerContext::doesEventChannelExist | ( | OperatorId | operatorId | ) |
check if a network channel exists for the operator in question
operatorId | the unique identifier of the operator to which the channel belongs |
bool NES::Runtime::WorkerContext::doesNetworkChannelExist | ( | OperatorId | operatorId | ) |
check if a network channel exists for the sink in question
operatorId |
std::optional< Network::NetworkChannelPtr > NES::Runtime::WorkerContext::getAsyncConnectionResult | ( | OperatorId | operatorId, |
DecomposedQueryPlanVersion | version | ||
) |
retrieves an asynchronously established output channel.
operatorId | id of the operator which will use the network channel |
References NES_TRACE.
std::optional< Network::EventOnlyNetworkChannelPtr > NES::Runtime::WorkerContext::getAsyncEventChannelConnectionResult | ( | OperatorId | operatorId | ) |
retrieves an asynchronously established event channel.
operatorId | id of the operator which will use the event channel |
References NES_TRACE.
Referenced by NES::Network::NetworkSource::onEvent().
LocalBufferPoolPtr NES::Runtime::WorkerContext::getBufferProvider | ( | ) |
Returns the thread-local buffer provider.
|
static |
Returns the thread-local buffer provider singleton. This can be accessed at any point in time also without the pointer to the context. Calling this method from a non worker thread results in undefined behaviour.
Network::EventOnlyNetworkChannel * NES::Runtime::WorkerContext::getEventOnlyNetworkChannel | ( | OperatorId | operatorId | ) |
retrieve a registered output channel
operatorId | id of the operator that we want to store the output channel |
References NES_TRACE.
Referenced by NES::Network::NetworkSource::onEvent().
WorkerThreadId NES::Runtime::WorkerContext::getId | ( | ) | const |
get current worker context thread id. This is assigned by calling NesThread::getId()
Referenced by NES::Runtime::AbstractQueryManager::completedWork(), NES::Runtime::detail::ReconfigurationEntryPointPipelineStage::execute(), NES::Runtime::AbstractQueryManager::updateStatistics(), and NES::Network::NetworkSink::writeData().
Network::NetworkChannel * NES::Runtime::WorkerContext::getNetworkChannel | ( | OperatorId | ownerId, |
DecomposedQueryPlanVersion | ownerVersion | ||
) |
retrieve a registered output channel
ownerId | id of the operator that we want to store the output channel |
References NES_TRACE.
Referenced by NES::Network::NetworkSink::writeData().
uint32_t NES::Runtime::WorkerContext::getQueueId | ( | ) | const |
get the queue id of the the current worker
Referenced by NES::Runtime::MultiQueueQueryManager::processNextTask(), NES::Runtime::MultiQueueQueryManager::terminateLoop(), and NES::Runtime::MultiQueueQueryManager::updateStatistics().
std::optional< NES::Runtime::TupleBuffer > NES::Runtime::WorkerContext::getTopTupleFromStorage | ( | Network::NesPartition | nesPartition | ) |
get the oldest buffered tuple for the specified partition
nesPartition | partition |
void NES::Runtime::WorkerContext::insertIntoReconnectBufferStorage | ( | OperatorId | operatorId, |
NES::Runtime::TupleBuffer | buffer | ||
) |
insert a tuple buffer into the reconnect buffer storage
operatorId | the id of the buffering sink |
buffer | the data to be buffered |
References backward::details::move().
Referenced by NES::Network::NetworkSink::writeData().
void NES::Runtime::WorkerContext::insertIntoStorage | ( | Network::NesPartition | nesPartition, |
NES::Runtime::TupleBuffer | buffer | ||
) |
This method inserts a tuple buffer into the storage.
nesPartition | partition |
TupleBuffer | tuple buffer |
References NES_WARNING.
bool NES::Runtime::WorkerContext::isAsyncConnectionInProgress | ( | OperatorId | operatorId | ) |
check if an async connection that was started by the operator with the specified id is currently in progress
operatorId | id of the operator which will use the network channel |
References NES_TRACE.
Referenced by NES::Network::NetworkSink::reconfigure(), and NES::Network::NetworkSink::writeData().
bool NES::Runtime::WorkerContext::releaseEventOnlyChannel | ( | OperatorId | id, |
Runtime::QueryTerminationType | terminationType | ||
) |
removes a registered network channel
id | of the operator that we want to store the output channel |
terminationType | the termination type |
References NES_TRACE.
Referenced by NES::Network::NetworkSource::reconfigure().
bool NES::Runtime::WorkerContext::releaseNetworkChannel | ( | OperatorId | id, |
DecomposedQueryPlanVersion | version, | ||
Runtime::QueryTerminationType | terminationType, | ||
uint16_t | sendingThreadCount, | ||
uint64_t | currentMessageSequenceNumber, | ||
bool | shouldPropagateMarker, | ||
const std::optional< ReconfigurationMarkerPtr > & | reconfigurationMarker | ||
) |
removes a registered network channel with a termination type
id | of the operator that we want to store the output channel |
terminationType | the termination type |
currentMessageSequenceNumber | represents the total number of data buffer messages sent |
shouldPropagateMarker | marker should be sent to downstream, when closing channel |
reconfigurationMarker | an optional containing the reconfiguration marker if this channel is closed as part of a reconfiguration |
References NES_TRACE.
Referenced by NES::Network::NetworkSink::reconfigure().
std::optional< NES::Runtime::TupleBuffer > NES::Runtime::WorkerContext::removeBufferFromReconnectBufferStorage | ( | OperatorId | operatorId | ) |
retrieve and delete a tuple buffer from the tuple buffer storage
operatorId | the id of the buffering sink |
void NES::Runtime::WorkerContext::removeTopTupleFromStorage | ( | Network::NesPartition | nesPartition | ) |
if the storage is not empty remove the oldest buffered tuple for the specified partition
nesPartition | partition |
void NES::Runtime::WorkerContext::setObjectRefCnt | ( | void * | object, |
uint32_t | refCnt | ||
) |
Sets the ref counter for a generic object using its pointer address as lookup.
object | the object that we want to track |
refCnt | the initial ref cnt |
Referenced by NES::Runtime::Execution::ExecutablePipeline::reconfigure(), and NES::Network::NetworkSink::reconfigure().
void NES::Runtime::WorkerContext::storeEventChannelFuture | ( | OperatorId | id, |
DecomposedQueryPlanVersion | version, | ||
std::pair< std::future< Network::EventOnlyNetworkChannelPtr >, std::promise< bool >> && | channelFuture | ||
) |
store a future for an event channel that is in the process of connecting
id | the id of the operator which the channel belongs to |
channelFuture | the future to be stored |
References backward::details::move(), and NES_TRACE.
void NES::Runtime::WorkerContext::storeEventOnlyChannel | ( | OperatorId | id, |
Network::EventOnlyNetworkChannelPtr && | channel | ||
) |
This stores a network channel for an operator.
id | of the operator that we want to store the output channel |
channel | the output channel |
References backward::details::move(), and NES_TRACE.
Referenced by NES::Network::NetworkSource::onEvent().
void NES::Runtime::WorkerContext::storeNetworkChannel | ( | OperatorId | id, |
DecomposedQueryPlanVersion | version, | ||
Network::NetworkChannelPtr && | channel | ||
) |
This stores a network channel for an operator.
id | of the operator that we want to store the output channel |
channel | the output channel |
References backward::details::move(), and NES_TRACE.
Referenced by NES::Network::NetworkSink::reconfigure().
void NES::Runtime::WorkerContext::storeNetworkChannelFuture | ( | OperatorId | id, |
DecomposedQueryPlanVersion | version, | ||
std::pair< std::future< Network::NetworkChannelPtr >, std::promise< bool >> && | channelFuture | ||
) |
This stores a future for network channel creation and a promise which can be used to abort the creation.
id | of the operator that we want to store the output channel |
channelFuture | a pair of a future waiting for the output channel to be connected and a promise to be used if the connection process is to be aborted |
References backward::details::move(), and NES_TRACE.
Referenced by NES::Network::NetworkSink::reconfigure().
bool NES::Runtime::WorkerContext::trimStorage | ( | Network::NesPartition | nesPartition, |
uint64_t | timestamp | ||
) |
This method deletes a tuple buffer from the storage.
nesPartition | partition |
timestamp | timestamp |
References timestamp.
Network::NetworkChannelPtr NES::Runtime::WorkerContext::waitForAsyncConnection | ( | OperatorId | operatorId | ) |
blocks until async connection of a network channel has succeeded or timed out
operatorId | id of the operator which will use the network channel |
Referenced by NES::Network::NetworkSink::reconfigure().
Network::EventOnlyNetworkChannelPtr NES::Runtime::WorkerContext::waitForAsyncConnectionEventChannel | ( | OperatorId | operatorId | ) |
blocks until async connection of an event channel has succeeded or timed out
operatorId | id of the operator which will use the event channel |
References NES_WARNING.