NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Network::NetworkSource Class Reference

this class provide a zmq as data source More...

#include <NetworkSource.hpp>

Collaboration diagram for NES::Network::NetworkSource:
[legend]

Public Member Functions

 NetworkSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, NetworkManagerPtr networkManager, NesPartition nesPartition, NodeLocation sinkLocation, size_t numSourceLocalBuffers, std::chrono::milliseconds waitTime, uint8_t retryTimes, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, DecomposedQueryPlanVersion version, OperatorId uniqueNetworkSourceIdentifier, const std::string &physicalSourceName="defaultPhysicalSourceName")
 
std::optional< Runtime::TupleBufferreceiveData () override
 this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use! More...
 
std::string toString () const override
 override the toString method More...
 
void onEvent (Runtime::BaseEvent &event) override
 This method is called once an event is triggered for the current source. More...
 
SourceType getType () const override
 Get source type. More...
 
bool start () final
 This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager. More...
 
bool stop (Runtime::QueryTerminationType=Runtime::QueryTerminationType::Graceful) final
 This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager. More...
 
bool fail () final
 This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager. More...
 
void reconfigure (Runtime::ReconfigurationMessage &message, Runtime::WorkerContext &context) override
 This method is invoked when the source receives a reconfiguration message. More...
 
void postReconfigurationCallback (Runtime::ReconfigurationMessage &message) override
 This method is invoked when the source receives a reconfiguration message. More...
 
void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) override
 API method called upon receiving an event, send event further upstream via Network Channel. More...
 
void onEndOfStream (Runtime::QueryTerminationType terminationType) override
 
bool startNewVersion () override
 Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one. More...
 
DecomposedQueryPlanVersion getVersion () const override
 Getter for the initial version. More...
 
OperatorId getUniqueId () const
 getter for the network sinks unique id More...
 
bool scheduleNewDescriptor (const NetworkSourceDescriptor &networkSourceDescriptor)
 set a new source descriptor to be applied once startNewVersion() is called More...
 
bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker) override
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
bool insertReconfigurationMarker (ReconfigurationMarkerPtr marker) override
 insert a reconfiguratin marker into the stream starting at this source. More...
 
bool bind ()
 
- Public Member Functions inherited from NES::DataSource
 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual void open ()
 This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More...
 
virtual void close ()
 This method cleans up thread-local state for the source. More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
virtual void createOrLoadPersistedProperties ()
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
virtual void storePersistedProperties ()
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
virtual void clearPersistedProperties ()
 This method clears the persisted properties of the data source on had stop. More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Static Public Member Functions

static void runningRoutine (const Runtime::BufferManagerPtr &, const Runtime::QueryManagerPtr &)
 This method is overridden here to prevent the NetworkSoure to start a thread. More...
 

Friends

bool operator< (const NetworkSource &lhs, const NetworkSource &rhs)
 

Additional Inherited Members

- Public Attributes inherited from NES::DataSource
const bool persistentSource
 
const std::string persistentSourceKey
 
- Protected Member Functions inherited from NES::DataSource
void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 
- Protected Attributes inherited from NES::DataSource
Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Detailed Description

this class provide a zmq as data source

Constructor & Destructor Documentation

◆ NetworkSource()

NES::Network::NetworkSource::NetworkSource ( SchemaPtr  schema,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
NetworkManagerPtr  networkManager,
NesPartition  nesPartition,
NodeLocation  sinkLocation,
size_t  numSourceLocalBuffers,
std::chrono::milliseconds  waitTime,
uint8_t  retryTimes,
std::vector< Runtime::Execution::SuccessorExecutablePipeline successors,
DecomposedQueryPlanVersion  version,
OperatorId  uniqueNetworkSourceIdentifier,
const std::string &  physicalSourceName = "defaultPhysicalSourceName" 
)
Parameters
SchemaPtr
bufferManager
queryManager
networkManager
nesPartition
sinkLocation
numSourceLocalBuffers
waitTime
retryTimes
successors
versionThe initial version number of this source when it starts
uniqueNetworkSourceIdentifiersystem wide unique id that persists even if the partition of this source is changed
physicalSourceName

References NES::INTERVAL_MODE, and NES_ASSERT.

Member Function Documentation

◆ bind()

bool NES::Network::NetworkSource::bind ( )

Referenced by startNewVersion().

Here is the caller graph for this function:

◆ fail()

bool NES::Network::NetworkSource::fail ( )
finalvirtual

This method is overridden here to manage failures of NetworkSource. It de-registers the source on the NetworkManager.

Returns
true if deregistration on the network stack is successful

Reimplemented from NES::DataSource.

References NES::Runtime::Failure, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.

◆ getType()

SourceType NES::Network::NetworkSource::getType ( ) const
overridevirtual

Get source type.

Implements NES::DataSource.

◆ getUniqueId()

OperatorId NES::Network::NetworkSource::getUniqueId ( ) const

getter for the network sinks unique id

Returns
the unique id

◆ getVersion()

DecomposedQueryPlanVersion NES::Network::NetworkSource::getVersion ( ) const
overridevirtual

Getter for the initial version.

Returns
The version this source was started with

Reimplemented from NES::DataEmitter.

◆ handleReconfigurationMarker()

bool NES::Network::NetworkSource::handleReconfigurationMarker ( ReconfigurationMarkerPtr  marker)
overridevirtual

check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream.

Parameters
markera marker containing a set of reconfiguration events
Returns
true if a reconfiguration was triggered, false if the marker did not contain any event to be handled by this source

Reimplemented from NES::DataSource.

References NES::ReconfigurationMetadata::instanceOf(), backward::details::move(), NES_ERROR, NES_NOT_IMPLEMENTED, and NES::DataSource::queryManager.

Referenced by insertReconfigurationMarker().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ insertReconfigurationMarker()

bool NES::Network::NetworkSource::insertReconfigurationMarker ( ReconfigurationMarkerPtr  marker)
overridevirtual

insert a reconfiguratin marker into the stream starting at this source.

Parameters
markera marker containing a set of reconfiguration events
Returns
true if a reconfiguration was triggered, false if the marker did not contain any event to be handled by this source

Reimplemented from NES::DataEmitter.

References handleReconfigurationMarker(), and backward::details::move().

Here is the call graph for this function:

◆ onEndOfStream()

void NES::Network::NetworkSource::onEndOfStream ( Runtime::QueryTerminationType  terminationType)
overridevirtual
Parameters
terminationType

Reimplemented from NES::DataEmitter.

References NES::Runtime::Graceful, NES_DEBUG, NES_WARNING, and NES::DataSource::queryManager.

◆ onEvent() [1/2]

void NES::Network::NetworkSource::onEvent ( Runtime::BaseEvent event)
overridevirtual

This method is called once an event is triggered for the current source.

Parameters
event

Reimplemented from NES::DataSource.

References NES_DEBUG.

◆ onEvent() [2/2]

void NES::Network::NetworkSource::onEvent ( Runtime::BaseEvent event,
Runtime::WorkerContextRef  workerContext 
)
overridevirtual

API method called upon receiving an event, send event further upstream via Network Channel.

Parameters
event
workerContext

Reimplemented from NES::DataSource.

References NES::Runtime::WorkerContext::getAsyncEventChannelConnectionResult(), NES::Runtime::WorkerContext::getEventOnlyNetworkChannel(), NES::Runtime::BaseEvent::getEventType(), NES::Runtime::kStartSourceEvent, backward::details::move(), NES_DEBUG, NES::DataSource::operatorId, NES::Network::detail::NetworkEventSender< BaseChannelType >::sendEvent(), and NES::Runtime::WorkerContext::storeEventOnlyChannel().

Here is the call graph for this function:

◆ postReconfigurationCallback()

void NES::Network::NetworkSource::postReconfigurationCallback ( Runtime::ReconfigurationMessage message)
overridevirtual

◆ receiveData()

std::optional< Runtime::TupleBuffer > NES::Network::NetworkSource::receiveData ( )
overridevirtual

this method is just dummy and is replaced by the ZmqServer in the NetworkStack. Do not use!

Returns
TupleBufferPtr containing the received buffer

Implements NES::DataSource.

References NES_THROW_RUNTIME_ERROR.

◆ reconfigure()

◆ runningRoutine()

void NES::Network::NetworkSource::runningRoutine ( const Runtime::BufferManagerPtr ,
const Runtime::QueryManagerPtr  
)
static

This method is overridden here to prevent the NetworkSoure to start a thread.

Parameters
bufferManager
queryManager

References NES_THROW_RUNTIME_ERROR.

◆ scheduleNewDescriptor()

bool NES::Network::NetworkSource::scheduleNewDescriptor ( const NetworkSourceDescriptor &  networkSourceDescriptor)

set a new source descriptor to be applied once startNewVersion() is called

Parameters
networkSourceDescriptorthe new descriptor
Returns
true if the partition to be scheduled if different from the current one and the descriptor was scheduled.

◆ start()

bool NES::Network::NetworkSource::start ( )
finalvirtual

This method is overridden here to prevent the NetworkSoure to start a thread. It registers the source on the NetworkManager.

Returns
true if registration on the network stack is successful

Reimplemented from NES::DataSource.

References NES::DataSource::executableSuccessors, NES::Runtime::Initialize, NES_DEBUG, NES::DataSource::queryManager, and NES::DataSource::running.

◆ startNewVersion()

bool NES::Network::NetworkSource::startNewVersion ( )
overridevirtual

Reconfigures this source with ReconfigurationType::UpdateVersion causing it to close event channels to the old upstream sink and open channels to the new one.

Returns
true if a scheduled new version was found and applied, false otherwise

Reimplemented from NES::DataEmitter.

References bind(), NES_DEBUG, NES::DataSource::queryManager, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), and NES::Runtime::UpdateVersion.

Here is the call graph for this function:

◆ stop()

bool NES::Network::NetworkSource::stop ( Runtime::QueryTerminationType  type = Runtime::QueryTerminationType::Graceful)
finalvirtual

This method is overridden here to prevent the NetworkSource to start a thread. It de-registers the source on the NetworkManager.

Returns
true if deregistration on the network stack is successful

Reimplemented from NES::DataSource.

References NES::Runtime::HardStop, NES_ASSERT2_FMT, NES_DEBUG, NES::DataSource::queryManager, NES::DataSource::running, and NES::DataSource::type.

◆ toString()

std::string NES::Network::NetworkSource::toString ( ) const
overridevirtual

override the toString method

Returns
returns string describing the network source

Implements NES::DataSource.

Friends And Related Function Documentation

◆ operator<

bool operator< ( const NetworkSource lhs,
const NetworkSource rhs 
)
friend

The documentation for this class was generated from the following files: