NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Runtime::WorkerContext Class Reference

A WorkerContext represents the current state of a worker thread Note that it is not thread-safe per se but it is meant to be used in a thread-safe manner by the ThreadPool. More...

#include <WorkerContext.hpp>

Public Member Functions

 WorkerContext (WorkerThreadId workerId, const BufferManagerPtr &bufferManager, uint64_t numberOfBuffersPerWorker, uint32_t queueId=0)
 
 ~WorkerContext ()
 
TupleBuffer allocateTupleBuffer ()
 Allocates a new tuple buffer. More...
 
WorkerContextBufferProviderPtr getBufferProvider ()
 Returns the thread-local buffer provider. More...
 
WorkerThreadId getId () const
 get current worker context thread id. This is assigned by calling NesThread::getId() More...
 
void setObjectRefCnt (void *object, uint32_t refCnt)
 Sets the ref counter for a generic object using its pointer address as lookup. More...
 
uint32_t decreaseObjectRefCnt (void *object)
 Reduces by one the ref cnt. It deletes the object as soon as ref cnt reaches 0. More...
 
uint32_t getQueueId () const
 get the queue id of the the current worker More...
 
void storeNetworkChannel (OperatorId id, DecomposedQueryPlanVersion version, Network::NetworkChannelPtr &&channel)
 This stores a network channel for an operator. More...
 
bool containsNetworkChannel (OperatorId id)
 
void storeNetworkChannelFuture (OperatorId id, DecomposedQueryPlanVersion version, std::pair< std::future< Network::NetworkChannelPtr >, std::promise< bool >> &&channelFuture)
 This stores a future for network channel creation and a promise which can be used to abort the creation. More...
 
bool containsNetworkChannelFuture (OperatorId id)
 
void createStorage (Network::NesPartition nesPartition)
 This method creates a network storage for a thread. More...
 
void insertIntoStorage (Network::NesPartition nesPartition, NES::Runtime::TupleBuffer buffer)
 This method inserts a tuple buffer into the storage. More...
 
bool trimStorage (Network::NesPartition nesPartition, uint64_t timestamp)
 This method deletes a tuple buffer from the storage. More...
 
std::optional< NES::Runtime::TupleBuffergetTopTupleFromStorage (Network::NesPartition nesPartition)
 get the oldest buffered tuple for the specified partition More...
 
void removeTopTupleFromStorage (Network::NesPartition nesPartition)
 if the storage is not empty remove the oldest buffered tuple for the specified partition More...
 
bool releaseNetworkChannel (OperatorId id, DecomposedQueryPlanVersion version, Runtime::QueryTerminationType terminationType, uint16_t sendingThreadCount, uint64_t currentMessageSequenceNumber, bool shouldPropagateMarker, const std::optional< ReconfigurationMarkerPtr > &reconfigurationMarker)
 removes a registered network channel with a termination type More...
 
void storeEventOnlyChannel (OperatorId id, Network::EventOnlyNetworkChannelPtr &&channel)
 This stores a network channel for an operator. More...
 
bool releaseEventOnlyChannel (OperatorId id, Runtime::QueryTerminationType terminationType)
 removes a registered network channel More...
 
Network::NetworkChannelgetNetworkChannel (OperatorId ownerId, DecomposedQueryPlanVersion ownerVersion)
 retrieve a registered output channel More...
 
std::optional< Network::NetworkChannelPtr > getAsyncConnectionResult (OperatorId operatorId, DecomposedQueryPlanVersion version)
 retrieves an asynchronously established output channel. More...
 
Network::NetworkChannelPtr waitForAsyncConnection (OperatorId operatorId)
 blocks until async connection of a network channel has succeeded or timed out More...
 
bool isAsyncConnectionInProgress (OperatorId operatorId)
 check if an async connection that was started by the operator with the specified id is currently in progress More...
 
Network::EventOnlyNetworkChannelgetEventOnlyNetworkChannel (OperatorId operatorId)
 retrieve a registered output channel More...
 
void insertIntoReconnectBufferStorage (OperatorId operatorId, NES::Runtime::TupleBuffer buffer)
 insert a tuple buffer into the reconnect buffer storage More...
 
std::optional< TupleBufferremoveBufferFromReconnectBufferStorage (OperatorId operatorId)
 retrieve and delete a tuple buffer from the tuple buffer storage More...
 
void abortConnectionProcess (OperatorId operatorId)
 stop a connection process which is currently in progress More...
 
bool doesNetworkChannelExist (OperatorId operatorId)
 check if a network channel exists for the sink in question More...
 
void storeEventChannelFuture (OperatorId id, DecomposedQueryPlanVersion version, std::pair< std::future< Network::EventOnlyNetworkChannelPtr >, std::promise< bool >> &&channelFuture)
 store a future for an event channel that is in the process of connecting More...
 
std::optional< Network::EventOnlyNetworkChannelPtr > getAsyncEventChannelConnectionResult (OperatorId operatorId)
 retrieves an asynchronously established event channel. More...
 
Network::EventOnlyNetworkChannelPtr waitForAsyncConnectionEventChannel (OperatorId operatorId)
 blocks until async connection of an event channel has succeeded or timed out More...
 
bool doesEventChannelExist (OperatorId operatorId)
 check if a network channel exists for the operator in question More...
 

Static Public Member Functions

static WorkerContextBufferProviderRawPtr getBufferProviderTLS ()
 Returns the thread-local buffer provider singleton. This can be accessed at any point in time also without the pointer to the context. Calling this method from a non worker thread results in undefined behaviour. More...
 

Detailed Description

A WorkerContext represents the current state of a worker thread Note that it is not thread-safe per se but it is meant to be used in a thread-safe manner by the ThreadPool.

Constructor & Destructor Documentation

◆ WorkerContext()

NES::Runtime::WorkerContext::WorkerContext ( WorkerThreadId  workerId,
const BufferManagerPtr bufferManager,
uint64_t  numberOfBuffersPerWorker,
uint32_t  queueId = 0 
)
explicit

References NES_ASSERT.

◆ ~WorkerContext()

NES::Runtime::WorkerContext::~WorkerContext ( )

Member Function Documentation

◆ abortConnectionProcess()

void NES::Runtime::WorkerContext::abortConnectionProcess ( OperatorId  operatorId)

stop a connection process which is currently in progress

Parameters
operatorIdthe id of the operator that started the connection process

References NES::Runtime::Failure.

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ allocateTupleBuffer()

TupleBuffer NES::Runtime::WorkerContext::allocateTupleBuffer ( )

Allocates a new tuple buffer.

Returns
TupleBuffer

Referenced by MultifieldGPUPipelineStage::execute(), ColumnLayoutGPUPipelineStage::execute(), WindowedAggregationGPUPipelineStage::execute(), NES::MockedExecutablePipeline::execute(), and NES::TextExecutablePipeline::execute().

Here is the caller graph for this function:

◆ containsNetworkChannel()

bool NES::Runtime::WorkerContext::containsNetworkChannel ( OperatorId  id)

◆ containsNetworkChannelFuture()

bool NES::Runtime::WorkerContext::containsNetworkChannelFuture ( OperatorId  id)

◆ createStorage()

void NES::Runtime::WorkerContext::createStorage ( Network::NesPartition  nesPartition)

This method creates a network storage for a thread.

Parameters
nesPartitionIdpartition

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ decreaseObjectRefCnt()

uint32_t NES::Runtime::WorkerContext::decreaseObjectRefCnt ( void *  object)

Reduces by one the ref cnt. It deletes the object as soon as ref cnt reaches 0.

Parameters
objectthe object that we want to ref count
Returns
the prev ref cnt

Referenced by NES::Runtime::Execution::ExecutablePipeline::reconfigure(), and NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ doesEventChannelExist()

bool NES::Runtime::WorkerContext::doesEventChannelExist ( OperatorId  operatorId)

check if a network channel exists for the operator in question

Parameters
operatorIdthe unique identifier of the operator to which the channel belongs
Returns
true if a channel was found

◆ doesNetworkChannelExist()

bool NES::Runtime::WorkerContext::doesNetworkChannelExist ( OperatorId  operatorId)

check if a network channel exists for the sink in question

Parameters
operatorId
Returns

◆ getAsyncConnectionResult()

std::optional< Network::NetworkChannelPtr > NES::Runtime::WorkerContext::getAsyncConnectionResult ( OperatorId  operatorId,
DecomposedQueryPlanVersion  version 
)

retrieves an asynchronously established output channel.

Parameters
operatorIdid of the operator which will use the network channel
Returns
an optional containing a network channel ptr:
  • nullopt if the operation has not yet completed
  • optional containing nullptr if the conneciton timed out
  • optional containing valid ptr if connection succeeded

References NES_TRACE.

◆ getAsyncEventChannelConnectionResult()

std::optional< Network::EventOnlyNetworkChannelPtr > NES::Runtime::WorkerContext::getAsyncEventChannelConnectionResult ( OperatorId  operatorId)

retrieves an asynchronously established event channel.

Parameters
operatorIdid of the operator which will use the event channel
Returns
an optional containing a event channel ptr:
  • nullopt if the operation has not yet completed
  • optional containing nullptr if the conneciton timed out
  • optional containing valid ptr if connection succeeded

References NES_TRACE.

Referenced by NES::Network::NetworkSource::onEvent().

Here is the caller graph for this function:

◆ getBufferProvider()

LocalBufferPoolPtr NES::Runtime::WorkerContext::getBufferProvider ( )

Returns the thread-local buffer provider.

Returns
shared_ptr to LocalBufferPool

◆ getBufferProviderTLS()

LocalBufferPool * NES::Runtime::WorkerContext::getBufferProviderTLS ( )
static

Returns the thread-local buffer provider singleton. This can be accessed at any point in time also without the pointer to the context. Calling this method from a non worker thread results in undefined behaviour.

Returns
raw pointer to AbstractBufferProvider

◆ getEventOnlyNetworkChannel()

Network::EventOnlyNetworkChannel * NES::Runtime::WorkerContext::getEventOnlyNetworkChannel ( OperatorId  operatorId)

retrieve a registered output channel

Parameters
operatorIdid of the operator that we want to store the output channel
Returns
an output channel

References NES_TRACE.

Referenced by NES::Network::NetworkSource::onEvent().

Here is the caller graph for this function:

◆ getId()

WorkerThreadId NES::Runtime::WorkerContext::getId ( ) const

get current worker context thread id. This is assigned by calling NesThread::getId()

Returns
current worker context thread id

Referenced by NES::Runtime::AbstractQueryManager::completedWork(), NES::Runtime::detail::ReconfigurationEntryPointPipelineStage::execute(), NES::Runtime::AbstractQueryManager::updateStatistics(), and NES::Network::NetworkSink::writeData().

Here is the caller graph for this function:

◆ getNetworkChannel()

Network::NetworkChannel * NES::Runtime::WorkerContext::getNetworkChannel ( OperatorId  ownerId,
DecomposedQueryPlanVersion  ownerVersion 
)

retrieve a registered output channel

Parameters
ownerIdid of the operator that we want to store the output channel
Returns
an output channel

References NES_TRACE.

Referenced by NES::Network::NetworkSink::writeData().

Here is the caller graph for this function:

◆ getQueueId()

uint32_t NES::Runtime::WorkerContext::getQueueId ( ) const

get the queue id of the the current worker

Returns
current queue id

Referenced by NES::Runtime::MultiQueueQueryManager::processNextTask(), NES::Runtime::MultiQueueQueryManager::terminateLoop(), and NES::Runtime::MultiQueueQueryManager::updateStatistics().

Here is the caller graph for this function:

◆ getTopTupleFromStorage()

std::optional< NES::Runtime::TupleBuffer > NES::Runtime::WorkerContext::getTopTupleFromStorage ( Network::NesPartition  nesPartition)

get the oldest buffered tuple for the specified partition

Parameters
nesPartitionpartition
Returns
an optional containing the tuple or nullopt if the storage is empty

◆ insertIntoReconnectBufferStorage()

void NES::Runtime::WorkerContext::insertIntoReconnectBufferStorage ( OperatorId  operatorId,
NES::Runtime::TupleBuffer  buffer 
)

insert a tuple buffer into the reconnect buffer storage

Parameters
operatorIdthe id of the buffering sink
bufferthe data to be buffered

References backward::details::move().

Referenced by NES::Network::NetworkSink::writeData().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ insertIntoStorage()

void NES::Runtime::WorkerContext::insertIntoStorage ( Network::NesPartition  nesPartition,
NES::Runtime::TupleBuffer  buffer 
)

This method inserts a tuple buffer into the storage.

Parameters
nesPartitionpartition
TupleBuffertuple buffer

References NES_WARNING.

◆ isAsyncConnectionInProgress()

bool NES::Runtime::WorkerContext::isAsyncConnectionInProgress ( OperatorId  operatorId)

check if an async connection that was started by the operator with the specified id is currently in progress

Parameters
operatorIdid of the operator which will use the network channel
Returns
true if a connection is currently being established

References NES_TRACE.

Referenced by NES::Network::NetworkSink::reconfigure(), and NES::Network::NetworkSink::writeData().

Here is the caller graph for this function:

◆ releaseEventOnlyChannel()

bool NES::Runtime::WorkerContext::releaseEventOnlyChannel ( OperatorId  id,
Runtime::QueryTerminationType  terminationType 
)

removes a registered network channel

Parameters
idof the operator that we want to store the output channel
terminationTypethe termination type

References NES_TRACE.

Referenced by NES::Network::NetworkSource::reconfigure().

Here is the caller graph for this function:

◆ releaseNetworkChannel()

bool NES::Runtime::WorkerContext::releaseNetworkChannel ( OperatorId  id,
DecomposedQueryPlanVersion  version,
Runtime::QueryTerminationType  terminationType,
uint16_t  sendingThreadCount,
uint64_t  currentMessageSequenceNumber,
bool  shouldPropagateMarker,
const std::optional< ReconfigurationMarkerPtr > &  reconfigurationMarker 
)

removes a registered network channel with a termination type

Parameters
idof the operator that we want to store the output channel
terminationTypethe termination type
currentMessageSequenceNumberrepresents the total number of data buffer messages sent
shouldPropagateMarkermarker should be sent to downstream, when closing channel
reconfigurationMarkeran optional containing the reconfiguration marker if this channel is closed as part of a reconfiguration

References NES_TRACE.

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ removeBufferFromReconnectBufferStorage()

std::optional< NES::Runtime::TupleBuffer > NES::Runtime::WorkerContext::removeBufferFromReconnectBufferStorage ( OperatorId  operatorId)

retrieve and delete a tuple buffer from the tuple buffer storage

Parameters
operatorIdthe id of the buffering sink
Returns
the buffer that was removed from the storage

◆ removeTopTupleFromStorage()

void NES::Runtime::WorkerContext::removeTopTupleFromStorage ( Network::NesPartition  nesPartition)

if the storage is not empty remove the oldest buffered tuple for the specified partition

Parameters
nesPartitionpartition

◆ setObjectRefCnt()

void NES::Runtime::WorkerContext::setObjectRefCnt ( void *  object,
uint32_t  refCnt 
)

Sets the ref counter for a generic object using its pointer address as lookup.

Parameters
objectthe object that we want to track
refCntthe initial ref cnt

Referenced by NES::Runtime::Execution::ExecutablePipeline::reconfigure(), and NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ storeEventChannelFuture()

void NES::Runtime::WorkerContext::storeEventChannelFuture ( OperatorId  id,
DecomposedQueryPlanVersion  version,
std::pair< std::future< Network::EventOnlyNetworkChannelPtr >, std::promise< bool >> &&  channelFuture 
)

store a future for an event channel that is in the process of connecting

Parameters
idthe id of the operator which the channel belongs to
channelFuturethe future to be stored

References backward::details::move(), and NES_TRACE.

Here is the call graph for this function:

◆ storeEventOnlyChannel()

void NES::Runtime::WorkerContext::storeEventOnlyChannel ( OperatorId  id,
Network::EventOnlyNetworkChannelPtr &&  channel 
)

This stores a network channel for an operator.

Parameters
idof the operator that we want to store the output channel
channelthe output channel

References backward::details::move(), and NES_TRACE.

Referenced by NES::Network::NetworkSource::onEvent().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ storeNetworkChannel()

void NES::Runtime::WorkerContext::storeNetworkChannel ( OperatorId  id,
DecomposedQueryPlanVersion  version,
Network::NetworkChannelPtr &&  channel 
)

This stores a network channel for an operator.

Parameters
idof the operator that we want to store the output channel
channelthe output channel

References backward::details::move(), and NES_TRACE.

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ storeNetworkChannelFuture()

void NES::Runtime::WorkerContext::storeNetworkChannelFuture ( OperatorId  id,
DecomposedQueryPlanVersion  version,
std::pair< std::future< Network::NetworkChannelPtr >, std::promise< bool >> &&  channelFuture 
)

This stores a future for network channel creation and a promise which can be used to abort the creation.

Parameters
idof the operator that we want to store the output channel
channelFuturea pair of a future waiting for the output channel to be connected and a promise to be used if the connection process is to be aborted

References backward::details::move(), and NES_TRACE.

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ trimStorage()

bool NES::Runtime::WorkerContext::trimStorage ( Network::NesPartition  nesPartition,
uint64_t  timestamp 
)

This method deletes a tuple buffer from the storage.

Parameters
nesPartitionpartition
timestamptimestamp
Returns
success in the case something was trimmed

References timestamp.

◆ waitForAsyncConnection()

Network::NetworkChannelPtr NES::Runtime::WorkerContext::waitForAsyncConnection ( OperatorId  operatorId)

blocks until async connection of a network channel has succeeded or timed out

Parameters
operatorIdid of the operator which will use the network channel
Returns
a pointer to the network channel or nullptr if the connection timed out

Referenced by NES::Network::NetworkSink::reconfigure().

Here is the caller graph for this function:

◆ waitForAsyncConnectionEventChannel()

Network::EventOnlyNetworkChannelPtr NES::Runtime::WorkerContext::waitForAsyncConnectionEventChannel ( OperatorId  operatorId)

blocks until async connection of an event channel has succeeded or timed out

Parameters
operatorIdid of the operator which will use the event channel
Returns
a pointer to the event channel or nullptr if the connection timed out

References NES_WARNING.


The documentation for this class was generated from the following files: