NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
this class represents the interface and entrance point into the query processing part of NES. It provides basic functionality such as deploying, undeploying, starting, and stopping. More...
#include <NodeEngine.hpp>
Public Types | |
enum class | NodeEngineQueryStatus : uint8_t { started , stopped , registered } |
Public Member Functions | |
virtual | ~NodeEngine () override |
NodeEngine ()=delete | |
NodeEngine (const NodeEngine &)=delete | |
NodeEngine & | operator= (const NodeEngine &)=delete |
void | onFatalError (int signalNumber, std::string callstack) override |
signal handler: behaviour not clear yet! More... | |
void | onFatalException (std::shared_ptr< std::exception > exception, std::string callstack) override |
exception handler: behaviour not clear yet! More... | |
bool | deployExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan) |
deploy registers and starts a query More... | |
bool | registerExecutableQueryPlan (const Execution::ExecutableQueryPlanPtr &executableQueryPlan) |
registers an executable query plan More... | |
bool | undeployDecomposedQueryPlan (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId) |
Stops and undeploy a decomposed query plan. More... | |
bool | registerDecomposableQueryPlan (const DecomposedQueryPlanPtr &decomposedQueryPlan) |
registers a decomposed query plan More... | |
Execution::ExecutableQueryPlanPtr | checkDecomposableQueryPlanToStart (DecomposedQueryId id, DecomposedQueryPlanVersion version) |
checks and returns decomposed query plan, delayed to register More... | |
bool | unregisterDecomposedQueryPlan (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId) |
unregisters a decomposed query More... | |
bool | startDecomposedQueryPlan (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId) |
method to start a already deployed query More... | |
bool | stopDecomposedQueryPlan (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, Runtime::QueryTerminationType terminationType=Runtime::QueryTerminationType::HardStop) |
method to stop a decomposed query plan More... | |
bool | bufferData (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, OperatorId uniqueNetworkSinkDescriptorId) |
method to trigger the buffering of data on a NetworkSink of a Query Sub Plan with the given id More... | |
bool | updateNetworkSink (WorkerId newNodeId, const std::string &newHostname, uint32_t newPort, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, OperatorId uniqueNetworkSinkDescriptorId) |
method to trigger the reconfiguration of a NetworkSink so that it points to a new downstream node. More... | |
bool | stop (bool markQueriesAsFailed=false) |
release all resource of the node engine More... | |
QueryManagerPtr | getQueryManager () |
getter of query manager More... | |
BufferManagerPtr | getBufferManager (uint32_t bufferManagerIndex=0) const |
getter of buffer manager for the i-th numa region (defaul: 0) More... | |
WorkerId | getWorkerId () |
getter of node id More... | |
Network::NetworkManagerPtr | getNetworkManager () |
getter of network manager More... | |
AbstractQueryStatusListenerPtr | getQueryStatusListener () |
getter of query status listener More... | |
Execution::ExecutableQueryPlanStatus | getQueryStatus (SharedQueryId sharedQueryId) |
std::vector< QueryStatisticsPtr > | getQueryStatistics (SharedQueryId sharedQueryId) |
method to return the query statistics More... | |
std::vector< QueryStatistics > | getQueryStatistics (bool withReset=false) |
method to return the query statistics More... | |
Network::PartitionManagerPtr | getPartitionManager () |
void | onDataBuffer (Network::NesPartition, TupleBuffer &) override |
this callback is called once a tuple buffer arrives on the network manager for a given nes partition More... | |
void | onEvent (Network::NesPartition, Runtime::BaseEvent &) override |
this callback is called once a tuple buffer arrives on the network manager for a given nes partition More... | |
void | onEndOfStream (Network::Messages::EndOfStreamMessage) override |
this callback is called once an end of stream message arrives More... | |
void | onServerError (Network::Messages::ErrorMessage) override |
this callback is called once an error is raised on the server side More... | |
void | onChannelError (Network::Messages::ErrorMessage) override |
this callback is called once an error is raised on the channel(client) side More... | |
HardwareManagerPtr | getHardwareManager () const |
Provide the hardware manager. More... | |
const std::vector< PhysicalSourceTypePtr > & | getPhysicalSourceTypes () const |
Get physical sources configured. More... | |
std::shared_ptr< const Execution::ExecutableQueryPlan > | getExecutableQueryPlan (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion) const |
finds executable query plan for a given sub query id More... | |
std::vector< DecomposedQueryIdWithVersion > | getDecomposedQueryIds (SharedQueryId sharedQueryId) |
finds sub query ids for a given query id More... | |
Monitoring::MetricStorePtr | getMetricStore () |
void | setMetricStore (Monitoring::MetricStorePtr metricStore) |
WorkerId | getNodeId () const |
void | setNodeId (const WorkerId NodeId) |
void | updatePhysicalSources (const std::vector< PhysicalSourceTypePtr > &physicalSources) |
Updates the physical sources on the node engine. More... | |
const OpenCLManagerPtr | getOpenCLManager () const |
const Statistic::StatisticManagerPtr | getStatisticManager () const |
bool | reconfigureSubPlan (DecomposedQueryPlanPtr &reconfiguredDecomposedQueryPlan) |
applies reconfigurations to the sources or sinks of a sub plan. Reconfigured sources will start expecting connections from a new upstream sink. Reconfigured sinks will scheduled a pending change of the downstream source to which they send their data. More... | |
bool | addReconfigureMarker (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, ReconfigurationMarkerPtr &reconfigurationMarker) |
add reconfiguration marker to the decomposed query plan More... | |
NodeEngine (std::vector< PhysicalSourceTypePtr > physicalSources, HardwareManagerPtr &&, std::vector< BufferManagerPtr > &&, QueryManagerPtr &&, std::function< Network::NetworkManagerPtr(std::shared_ptr< NodeEngine >)> &&, Network::PartitionManagerPtr &&, OperatorHandlerStorePtr, QueryCompilation::QueryCompilerPtr &&, std::weak_ptr< AbstractQueryStatusListener > &&, OpenCLManagerPtr &&, WorkerId nodeEngineId, uint64_t numberOfBuffersInGlobalBufferManager, uint64_t numberOfBuffersInSourceLocalBufferPool, uint64_t numberOfBuffersPerWorker, bool sourceSharing) | |
Create a node engine and gather node information and initialize QueryManager, BufferManager and ThreadPool. More... | |
![]() | |
virtual | ~ExchangeProtocolListener ()=default |
virtual void | onEvent (NesPartition, Runtime::BaseEvent &)=0 |
This is called on every event buffer received by the network stack. More... | |
virtual void | onDataBuffer (NesPartition, Runtime::TupleBuffer &)=0 |
This is called on every data buffer that the network stack receives for a specific nes partition. More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(true) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
![]() | |
virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(true) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
Friends | |
class | NodeEngineBuilder |
this class represents the interface and entrance point into the query processing part of NES. It provides basic functionality such as deploying, undeploying, starting, and stopping.
|
strong |
|
overridevirtual |
|
delete |
|
delete |
|
explicit |
Create a node engine and gather node information and initialize QueryManager, BufferManager and ThreadPool.
bool NES::Runtime::NodeEngine::addReconfigureMarker | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId, | ||
ReconfigurationMarkerPtr & | reconfigurationMarker | ||
) |
add reconfiguration marker to the decomposed query plan
sharedQueryId | shared query id |
decomposedQueryId | Decomposed query id |
reconfigurationMarker | reconfiguration marker containing information about how to reconfigure decomposed query plans |
References NES_WARNING.
bool NES::Runtime::NodeEngine::bufferData | ( | DecomposedQueryId | decomposedQueryId, |
DecomposedQueryPlanVersion | decomposedQueryVersion, | ||
OperatorId | uniqueNetworkSinkDescriptorId | ||
) |
method to trigger the buffering of data on a NetworkSink of a Query Sub Plan with the given id
decomposedQueryId | the id of the Query Sub Plan to which the Network Sink belongs to |
decomposedQueryVersion | the version of the Query Sub Plan to which the Network Sink belongs to |
uniqueNetworkSinkDescriptorId | : the id of the Network Sink Descriptor. Helps identify the Network Sink on which to buffer data |
References NES_DEBUG, and NES_NOT_IMPLEMENTED.
Execution::ExecutableQueryPlanPtr NES::Runtime::NodeEngine::checkDecomposableQueryPlanToStart | ( | DecomposedQueryId | id, |
DecomposedQueryPlanVersion | version | ||
) |
checks and returns decomposed query plan, delayed to register
id | the decomposed query plan id |
version | the decomposed query plan version |
References NES_DEBUG.
bool NES::Runtime::NodeEngine::deployExecutableQueryPlan | ( | const Execution::ExecutableQueryPlanPtr & | executableQueryPlan | ) |
deploy registers and starts a query
executableQueryPlan | the executable query plan to deploy |
References NES_DEBUG, NES_ERROR, registerExecutableQueryPlan(), and startDecomposedQueryPlan().
BufferManagerPtr NES::Runtime::NodeEngine::getBufferManager | ( | uint32_t | bufferManagerIndex = 0 | ) | const |
getter of buffer manager for the i-th numa region (defaul: 0)
References NES_ASSERT2_FMT.
std::vector< DecomposedQueryIdWithVersion > NES::Runtime::NodeEngine::getDecomposedQueryIds | ( | SharedQueryId | sharedQueryId | ) |
finds sub query ids for a given query id
sharedQueryId | query id |
std::shared_ptr< const Execution::ExecutableQueryPlan > NES::Runtime::NodeEngine::getExecutableQueryPlan | ( | DecomposedQueryId | decomposedQueryId, |
DecomposedQueryPlanVersion | decomposedQueryVersion | ||
) | const |
finds executable query plan for a given sub query id
decomposedQueryId | query sub plan id |
decomposedQueryVersion | query sub plan version |
HardwareManagerPtr NES::Runtime::NodeEngine::getHardwareManager | ( | ) | const |
Provide the hardware manager.
Monitoring::MetricStorePtr NES::Runtime::NodeEngine::getMetricStore | ( | ) |
Getter for the metric store
Network::NetworkManagerPtr NES::Runtime::NodeEngine::getNetworkManager | ( | ) |
getter of network manager
WorkerId NES::Runtime::NodeEngine::getNodeId | ( | ) | const |
Getter for node Id
const OpenCLManagerPtr NES::Runtime::NodeEngine::getOpenCLManager | ( | ) | const |
Network::PartitionManagerPtr NES::Runtime::NodeEngine::getPartitionManager | ( | ) |
const std::vector< PhysicalSourceTypePtr > & NES::Runtime::NodeEngine::getPhysicalSourceTypes | ( | ) | const |
Get physical sources configured.
QueryManagerPtr NES::Runtime::NodeEngine::getQueryManager | ( | ) |
getter of query manager
std::vector< QueryStatistics > NES::Runtime::NodeEngine::getQueryStatistics | ( | bool | withReset = false | ) |
method to return the query statistics
withReset | specifies if the statistics is deleted after reading (so we start with 0) |
References NES_TRACE.
std::vector< QueryStatisticsPtr > NES::Runtime::NodeEngine::getQueryStatistics | ( | SharedQueryId | sharedQueryId | ) |
Execution::ExecutableQueryPlanStatus NES::Runtime::NodeEngine::getQueryStatus | ( | SharedQueryId | sharedQueryId | ) |
References NES::Runtime::Execution::Invalid, and NES_ERROR.
AbstractQueryStatusListenerPtr NES::Runtime::NodeEngine::getQueryStatusListener | ( | ) |
getter of query status listener
const Statistic::StatisticManagerPtr NES::Runtime::NodeEngine::getStatisticManager | ( | ) | const |
WorkerId NES::Runtime::NodeEngine::getWorkerId | ( | ) |
getter of node id
|
overridevirtual |
this callback is called once an error is raised on the channel(client) side
Implements NES::Network::ExchangeProtocolListener.
References NES::Network::Messages::DeletedPartitionError, NES::Network::Messages::ExchangeMessage::getChannelId(), NES::Network::Messages::ErrorMessage::getErrorType(), NES::Network::Messages::ErrorMessage::getErrorTypeAsString(), NES_INFO, NES_THROW_RUNTIME_ERROR, NES_WARNING, NES::Network::Messages::PartitionNotRegisteredError, and NES::Network::Messages::VersionMismatchError.
|
override |
this callback is called once a tuple buffer arrives on the network manager for a given nes partition
|
overridevirtual |
this callback is called once an end of stream message arrives
Implements NES::Network::ExchangeProtocolListener.
|
override |
this callback is called once a tuple buffer arrives on the network manager for a given nes partition
|
overridevirtual |
signal handler: behaviour not clear yet!
signalNumber | |
callstack |
Implements NES::Exceptions::ErrorListener.
References NES_ERROR.
|
overridevirtual |
exception handler: behaviour not clear yet!
exception | |
callstack |
Implements NES::Exceptions::ErrorListener.
References NES_ERROR.
|
overridevirtual |
this callback is called once an error is raised on the server side
Implements NES::Network::ExchangeProtocolListener.
References NES::Network::Messages::DeletedPartitionError, NES::Network::Messages::ExchangeMessage::getChannelId(), NES::Network::Messages::ErrorMessage::getErrorType(), NES::Network::Messages::ErrorMessage::getErrorTypeAsString(), NES_ASSERT, NES_INFO, NES_WARNING, NES::Network::Messages::PartitionNotRegisteredError, and NES::Network::Messages::VersionMismatchError.
|
delete |
bool NES::Runtime::NodeEngine::reconfigureSubPlan | ( | DecomposedQueryPlanPtr & | reconfiguredDecomposedQueryPlan | ) |
applies reconfigurations to the sources or sinks of a sub plan. Reconfigured sources will start expecting connections from a new upstream sink. Reconfigured sinks will scheduled a pending change of the downstream source to which they send their data.
reconfiguredDecomposedQueryPlan | A query plan containing source or sink descriptors which contain the updated sender/receiver date. |
References NES_DEBUG.
bool NES::Runtime::NodeEngine::registerDecomposableQueryPlan | ( | const DecomposedQueryPlanPtr & | decomposedQueryPlan | ) |
registers a decomposed query plan
@caution !This method should be called from separate thread that can be blocked! !!Calling this method from task queue will result in deadlock!!
decomposedQueryPlan | the decomposed query plan to be registered |
References NES::QueryCompilation::QueryCompilationRequest::create(), NES_ERROR, NES_INFO, and registerExecutableQueryPlan().
bool NES::Runtime::NodeEngine::registerExecutableQueryPlan | ( | const Execution::ExecutableQueryPlanPtr & | executableQueryPlan | ) |
registers an executable query plan
executableQueryPlan | executable query plan to register |
References NES_ASSERT, and NES_DEBUG.
Referenced by deployExecutableQueryPlan(), and registerDecomposableQueryPlan().
void NES::Runtime::NodeEngine::setMetricStore | ( | Monitoring::MetricStorePtr | metricStore | ) |
void NES::Runtime::NodeEngine::setNodeId | ( | const WorkerId | NodeId | ) |
Setter for node ID
NodeId |
bool NES::Runtime::NodeEngine::startDecomposedQueryPlan | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId | ||
) |
method to start a already deployed query
sharedQueryId | id of the shared query which is served by the decomposed query plan |
decomposedQueryId | id of the decomposed query plan to be started |
References NES_DEBUG, and NES_ERROR.
Referenced by deployExecutableQueryPlan().
bool NES::Runtime::NodeEngine::stop | ( | bool | markQueriesAsFailed = false | ) |
release all resource of the node engine
withError | true if the node engine stopped with an error |
References NES_DEBUG, NES_ERROR, and NES_WARNING.
Referenced by ~NodeEngine().
bool NES::Runtime::NodeEngine::stopDecomposedQueryPlan | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId, | ||
Runtime::QueryTerminationType | terminationType = Runtime::QueryTerminationType::HardStop |
||
) |
method to stop a decomposed query plan
sharedQueryId | id of the shared query which is served by the decomposed query plan |
decomposedQueryId | id of the decomposed query plan to be stopped |
graceful | hard or soft termination |
References NES::Runtime::Failure, NES::Runtime::Graceful, NES::Runtime::HardStop, NES::Runtime::Invalid, NES_DEBUG, NES_ERROR, NES_NOT_IMPLEMENTED, NES_WARNING, and NES::Runtime::Reconfiguration.
Referenced by undeployDecomposedQueryPlan().
bool NES::Runtime::NodeEngine::undeployDecomposedQueryPlan | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId | ||
) |
Stops and undeploy a decomposed query plan.
sharedQueryId | the shared query plan id that is served by the decomposed query plan |
decomposedQueryId | id of the decomposed query plan to undeploy |
References NES_DEBUG, NES_ERROR, stopDecomposedQueryPlan(), and unregisterDecomposedQueryPlan().
bool NES::Runtime::NodeEngine::unregisterDecomposedQueryPlan | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId | ||
) |
unregisters a decomposed query
sharedQueryId | id of the shared query which is served by the decomposed query plan |
decomposedQueryId | id of the decomposed query plan to be unregistered |
References NES::Runtime::Execution::Created, NES::Runtime::Execution::Deployed, NES::Runtime::HardStop, NES_DEBUG, NES_ERROR, and NES::Runtime::Execution::Running.
Referenced by undeployDecomposedQueryPlan().
bool NES::Runtime::NodeEngine::updateNetworkSink | ( | WorkerId | newNodeId, |
const std::string & | newHostname, | ||
uint32_t | newPort, | ||
DecomposedQueryId | decomposedQueryId, | ||
DecomposedQueryPlanVersion | decomposedQueryVersion, | ||
OperatorId | uniqueNetworkSinkDescriptorId | ||
) |
method to trigger the reconfiguration of a NetworkSink so that it points to a new downstream node.
newNodeId | : the id of the new node |
newHostname | : the hostname of the new node |
newPort | : the port of the new node |
decomposedQueryId | : the id of the Query Sub Plan to which the Network Sink belongs to |
DecomposedQueryPlanVersion | : the version of the Query Sub Plan to which the Network Sink belongs to |
uniqueNetworkSinkDescriptorId | : the id of the Network Sink Descriptor. Helps identify the Network Sink to reconfigure. |
References NES_DEBUG, NES_ERROR, and NES_NOT_IMPLEMENTED.
void NES::Runtime::NodeEngine::updatePhysicalSources | ( | const std::vector< PhysicalSourceTypePtr > & | physicalSources | ) |
Updates the physical sources on the node engine.
physicalSources |
References backward::details::move().
|
friend |