NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::MemorySource Class Reference

Memory Source that reads from main memory and produces buffers. The memory area out of which buffers will be produced must be initialized beforehand and allocated as a shared_ptr that must have ownership of the area, i.e., it must control when to free it. Do not use in distributed settings but only for single node dev and testing. More...

#include <MemorySource.hpp>

Collaboration diagram for NES::MemorySource:
[legend]

Public Member Functions

 MemorySource (SchemaPtr schema, const std::shared_ptr< uint8_t > &memoryArea, size_t memoryAreaSize, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, uint64_t numBuffersToProcess, uint64_t gatheringValue, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, uint64_t sourceAffinity, uint64_t taskQueueId, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors)
 The constructor of a MemorySource. More...
 
std::optional< Runtime::TupleBufferreceiveData () override
 This method is implemented only to comply with the API: it will crash the system if called. More...
 
std::string toString () const override
 Provides a string representation of the source. More...
 
SourceType getType () const override
 Provides the type of the source. More...
 
virtual void recyclePooledBuffer (Runtime::detail::MemorySegment *) override
 Interface method for pooled buffer recycling. More...
 
virtual void recycleUnpooledBuffer (Runtime::detail::MemorySegment *) override
 Interface method for unpooled buffer recycling. More...
 
- Public Member Functions inherited from NES::GeneratorSource
 GeneratorSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, uint64_t numberOfBufferToProduce, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, const std::string &physicalSourceName=std::string("defaultPhysicalStreamName"))
 constructor to create a generator source More...
 
- Public Member Functions inherited from NES::DataSource
 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual void open ()
 This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More...
 
virtual void close ()
 This method cleans up thread-local state for the source. More...
 
virtual bool start ()
 method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More...
 
virtual bool stop (Runtime::QueryTerminationType graceful)
 method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
virtual void createOrLoadPersistedProperties ()
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
virtual void storePersistedProperties ()
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
virtual void clearPersistedProperties ()
 This method clears the persisted properties of the data source on had stop. More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
virtual void onEvent (Runtime::BaseEvent &) override
 API method called upon receiving an event. More...
 
virtual void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More...
 
virtual bool fail ()
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
virtual bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker)
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
virtual void reconfigure (ReconfigurationMessage &, WorkerContext &)
 reconfigure callback that will be called per thread More...
 
virtual void postReconfigurationCallback (ReconfigurationMessage &)
 callback that will be called on the last thread the executes the reconfiguration More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
virtual void onEndOfStream (Runtime::QueryTerminationType)
 
virtual DecomposedQueryPlanVersion getVersion () const
 
virtual bool startNewVersion ()
 start a previously scheduled new version for this data emitter More...
 
virtual bool insertReconfigurationMarker (ReconfigurationMarkerPtr)
 check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Additional Inherited Members

- Public Attributes inherited from NES::DataSource
const bool persistentSource
 
const std::string persistentSourceKey
 
- Protected Member Functions inherited from NES::DataSource
void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 
- Protected Attributes inherited from NES::DataSource
Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Detailed Description

Memory Source that reads from main memory and produces buffers. The memory area out of which buffers will be produced must be initialized beforehand and allocated as a shared_ptr that must have ownership of the area, i.e., it must control when to free it. Do not use in distributed settings but only for single node dev and testing.

Constructor & Destructor Documentation

◆ MemorySource()

NES::MemorySource::MemorySource ( SchemaPtr  schema,
const std::shared_ptr< uint8_t > &  memoryArea,
size_t  memoryAreaSize,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
uint64_t  numBuffersToProcess,
uint64_t  gatheringValue,
OperatorId  operatorId,
OriginId  originId,
StatisticId  statisticId,
size_t  numSourceLocalBuffers,
GatheringMode  gatheringMode,
uint64_t  sourceAffinity,
uint64_t  taskQueueId,
const std::string &  physicalSourceName,
std::vector< Runtime::Execution::SuccessorExecutablePipeline successors 
)
explicit

The constructor of a MemorySource.

Parameters
schemathe schema of the source
memoryAreathe non-null memory area that stores the data that will be used by the source
memoryAreaSizethe non-zero size of the memory area
bufferManagerpointer to the buffer manager
queryManagerpointer to the query manager
numBuffersToProcess
gatheringValuehow many tuples to collect per interval
operatorIdthe id of the source
originIdrepresents the identifier of the upstream operator that represents the origin of the input stream
statisticIdrepresents the unique identifier of components that we can track statistics for
numSourceLocalBuffersthe number of buffers allocated to a source
gatheringModethe gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE)
sourceAffinitythe subsequent operators in the pipeline to which the data is pushed
taskQueueIdthe ID of the queue to which the task is pushed
physicalSourceNamethe name and unique identifier of a physical source
successorsthe subsequent operators in the pipeline to which the data is pushed

References magic_enum::enum_name(), NES::DataSource::gatheringIngestionRate, NES::DataSource::gatheringInterval, NES::DataSource::gatheringMode, NES::INGESTION_RATE_MODE, NES::INTERVAL_MODE, NES::DataSource::localBufferManager, NES_ASSERT, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES::DataSource::numberOfBuffersToProduce, NES::DataSource::sourceAffinity, and NES::DataSource::taskQueueId.

Here is the call graph for this function:

Member Function Documentation

◆ getType()

NES::SourceType NES::MemorySource::getType ( ) const
overridevirtual

Provides the type of the source.

Returns
the type of the source

Reimplemented from NES::GeneratorSource.

◆ receiveData()

std::optional< Runtime::TupleBuffer > NES::MemorySource::receiveData ( )
overridevirtual

This method is implemented only to comply with the API: it will crash the system if called.

Returns
a nullopt

Implements NES::GeneratorSource.

References NES::DataSource::bufferManager, NES::DataSource::generatedBuffers, NES::DataSource::generatedTuples, apex::memcpy(), NES_DEBUG, NES_TRACE, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::operatorId.

Here is the call graph for this function:

◆ recyclePooledBuffer()

virtual void NES::MemorySource::recyclePooledBuffer ( Runtime::detail::MemorySegment buffer)
inlineoverridevirtual

Interface method for pooled buffer recycling.

Parameters
bufferthe buffer to recycle

Implements NES::Runtime::BufferRecycler.

◆ recycleUnpooledBuffer()

virtual void NES::MemorySource::recycleUnpooledBuffer ( Runtime::detail::MemorySegment )
inlineoverridevirtual

Interface method for unpooled buffer recycling.

Parameters
bufferthe buffer to recycle

Implements NES::Runtime::BufferRecycler.

◆ toString()

std::string NES::MemorySource::toString ( ) const
overridevirtual

Provides a string representation of the source.

Returns
The string representation of the source

Reimplemented from NES::GeneratorSource.


The documentation for this class was generated from the following files: