|
NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
this class provide a zmq as data source More...
#include <NetworkSource.hpp>
Public Member Functions | |
| NetworkSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, NetworkManagerPtr networkManager, NesPartition nesPartition, NodeLocation sinkLocation, size_t numSourceLocalBuffers, std::chrono::milliseconds waitTime, uint8_t retryTimes, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, DecomposedQueryPlanVersion version, OperatorId uniqueNetworkSourceIdentifier, const std::string &physicalSourceName="defaultPhysicalSourceName") | |
| std::optional< Runtime::TupleBuffer > | receiveData () override |
| this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use! More... | |
| std::string | toString () const override |
| override the toString method More... | |
| void | onEvent (Runtime::BaseEvent &event) override |
| This method is called once an event is triggered for the current source. More... | |
| SourceType | getType () const override |
| Get source type. More... | |
| bool | start () final |
| This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager. More... | |
| bool | stop (Runtime::QueryTerminationType=Runtime::QueryTerminationType::Graceful) final |
| This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager. More... | |
| bool | fail () final |
| This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager. More... | |
| void | reconfigure (Runtime::ReconfigurationMessage &message, Runtime::WorkerContext &context) override |
| This method is invoked when the source receives a reconfiguration message. More... | |
| void | postReconfigurationCallback (Runtime::ReconfigurationMessage &message) override |
| This method is invoked when the source receives a reconfiguration message. More... | |
| void | onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) override |
| API method called upon receiving an event, send event further upstream via Network Channel. More... | |
| void | onEndOfStream (Runtime::QueryTerminationType terminationType) override |
| bool | startNewVersion () override |
| Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one. More... | |
| DecomposedQueryPlanVersion | getVersion () const override |
| Getter for the initial version. More... | |
| OperatorId | getUniqueId () const |
| getter for the network sinks unique id More... | |
| bool | scheduleNewDescriptor (const NetworkSourceDescriptor &networkSourceDescriptor) |
| set a new source descriptor to be applied once startNewVersion() is called More... | |
| bool | handleReconfigurationMarker (ReconfigurationMarkerPtr marker) override |
| check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
| bool | insertReconfigurationMarker (ReconfigurationMarkerPtr marker) override |
| insert a reconfiguratin marker into the stream starting at this source. More... | |
| bool | bind () |
Public Member Functions inherited from NES::DataSource | |
| DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0) | |
| public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More... | |
| DataSource ()=delete | |
| virtual void | open () |
| This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More... | |
| virtual void | close () |
| This method cleans up thread-local state for the source. More... | |
| virtual void | runningRoutine () |
| running routine while source is active More... | |
| virtual void | createOrLoadPersistedProperties () |
| This method defines the logic to load the properties persisted during the previous run of the data source. More... | |
| virtual void | storePersistedProperties () |
| This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More... | |
| virtual void | clearPersistedProperties () |
| This method clears the persisted properties of the data source on had stop. More... | |
| SchemaPtr | getSchema () const |
| method to return the current schema of the source More... | |
| std::string | getSourceSchemaAsString () |
| method to return the current schema of the source as string More... | |
| bool | isRunning () const noexcept |
| debug function for testing to test if source is running More... | |
| uint64_t | getNumberOfGeneratedTuples () const |
| debug function for testing to get number of generated tuples More... | |
| uint64_t | getNumberOfGeneratedBuffers () const |
| debug function for testing to get number of generated buffer More... | |
| virtual bool | performSoftStop () |
| This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More... | |
| void | setGatheringInterval (std::chrono::milliseconds interval) |
| method to set the sampling interval More... | |
| virtual | ~DataSource () NES_NOEXCEPT(false) override |
| Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More... | |
| uint64_t | getNumBuffersToProcess () const |
| Get number of buffers to be processed. More... | |
| std::chrono::milliseconds | getGatheringInterval () const |
| Get gathering interval. More... | |
| uint64_t | getGatheringIntervalCount () const |
| Get number representation of gathering interval. More... | |
| OperatorId | getOperatorId () const |
| Gets the operator id for the data source. More... | |
| void | setOperatorId (OperatorId operatorId) |
| Set the operator id for the data source. More... | |
| std::vector< Runtime::Execution::SuccessorExecutablePipeline > | getExecutableSuccessors () |
| Returns the list of successor pipelines. More... | |
| void | addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines) |
| Add a list of successor pipelines. More... | |
| template<typename Derived > | |
| std::shared_ptr< Derived > | shared_from_base () |
| This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More... | |
| virtual std::vector< Schema::MemoryLayoutType > | getSupportedLayouts () |
| This method returns all supported layouts. More... | |
| void | setSourceSharing (bool value) |
| set source sharing value More... | |
| void | incrementNumberOfConsumerQueries () |
| set the number of queries that use this source More... | |
Public Member Functions inherited from NES::Runtime::Reconfigurable | |
| ~Reconfigurable () NES_NOEXCEPT(false) override=default | |
Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false > | |
| ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
| std::shared_ptr< T1 > | shared_from_this () |
| std::weak_ptr< T1 > | weak_from_this () |
Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible > | |
| virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
Public Member Functions inherited from NES::DataEmitter | |
| virtual | ~DataEmitter () NES_NOEXCEPT(false)=default |
Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false > | |
| ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
| std::shared_ptr< T1 > | shared_from_this () |
| std::weak_ptr< T1 > | weak_from_this () |
Static Public Member Functions | |
| static void | runningRoutine (const Runtime::BufferManagerPtr &, const Runtime::QueryManagerPtr &) |
| This method is overridden here to prevent the NetworkSoure to start a thread. More... | |
Friends | |
| bool | operator< (const NetworkSource &lhs, const NetworkSource &rhs) |
this class provide a zmq as data source
| NES::Network::NetworkSource::NetworkSource | ( | SchemaPtr | schema, |
| Runtime::BufferManagerPtr | bufferManager, | ||
| Runtime::QueryManagerPtr | queryManager, | ||
| NetworkManagerPtr | networkManager, | ||
| NesPartition | nesPartition, | ||
| NodeLocation | sinkLocation, | ||
| size_t | numSourceLocalBuffers, | ||
| std::chrono::milliseconds | waitTime, | ||
| uint8_t | retryTimes, | ||
| std::vector< Runtime::Execution::SuccessorExecutablePipeline > | successors, | ||
| DecomposedQueryPlanVersion | version, | ||
| OperatorId | uniqueNetworkSourceIdentifier, | ||
| const std::string & | physicalSourceName = "defaultPhysicalSourceName" |
||
| ) |
| SchemaPtr | |
| bufferManager | |
| queryManager | |
| networkManager | |
| nesPartition | |
| sinkLocation | |
| numSourceLocalBuffers | |
| waitTime | |
| retryTimes | |
| successors | |
| version | The initial version number of this source when it starts |
| uniqueNetworkSourceIdentifier | system wide unique id that persists even if the partition of this source is changed |
| physicalSourceName |
References NES::INTERVAL_MODE, and NES_ASSERT.
| bool NES::Network::NetworkSource::bind | ( | ) |
|
finalvirtual |
This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::Runtime::Failure, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.
|
overridevirtual |
Get source type.
Implements NES::DataSource.
| OperatorId NES::Network::NetworkSource::getUniqueId | ( | ) | const |
getter for the network sinks unique id
|
overridevirtual |
Getter for the initial version.
Reimplemented from NES::DataEmitter.
|
overridevirtual |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream.
| marker | a marker containing a set of reconfiguration events |
Reimplemented from NES::DataSource.
References NES::ReconfigurationMetadata::instanceOf(), backward::details::move(), NES_ERROR, NES_NOT_IMPLEMENTED, and NES::DataSource::queryManager.
Referenced by insertReconfigurationMarker().
|
overridevirtual |
insert a reconfiguratin marker into the stream starting at this source.
| marker | a marker containing a set of reconfiguration events |
Reimplemented from NES::DataEmitter.
References handleReconfigurationMarker(), and backward::details::move().
|
overridevirtual |
| terminationType |
Reimplemented from NES::DataEmitter.
References NES::Runtime::Graceful, NES_DEBUG, NES_WARNING, and NES::DataSource::queryManager.
|
overridevirtual |
This method is called once an event is triggered for the current source.
| event |
Reimplemented from NES::DataSource.
References NES_DEBUG.
|
overridevirtual |
API method called upon receiving an event, send event further upstream via Network Channel.
| event | |
| workerContext |
Reimplemented from NES::DataSource.
References NES::Runtime::WorkerContext::getAsyncEventChannelConnectionResult(), NES::Runtime::WorkerContext::getEventOnlyNetworkChannel(), NES::Runtime::BaseEvent::getEventType(), NES::Runtime::kStartSourceEvent, backward::details::move(), NES_DEBUG, NES::DataSource::operatorId, NES::Network::detail::NetworkEventSender< BaseChannelType >::sendEvent(), and NES::Runtime::WorkerContext::storeEventOnlyChannel().
|
overridevirtual |
This method is invoked when the source receives a reconfiguration message.
| message | the reconfiguration message |
Reimplemented from NES::Runtime::Reconfigurable.
References NES::Runtime::FailEndOfStream, NES::Runtime::Failure, NES::Runtime::ReconfigurationMessage::getType(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::Runtime::Graceful, NES::Runtime::HardEndOfStream, NES::Runtime::HardStop, NES::ReconfigurationMetadata::instanceOf(), NES::Runtime::Invalid, NES_DEBUG, NES_NOT_IMPLEMENTED, NES_WARNING, NES::Runtime::Reconfigurable::postReconfigurationCallback(), NES::DataSource::queryManager, NES::Runtime::Reconfiguration, NES::Runtime::ReconfigurationMarker, NES::DataSource::running, NES::Runtime::SoftEndOfStream, and NES::Runtime::UpdateVersion.
|
overridevirtual |
this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use!
Implements NES::DataSource.
References NES_THROW_RUNTIME_ERROR.
|
overridevirtual |
This method is invoked when the source receives a reconfiguration message.
| message | the reconfiguration message |
| context | thread context |
Reimplemented from NES::Runtime::Reconfigurable.
References NES::Network::Deleted, NES::Runtime::Destroy, magic_enum::enum_name(), NES::Runtime::FailEndOfStream, NES::Runtime::Failure, NES::Runtime::NesThread::getId(), NES::Runtime::ReconfigurationMessage::getType(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::Runtime::Graceful, NES::Runtime::HardEndOfStream, NES::Runtime::HardStop, NES::Runtime::Initialize, NES::ReconfigurationMetadata::instanceOf(), NES_DEBUG, NES_NOT_IMPLEMENTED, NES_THROW_RUNTIME_ERROR, NES_WARNING, NES::Runtime::Reconfiguration, NES::Runtime::ReconfigurationMarker, NES::Runtime::Reconfigurable::reconfigure(), NES::Runtime::WorkerContext::releaseEventOnlyChannel(), NES::Runtime::SoftEndOfStream, and NES::Runtime::UpdateVersion.
|
static |
This method is overridden here to prevent the NetworkSoure to start a thread.
| bufferManager | |
| queryManager |
References NES_THROW_RUNTIME_ERROR.
| bool NES::Network::NetworkSource::scheduleNewDescriptor | ( | const NetworkSourceDescriptor & | networkSourceDescriptor | ) |
set a new source descriptor to be applied once startNewVersion() is called
| networkSourceDescriptor | the new descriptor |
|
finalvirtual |
This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::DataSource::executableSuccessors, NES::Runtime::Initialize, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.
|
overridevirtual |
Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one.
Reimplemented from NES::DataEmitter.
References bind(), NES_DEBUG, NES::DataSource::queryManager, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), and NES::Runtime::UpdateVersion.
|
finalvirtual |
This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::Runtime::HardStop, NES_ASSERT2_FMT, NES_DEBUG, NES::DataSource::queryManager, NES::DataSource::running, and NES::DataSource::type.
|
overridevirtual |
override the toString method
Implements NES::DataSource.
|
friend |