NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Runtime::DynamicQueryManager Class Reference

#include <QueryManager.hpp>

Collaboration diagram for NES::Runtime::DynamicQueryManager:
[legend]

Public Member Functions

 DynamicQueryManager (std::shared_ptr< AbstractQueryStatusListener > queryStatusListener, std::vector< BufferManagerPtr > bufferManager, WorkerId nodeEngineId, uint16_t numThreads, HardwareManagerPtr hardwareManager, uint64_t numberOfBuffersPerEpoch, std::vector< uint64_t > workerToCoreMapping={})
 
void destroy () override
 reset query manager. After this call, it wont be possible to use the query manager. More...
 
ExecutionResult processNextTask (bool running, WorkerContext &workerContext) override
 process task from task queue More...
 
void addWorkForNextPipeline (TupleBuffer &buffer, Execution::SuccessorExecutablePipeline executable, uint32_t queueId=0) override
 add work to the query manager, this methods is source-driven and is called for each buffer generated by the window trigger More...
 
- Public Member Functions inherited from NES::Runtime::AbstractQueryManager
 AbstractQueryManager ()=delete
 
 AbstractQueryManager (const AbstractQueryManager &)=delete
 
AbstractQueryManageroperator= (const AbstractQueryManager &)=delete
 
 AbstractQueryManager (std::shared_ptr< AbstractQueryStatusListener > queryStatusListener, std::vector< BufferManagerPtr > bufferManagers, WorkerId nodeEngineId, uint16_t numThreads, HardwareManagerPtr hardwareManager, uint64_t numberOfBuffersPerEpoch, std::vector< uint64_t > workerToCoreMapping={})
 
virtual ~AbstractQueryManager () NES_NOEXCEPT(false) override
 
virtual bool registerExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan)
 register a query by extracting sources, windows and sink and add them to respective map More...
 
bool unregisterExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan)
 deregister a query by extracting sources, windows and sink and remove them from respective map More...
 
bool startExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep)
 method to start a query More...
 
bool stopExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep, Runtime::QueryTerminationType type=Runtime::QueryTerminationType::HardStop)
 method to start a query More...
 
bool failExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &qep)
 method to fail a query More...
 
void postReconfigurationCallback (ReconfigurationMessage &task) override
 
void reconfigure (ReconfigurationMessage &, WorkerContext &context) override
 
Execution::ExecutableQueryPlanStatus getQepStatus (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion)
 retrieve the execution status of a given local query sub plan id. More...
 
Execution::ExecutableQueryPlanPtr getQueryExecutionPlan (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion) const
 Provides the QEP object for an id. More...
 
bool canTriggerEndOfStream (DataSourcePtr source, Runtime::QueryTerminationType)
 
QueryStatisticsPtr getQueryStatistics (DecomposedQueryId decomposedQueryId)
 method to return the query statistics More...
 
void resetQueryStatistics (DecomposedQueryId decomposedQueryId)
 Reset statistics for the decomposed query plan. More...
 
WorkerId getNodeId () const
 
BufferManagerPtr getBufferManager ()
 
void notifyTaskFailure (Execution::SuccessorExecutablePipeline pipeline, const std::string &message)
 This method informs the QueryManager that a task has failed. More...
 
uint64_t getNumberOfBuffersPerEpoch () const
 Returns the numberOfBuffersPerEpoch. More...
 
void notifySourceFailure (DataSourcePtr source, const std::string errorMessage)
 This method informs the QueryManager that a source has failed. More...
 
void notifyQueryStatusChange (const Execution::ExecutableQueryPlanPtr &qep, Execution::ExecutableQueryPlanStatus newStatus)
 Informs the query manager about a status change in a sub query plan. More...
 
SharedQueryId getSharedQueryId (DecomposedQueryIdWithVersion decomposedQueryIdWithVersion) const
 get the shared query id mapped to the decomposed query plan id More...
 
bool addEndOfStream (DataSourcePtr source, Runtime::QueryTerminationType graceful=Runtime::QueryTerminationType::Graceful)
 introduces end of stream to all QEPs connected to this source More...
 
bool propagateReconfigurationMarker (const ReconfigurationMarkerPtr &marker, DataSourcePtr source)
 propagates a reconfiguration marker to all downstream operators of a source operator More...
 
bool startNewExecutableQueryPlanAndPropagateMarker (const ReconfigurationMarkerPtr marker, DecomposedQueryIdWithVersion decomposedQueryIdWithVersion)
 starts new executable plan and propagates reconfiguration marker to it's source More...
 
bool isThreadPoolRunning () const
 
uint64_t getCurrentTaskSum ()
 
uint64_t getNumberOfWorkerThreads ()
 
void notifySourceCompletion (DataSourcePtr source, QueryTerminationType terminationType)
 Notifies that a source operator is done with its execution. More...
 
void notifyPipelineCompletion (DecomposedQueryId decomposedQueryId, Execution::ExecutablePipelinePtr pipeline, QueryTerminationType terminationType)
 Notifies that a pipeline is done with its execution. More...
 
void notifySinkCompletion (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, DataSinkPtr sink, QueryTerminationType terminationType)
 Notifies that a sink operator is done with its execution. More...
 
std::optional< ReconfigurationMarkerEventPtrgetReconfigurationEvent (Network::NetworkSourcePtr networkSource, SharedQueryId sharedQueryId, const ReconfigurationMarker &marker) const
 
std::unordered_set< DecomposedQueryIdWithVersiongetExecutablePlanIdsForSource (DataSourcePtr source) const
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< AbstractQueryManager, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Protected Member Functions

ExecutionResult terminateLoop (WorkerContext &) override
 
void updateStatistics (const Task &task, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, PipelineId pipeId, WorkerContext &workerContext) override
 
uint64_t getNumberOfTasksInWorkerQueues () const override
 get number of tasks in the queue More...
 
uint64_t getNumberOfBuffersPerEpoch () const
 Returns the numberOfBuffersPerEpoch. More...
 
- Protected Member Functions inherited from NES::Runtime::AbstractQueryManager
void completedWork (Task &task, WorkerContext &workerContext)
 finalize task execution by: 1.) update statistics (number of processed tuples and tasks) 2.) release input buffer (give back to the buffer manager) More...
 
bool addSoftEndOfStream (DataSourcePtr source)
 Triggers a soft end of stream for a source. More...
 
bool addHardEndOfStream (DataSourcePtr source)
 Triggers a hard end of stream for a source. More...
 
bool addFailureEndOfStream (DataSourcePtr source)
 Triggers a failure end of stream for a source. More...
 
uint64_t getNextTaskId ()
 Returns the next free task id. More...
 

Additional Inherited Members

- Public Types inherited from NES::Runtime::AbstractQueryManager
enum class  QueryManagerStatus : uint8_t {
  Created , Running , Stopped , Destroyed ,
  Failed
}
 
using inherited0 = NES::detail::virtual_enable_shared_from_this< AbstractQueryManager, false >
 
using inherited1 = Reconfigurable
 
- Public Attributes inherited from NES::Runtime::AbstractQueryManager
folly::Synchronized< std::unordered_map< std::string, std::shared_ptr< BasePersistentSourceProperties > > > persistentSourceProperties
 
- Protected Attributes inherited from NES::Runtime::AbstractQueryManager
WorkerId nodeEngineId
 
std::atomic_uint64_t taskIdCounter = 0
 
std::vector< BufferManagerPtrbufferManagers
 
uint16_t numThreads
 
HardwareManagerPtr hardwareManager
 
ThreadPoolPtr threadPool {nullptr}
 worker threads running compute tasks More...
 
AsyncTaskExecutorPtr asyncTaskExecutor
 worker thread for async maintenance task, e.g., fail queryIdAndCatalogEntryMapping More...
 
std::unordered_map< DecomposedQueryIdWithVersion, Execution::ExecutableQueryPlanPtrrunningQEPs
 
std::mutex statisticsMutex
 
cuckoohash_map< DecomposedQueryId, QueryStatisticsPtrqueryToStatisticsMap
 
std::mutex reconfigurationMutex
 
std::vector< uint64_t > workerToCoreMapping
 
std::recursive_mutex queryMutex
 
std::atomic< QueryManagerStatusqueryManagerStatus {QueryManagerStatus::Created}
 
std::vector< AtomicCounter< uint64_t > > tempCounterTasksCompleted
 
std::shared_ptr< AbstractQueryStatusListenerqueryStatusListener
 
std::unordered_map< DecomposedQueryIdWithVersion, std::vector< OperatorId > > decomposeQueryToSourceIdMapping
 
std::unordered_map< OperatorId, std::vector< Execution::ExecutableQueryPlanPtr > > sourceToQEPMapping
 
uint64_t numberOfBuffersPerEpoch
 

Constructor & Destructor Documentation

◆ DynamicQueryManager()

NES::Runtime::DynamicQueryManager::DynamicQueryManager ( std::shared_ptr< AbstractQueryStatusListener queryStatusListener,
std::vector< BufferManagerPtr bufferManager,
WorkerId  nodeEngineId,
uint16_t  numThreads,
HardwareManagerPtr  hardwareManager,
uint64_t  numberOfBuffersPerEpoch,
std::vector< uint64_t >  workerToCoreMapping = {} 
)
explicit

Member Function Documentation

◆ addWorkForNextPipeline()

void NES::Runtime::DynamicQueryManager::addWorkForNextPipeline ( TupleBuffer buffer,
Execution::SuccessorExecutablePipeline  executable,
uint32_t  queueId = 0 
)
overridevirtual

add work to the query manager, this methods is source-driven and is called for each buffer generated by the window trigger

Parameters
Pointerto the tuple buffer containing the data
Pointerto the pipeline stage that will be executed next
idof the queue where to put the task (only necessary if multiple queues are used)

Implements NES::Runtime::AbstractQueryManager.

References NES::Runtime::AbstractQueryManager::getNextTaskId(), NES_THROW_RUNTIME_ERROR, NES_TRACE, and NES_WARNING.

Here is the call graph for this function:

◆ destroy()

void NES::Runtime::DynamicQueryManager::destroy ( )
overridevirtual

reset query manager. After this call, it wont be possible to use the query manager.

Reimplemented from NES::Runtime::AbstractQueryManager.

References NES::Runtime::AbstractQueryManager::destroy(), NES::Runtime::AbstractQueryManager::Destroyed, and NES::Runtime::AbstractQueryManager::queryManagerStatus.

Here is the call graph for this function:

◆ getNumberOfBuffersPerEpoch()

uint64_t NES::Runtime::DynamicQueryManager::getNumberOfBuffersPerEpoch ( ) const
protected

Returns the numberOfBuffersPerEpoch.

Returns
numberOfBuffersPerEpoch

References NES::Runtime::AbstractQueryManager::numberOfBuffersPerEpoch.

◆ getNumberOfTasksInWorkerQueues()

uint64_t NES::Runtime::DynamicQueryManager::getNumberOfTasksInWorkerQueues ( ) const
overrideprotectedvirtual

get number of tasks in the queue

Returns
task count

Implements NES::Runtime::AbstractQueryManager.

◆ processNextTask()

ExecutionResult NES::Runtime::DynamicQueryManager::processNextTask ( bool  running,
WorkerContext workerContext 
)
overridevirtual

process task from task queue

Parameters
boolindicating if the thread pool is still running
workercontext
Returns
an execution result

Implements NES::Runtime::AbstractQueryManager.

References NES::Runtime::AbstractQueryManager::completedWork(), NES::Error, NES::Finished, NES::Runtime::Task::getExecutable(), NES::Runtime::NesThread::getId(), NES::Runtime::Task::getNumberOfInputTuples(), NES_ERROR, NES_TRACE, NES::Runtime::AbstractQueryManager::notifyTaskFailure(), NES::Ok, terminateLoop(), and NES::Runtime::Task::toString().

Here is the call graph for this function:

◆ terminateLoop()

ExecutionResult NES::Runtime::DynamicQueryManager::terminateLoop ( WorkerContext workerContext)
overrideprotectedvirtual
Returns

Implements NES::Runtime::AbstractQueryManager.

References NES::Finished, and NES::Runtime::Task::isReconfiguration().

Referenced by processNextTask().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ updateStatistics()

void NES::Runtime::DynamicQueryManager::updateStatistics ( const Task task,
SharedQueryId  sharedQueryId,
DecomposedQueryId  decomposedQueryId,
PipelineId  pipeId,
WorkerContext workerContext 
)
overrideprotectedvirtual
Parameters
task
sharedQueryId
decomposedQueryId
pipeId
workerContext

Reimplemented from NES::Runtime::AbstractQueryManager.

References NES::Runtime::AbstractQueryManager::queryToStatisticsMap, and NES::Runtime::AbstractQueryManager::updateStatistics().

Here is the call graph for this function:

The documentation for this class was generated from the following files: