NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Runtime::AbstractQueryManager Class Referenceabstract

#include <QueryManager.hpp>

Collaboration diagram for NES::Runtime::AbstractQueryManager:
[legend]

Public Types

enum class  QueryManagerStatus : uint8_t {
  Created , Running , Stopped , Destroyed ,
  Failed
}
 
using inherited0 = NES::detail::virtual_enable_shared_from_this< AbstractQueryManager, false >
 
using inherited1 = Reconfigurable
 

Public Member Functions

 AbstractQueryManager ()=delete
 
 AbstractQueryManager (const AbstractQueryManager &)=delete
 
AbstractQueryManageroperator= (const AbstractQueryManager &)=delete
 
 AbstractQueryManager (std::shared_ptr< AbstractQueryStatusListener > queryStatusListener, std::vector< BufferManagerPtr > bufferManagers, WorkerId nodeEngineId, uint16_t numThreads, HardwareManagerPtr hardwareManager, uint64_t numberOfBuffersPerEpoch, std::vector< uint64_t > workerToCoreMapping={})
 
virtual ~AbstractQueryManager () NES_NOEXCEPT(false) override
 
virtual bool registerExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan)
 register a query by extracting sources, windows and sink and add them to respective map More...
 
bool unregisterExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan)
 deregister a query by extracting sources, windows and sink and remove them from respective map More...
 
bool startExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep)
 method to start a query More...
 
bool stopExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep, Runtime::QueryTerminationType type=Runtime::QueryTerminationType::HardStop)
 method to start a query More...
 
bool failExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep)
 method to fail a query More...
 
virtual ExecutionResult processNextTask (bool running, WorkerContext &workerContext)=0
 process task from task queue More...
 
virtual void addWorkForNextPipeline (TupleBuffer &buffer, Execution::SuccessorExecutablePipeline executable, uint32_t queueId=0)=0
 add work to the query manager, this methods is source-driven and is called for each buffer generated by the window trigger More...
 
void postReconfigurationCallback (ReconfigurationMessage &task) override
 
void reconfigure (ReconfigurationMessage &, WorkerContext &context) override
 
Execution::ExecutableQueryPlanStatus getQepStatus (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion)
 retrieve the execution status of a given local query sub plan id. More...
 
Execution::ExecutableQueryPlanPtr getQueryExecutionPlan (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion) const
 Provides the QEP object for an id. More...
 
bool canTriggerEndOfStream (DataSourcePtr source, Runtime::QueryTerminationType)
 
virtual void poisonWorkers ()=0
 notify all waiting threads in getWork() to wake up and finish up More...
 
virtual void destroy ()
 reset query manager. After this call, it wont be possible to use the query manager. More...
 
QueryStatisticsPtr getQueryStatistics (DecomposedQueryId decomposedQueryId)
 method to return the query statistics More...
 
void resetQueryStatistics (DecomposedQueryId decomposedQueryId)
 Reset statistics for the decomposed query plan. More...
 
WorkerId getNodeId () const
 
virtual bool addReconfigurationMessage (SharedQueryId sharedQueryId, DecomposedQueryId queryExecutionPlanId, DecomposedQueryPlanVersion queryExecutionPlanVersion, const ReconfigurationMessage &reconfigurationMessage, bool blocking=false)=0
 this methods adds a reconfiguration task on the worker queue More...
 
BufferManagerPtr getBufferManager ()
 
void notifyTaskFailure (Execution::SuccessorExecutablePipeline pipeline, const std::string &message)
 This method informs the QueryManager that a task has failed. More...
 
uint64_t getNumberOfBuffersPerEpoch () const
 Returns the numberOfBuffersPerEpoch. More...
 
void notifySourceFailure (DataSourcePtr source, const std::string errorMessage)
 This method informs the QueryManager that a source has failed. More...
 
void notifyQueryStatusChange (const Execution::ExecutableQueryPlanPtr &qep, Execution::ExecutableQueryPlanStatus newStatus)
 Informs the query manager about a status change in a sub query plan. More...
 
SharedQueryId getSharedQueryId (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion) const
 get the shared query id mapped to the decomposed query plan id More...
 
bool addEndOfStream (DataSourcePtr source, Runtime::QueryTerminationType graceful=Runtime::QueryTerminationType::Graceful)
 introduces end of stream to all QEPs connected to this source More...
 
bool propagateReconfigurationMarker (const ReconfigurationMarkerPtr &marker, DataSourcePtr source)
 propagates a reconfiguration marker to all downstream operators of a source operator More...
 
bool startNewExecutableQueryPlanAndPropagateMarker (const ReconfigurationMarkerPtr marker, DecomposedQueryIdWithVersion decomposedQueryIdWithVersion)
 starts new executable plan and propagates reconfiguration marker to it's source More...
 
bool isThreadPoolRunning () const
 
virtual uint64_t getNumberOfTasksInWorkerQueues () const =0
 get number of tasks in the queue More...
 
uint64_t getCurrentTaskSum ()
 
uint64_t getNumberOfWorkerThreads ()
 
void notifySourceCompletion (DataSourcePtr source, QueryTerminationType terminationType)
 Notifies that a source operator is done with its execution. More...
 
void notifyPipelineCompletion (DecomposedQueryId decomposedQueryId, Execution::ExecutablePipelinePtr pipeline, QueryTerminationType terminationType)
 Notifies that a pipeline is done with its execution. More...
 
void notifySinkCompletion (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, DataSinkPtr sink, QueryTerminationType terminationType)
 Notifies that a sink operator is done with its execution. More...
 
std::optional< ReconfigurationMarkerEventPtrgetReconfigurationEvent (Network::NetworkSourcePtr networkSource, SharedQueryId sharedQueryId, const ReconfigurationMarker &marker) const
 
std::unordered_set< DecomposedQueryIdWithVersiongetExecutablePlanIdsForSource (DataSourcePtr source) const
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< AbstractQueryManager, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Public Attributes

folly::Synchronized< std::unordered_map< std::string, std::shared_ptr< BasePersistentSourceProperties > > > persistentSourceProperties
 

Protected Member Functions

void completedWork (Task &task, WorkerContext &workerContext)
 finalize task execution by: 1.) update statistics (number of processed tuples and tasks) 2.) release input buffer (give back to the buffer manager) More...
 
virtual void updateStatistics (const Task &task, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, PipelineId pipelineId, WorkerContext &workerContext)
 Method to update the statistics. More...
 
virtual ExecutionResult terminateLoop (WorkerContext &)=0
 Executes cleaning up logic on the task queue. More...
 
bool addSoftEndOfStream (DataSourcePtr source)
 Triggers a soft end of stream for a source. More...
 
bool addHardEndOfStream (DataSourcePtr source)
 Triggers a hard end of stream for a source. More...
 
bool addFailureEndOfStream (DataSourcePtr source)
 Triggers a failure end of stream for a source. More...
 
uint64_t getNextTaskId ()
 Returns the next free task id. More...
 

Protected Attributes

WorkerId nodeEngineId
 
std::atomic_uint64_t taskIdCounter = 0
 
std::vector< BufferManagerPtrbufferManagers
 
uint16_t numThreads
 
HardwareManagerPtr hardwareManager
 
ThreadPoolPtr threadPool {nullptr}
 worker threads running compute tasks More...
 
AsyncTaskExecutorPtr asyncTaskExecutor
 worker thread for async maintenance task, e.g., fail queryIdAndCatalogEntryMapping More...
 
std::unordered_map< DecomposedQueryIdWithVersion, Execution::ExecutableQueryPlanPtrrunningQEPs
 
std::mutex statisticsMutex
 
cuckoohash_map< DecomposedQueryId, QueryStatisticsPtrqueryToStatisticsMap
 
std::mutex reconfigurationMutex
 
std::vector< uint64_t > workerToCoreMapping
 
std::recursive_mutex queryMutex
 
std::atomic< QueryManagerStatusqueryManagerStatus {QueryManagerStatus::Created}
 
std::vector< AtomicCounter< uint64_t > > tempCounterTasksCompleted
 
std::shared_ptr< AbstractQueryStatusListenerqueryStatusListener
 
std::unordered_map< DecomposedQueryIdWithVersion, std::vector< OperatorId > > decomposeQueryToSourceIdMapping
 
std::unordered_map< OperatorId, std::vector< Execution::ExecutableQueryPlanPtr > > sourceToQEPMapping
 
uint64_t numberOfBuffersPerEpoch
 

Friends

class ThreadPool
 
class NodeEngine
 

Member Typedef Documentation

◆ inherited0

◆ inherited1

Member Enumeration Documentation

◆ QueryManagerStatus

Enumerator
Created 
Running 
Stopped 
Destroyed 
Failed 

Constructor & Destructor Documentation

◆ AbstractQueryManager() [1/3]

NES::Runtime::AbstractQueryManager::AbstractQueryManager ( )
delete

◆ AbstractQueryManager() [2/3]

NES::Runtime::AbstractQueryManager::AbstractQueryManager ( const AbstractQueryManager )
delete

◆ AbstractQueryManager() [3/3]

NES::Runtime::AbstractQueryManager::AbstractQueryManager ( std::shared_ptr< AbstractQueryStatusListener queryStatusListener,
std::vector< BufferManagerPtr bufferManagers,
WorkerId  nodeEngineId,
uint16_t  numThreads,
HardwareManagerPtr  hardwareManager,
uint64_t  numberOfBuffersPerEpoch,
std::vector< uint64_t >  workerToCoreMapping = {} 
)
explicit
Parameters
bufferManager

References asyncTaskExecutor, numThreads, and tempCounterTasksCompleted.

◆ ~AbstractQueryManager()

NES::Runtime::AbstractQueryManager::~AbstractQueryManager ( )
overridevirtual

References destroy().

Here is the call graph for this function:

Member Function Documentation

◆ addEndOfStream()

bool NES::Runtime::AbstractQueryManager::addEndOfStream ( DataSourcePtr  source,
Runtime::QueryTerminationType  graceful = Runtime::QueryTerminationType::Graceful 
)

introduces end of stream to all QEPs connected to this source

Parameters
sourcethe source
gracefulhard or soft termination
Returns
true if it went through

References addFailureEndOfStream(), addHardEndOfStream(), addSoftEndOfStream(), NES::Runtime::Failure, NES::Runtime::Graceful, NES::Runtime::HardStop, NES_ASSERT2_FMT, NES_WARNING, queryMutex, sourceToQEPMapping, and threadPool.

Here is the call graph for this function:

◆ addFailureEndOfStream()

bool NES::Runtime::AbstractQueryManager::addFailureEndOfStream ( DataSourcePtr  source)
protected

Triggers a failure end of stream for a source.

Parameters
sourcethe source for which to trigger the failure end of stream
Returns
true if successful

References addReconfigurationMessage(), magic_enum::enum_name(), NES::Runtime::FailEndOfStream, NES_DEBUG, and threadPool.

Referenced by addEndOfStream().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ addHardEndOfStream()

bool NES::Runtime::AbstractQueryManager::addHardEndOfStream ( DataSourcePtr  source)
protected

Triggers a hard end of stream for a source.

Parameters
sourcethe source for which to trigger the hard end of stream
Returns
true if successful

References addReconfigurationMessage(), magic_enum::enum_name(), NES::Runtime::HardEndOfStream, NES_DEBUG, and threadPool.

Referenced by addEndOfStream().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ addReconfigurationMessage()

virtual bool NES::Runtime::AbstractQueryManager::addReconfigurationMessage ( SharedQueryId  sharedQueryId,
DecomposedQueryId  queryExecutionPlanId,
DecomposedQueryPlanVersion  queryExecutionPlanVersion,
const ReconfigurationMessage reconfigurationMessage,
bool  blocking = false 
)
pure virtual

this methods adds a reconfiguration task on the worker queue

Returns
true if the reconfiguration task was added correctly on the worker queue N.B.: this does not not mean that the reconfiguration took place but it means that it was scheduled to be executed!
Parameters
sharedQueryIdthe local QEP to reconfigure
queryExecutionPlanIdthe local sub QEP id to reconfigure
queryExecutionPlanVersionthe local sub QEP version to reconfigure
reconfigurationDescriptorwhat to do
blockingwhether to block until the reconfiguration is done. Mind this parameter because it blocks!

Referenced by addFailureEndOfStream(), addHardEndOfStream(), addSoftEndOfStream(), failExecutableQueryPlan(), notifyQueryStatusChange(), propagateReconfigurationMarker(), and stopExecutableQueryPlan().

Here is the caller graph for this function:

◆ addSoftEndOfStream()

bool NES::Runtime::AbstractQueryManager::addSoftEndOfStream ( DataSourcePtr  source)
protected

Triggers a soft end of stream for a source.

Parameters
sourcethe source for which to trigger the soft end of stream
Returns
true if successful

References addReconfigurationMessage(), magic_enum::enum_name(), NES_DEBUG, NES::Runtime::SoftEndOfStream, and threadPool.

Referenced by addEndOfStream().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ addWorkForNextPipeline()

virtual void NES::Runtime::AbstractQueryManager::addWorkForNextPipeline ( TupleBuffer buffer,
Execution::SuccessorExecutablePipeline  executable,
uint32_t  queueId = 0 
)
pure virtual

add work to the query manager, this methods is source-driven and is called for each buffer generated by the window trigger

Parameters
Pointerto the tuple buffer containing the data
Pointerto the pipeline stage that will be executed next
idof the queue where to put the task (only necessary if multiple queues are used)

Implemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

◆ canTriggerEndOfStream()

bool NES::Runtime::AbstractQueryManager::canTriggerEndOfStream ( DataSourcePtr  source,
Runtime::QueryTerminationType  terminationType 
)

◆ completedWork()

void NES::Runtime::AbstractQueryManager::completedWork ( Task task,
WorkerContext workerContext 
)
protected

finalize task execution by: 1.) update statistics (number of processed tuples and tasks) 2.) release input buffer (give back to the buffer manager)

Parameters
referenceto processed task @oaram reference to worker context

References NES::Runtime::Task::getExecutable(), NES::Runtime::WorkerContext::getId(), NES::Runtime::Task::isReconfiguration(), NES_TRACE, NES::Runtime::Task::toString(), and updateStatistics().

Referenced by NES::Runtime::DynamicQueryManager::processNextTask(), and NES::Runtime::MultiQueueQueryManager::processNextTask().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ destroy()

void NES::Runtime::AbstractQueryManager::destroy ( )
virtual

reset query manager. After this call, it wont be possible to use the query manager.

Reimplemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

References Destroyed, NES::Runtime::HardStop, NES_ASSERT2_FMT, NES_DEBUG, queryManagerStatus, queryMutex, queryToStatisticsMap, Running, runningQEPs, statisticsMutex, stopExecutableQueryPlan(), Stopped, and threadPool.

Referenced by NES::Runtime::DynamicQueryManager::destroy(), NES::Runtime::MultiQueueQueryManager::destroy(), and ~AbstractQueryManager().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ failExecutableQueryPlan()

bool NES::Runtime::AbstractQueryManager::failExecutableQueryPlan ( const Execution::ExecutableQueryPlanPtr qep)

method to fail a query

Parameters
qepof the query to fail
Returns
bool indicating success

References addReconfigurationMessage(), NES::Runtime::Destroy, NES::Runtime::Execution::ErrorState, NES::Runtime::Execution::Fail, NES::Runtime::Execution::Finished, NES::Runtime::Execution::Invalid, NES_ASSERT2_FMT, NES_DEBUG, NES_FATAL_ERROR, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), NES::Runtime::Execution::Stopped, and timeout.

Referenced by notifySourceFailure(), and notifyTaskFailure().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getBufferManager()

BufferManagerPtr NES::Runtime::AbstractQueryManager::getBufferManager ( )
inline

method to get the first buffer manger

Returns
first buffer manager

References bufferManagers.

◆ getCurrentTaskSum()

uint64_t NES::Runtime::AbstractQueryManager::getCurrentTaskSum ( )

Return the current occupation of the task queue

Returns
number of tasks in the queue

References tempCounterTasksCompleted.

◆ getExecutablePlanIdsForSource()

std::unordered_set< DecomposedQueryIdWithVersion > NES::Runtime::AbstractQueryManager::getExecutablePlanIdsForSource ( DataSourcePtr  source) const

References sourceToQEPMapping.

◆ getNextTaskId()

uint64_t NES::Runtime::AbstractQueryManager::getNextTaskId ( )
protected

Returns the next free task id.

Returns
next task id

References taskIdCounter.

