NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::DataSource Class Referenceabstract

Base class for all data sources in NES we allow only three cases: 1.) If the user specifies a numBuffersToProcess: 1.1) if the source e.g. file is large enough we will read in numBuffersToProcess and then terminate 1.2) if the file is not large enough, we will start at the beginning until we produced numBuffersToProcess 2.) If the user set numBuffersToProcess to 0, we read the source until it ends, e.g, until the file ends 3.) If the user just set numBuffersToProcess to n but does not say how many tuples he wants per buffer, we loop over the source until the buffer is full. More...

#include <DataSource.hpp>

Collaboration diagram for NES::DataSource:
[legend]

Public Member Functions

 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual void open ()
 This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More...
 
virtual void close ()
 This method cleans up thread-local state for the source. More...
 
virtual bool start ()
 method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More...
 
virtual bool stop (Runtime::QueryTerminationType graceful)
 method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
virtual std::optional< Runtime::TupleBufferreceiveData ()=0
 virtual function to receive a buffer @Note this function is overwritten by the particular data source More...
 
virtual void createOrLoadPersistedProperties ()
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
virtual void storePersistedProperties ()
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
virtual void clearPersistedProperties ()
 This method clears the persisted properties of the data source on had stop. More...
 
virtual std::string toString () const =0
 virtual function to get a string describing the particular source @Note this function is overwritten by the particular data source More...
 
virtual SourceType getType () const =0
 get source Type More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
virtual void onEvent (Runtime::BaseEvent &) override
 API method called upon receiving an event. More...
 
virtual void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More...
 
virtual bool fail ()
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
virtual bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker)
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
virtual void reconfigure (ReconfigurationMessage &, WorkerContext &)
 reconfigure callback that will be called per thread More...
 
virtual void postReconfigurationCallback (ReconfigurationMessage &)
 callback that will be called on the last thread the executes the reconfiguration More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
virtual void onEndOfStream (Runtime::QueryTerminationType)
 
virtual DecomposedQueryPlanVersion getVersion () const
 
virtual bool startNewVersion ()
 start a previously scheduled new version for this data emitter More...
 
virtual bool insertReconfigurationMarker (ReconfigurationMarkerPtr)
 check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Public Attributes

const bool persistentSource
 
const std::string persistentSourceKey
 

Protected Member Functions

void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 

Protected Attributes

Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Detailed Description

Base class for all data sources in NES we allow only three cases: 1.) If the user specifies a numBuffersToProcess: 1.1) if the source e.g. file is large enough we will read in numBuffersToProcess and then terminate 1.2) if the file is not large enough, we will start at the beginning until we produced numBuffersToProcess 2.) If the user set numBuffersToProcess to 0, we read the source until it ends, e.g, until the file ends 3.) If the user just set numBuffersToProcess to n but does not say how many tuples he wants per buffer, we loop over the source until the buffer is full.

Constructor & Destructor Documentation

◆ DataSource() [1/2]

NES::DataSource::DataSource ( SchemaPtr  schema,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
OperatorId  operatorId,
OriginId  originId,
StatisticId  statisticId,
size_t  numSourceLocalBuffers,
GatheringMode  gatheringMode,
const std::string &  physicalSourceName,
bool  persistentSource,
std::vector< Runtime::Execution::SuccessorExecutablePipeline executableSuccessors = std::vector<Runtime::Execution::SuccessorExecutablePipeline>(),
uint64_t  sourceAffinity = std::numeric_limits<uint64_t>::max(),
uint64_t  taskQueueId = 0 
)
explicit

public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior

Parameters
schemaof the data that this source produces
bufferManagerpointer to the buffer manager
queryManagerpointer to the query manager
operatorIdcurrent operator id
originIdrepresents the identifier of the upstream operator that represents the origin of the input stream
statisticIdrepresents the unique identifier of components that we can track statistics for
numSourceLocalBuffersnumber of local source buffers
gatheringModethe gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE)
physicalSourceNamethe name and unique identifier of a physical source
persistentSourceif the source has to persist properties that can be loaded when restarting the source
successorsthe subsequent operators in the pipeline to which the data is pushed
sourceAffinitythe subsequent operators in the pipeline to which the data is pushed
taskQueueIdthe ID of the queue to which the task is pushed

References NES::Schema::COLUMNAR_LAYOUT, NES::Runtime::MemoryLayouts::ColumnLayout::create(), NES::Runtime::MemoryLayouts::RowLayout::create(), localBufferManager, memoryLayout, NES_ASSERT, NES_DEBUG, operatorId, NES::Schema::ROW_LAYOUT, and schema.

Here is the call graph for this function:

◆ DataSource() [2/2]

NES::DataSource::DataSource ( )
delete

◆ ~DataSource()

