NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Experimental::StaticDataSource Class Reference

Table Source todo Still under development. More...

#include <StaticDataSource.hpp>

Collaboration diagram for NES::Experimental::StaticDataSource:
[legend]

Public Member Functions

 StaticDataSource (SchemaPtr schema, std::string pathTableFile, const bool lateStart, ::NES::Runtime::BufferManagerPtr bufferManager, ::NES::Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, const std::string &physicalSourceName, std::vector<::NES::Runtime::Execution::SuccessorExecutablePipeline > successors)
 The constructor of a StaticDataSource. More...
 
bool start () final
 overwrite DataSource::start(). Only start runningRoutine, if lateStart==false. More...
 
bool startStaticDataSourceManually ()
 method to start the source, calls DataSource::start(). 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More...
 
void onEvent (Runtime::BaseEvent &) final
 API method called upon receiving an event. At startSourceEvent, start source. More...
 
void open () override
 This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More...
 
void preloadBuffers ()
 
std::optional<::NES::Runtime::TupleBufferreceiveData () override
 This method is implemented only to comply with the API: it will crash the system if called. More...
 
std::string toString () const override
 Provides a string representation of the source. More...
 
SourceType getType () const override
 Provides the type of the source. More...
 
virtual void recyclePooledBuffer (::NES::Runtime::detail::MemorySegment *) override
 
virtual void recycleUnpooledBuffer (::NES::Runtime::detail::MemorySegment *) override
 Interface method for unpooled buffer recycling. More...
 
- Public Member Functions inherited from NES::GeneratorSource
 GeneratorSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, uint64_t numberOfBufferToProduce, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, const std::string &physicalSourceName=std::string("defaultPhysicalStreamName"))
 constructor to create a generator source More...
 
- Public Member Functions inherited from NES::DataSource
 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual void close ()
 This method cleans up thread-local state for the source. More...
 
virtual bool stop (Runtime::QueryTerminationType graceful)
 method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
virtual void createOrLoadPersistedProperties ()
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
virtual void storePersistedProperties ()
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
virtual void clearPersistedProperties ()
 This method clears the persisted properties of the data source on had stop. More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
virtual void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More...
 
virtual bool fail ()
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
virtual bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker)
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
virtual void reconfigure (ReconfigurationMessage &, WorkerContext &)
 reconfigure callback that will be called per thread More...
 
virtual void postReconfigurationCallback (ReconfigurationMessage &)
 callback that will be called on the last thread the executes the reconfiguration More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
virtual void onEndOfStream (Runtime::QueryTerminationType)
 
virtual DecomposedQueryPlanVersion getVersion () const
 
virtual bool startNewVersion ()
 start a previously scheduled new version for this data emitter More...
 
virtual bool insertReconfigurationMarker (ReconfigurationMarkerPtr)
 check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::Runtime::BufferRecycler
virtual void recyclePooledBuffer (detail::MemorySegment *buffer)=0
 Interface method for pooled buffer recycling. More...
 
virtual void recycleUnpooledBuffer (detail::MemorySegment *buffer)=0
 Interface method for unpooled buffer recycling. More...
 

Additional Inherited Members

- Public Attributes inherited from NES::DataSource
const bool persistentSource
 
const std::string persistentSourceKey
 
- Protected Member Functions inherited from NES::DataSource
void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 
- Protected Attributes inherited from NES::DataSource
Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Detailed Description

Table Source todo Still under development.

Constructor & Destructor Documentation

◆ StaticDataSource()

NES::Experimental::StaticDataSource::StaticDataSource ( SchemaPtr  schema,
std::string  pathTableFile,
const bool  lateStart,
::NES::Runtime::BufferManagerPtr  bufferManager,
::NES::Runtime::QueryManagerPtr  queryManager,
OperatorId  operatorId,
OriginId  originId,
StatisticId  statisticId,
size_t  numSourceLocalBuffers,
const std::string &  physicalSourceName,
std::vector<::NES::Runtime::Execution::SuccessorExecutablePipeline successors 
)
explicit