Referenced by NES::Runtime::DynamicQueryManager::addWorkForNextPipeline(), and NES::Runtime::MultiQueueQueryManager::addWorkForNextPipeline().

Here is the caller graph for this function:

◆ getNodeId()

WorkerId NES::Runtime::AbstractQueryManager::getNodeId ( ) const

Get the id of the current node

Returns
node id

References nodeEngineId.

◆ getNumberOfBuffersPerEpoch()

uint64_t NES::Runtime::AbstractQueryManager::getNumberOfBuffersPerEpoch ( ) const

Returns the numberOfBuffersPerEpoch.

Returns
numberOfBuffersPerEpoch

References numberOfBuffersPerEpoch.

◆ getNumberOfTasksInWorkerQueues()

virtual uint64_t NES::Runtime::AbstractQueryManager::getNumberOfTasksInWorkerQueues ( ) const
pure virtual

get number of tasks in the queue

Returns
task count

Implemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

◆ getNumberOfWorkerThreads()

uint64_t NES::Runtime::AbstractQueryManager::getNumberOfWorkerThreads ( )

Returns the current number of worker threads

Returns
thread cnt

References numThreads.

◆ getQepStatus()

Execution::ExecutableQueryPlanStatus NES::Runtime::AbstractQueryManager::getQepStatus ( DecomposedQueryIdWithVersion  decomposedQueryIdWithVersion)

retrieve the execution status of a given local query sub plan id.

Parameters
decomposedQueryId: the query sub plan id with version
Returns
status of the query sub plan

References NES::Runtime::Execution::Invalid, queryMutex, and runningQEPs.

Referenced by postReconfigurationCallback().

Here is the caller graph for this function:

◆ getQueryExecutionPlan()

Execution::ExecutableQueryPlanPtr NES::Runtime::AbstractQueryManager::getQueryExecutionPlan ( DecomposedQueryIdWithVersion  decomposedQueryIdWithVersion) const

Provides the QEP object for an id.

Parameters
idthe plan to lookup
Returns
the QEP or null, if not found

References queryMutex, and runningQEPs.

Referenced by startNewExecutableQueryPlanAndPropagateMarker().

Here is the caller graph for this function:

◆ getQueryStatistics()

QueryStatisticsPtr NES::Runtime::AbstractQueryManager::getQueryStatistics ( DecomposedQueryId  decomposedQueryId)

method to return the query statistics

Parameters
decomposedQueryIdid of the particular decomposed query
Returns

References queryToStatisticsMap.

◆ getReconfigurationEvent()

std::optional<ReconfigurationMarkerEventPtr> NES::Runtime::AbstractQueryManager::getReconfigurationEvent ( Network::NetworkSourcePtr  networkSource,
SharedQueryId  sharedQueryId,
const ReconfigurationMarker marker 
) const

◆ getSharedQueryId()

SharedQueryId NES::Runtime::AbstractQueryManager::getSharedQueryId ( DecomposedQueryIdWithVersion  decomposedQueryIdWithVersion) const

get the shared query id mapped to the decomposed query plan id

Parameters
decomposedQueryIdthe decomposed query plan id with version
Returns
shared query id

References runningQEPs, and statisticsMutex.

◆ isThreadPoolRunning()

bool NES::Runtime::AbstractQueryManager::isThreadPoolRunning ( ) const
Returns
true if thread pool is running

References threadPool.

◆ notifyPipelineCompletion()

void NES::Runtime::AbstractQueryManager::notifyPipelineCompletion ( DecomposedQueryId  decomposedQueryId,
Execution::ExecutablePipelinePtr  pipeline,
QueryTerminationType  terminationType 
)

Notifies that a pipeline is done with its execution.

Parameters
decomposedQueryIdthe plan the pipeline belongs to
pipelinethe terminated pipeline
terminationTypethe type of termination (e.g., failure, soft)

References NES_ASSERT2_FMT, queryMutex, and runningQEPs.

◆ notifyQueryStatusChange()

void NES::Runtime::AbstractQueryManager::notifyQueryStatusChange ( const Execution::ExecutableQueryPlanPtr qep,
Execution::ExecutableQueryPlanStatus  newStatus 
)

Informs the query manager about a status change in a sub query plan.

Parameters
qepthe sub query plan
newStatusthe new status of the query plan

References addReconfigurationMessage(), NES::Runtime::Destroy, NES::Runtime::Execution::ErrorState, NES::Runtime::Execution::Finished, NES::Runtime::Graceful, NES_ASSERT, NES_ASSERT2_FMT, queryStatusListener, and NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this().

Here is the call graph for this function:

◆ notifySinkCompletion()

void NES::Runtime::AbstractQueryManager::notifySinkCompletion ( DecomposedQueryId  decomposedQueryId,
DecomposedQueryPlanVersion  decomposedQueryVersion,
DataSinkPtr  sink,
QueryTerminationType  terminationType 
)

Notifies that a sink operator is done with its execution.

Parameters
decomposedQueryIdthe id of the plan the sink belongs to
decomposedQueryVersionthe version of the plan the sink belongs to
sinkthe terminated sink
terminationTypethe type of termination (e.g., failure, soft)

References NES_ASSERT2_FMT, queryMutex, runningQEPs, and NES::NESStrongType< T, Tag, invalid, initial >::toString().

Here is the call graph for this function:

◆ notifySourceCompletion()

void NES::Runtime::AbstractQueryManager::notifySourceCompletion ( DataSourcePtr  source,
QueryTerminationType  terminationType 
)

Notifies that a source operator is done with its execution.

Parameters
sourcethe completed source
terminationTypethe type of termination (e.g., failure, soft)

References NES::Runtime::Execution::Finished, NES::Runtime::Graceful, NES_TRACE, queryMutex, queryStatusListener, NES::Runtime::Reconfiguration, and sourceToQEPMapping.

◆ notifySourceFailure()

void NES::Runtime::AbstractQueryManager::notifySourceFailure ( DataSourcePtr  source,
const std::string  errorMessage 
)

This method informs the QueryManager that a source has failed.

Parameters
sourcethe failed source
errorMessagethe reason of the feature

References asyncTaskExecutor, failExecutableQueryPlan(), backward::details::move(), NES_DEBUG, queryMutex, queryStatusListener, and sourceToQEPMapping.

Here is the call graph for this function:

◆ notifyTaskFailure()

void NES::Runtime::AbstractQueryManager::notifyTaskFailure ( Execution::SuccessorExecutablePipeline  pipeline,
const std::string &  message 
)

This method informs the QueryManager that a task has failed.

Parameters
pipelinethe enclosed pipeline or sink
messagethe reason of the feature

References asyncTaskExecutor, failExecutableQueryPlan(), backward::details::move(), NES_DEBUG, NES_WARNING, queryMutex, queryStatusListener, and runningQEPs.

Referenced by NES::Runtime::DynamicQueryManager::processNextTask().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ operator=()

AbstractQueryManager& NES::Runtime::AbstractQueryManager::operator= ( const AbstractQueryManager )
delete

◆ poisonWorkers()

virtual void NES::Runtime::AbstractQueryManager::poisonWorkers ( )
pure virtual

notify all waiting threads in getWork() to wake up and finish up

◆ postReconfigurationCallback()

void NES::Runtime::AbstractQueryManager::postReconfigurationCallback ( ReconfigurationMessage task)
overridevirtual

◆ processNextTask()

virtual ExecutionResult NES::Runtime::AbstractQueryManager::processNextTask ( bool  running,
WorkerContext workerContext 
)
pure virtual

process task from task queue

Parameters
boolindicating if the thread pool is still running
workercontext
Returns
an execution result

Implemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

◆ propagateReconfigurationMarker()

bool NES::Runtime::AbstractQueryManager::propagateReconfigurationMarker ( const ReconfigurationMarkerPtr marker,
DataSourcePtr  source 
)

propagates a reconfiguration marker to all downstream operators of a source operator

Parameters
sourcethe source operator
Returns
true if it went through

References addReconfigurationMessage(), magic_enum::enum_name(), NES_DEBUG, NES::Runtime::ReconfigurationMarker, NES::Runtime::SoftEndOfStream, and threadPool.

Here is the call graph for this function:

◆ reconfigure()

void NES::Runtime::AbstractQueryManager::reconfigure ( ReconfigurationMessage task,
WorkerContext context 
)
overridevirtual

This methods triggers the reconfiguration

Parameters
contextworkercontext

Reimplemented from NES::Runtime::Reconfigurable.

References NES::Runtime::Destroy, NES::Runtime::ReconfigurationMessage::getType(), NES_THROW_RUNTIME_ERROR, and NES::Runtime::Reconfigurable::reconfigure().

Here is the call graph for this function:

◆ registerExecutableQueryPlan()

bool NES::Runtime::AbstractQueryManager::registerExecutableQueryPlan ( const Execution::ExecutableQueryPlanPtr executableQueryPlan)
virtual

register a query by extracting sources, windows and sink and add them to respective map

Parameters
executableQueryPlanto be deployed

Reimplemented in NES::Runtime::MultiQueueQueryManager.

References asyncTaskExecutor, NES::Runtime::Execution::Created, decomposeQueryToSourceIdMapping, NES_ASSERT, NES_ASSERT2_FMT, NES_DEBUG, NES_FATAL_ERROR, NES_WARNING, queryManagerStatus, queryMutex, queryToStatisticsMap, Running, runningQEPs, and sourceToQEPMapping.

Referenced by NES::Runtime::MultiQueueQueryManager::registerExecutableQueryPlan().

Here is the caller graph for this function:

◆ resetQueryStatistics()

void NES::Runtime::AbstractQueryManager::resetQueryStatistics ( NES::DecomposedQueryId  decomposedQueryId)

Reset statistics for the decomposed query plan.

Parameters
decomposedQueryId: the decomposed query plan id

References queryToStatisticsMap.

◆ startExecutableQueryPlan()

bool NES::Runtime::AbstractQueryManager::startExecutableQueryPlan ( const Execution::ExecutableQueryPlanPtr qep)

method to start a query

Parameters
qepof the query to start
Returns
bool indicating success

References NES_ASSERT2_FMT, NES_DEBUG, NES_FATAL_ERROR, NES_THROW_RUNTIME_ERROR, NES_WARNING, queryManagerStatus, queryToStatisticsMap, and Running.

Referenced by startNewExecutableQueryPlanAndPropagateMarker().

Here is the caller graph for this function:

◆ startNewExecutableQueryPlanAndPropagateMarker()

bool NES::Runtime::AbstractQueryManager::startNewExecutableQueryPlanAndPropagateMarker ( const ReconfigurationMarkerPtr  marker,
DecomposedQueryIdWithVersion  decomposedQueryIdWithVersion 
)

starts new executable plan and propagates reconfiguration marker to it's source

Parameters
markerreconfiguration marker
decomposedQueryIdid and version of current plan
Returns
true if plan was started

References NES::Runtime::Execution::Finished, getQueryExecutionPlan(), and startExecutableQueryPlan().

Here is the call graph for this function:

◆ stopExecutableQueryPlan()

bool NES::Runtime::AbstractQueryManager::stopExecutableQueryPlan ( const Execution::ExecutableQueryPlanPtr qep,
Runtime::QueryTerminationType  type = Runtime::QueryTerminationType::HardStop 
)

method to start a query

Parameters
qepof the query to start
gracefulstop the query gracefully or not
Returns
bool indicating success

References addReconfigurationMessage(), NES::Runtime::Destroy, magic_enum::enum_name(), NES::Runtime::Execution::ErrorState, NES::Runtime::Failure, NES::Runtime::Execution::Finished, NES::Runtime::Graceful, NES::Runtime::HardStop, NES::Runtime::Execution::Invalid, NES_ASSERT2_FMT, NES_DEBUG, NES_FATAL_ERROR, NES_WARNING, NES::Runtime::Execution::Ok, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), NES::Runtime::Execution::Stopped, timeout, and type.

Referenced by destroy().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ terminateLoop()

virtual ExecutionResult NES::Runtime::AbstractQueryManager::terminateLoop ( WorkerContext )
protectedpure virtual

Executes cleaning up logic on the task queue.

Returns
an ExecutionResult

Implemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

◆ unregisterExecutableQueryPlan()

bool NES::Runtime::AbstractQueryManager::unregisterExecutableQueryPlan ( const Execution::ExecutableQueryPlanPtr executableQueryPlan)

deregister a query by extracting sources, windows and sink and remove them from respective map

Parameters
executableQueryPlanto be unregistered
Returns
bool indicating if register was successful

References decomposeQueryToSourceIdMapping, NES::Runtime::Execution::ErrorState, NES::Runtime::Execution::Finished, NES_ASSERT2_FMT, NES_DEBUG, queryMutex, runningQEPs, sourceToQEPMapping, and NES::Runtime::Execution::Stopped.

◆ updateStatistics()

void NES::Runtime::AbstractQueryManager::updateStatistics ( const Task task,
SharedQueryId  sharedQueryId,
DecomposedQueryId  decomposedQueryId,
PipelineId  pipelineId,
WorkerContext workerContext 
)
protectedvirtual

Method to update the statistics.

Parameters
task
sharedQueryId
decomposedQueryId
pipelineId
workerContext

Reimplemented in NES::Runtime::MultiQueueQueryManager, and NES::Runtime::DynamicQueryManager.

References bufferManagers, NES::Runtime::Task::getBufferRef(), NES::Runtime::TupleBuffer::getChunkNumber(), NES::Runtime::TupleBuffer::getCreationTimestampInMS(), NES::Runtime::WorkerContext::getId(), NES::Runtime::Task::getNumberOfInputTuples(), NES::Runtime::TupleBuffer::getOriginId(), NES_ERROR, NES_WARNING, queryToStatisticsMap, and tempCounterTasksCompleted.

Referenced by completedWork(), NES::Runtime::DynamicQueryManager::updateStatistics(), and NES::Runtime::MultiQueueQueryManager::updateStatistics().

Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Function Documentation

◆ NodeEngine

friend class NodeEngine
friend

◆ ThreadPool

friend class ThreadPool
friend

Member Data Documentation

◆ asyncTaskExecutor

AsyncTaskExecutorPtr NES::Runtime::AbstractQueryManager::asyncTaskExecutor
protected

worker thread for async maintenance task, e.g., fail queryIdAndCatalogEntryMapping

Referenced by AbstractQueryManager(), notifySourceFailure(), notifyTaskFailure(), and registerExecutableQueryPlan().

◆ bufferManagers

std::vector<BufferManagerPtr> NES::Runtime::AbstractQueryManager::bufferManagers
protected

◆ decomposeQueryToSourceIdMapping

std::unordered_map<DecomposedQueryIdWithVersion, std::vector<OperatorId> > NES::Runtime::AbstractQueryManager::decomposeQueryToSourceIdMapping
protected

◆ hardwareManager

HardwareManagerPtr NES::Runtime::AbstractQueryManager::hardwareManager
protected

◆ nodeEngineId

WorkerId NES::Runtime::AbstractQueryManager::nodeEngineId
protected

Referenced by getNodeId().

◆ numberOfBuffersPerEpoch

◆ numThreads

◆ persistentSourceProperties

folly::Synchronized<std::unordered_map<std::string, std::shared_ptr<BasePersistentSourceProperties> > > NES::Runtime::AbstractQueryManager::persistentSourceProperties

◆ queryManagerStatus

◆ queryMutex

◆ queryStatusListener

std::shared_ptr<AbstractQueryStatusListener> NES::Runtime::AbstractQueryManager::queryStatusListener
protected

◆ queryToStatisticsMap

◆ reconfigurationMutex

std::mutex NES::Runtime::AbstractQueryManager::reconfigurationMutex
mutableprotected

◆ runningQEPs

◆ sourceToQEPMapping

◆ statisticsMutex

std::mutex NES::Runtime::AbstractQueryManager::statisticsMutex
mutableprotected

Referenced by destroy(), and getSharedQueryId().

◆ taskIdCounter

std::atomic_uint64_t NES::Runtime::AbstractQueryManager::taskIdCounter = 0
protected

Referenced by getNextTaskId().

◆ tempCounterTasksCompleted

std::vector<AtomicCounter<uint64_t> > NES::Runtime::AbstractQueryManager::tempCounterTasksCompleted
protected

◆ threadPool

ThreadPoolPtr NES::Runtime::AbstractQueryManager::threadPool {nullptr}
protected

◆ workerToCoreMapping

std::vector<uint64_t> NES::Runtime::AbstractQueryManager::workerToCoreMapping
protected

The documentation for this class was generated from the following files: