NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
this class provide a zmq as data source More...
#include <NetworkSource.hpp>
Public Member Functions | |
NetworkSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, NetworkManagerPtr networkManager, NesPartition nesPartition, NodeLocation sinkLocation, size_t numSourceLocalBuffers, std::chrono::milliseconds waitTime, uint8_t retryTimes, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, DecomposedQueryPlanVersion version, OperatorId uniqueNetworkSourceIdentifier, const std::string &physicalSourceName="defaultPhysicalSourceName") | |
std::optional< Runtime::TupleBuffer > | receiveData () override |
this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use! More... | |
std::string | toString () const override |
override the toString method More... | |
void | onEvent (Runtime::BaseEvent &event) override |
This method is called once an event is triggered for the current source. More... | |
SourceType | getType () const override |
Get source type. More... | |
bool | start () final |
This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager. More... | |
bool | stop (Runtime::QueryTerminationType=Runtime::QueryTerminationType::Graceful) final |
This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager. More... | |
bool | fail () final |
This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager. More... | |
void | reconfigure (Runtime::ReconfigurationMessage &message, Runtime::WorkerContext &context) override |
This method is invoked when the source receives a reconfiguration message. More... | |
void | postReconfigurationCallback (Runtime::ReconfigurationMessage &message) override |
This method is invoked when the source receives a reconfiguration message. More... | |
void | onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) override |
API method called upon receiving an event, send event further upstream via Network Channel. More... | |
void | onEndOfStream (Runtime::QueryTerminationType terminationType) override |
bool | startNewVersion () override |
Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one. More... | |
DecomposedQueryPlanVersion | getVersion () const override |
Getter for the initial version. More... | |
OperatorId | getUniqueId () const |
getter for the network sinks unique id More... | |
bool | scheduleNewDescriptor (const NetworkSourceDescriptor &networkSourceDescriptor) |
set a new source descriptor to be applied once startNewVersion() is called More... | |
bool | handleReconfigurationMarker (ReconfigurationMarkerPtr marker) override |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
bool | insertReconfigurationMarker (ReconfigurationMarkerPtr marker) override |
insert a reconfiguratin marker into the stream starting at this source. More... | |
bool | bind () |
![]() | |
DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0) | |
public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More... | |
DataSource ()=delete | |
virtual void | open () |
This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More... | |
virtual void | close () |
This method cleans up thread-local state for the source. More... | |
virtual void | runningRoutine () |
running routine while source is active More... | |
virtual void | createOrLoadPersistedProperties () |
This method defines the logic to load the properties persisted during the previous run of the data source. More... | |
virtual void | storePersistedProperties () |
This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More... | |
virtual void | clearPersistedProperties () |
This method clears the persisted properties of the data source on had stop. More... | |
SchemaPtr | getSchema () const |
method to return the current schema of the source More... | |
std::string | getSourceSchemaAsString () |
method to return the current schema of the source as string More... | |
bool | isRunning () const noexcept |
debug function for testing to test if source is running More... | |
uint64_t | getNumberOfGeneratedTuples () const |
debug function for testing to get number of generated tuples More... | |
uint64_t | getNumberOfGeneratedBuffers () const |
debug function for testing to get number of generated buffer More... | |
virtual bool | performSoftStop () |
This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More... | |
void | setGatheringInterval (std::chrono::milliseconds interval) |
method to set the sampling interval More... | |
virtual | ~DataSource () NES_NOEXCEPT(false) override |
Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More... | |
uint64_t | getNumBuffersToProcess () const |
Get number of buffers to be processed. More... | |
std::chrono::milliseconds | getGatheringInterval () const |
Get gathering interval. More... | |
uint64_t | getGatheringIntervalCount () const |
Get number representation of gathering interval. More... | |
OperatorId | getOperatorId () const |
Gets the operator id for the data source. More... | |
void | setOperatorId (OperatorId operatorId) |
Set the operator id for the data source. More... | |
std::vector< Runtime::Execution::SuccessorExecutablePipeline > | getExecutableSuccessors () |
Returns the list of successor pipelines. More... | |
void | addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines) |
Add a list of successor pipelines. More... | |
template<typename Derived > | |
std::shared_ptr< Derived > | shared_from_base () |
This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More... | |
virtual std::vector< Schema::MemoryLayoutType > | getSupportedLayouts () |
This method returns all supported layouts. More... | |
void | setSourceSharing (bool value) |
set source sharing value More... | |
void | incrementNumberOfConsumerQueries () |
set the number of queries that use this source More... | |
![]() | |
~Reconfigurable () NES_NOEXCEPT(false) override=default | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
![]() | |
virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
![]() | |
virtual | ~DataEmitter () NES_NOEXCEPT(false)=default |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
Static Public Member Functions | |
static void | runningRoutine (const Runtime::BufferManagerPtr &, const Runtime::QueryManagerPtr &) |
This method is overridden here to prevent the NetworkSoure to start a thread. More... | |
Friends | |
bool | operator< (const NetworkSource &lhs, const NetworkSource &rhs) |
this class provide a zmq as data source
NES::Network::NetworkSource::NetworkSource | ( | SchemaPtr | schema, |
Runtime::BufferManagerPtr | bufferManager, | ||
Runtime::QueryManagerPtr | queryManager, | ||
NetworkManagerPtr | networkManager, | ||
NesPartition | nesPartition, | ||
NodeLocation | sinkLocation, | ||
size_t | numSourceLocalBuffers, | ||
std::chrono::milliseconds | waitTime, | ||
uint8_t | retryTimes, | ||
std::vector< Runtime::Execution::SuccessorExecutablePipeline > | successors, | ||
DecomposedQueryPlanVersion | version, | ||
OperatorId | uniqueNetworkSourceIdentifier, | ||
const std::string & | physicalSourceName = "defaultPhysicalSourceName" |
||
) |
SchemaPtr | |
bufferManager | |
queryManager | |
networkManager | |
nesPartition | |
sinkLocation | |
numSourceLocalBuffers | |
waitTime | |
retryTimes | |
successors | |
version | The initial version number of this source when it starts |
uniqueNetworkSourceIdentifier | system wide unique id that persists even if the partition of this source is changed |
physicalSourceName |
References NES::INTERVAL_MODE, and NES_ASSERT.
bool NES::Network::NetworkSource::bind | ( | ) |
|
finalvirtual |
This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::Runtime::Failure, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.
|
overridevirtual |
Get source type.
Implements NES::DataSource.
OperatorId NES::Network::NetworkSource::getUniqueId | ( | ) | const |
getter for the network sinks unique id
|
overridevirtual |
Getter for the initial version.
Reimplemented from NES::DataEmitter.
|
overridevirtual |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream.
marker | a marker containing a set of reconfiguration events |
Reimplemented from NES::DataSource.
References NES::ReconfigurationMetadata::instanceOf(), backward::details::move(), NES_ERROR, NES_NOT_IMPLEMENTED, and NES::DataSource::queryManager.
Referenced by insertReconfigurationMarker().
|
overridevirtual |
insert a reconfiguratin marker into the stream starting at this source.
marker | a marker containing a set of reconfiguration events |
Reimplemented from NES::DataEmitter.
References handleReconfigurationMarker(), and backward::details::move().
|
overridevirtual |
terminationType |
Reimplemented from NES::DataEmitter.
References NES::Runtime::Graceful, NES_DEBUG, NES_WARNING, and NES::DataSource::queryManager.
|
overridevirtual |
This method is called once an event is triggered for the current source.
event |
Reimplemented from NES::DataSource.
References NES_DEBUG.
|
overridevirtual |
API method called upon receiving an event, send event further upstream via Network Channel.
event | |
workerContext |
Reimplemented from NES::DataSource.
References NES::Runtime::WorkerContext::getAsyncEventChannelConnectionResult(), NES::Runtime::WorkerContext::getEventOnlyNetworkChannel(), NES::Runtime::BaseEvent::getEventType(), NES::Runtime::kStartSourceEvent, backward::details::move(), NES_DEBUG, NES::DataSource::operatorId, NES::Network::detail::NetworkEventSender< BaseChannelType >::sendEvent(), and NES::Runtime::WorkerContext::storeEventOnlyChannel().
|
overridevirtual |
This method is invoked when the source receives a reconfiguration message.
message | the reconfiguration message |
Reimplemented from NES::Runtime::Reconfigurable.
References NES::Runtime::FailEndOfStream, NES::Runtime::Failure, NES::Runtime::ReconfigurationMessage::getType(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::Runtime::Graceful, NES::Runtime::HardEndOfStream, NES::Runtime::HardStop, NES::ReconfigurationMetadata::instanceOf(), NES::Runtime::Invalid, NES_DEBUG, NES_NOT_IMPLEMENTED, NES_WARNING, NES::Runtime::Reconfigurable::postReconfigurationCallback(), NES::DataSource::queryManager, NES::Runtime::Reconfiguration, NES::Runtime::ReconfigurationMarker, NES::DataSource::running, NES::Runtime::SoftEndOfStream, and NES::Runtime::UpdateVersion.
|
overridevirtual |
this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use!
Implements NES::DataSource.
References NES_THROW_RUNTIME_ERROR.
|
overridevirtual |
This method is invoked when the source receives a reconfiguration message.
message | the reconfiguration message |
context | thread context |
Reimplemented from NES::Runtime::Reconfigurable.
References NES::Network::Deleted, NES::Runtime::Destroy, magic_enum::enum_name(), NES::Runtime::FailEndOfStream, NES::Runtime::Failure, NES::Runtime::NesThread::getId(), NES::Runtime::ReconfigurationMessage::getType(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::Runtime::Graceful, NES::Runtime::HardEndOfStream, NES::Runtime::HardStop, NES::Runtime::Initialize, NES::ReconfigurationMetadata::instanceOf(), NES_DEBUG, NES_NOT_IMPLEMENTED, NES_THROW_RUNTIME_ERROR, NES_WARNING, NES::Runtime::Reconfiguration, NES::Runtime::ReconfigurationMarker, NES::Runtime::Reconfigurable::reconfigure(), NES::Runtime::WorkerContext::releaseEventOnlyChannel(), NES::Runtime::SoftEndOfStream, and NES::Runtime::UpdateVersion.
|
static |
This method is overridden here to prevent the NetworkSoure to start a thread.
bufferManager | |
queryManager |
References NES_THROW_RUNTIME_ERROR.
bool NES::Network::NetworkSource::scheduleNewDescriptor | ( | const NetworkSourceDescriptor & | networkSourceDescriptor | ) |
set a new source descriptor to be applied once startNewVersion() is called
networkSourceDescriptor | the new descriptor |
|
finalvirtual |
This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::DataSource::executableSuccessors, NES::Runtime::Initialize, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.
|
overridevirtual |
Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one.
Reimplemented from NES::DataEmitter.
References bind(), NES_DEBUG, NES::DataSource::queryManager, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), and NES::Runtime::UpdateVersion.
|
finalvirtual |
This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager.
Reimplemented from NES::DataSource.
References NES::Runtime::HardStop, NES_ASSERT2_FMT, NES_DEBUG, NES::DataSource::queryManager, NES::DataSource::running, and NES::DataSource::type.
|
overridevirtual |
override the toString method
Implements NES::DataSource.
|
friend |