The constructor of a StaticDataSource.

Parameters
schemathe schema of the data
pathTableFile
lateStartwether to start the source late
bufferManagerpointer to the buffer manager
queryManagerpointer to the query manager
operatorIdcurrent operator id
originIdrepresents the identifier of the upstream operator that represents the origin of the input stream
statisticIdrepresents the unique identifier of components that we can track statistics for
numSourceLocalBuffersthe number of buffers allocated to a source
physicalSourceNamethe name and unique identifier of a physical source
successorsthe subsequent operators in the pipeline to which the data is pushed

References NES::DefaultPhysicalTypeFactory::getPhysicalType(), NES::INTERVAL_MODE, NES::DataSource::localBufferManager, NES_ASSERT, NES_DEBUG, NES::DataSource::numberOfBuffersToProduce, NES::DataSource::operatorId, preloadBuffers(), NES::DataSource::sourceAffinity, and NES::NESStrongType< T, Tag, invalid, initial >::toString().

Here is the call graph for this function:

Member Function Documentation

◆ getType()

NES::SourceType NES::Experimental::StaticDataSource::getType ( ) const
overridevirtual

Provides the type of the source.

Returns
the type of the source

Reimplemented from NES::GeneratorSource.

◆ onEvent()

void NES::Experimental::StaticDataSource::onEvent ( Runtime::BaseEvent event)
finalvirtual

API method called upon receiving an event. At startSourceEvent, start source.

Parameters
event

Reimplemented from NES::DataSource.

References NES::Runtime::BaseEvent::getEventType(), NES::Runtime::kStartSourceEvent, NES_DEBUG, NES::DataSource::operatorId, and startStaticDataSourceManually().

Here is the call graph for this function:

◆ open()

void NES::Experimental::StaticDataSource::open ( )
overridevirtual

This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor.

Reimplemented from NES::DataSource.

References NES::DataSource::bufferManager, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::open().

Here is the call graph for this function:

◆ preloadBuffers()

void NES::Experimental::StaticDataSource::preloadBuffers ( )

References NES::DataSource::allocateBuffer(), NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::open().

Referenced by StaticDataSource().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ receiveData()

std::optional<::NES::Runtime::TupleBuffer > NES::Experimental::StaticDataSource::receiveData ( )
overridevirtual

This method is implemented only to comply with the API: it will crash the system if called.

Returns
a nullopt

Implements NES::GeneratorSource.

References NES::DataSource::allocateBuffer(), NES_ASSERT2_FMT, NES_DEBUG, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::operatorId.

Here is the call graph for this function:

◆ recyclePooledBuffer()

virtual void NES::Experimental::StaticDataSource::recyclePooledBuffer ( ::NES::Runtime::detail::MemorySegment )
inlineoverridevirtual

◆ recycleUnpooledBuffer()

virtual void NES::Experimental::StaticDataSource::recycleUnpooledBuffer ( ::NES::Runtime::detail::MemorySegment )
inlineoverridevirtual

Interface method for unpooled buffer recycling.

Parameters
bufferthe buffer to recycle

◆ start()

bool NES::Experimental::StaticDataSource::start ( )
finalvirtual

overwrite DataSource::start(). Only start runningRoutine, if lateStart==false.

Reimplemented from NES::DataSource.

References NES_DEBUG, NES::DataSource::operatorId, and startStaticDataSourceManually().

Here is the call graph for this function:

◆ startStaticDataSourceManually()

bool NES::Experimental::StaticDataSource::startStaticDataSourceManually ( )

method to start the source, calls DataSource::start(). 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine

References NES::DataSource::start().

Referenced by onEvent(), and start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ toString()

std::string NES::Experimental::StaticDataSource::toString ( ) const
overridevirtual

Provides a string representation of the source.

Returns
The string representation of the source

Reimplemented from NES::GeneratorSource.

References NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::schema.


The documentation for this class was generated from the following files: