NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
Base class for all data sources in NES we allow only three cases: 1.) If the user specifies a numBuffersToProcess: 1.1) if the source e.g. file is large enough we will read in numBuffersToProcess and then terminate 1.2) if the file is not large enough, we will start at the beginning until we produced numBuffersToProcess 2.) If the user set numBuffersToProcess to 0, we read the source until it ends, e.g, until the file ends 3.) If the user just set numBuffersToProcess to n but does not say how many tuples he wants per buffer, we loop over the source until the buffer is full. More...
#include <DataSource.hpp>
Public Member Functions | |
DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0) | |
public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More... | |
DataSource ()=delete | |
virtual void | open () |
This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More... | |
virtual void | close () |
This method cleans up thread-local state for the source. More... | |
virtual bool | start () |
method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More... | |
virtual bool | stop (Runtime::QueryTerminationType graceful) |
method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More... | |
virtual void | runningRoutine () |
running routine while source is active More... | |
virtual std::optional< Runtime::TupleBuffer > | receiveData ()=0 |
virtual function to receive a buffer @Note this function is overwritten by the particular data source More... | |
virtual void | createOrLoadPersistedProperties () |
This method defines the logic to load the properties persisted during the previous run of the data source. More... | |
virtual void | storePersistedProperties () |
This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More... | |
virtual void | clearPersistedProperties () |
This method clears the persisted properties of the data source on had stop. More... | |
virtual std::string | toString () const =0 |
virtual function to get a string describing the particular source @Note this function is overwritten by the particular data source More... | |
virtual SourceType | getType () const =0 |
get source Type More... | |
SchemaPtr | getSchema () const |
method to return the current schema of the source More... | |
std::string | getSourceSchemaAsString () |
method to return the current schema of the source as string More... | |
bool | isRunning () const noexcept |
debug function for testing to test if source is running More... | |
uint64_t | getNumberOfGeneratedTuples () const |
debug function for testing to get number of generated tuples More... | |
uint64_t | getNumberOfGeneratedBuffers () const |
debug function for testing to get number of generated buffer More... | |
virtual bool | performSoftStop () |
This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More... | |
void | setGatheringInterval (std::chrono::milliseconds interval) |
method to set the sampling interval More... | |
virtual | ~DataSource () NES_NOEXCEPT(false) override |
Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More... | |
uint64_t | getNumBuffersToProcess () const |
Get number of buffers to be processed. More... | |
std::chrono::milliseconds | getGatheringInterval () const |
Get gathering interval. More... | |
uint64_t | getGatheringIntervalCount () const |
Get number representation of gathering interval. More... | |
OperatorId | getOperatorId () const |
Gets the operator id for the data source. More... | |
void | setOperatorId (OperatorId operatorId) |
Set the operator id for the data source. More... | |
std::vector< Runtime::Execution::SuccessorExecutablePipeline > | getExecutableSuccessors () |
Returns the list of successor pipelines. More... | |
void | addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines) |
Add a list of successor pipelines. More... | |
template<typename Derived > | |
std::shared_ptr< Derived > | shared_from_base () |
This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More... | |
virtual std::vector< Schema::MemoryLayoutType > | getSupportedLayouts () |
This method returns all supported layouts. More... | |
virtual void | onEvent (Runtime::BaseEvent &) override |
API method called upon receiving an event. More... | |
virtual void | onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) |
API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More... | |
virtual bool | fail () |
void | setSourceSharing (bool value) |
set source sharing value More... | |
void | incrementNumberOfConsumerQueries () |
set the number of queries that use this source More... | |
virtual bool | handleReconfigurationMarker (ReconfigurationMarkerPtr marker) |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~Reconfigurable () NES_NOEXCEPT(false) override=default | |
virtual void | reconfigure (ReconfigurationMessage &, WorkerContext &) |
reconfigure callback that will be called per thread More... | |
virtual void | postReconfigurationCallback (ReconfigurationMessage &) |
callback that will be called on the last thread the executes the reconfiguration More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
![]() | |
virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
![]() | |
virtual | ~DataEmitter () NES_NOEXCEPT(false)=default |
virtual void | onEndOfStream (Runtime::QueryTerminationType) |
virtual DecomposedQueryPlanVersion | getVersion () const |
virtual bool | startNewVersion () |
start a previously scheduled new version for this data emitter More... | |
virtual bool | insertReconfigurationMarker (ReconfigurationMarkerPtr) |
check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
Public Attributes | |
const bool | persistentSource |
const std::string | persistentSourceKey |
Protected Member Functions | |
void | emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override |
create a task using the provided buffer and submit it to a task consumer, e.g., query manager More... | |
NES::Runtime::MemoryLayouts::TestTupleBuffer | allocateBuffer () |
Base class for all data sources in NES we allow only three cases: 1.) If the user specifies a numBuffersToProcess: 1.1) if the source e.g. file is large enough we will read in numBuffersToProcess and then terminate 1.2) if the file is not large enough, we will start at the beginning until we produced numBuffersToProcess 2.) If the user set numBuffersToProcess to 0, we read the source until it ends, e.g, until the file ends 3.) If the user just set numBuffersToProcess to n but does not say how many tuples he wants per buffer, we loop over the source until the buffer is full.
|
explicit |
public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior
schema | of the data that this source produces |
bufferManager | pointer to the buffer manager |
queryManager | pointer to the query manager |
operatorId | current operator id |
originId | represents the identifier of the upstream operator that represents the origin of the input stream |
statisticId | represents the unique identifier of components that we can track statistics for |
numSourceLocalBuffers | number of local source buffers |
gatheringMode | the gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE) |
physicalSourceName | the name and unique identifier of a physical source |
persistentSource | if the source has to persist properties that can be loaded when restarting the source |
successors | the subsequent operators in the pipeline to which the data is pushed |
sourceAffinity | the subsequent operators in the pipeline to which the data is pushed |
taskQueueId | the ID of the queue to which the task is pushed |
References NES::Schema::COLUMNAR_LAYOUT, NES::Runtime::MemoryLayouts::ColumnLayout::create(), NES::Runtime::MemoryLayouts::RowLayout::create(), localBufferManager, memoryLayout, NES_ASSERT, NES_DEBUG, operatorId, NES::Schema::ROW_LAYOUT, and schema.
|
delete |
|
overridevirtual |
Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize.
References executableSuccessors, NES_ASSERT, NES_DEBUG, operatorId, and running.
void NES::DataSource::addExecutableSuccessors | ( | std::vector< Runtime::Execution::SuccessorExecutablePipeline > | newPipelines | ) |
Add a list of successor pipelines.
References executableSuccessors.
|
protected |
References bufferManager, and memoryLayout.
Referenced by NES::Testing::NonRunnableDataSource::getBuffer(), NES::Experimental::StaticDataSource::preloadBuffers(), NES::CSVSource::receiveData(), NES::DefaultSource::receiveData(), NES::LambdaSource::receiveData(), NES::Experimental::StaticDataSource::receiveData(), and NES::TCPSource::receiveData().
|
virtual |
This method clears the persisted properties of the data source on had stop.
Reimplemented in NES::TCPSource.
References NES_WARNING.
Referenced by close().
|
virtual |
This method cleans up thread-local state for the source.
Reimplemented in NES::TCPSource, NES::TCPSource, and NES::BenchmarkSource.
References bufferManager, clearPersistedProperties(), magic_enum::enum_name(), NES::Runtime::Graceful, NES_ASSERT2_FMT, NES_WARNING, operatorId, persistentSource, queryManager, NES::Runtime::Reconfiguration, toString(), and wasGracefullyStopped.
Referenced by NES::BenchmarkSource::close(), NES::TCPSource::close(), and NES::Testing::NonRunnableDataSource::runningRoutine().
|
virtual |
This method defines the logic to load the properties persisted during the previous run of the data source.
Reimplemented in NES::TCPSource.
References NES_WARNING.
Referenced by open().
|
overrideprotectedvirtual |
create a task using the provided buffer and submit it to a task consumer, e.g., query manager
buffer | |
addBufferMetaData | If true, buffer meta data (e.g., sequence number, statistic id, origin id, ...) is added to the buffer |
Implements NES::DataEmitter.
References executableSuccessors, NES::Runtime::TupleBuffer::getChunkNumber(), NES::Runtime::TupleBuffer::getOriginId(), NES::Runtime::TupleBuffer::getSequenceNumber(), NES::Runtime::TupleBuffer::getStatisticId(), NES::Runtime::TupleBuffer::isLastChunk(), NES_DEBUG, NES_TRACE, originId, queryManager, NES::Runtime::TupleBuffer::setChunkNumber(), NES::Runtime::TupleBuffer::setCreationTimestampInMS(), NES::Runtime::TupleBuffer::setLastChunk(), NES::Runtime::TupleBuffer::setOriginId(), NES::Runtime::TupleBuffer::setSequenceNumber(), NES::Runtime::TupleBuffer::setStatisticId(), sourceSharing, statisticId, and taskQueueId.
Referenced by NES::Testing::NonRunnableDataSource::emitBuffer().
|
virtual |
Reimplemented in NES::Network::NetworkSource.
References NES::Runtime::Failure, NES_DEBUG, operatorId, queryManager, and stop().
std::vector< Runtime::Execution::SuccessorExecutablePipeline > NES::DataSource::getExecutableSuccessors | ( | ) |
Returns the list of successor pipelines.
References executableSuccessors.
std::chrono::milliseconds NES::DataSource::getGatheringInterval | ( | ) | const |
Get gathering interval.
References gatheringInterval.
uint64_t NES::DataSource::getGatheringIntervalCount | ( | ) | const |
Get number representation of gathering interval.
References gatheringInterval.
uint64_t NES::DataSource::getNumberOfGeneratedBuffers | ( | ) | const |
debug function for testing to get number of generated buffer
References generatedBuffers.
uint64_t NES::DataSource::getNumberOfGeneratedTuples | ( | ) | const |
debug function for testing to get number of generated tuples
References generatedTuples.
uint64_t NES::DataSource::getNumBuffersToProcess | ( | ) | const |
Get number of buffers to be processed.
References numberOfBuffersToProduce.
OperatorId NES::DataSource::getOperatorId | ( | ) | const |
SchemaPtr NES::DataSource::getSchema | ( | ) | const |
method to return the current schema of the source
References schema.
std::string NES::DataSource::getSourceSchemaAsString | ( | ) |
method to return the current schema of the source as string
References schema.
|
virtual |
This method returns all supported layouts.
Reimplemented in NES::DefaultSource.
References NES::Schema::ROW_LAYOUT.
|
pure virtual |
get source Type
Implemented in NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::GeneratorSource, NES::DefaultSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.
Referenced by start().
|
virtual |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream.
marker | a marker containing a set of reconfiguration events |
Reimplemented in NES::Network::NetworkSource.
References completedPromise, NES::Runtime::Failure, futureRetrieved, NES::ReconfigurationMetadata::instanceOf(), backward::details::move(), NES_ASSERT, NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, NES_NOT_IMPLEMENTED, NES_WARNING, operatorId, queryManager, NES::Runtime::Reconfiguration, running, NES::detail::waitForFuture(), wasGracefullyStopped, and wasStarted.
|
inline |
|
inlinenoexcept |
debug function for testing to test if source is running
isRunning
and this class' private member running
are consistent or that this class does not evaluate running
directly when checking if it is running. References running.
Referenced by NES::Testing::NonRunnableDataSource::stop().
|
overridevirtual |
API method called upon receiving an event.
event |
Reimplemented from NES::DataEmitter.
Reimplemented in NES::Network::NetworkSource, and NES::Experimental::StaticDataSource.
References NES_DEBUG, NES::DataEmitter::onEvent(), and operatorId.
Referenced by onEvent().
|
virtual |
API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels).
event | |
workerContext |
Reimplemented in NES::Network::NetworkSource.
References NES_DEBUG, onEvent(), and operatorId.
|
virtual |
This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor.
Reimplemented in NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, and NES::BenchmarkSource.
References bufferManager, createOrLoadPersistedProperties(), localBufferManager, numSourceLocalBuffers, and storePersistedProperties().
Referenced by NES::BenchmarkSource::open(), NES::Experimental::StaticDataSource::open(), NES::TCPSource::open(), NES::Experimental::StaticDataSource::preloadBuffers(), NES::Testing::NonRunnableDataSource::runningRoutine(), and NES::TEST_F().
|
virtual |
This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine.
Reimplemented in NES::Testing::NonRunnableDataSource.
References NES_WARNING, operatorId, and running.
Referenced by NES::Testing::NonRunnableDataSource::performSoftStop().
|
pure virtual |
virtual function to receive a buffer @Note this function is overwritten by the particular data source
Implemented in NES::GeneratorSource, NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::DefaultSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.
|
virtual |
running routine while source is active
Reimplemented in NES::BenchmarkSource, NES::Network::detail::TestSourceWithLatch, and NES::Testing::NonRunnableDataSource.
References NES::ADAPTIVE_MODE, completedPromise, gatheringMode, NES::INGESTION_RATE_MODE, NES::INTERVAL_MODE, NES_DEBUG, NES_FATAL_ERROR, NES_NOT_IMPLEMENTED, operatorId, and queryManager.
Referenced by NES::MockDataSourceWithRunningRoutine::MockDataSourceWithRunningRoutine(), start(), and NES::TEST_F().
void NES::DataSource::setGatheringInterval | ( | std::chrono::milliseconds | interval | ) |
method to set the sampling interval
interal | to gather |
References gatheringInterval.
void NES::DataSource::setOperatorId | ( | OperatorId | operatorId | ) |
|
inline |
set source sharing value
value |
References sourceSharing, and magic_enum::detail::value().
|
inline |
This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance.
Derived | the class type that we want to cast the shared ptr |
References NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >::shared_from_this().
|
virtual |
method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine
Reimplemented in NES::Experimental::StaticDataSource, and NES::Network::NetworkSource.
References getType(), NES_ASSERT, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES_WARNING, operatorId, running, runningRoutine(), sourceAffinity, type, and wasStarted.
Referenced by NES::Experimental::StaticDataSource::startStaticDataSourceManually().
|
virtual |
method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join
Reimplemented in NES::Network::NetworkSource, NES::Testing::NonRunnableDataSource, and NES::ZmqSource.
References completedPromise, NES::Runtime::Failure, futureRetrieved, NES_ASSERT2_FMT, NES_DEBUG, NES_WARNING, numberOfConsumerQueries, operatorId, queryManager, refCounter, running, NES::detail::waitForFuture(), wasGracefullyStopped, and wasStarted.
Referenced by fail(), NES::ZmqSource::stop(), and NES::Testing::NonRunnableDataSource::stop().
|
virtual |
This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call.
Reimplemented in NES::TCPSource.
References NES_WARNING.
Referenced by open().
|
pure virtual |
virtual function to get a string describing the particular source @Note this function is overwritten by the particular data source
Implemented in NES::ZmqSource, NES::TCPSource, NES::TCPSource, NES::Experimental::StaticDataSource, NES::SenseSource, NES::MonitoringSource, NES::MemorySource, NES::LambdaSource, NES::GeneratorSource, NES::CSVSource, NES::BinarySource, NES::BenchmarkSource, and NES::Network::NetworkSource.
Referenced by close().
|
protected |
Referenced by allocateBuffer(), close(), NES::MockDataSource::create(), open(), NES::Experimental::StaticDataSource::open(), NES::BinarySource::receiveData(), NES::MemorySource::receiveData(), NES::MonitoringSource::receiveData(), NES::SenseSource::receiveData(), NES::ZmqSource::receiveData(), and NES::BenchmarkSource::runningRoutine().
|
protected |
|
protected |
|
protected |
Referenced by handleReconfigurationMarker(), and stop().
|
protected |
|
protected |
Referenced by NES::BenchmarkSource::BenchmarkSource(), NES::CSVSource::CSVSource(), NES::DefaultSource::DefaultSource(), getGatheringInterval(), getGatheringIntervalCount(), NES::LambdaSource::LambdaSource(), NES::MemorySource::MemorySource(), setGatheringInterval(), NES::CSVSource::toString(), and NES::GeneratorSource::toString().
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
Referenced by allocateBuffer(), and DataSource().
|
protected |
Referenced by NES::BenchmarkSource::BenchmarkSource(), NES::CSVSource::CSVSource(), NES::GeneratorSource::GeneratorSource(), getNumBuffersToProcess(), NES::MemorySource::MemorySource(), NES::Experimental::StaticDataSource::open(), NES::Experimental::StaticDataSource::preloadBuffers(), NES::MemorySource::receiveData(), NES::Experimental::StaticDataSource::receiveData(), NES::BenchmarkSource::runningRoutine(), NES::Experimental::StaticDataSource::StaticDataSource(), NES::CSVSource::toString(), NES::GeneratorSource::toString(), and NES::Experimental::StaticDataSource::toString().
|
protected |
Referenced by incrementNumberOfConsumerQueries(), and stop().
|
protected |
Referenced by NES::MockDataSource::create(), and open().
|
protected |
Referenced by close(), NES::MockDataSource::create(), DataSource(), fail(), NES::TCPSource::fillBuffer(), getOperatorId(), handleReconfigurationMarker(), NES::LambdaSource::LambdaSource(), NES::Experimental::StaticDataSource::onEvent(), onEvent(), NES::Network::NetworkSource::onEvent(), performSoftStop(), NES::CSVSource::receiveData(), NES::DefaultSource::receiveData(), NES::LambdaSource::receiveData(), NES::MemorySource::receiveData(), NES::Experimental::StaticDataSource::receiveData(), runningRoutine(), setOperatorId(), start(), NES::Experimental::StaticDataSource::start(), NES::Experimental::StaticDataSource::StaticDataSource(), stop(), and ~DataSource().
|
protected |
Referenced by emitWork().
const bool NES::DataSource::persistentSource |
const std::string NES::DataSource::persistentSourceKey |
|
protected |
|
protected |
Referenced by NES::TCPSource::clearPersistedProperties(), close(), NES::MockDataSource::create(), NES::TCPSource::createOrLoadPersistedProperties(), emitWork(), fail(), NES::Network::NetworkSource::fail(), handleReconfigurationMarker(), NES::Network::NetworkSource::handleReconfigurationMarker(), NES::Network::NetworkSource::onEndOfStream(), NES::Network::NetworkSource::postReconfigurationCallback(), runningRoutine(), NES::Network::NetworkSource::start(), NES::Network::NetworkSource::startNewVersion(), stop(), NES::Network::NetworkSource::stop(), and NES::TCPSource::storePersistedProperties().
|
protected |
Referenced by stop().
|
protected |
Referenced by NES::Network::NetworkSource::fail(), NES::TCPSource::fillBuffer(), handleReconfigurationMarker(), isRunning(), performSoftStop(), NES::Network::NetworkSource::postReconfigurationCallback(), NES::TCPSource::receiveData(), NES::BenchmarkSource::runningRoutine(), start(), NES::Network::NetworkSource::start(), stop(), NES::Network::NetworkSource::stop(), and ~DataSource().
|
protected |
Referenced by NES::BinarySource::BinarySource(), NES::MockDataSource::create(), NES::CSVSource::CSVSource(), DataSource(), NES::CSVSource::fillBuffer(), NES::TCPSource::fillBuffer(), getSchema(), getSourceSchemaAsString(), NES::MonitoringSource::MonitoringSource(), NES::DefaultSource::receiveData(), NES::LambdaSource::receiveData(), NES::MonitoringSource::receiveData(), NES::TCPSource::TCPSource(), NES::BinarySource::toString(), NES::CSVSource::toString(), NES::GeneratorSource::toString(), NES::MonitoringSource::toString(), NES::SenseSource::toString(), NES::Experimental::StaticDataSource::toString(), NES::TCPSource::toString(), and NES::ZmqSource::toString().
|
protected |
|
protected |
Referenced by emitWork(), and setSourceSharing().
|
protected |
Referenced by emitWork().
|
protected |
|
protected |
|
protected |
|
protected |
Referenced by handleReconfigurationMarker(), start(), and stop().