NES::DataSource::~DataSource ( )
overridevirtual

Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize.

References executableSuccessors, NES_ASSERT, NES_DEBUG, operatorId, and running.

Member Function Documentation

◆ addExecutableSuccessors()

void NES::DataSource::addExecutableSuccessors ( std::vector< Runtime::Execution::SuccessorExecutablePipeline newPipelines)

Add a list of successor pipelines.

References executableSuccessors.

◆ allocateBuffer()

Runtime::MemoryLayouts::TestTupleBuffer NES::DataSource::allocateBuffer ( )
protected

◆ clearPersistedProperties()

void NES::DataSource::clearPersistedProperties ( )
virtual

This method clears the persisted properties of the data source on had stop.

Reimplemented in NES::TCPSource.

References NES_WARNING.

Referenced by close().

Here is the caller graph for this function:

◆ close()

void NES::DataSource::close ( )
virtual

This method cleans up thread-local state for the source.

Reimplemented in NES::TCPSource, NES::TCPSource, and NES::BenchmarkSource.

References bufferManager, clearPersistedProperties(), magic_enum::enum_name(), NES::Runtime::Graceful, NES_ASSERT2_FMT, NES_WARNING, operatorId, persistentSource, queryManager, NES::Runtime::Reconfiguration, toString(), and wasGracefullyStopped.

Referenced by NES::BenchmarkSource::close(), NES::TCPSource::close(), and NES::Testing::NonRunnableDataSource::runningRoutine().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ createOrLoadPersistedProperties()

void NES::DataSource::createOrLoadPersistedProperties ( )
virtual

This method defines the logic to load the properties persisted during the previous run of the data source.

Reimplemented in NES::TCPSource.

References NES_WARNING.

Referenced by open().

Here is the caller graph for this function:

◆ emitWork()

void NES::DataSource::emitWork ( Runtime::TupleBuffer buffer,
bool  addBufferMetaData = true 
)
overrideprotectedvirtual

create a task using the provided buffer and submit it to a task consumer, e.g., query manager

Parameters
buffer
addBufferMetaDataIf true, buffer meta data (e.g., sequence number, statistic id, origin id, ...) is added to the buffer

Implements NES::DataEmitter.

References executableSuccessors, NES::Runtime::TupleBuffer::getChunkNumber(), NES::Runtime::TupleBuffer::getOriginId(), NES::Runtime::TupleBuffer::getSequenceNumber(), NES::Runtime::TupleBuffer::getStatisticId(), NES::Runtime::TupleBuffer::isLastChunk(), NES_DEBUG, NES_TRACE, originId, queryManager, NES::Runtime::TupleBuffer::setChunkNumber(), NES::Runtime::TupleBuffer::setCreationTimestampInMS(), NES::Runtime::TupleBuffer::setLastChunk(), NES::Runtime::TupleBuffer::setOriginId(), NES::Runtime::TupleBuffer::setSequenceNumber(), NES::Runtime::TupleBuffer::setStatisticId(), sourceSharing, statisticId, and taskQueueId.

Referenced by NES::Testing::NonRunnableDataSource::emitBuffer().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ fail()

bool NES::DataSource::fail ( )
virtual

Reimplemented in NES::Network::NetworkSource.

References NES::Runtime::Failure, NES_DEBUG, operatorId, queryManager, and stop().

Here is the call graph for this function:

◆ getExecutableSuccessors()

std::vector< Runtime::Execution::SuccessorExecutablePipeline > NES::DataSource::getExecutableSuccessors ( )

Returns the list of successor pipelines.

Returns
std::vector<Runtime::Execution::SuccessorExecutablePipeline>

References executableSuccessors.

◆ getGatheringInterval()

std::chrono::milliseconds NES::DataSource::getGatheringInterval ( ) const

Get gathering interval.

References gatheringInterval.

◆ getGatheringIntervalCount()

uint64_t NES::DataSource::getGatheringIntervalCount ( ) const

Get number representation of gathering interval.

References gatheringInterval.

◆ getNumberOfGeneratedBuffers()

uint64_t NES::DataSource::getNumberOfGeneratedBuffers ( ) const

debug function for testing to get number of generated buffer

Returns
number of generated buffer

References generatedBuffers.

◆ getNumberOfGeneratedTuples()

uint64_t NES::DataSource::getNumberOfGeneratedTuples ( ) const

debug function for testing to get number of generated tuples

Returns
number of generated tuples

References generatedTuples.

◆ getNumBuffersToProcess()

uint64_t NES::DataSource::getNumBuffersToProcess ( ) const

Get number of buffers to be processed.

References numberOfBuffersToProduce.

◆ getOperatorId()

OperatorId NES::DataSource::getOperatorId ( ) const

Gets the operator id for the data source.

Returns
OperatorId

References operatorId.

◆ getSchema()

SchemaPtr NES::DataSource::getSchema ( ) const

method to return the current schema of the source

Returns
schema description of the source

References schema.

◆ getSourceSchemaAsString()

std::string NES::DataSource::getSourceSchemaAsString ( )

method to return the current schema of the source as string

Returns
schema description of the source as string

References schema.

◆ getSupportedLayouts()

std::vector< Schema::MemoryLayoutType > NES::DataSource::getSupportedLayouts ( )
virtual

This method returns all supported layouts.

Returns

Reimplemented in NES::DefaultSource.

References NES::Schema::ROW_LAYOUT.

◆ getType()

virtual SourceType NES::DataSource::getType ( ) const
pure virtual

get source Type

Returns

Implemented in NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::GeneratorSource, NES::DefaultSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.

Referenced by start().

Here is the caller graph for this function:

◆ handleReconfigurationMarker()

bool NES::DataSource::handleReconfigurationMarker ( ReconfigurationMarkerPtr  marker)
virtual

check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream.

Parameters
markera marker containing a set of reconfiguration events
Returns
true if a reconfiguration was triggered, false if the marker did not contain any event to be handled by this source

Reimplemented in NES::Network::NetworkSource.

References completedPromise, NES::Runtime::Failure, futureRetrieved, NES::ReconfigurationMetadata::instanceOf(), backward::details::move(), NES_ASSERT, NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, NES_NOT_IMPLEMENTED, NES_WARNING, operatorId, queryManager, NES::Runtime::Reconfiguration, running, NES::detail::waitForFuture(), wasGracefullyStopped, and wasStarted.

Here is the call graph for this function:

◆ incrementNumberOfConsumerQueries()

void NES::DataSource::incrementNumberOfConsumerQueries ( )
inline

set the number of queries that use this source

Parameters
value

References numberOfConsumerQueries.

◆ isRunning()

bool NES::DataSource::isRunning ( ) const
inlinenoexcept

debug function for testing to test if source is running

Returns
bool indicating if source is running @dev I made this function non-virtual. If implementations of this class should be able to override this function, we have to ensure that isRunning and this class' private member running are consistent or that this class does not evaluate running directly when checking if it is running.

References running.

Referenced by NES::Testing::NonRunnableDataSource::stop().

Here is the caller graph for this function:

◆ onEvent() [1/2]

void NES::DataSource::onEvent ( Runtime::BaseEvent event)
overridevirtual

API method called upon receiving an event.

Note
Currently has no behaviour. We need to overwrite DataEmitter::onEvent for compliance.
Parameters
event

Reimplemented from NES::DataEmitter.

Reimplemented in NES::Network::NetworkSource, and NES::Experimental::StaticDataSource.

References NES_DEBUG, NES::DataEmitter::onEvent(), and operatorId.

Referenced by onEvent().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ onEvent() [2/2]

void NES::DataSource::onEvent ( Runtime::BaseEvent event,
Runtime::WorkerContextRef  workerContext 
)
virtual

API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels).

Note
Only calls onEvent(event) of this class or derived classes.
Parameters
event
workerContext

Reimplemented in NES::Network::NetworkSource.

References NES_DEBUG, onEvent(), and operatorId.

Here is the call graph for this function:

◆ open()

void NES::DataSource::open ( )
virtual

This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor.

Reimplemented in NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, and NES::BenchmarkSource.

References bufferManager, createOrLoadPersistedProperties(), localBufferManager, numSourceLocalBuffers, and storePersistedProperties().

Referenced by NES::BenchmarkSource::open(), NES::Experimental::StaticDataSource::open(), NES::TCPSource::open(), NES::Experimental::StaticDataSource::preloadBuffers(), NES::Testing::NonRunnableDataSource::runningRoutine(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ performSoftStop()

bool NES::DataSource::performSoftStop ( )
virtual

This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine.

Returns
true if successfully interrupted else false

Reimplemented in NES::Testing::NonRunnableDataSource.

References NES_WARNING, operatorId, and running.

Referenced by NES::Testing::NonRunnableDataSource::performSoftStop().

Here is the caller graph for this function:

◆ receiveData()

virtual std::optional<Runtime::TupleBuffer> NES::DataSource::receiveData ( )
pure virtual

virtual function to receive a buffer @Note this function is overwritten by the particular data source

Returns
returns a tuple buffer

Implemented in NES::GeneratorSource, NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::DefaultSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.

◆ runningRoutine()

void NES::DataSource::runningRoutine ( )
virtual

running routine while source is active

Reimplemented in NES::BenchmarkSource, NES::Network::detail::TestSourceWithLatch, and NES::Testing::NonRunnableDataSource.

References NES::ADAPTIVE_MODE, completedPromise, gatheringMode, NES::INGESTION_RATE_MODE, NES::INTERVAL_MODE, NES_DEBUG, NES_FATAL_ERROR, NES_NOT_IMPLEMENTED, operatorId, and queryManager.

Referenced by NES::MockDataSourceWithRunningRoutine::MockDataSourceWithRunningRoutine(), start(), and NES::TEST_F().

Here is the caller graph for this function:

◆ setGatheringInterval()

void NES::DataSource::setGatheringInterval ( std::chrono::milliseconds  interval)

method to set the sampling interval

Note
the source will sleep for interval seconds and then produce the next buffer
Parameters
interalto gather

References gatheringInterval.

◆ setOperatorId()

void NES::DataSource::setOperatorId ( OperatorId  operatorId)

Set the operator id for the data source.

Parameters
operatorId

References operatorId.

◆ setSourceSharing()

void NES::DataSource::setSourceSharing ( bool  value)
inline

set source sharing value

Parameters
value

References sourceSharing, and magic_enum::detail::value().

Here is the call graph for this function:

◆ shared_from_base()

template<typename Derived >
std::shared_ptr<Derived> NES::DataSource::shared_from_base ( )
inline

This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance.

Template Parameters
Derivedthe class type that we want to cast the shared ptr
Returns
this instance casted to the desired shared_ptr<Derived> type

References NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >::shared_from_this().

Here is the call graph for this function:

◆ start()

bool NES::DataSource::start ( )
virtual

method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine

Reimplemented in NES::Experimental::StaticDataSource, and NES::Network::NetworkSource.

References getType(), NES_ASSERT, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES_WARNING, operatorId, running, runningRoutine(), sourceAffinity, type, and wasStarted.

Referenced by NES::Experimental::StaticDataSource::startStaticDataSourceManually().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ stop()

bool NES::DataSource::stop ( Runtime::QueryTerminationType  graceful)
virtual

method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join

Reimplemented in NES::Network::NetworkSource, NES::Testing::NonRunnableDataSource, and NES::ZmqSource.

References completedPromise, NES::Runtime::Failure, futureRetrieved, NES_ASSERT2_FMT, NES_DEBUG, NES_WARNING, numberOfConsumerQueries, operatorId, queryManager, refCounter, running, NES::detail::waitForFuture(), wasGracefullyStopped, and wasStarted.

Referenced by fail(), NES::ZmqSource::stop(), and NES::Testing::NonRunnableDataSource::stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ storePersistedProperties()

void NES::DataSource::storePersistedProperties ( )
virtual

This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call.

Reimplemented in NES::TCPSource.

References NES_WARNING.

Referenced by open().

Here is the caller graph for this function:

◆ toString()

virtual std::string NES::DataSource::toString ( ) const
pure virtual

virtual function to get a string describing the particular source @Note this function is overwritten by the particular data source

Returns
string with name and additional information about the source

Implemented in NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::GeneratorSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.

Referenced by close().

Here is the caller graph for this function:

Member Data Documentation

◆ bufferManager

◆ completedPromise

◆ executableSuccessors

◆ futureRetrieved

std::atomic_bool NES::DataSource::futureRetrieved {false}
protected

◆ gatheringIngestionRate

uint64_t NES::DataSource::gatheringIngestionRate {}
protected

◆ gatheringInterval

◆ gatheringMode

◆ generatedBuffers

◆ generatedTuples

◆ localBufferManager

◆ memoryLayout

Runtime::MemoryLayouts::MemoryLayoutPtr NES::DataSource::memoryLayout
protected

Referenced by allocateBuffer(), and DataSource().

◆ numberOfBuffersToProduce

◆ numberOfConsumerQueries

std::atomic<uint64_t> NES::DataSource::numberOfConsumerQueries = 1
protected

◆ numSourceLocalBuffers

uint64_t NES::DataSource::numSourceLocalBuffers
protected

◆ operatorId

◆ originId

OriginId NES::DataSource::originId
protected

Referenced by emitWork().

◆ persistentSource

◆ persistentSourceKey

◆ physicalSourceName

const std::string NES::DataSource::physicalSourceName
protected

◆ queryManager

◆ refCounter

std::atomic<uint64_t> NES::DataSource::refCounter = 0
protected

Referenced by stop().

◆ running

◆ schema

◆ sourceAffinity

◆ sourceSharing

bool NES::DataSource::sourceSharing = false
protected

Referenced by emitWork(), and setSourceSharing().

◆ statisticId

StatisticId NES::DataSource::statisticId
protected

Referenced by emitWork().

◆ taskQueueId

◆ type

SourceType NES::DataSource::type
protected

◆ wasGracefullyStopped

◆ wasStarted

std::atomic_bool NES::DataSource::wasStarted {false}
protected

The documentation for this class was generated from the following files: