NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
Benchmark Source is a special source for benchmarking purposes only and stripes away all overhead The memory area out of which buffers will be produced must be initialized beforehand and allocated as a shared_ptr that must have ownership of the area, i.e., it must control when to free it. Do not use in distributed settings but only for single node dev and testing. More...
#include <BenchmarkSource.hpp>
Public Member Functions | |
BenchmarkSource (SchemaPtr schema, const std::shared_ptr< uint8_t > &memoryArea, size_t memoryAreaSize, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, uint64_t numberOfBuffersToProcess, uint64_t gatheringValue, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, SourceMode sourceMode, uint64_t sourceAffinity, uint64_t taskQueueId, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors) | |
The constructor of a BenchmarkSource. More... | |
~BenchmarkSource () | |
std::optional< Runtime::TupleBuffer > | receiveData () override |
This method is implemented only to comply with the API: it will crash the system if called. More... | |
std::string | toString () const override |
Provides a string representation of the source. More... | |
SourceType | getType () const override |
Provides the type of the source. More... | |
virtual void | runningRoutine () override |
running routine while source is active, More... | |
virtual void | recyclePooledBuffer (Runtime::detail::MemorySegment *) override |
Interface method for pooled buffer recycling. More... | |
virtual void | recycleUnpooledBuffer (Runtime::detail::MemorySegment *) override |
Interface method for unpooled buffer recycling. More... | |
void | open () override |
This methods creates the local buffer pool and is necessary because we cannot do it in the constructor. More... | |
void | close () override |
This method cleans up thread-local state for the source. More... | |
![]() | |
GeneratorSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, uint64_t numberOfBufferToProduce, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors, const std::string &physicalSourceName=std::string("defaultPhysicalStreamName")) | |
constructor to create a generator source More... | |
std::string | toString () const override |
override the toString method for the generator source More... | |
SourceType | getType () const override |
get source Type More... | |
![]() | |
DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0) | |
public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More... | |
DataSource ()=delete | |
virtual bool | start () |
method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More... | |
virtual bool | stop (Runtime::QueryTerminationType graceful) |
method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More... | |
virtual void | createOrLoadPersistedProperties () |
This method defines the logic to load the properties persisted during the previous run of the data source. More... | |
virtual void | storePersistedProperties () |
This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More... | |
virtual void | clearPersistedProperties () |
This method clears the persisted properties of the data source on had stop. More... | |
SchemaPtr | getSchema () const |
method to return the current schema of the source More... | |
std::string | getSourceSchemaAsString () |
method to return the current schema of the source as string More... | |
bool | isRunning () const noexcept |
debug function for testing to test if source is running More... | |
uint64_t | getNumberOfGeneratedTuples () const |
debug function for testing to get number of generated tuples More... | |
uint64_t | getNumberOfGeneratedBuffers () const |
debug function for testing to get number of generated buffer More... | |
virtual bool | performSoftStop () |
This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More... | |
void | setGatheringInterval (std::chrono::milliseconds interval) |
method to set the sampling interval More... | |
virtual | ~DataSource () NES_NOEXCEPT(false) override |
Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More... | |
uint64_t | getNumBuffersToProcess () const |
Get number of buffers to be processed. More... | |
std::chrono::milliseconds | getGatheringInterval () const |
Get gathering interval. More... | |
uint64_t | getGatheringIntervalCount () const |
Get number representation of gathering interval. More... | |
OperatorId | getOperatorId () const |
Gets the operator id for the data source. More... | |
void | setOperatorId (OperatorId operatorId) |
Set the operator id for the data source. More... | |
std::vector< Runtime::Execution::SuccessorExecutablePipeline > | getExecutableSuccessors () |
Returns the list of successor pipelines. More... | |
void | addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines) |
Add a list of successor pipelines. More... | |
template<typename Derived > | |
std::shared_ptr< Derived > | shared_from_base () |
This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More... | |
virtual std::vector< Schema::MemoryLayoutType > | getSupportedLayouts () |
This method returns all supported layouts. More... | |
virtual void | onEvent (Runtime::BaseEvent &) override |
API method called upon receiving an event. More... | |
virtual void | onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) |
API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More... | |
virtual bool | fail () |
void | setSourceSharing (bool value) |
set source sharing value More... | |
void | incrementNumberOfConsumerQueries () |
set the number of queries that use this source More... | |
virtual bool | handleReconfigurationMarker (ReconfigurationMarkerPtr marker) |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~Reconfigurable () NES_NOEXCEPT(false) override=default | |
virtual void | reconfigure (ReconfigurationMessage &, WorkerContext &) |
reconfigure callback that will be called per thread More... | |
virtual void | postReconfigurationCallback (ReconfigurationMessage &) |
callback that will be called on the last thread the executes the reconfiguration More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
![]() | |
virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
![]() | |
virtual | ~DataEmitter () NES_NOEXCEPT(false)=default |
virtual void | onEndOfStream (Runtime::QueryTerminationType) |
virtual DecomposedQueryPlanVersion | getVersion () const |
virtual bool | startNewVersion () |
start a previously scheduled new version for this data emitter More... | |
virtual bool | insertReconfigurationMarker (ReconfigurationMarkerPtr) |
check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
Benchmark Source is a special source for benchmarking purposes only and stripes away all overhead The memory area out of which buffers will be produced must be initialized beforehand and allocated as a shared_ptr that must have ownership of the area, i.e., it must control when to free it. Do not use in distributed settings but only for single node dev and testing.
|
explicit |
The constructor of a BenchmarkSource.
schema | the schema of the source |
memoryArea | the non-null memory area that stores the data that will be used by the source |
memoryAreaSize | the non-zero size of the memory area |
bufferManager | pointer to the buffer manager |
queryManager | pointer to the query manager |
numberOfBuffersToProcess | the number of buffers to be produced by the source |
gatheringValue | how many tuples to collect per interval |
operatorId | current operator id |
originId | represents the identifier of the upstream operator that represents the origin of the input stream |
statisticId | represents the unique identifier of components that we can track statistics for |
numSourceLocalBuffers | the number of buffers allocated to a source |
gatheringMode | the gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE) |
sourceMode | |
sourceAffinity | the subsequent operators in the pipeline to which the data is pushed |
taskQueueId | the ID of the queue to which the task is pushed |
physicalSourceName | the name and unique identifier of a physical source |
successors | the subsequent operators in the pipeline to which the data is pushed |
References magic_enum::enum_name(), NES::DataSource::gatheringIngestionRate, NES::DataSource::gatheringInterval, NES::DataSource::gatheringMode, NES::INGESTION_RATE_MODE, NES::INTERVAL_MODE, NES::DataSource::localBufferManager, NES_ASSERT, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES::DataSource::numberOfBuffersToProduce, NES::DataSource::sourceAffinity, and NES::DataSource::taskQueueId.
NES::BenchmarkSource::~BenchmarkSource | ( | ) |
|
overridevirtual |
This method cleans up thread-local state for the source.
Reimplemented from NES::DataSource.
References NES::DataSource::close(), and NES::Runtime::TupleBuffer::release().
|
overridevirtual |
|
overridevirtual |
This methods creates the local buffer pool and is necessary because we cannot do it in the constructor.
Reimplemented from NES::DataSource.
References NES::Runtime::TupleBuffer::getBuffer(), NES::DataSource::localBufferManager, apex::memcpy(), and NES::DataSource::open().
Referenced by runningRoutine().
|
overridevirtual |
This method is implemented only to comply with the API: it will crash the system if called.
Implements NES::GeneratorSource.
References NES_NOT_IMPLEMENTED.
|
overridevirtual |
Interface method for pooled buffer recycling.
buffer | the buffer to recycle |
Implements NES::Runtime::BufferRecycler.
References NES::Runtime::TupleBuffer::release().
|
overridevirtual |
Interface method for unpooled buffer recycling.
buffer | the buffer to recycle |
Implements NES::Runtime::BufferRecycler.
References NES::Runtime::TupleBuffer::release().
|
overridevirtual |
running routine while source is active,
Reimplemented from NES::DataSource.
References NES::DataSource::bufferManager, NES::COPY_BUFFER_SIMD_RTE, NES::EMPTY_BUFFER, NES_INFO, NES::DataSource::numberOfBuffersToProduce, open(), and NES::DataSource::running.
|
overridevirtual |
Provides a string representation of the source.
Implements NES::DataSource.