NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
this class implement the CSV as an input source More...
#include <CSVSource.hpp>
Public Member Functions | |
CSVSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, CSVSourceTypePtr csvSourceType, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors) | |
constructor of the CSV source More... | |
std::optional< Runtime::TupleBuffer > | receiveData () override |
override the receiveData method for the csv source More... | |
void | fillBuffer (Runtime::MemoryLayouts::TestTupleBuffer &) |
method to fill the buffer with tuples More... | |
std::string | toString () const override |
override the toString method for the csv source More... | |
SourceType | getType () const override |
Get source type. More... | |
std::string | getFilePath () const |
Get file path for the csv file. More... | |
const CSVSourceTypePtr & | getSourceConfig () const |
getter for source config More... | |
![]() | |
DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0) | |
public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More... | |
DataSource ()=delete | |
virtual void | open () |
This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More... | |
virtual void | close () |
This method cleans up thread-local state for the source. More... | |
virtual bool | start () |
method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More... | |
virtual bool | stop (Runtime::QueryTerminationType graceful) |
method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More... | |
virtual void | runningRoutine () |
running routine while source is active More... | |
virtual void | createOrLoadPersistedProperties () |
This method defines the logic to load the properties persisted during the previous run of the data source. More... | |
virtual void | storePersistedProperties () |
This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More... | |
virtual void | clearPersistedProperties () |
This method clears the persisted properties of the data source on had stop. More... | |
SchemaPtr | getSchema () const |
method to return the current schema of the source More... | |
std::string | getSourceSchemaAsString () |
method to return the current schema of the source as string More... | |
bool | isRunning () const noexcept |
debug function for testing to test if source is running More... | |
uint64_t | getNumberOfGeneratedTuples () const |
debug function for testing to get number of generated tuples More... | |
uint64_t | getNumberOfGeneratedBuffers () const |
debug function for testing to get number of generated buffer More... | |
virtual bool | performSoftStop () |
This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More... | |
void | setGatheringInterval (std::chrono::milliseconds interval) |
method to set the sampling interval More... | |
virtual | ~DataSource () NES_NOEXCEPT(false) override |
Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More... | |
uint64_t | getNumBuffersToProcess () const |
Get number of buffers to be processed. More... | |
std::chrono::milliseconds | getGatheringInterval () const |
Get gathering interval. More... | |
uint64_t | getGatheringIntervalCount () const |
Get number representation of gathering interval. More... | |
OperatorId | getOperatorId () const |
Gets the operator id for the data source. More... | |
void | setOperatorId (OperatorId operatorId) |
Set the operator id for the data source. More... | |
std::vector< Runtime::Execution::SuccessorExecutablePipeline > | getExecutableSuccessors () |
Returns the list of successor pipelines. More... | |
void | addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines) |
Add a list of successor pipelines. More... | |
template<typename Derived > | |
std::shared_ptr< Derived > | shared_from_base () |
This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More... | |
virtual std::vector< Schema::MemoryLayoutType > | getSupportedLayouts () |
This method returns all supported layouts. More... | |
virtual void | onEvent (Runtime::BaseEvent &) override |
API method called upon receiving an event. More... | |
virtual void | onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext) |
API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More... | |
virtual bool | fail () |
void | setSourceSharing (bool value) |
set source sharing value More... | |
void | incrementNumberOfConsumerQueries () |
set the number of queries that use this source More... | |
virtual bool | handleReconfigurationMarker (ReconfigurationMarkerPtr marker) |
check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~Reconfigurable () NES_NOEXCEPT(false) override=default | |
virtual void | reconfigure (ReconfigurationMessage &, WorkerContext &) |
reconfigure callback that will be called per thread More... | |
virtual void | postReconfigurationCallback (ReconfigurationMessage &) |
callback that will be called on the last thread the executes the reconfiguration More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
![]() | |
virtual | ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default |
![]() | |
virtual | ~DataEmitter () NES_NOEXCEPT(false)=default |
virtual void | onEndOfStream (Runtime::QueryTerminationType) |
virtual DecomposedQueryPlanVersion | getVersion () const |
virtual bool | startNewVersion () |
start a previously scheduled new version for this data emitter More... | |
virtual bool | insertReconfigurationMarker (ReconfigurationMarkerPtr) |
check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More... | |
![]() | |
~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default | |
std::shared_ptr< T1 > | shared_from_this () |
std::weak_ptr< T1 > | weak_from_this () |
Additional Inherited Members | |
![]() | |
const bool | persistentSource |
const std::string | persistentSourceKey |
![]() | |
void | emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override |
create a task using the provided buffer and submit it to a task consumer, e.g., query manager More... | |
NES::Runtime::MemoryLayouts::TestTupleBuffer | allocateBuffer () |
this class implement the CSV as an input source
|
explicit |
constructor of the CSV source
schema | of the source |
bufferManager | pointer to the buffer manager |
queryManager | pointer to the query manager |
csvSourceType | points to the current source configuration object, look at mqttSourceType and CSVSourceType for info |
operatorId | current operator id |
originId | represents the identifier of the upstream operator that represents the origin of the input stream |
statisticId | represents the unique identifier of components that we can track statistics for |
numSourceLocalBuffers | number of local source buffers |
gatheringMode | the gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE) |
physicalSourceName | the name and unique identifier of a physical source |
successors | the subsequent operators in the pipeline to which the data is pushed |
References NES::DataSource::gatheringInterval, NES::DefaultPhysicalTypeFactory::getPhysicalType(), input, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::schema.
void NES::CSVSource::fillBuffer | ( | Runtime::MemoryLayouts::TestTupleBuffer & | buffer | ) |
method to fill the buffer with tuples
buffer | to be filled |
References fileEnded, NES::DataSource::generatedBuffers, NES::DataSource::generatedTuples, NES::Runtime::MemoryLayouts::TestTupleBuffer::getBuffer(), NES::Runtime::TupleBuffer::getBufferSize(), NES::Runtime::MemoryLayouts::TestTupleBuffer::getCapacity(), input, NES::DataSource::localBufferManager, NES_ASSERT2_FMT, NES_TRACE, NES_WARNING, NES::Util::printTupleBufferAsCSV(), NES::DataSource::schema, and NES::Runtime::MemoryLayouts::TestTupleBuffer::setNumberOfTuples().
Referenced by receiveData().
std::string NES::CSVSource::getFilePath | ( | ) | const |
Get file path for the csv file.
const CSVSourceTypePtr & NES::CSVSource::getSourceConfig | ( | ) | const |
getter for source config
|
overridevirtual |
|
overridevirtual |
override the receiveData method for the csv source
Implements NES::DataSource.
References NES::DataSource::allocateBuffer(), fillBuffer(), NES_TRACE, and NES::DataSource::operatorId.
|
overridevirtual |
override the toString method for the csv source
Implements NES::DataSource.
References NES::DataSource::gatheringInterval, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::schema.
|
protected |
Referenced by fillBuffer(), and NES::TEST_F().
|
protected |
Referenced by CSVSource(), and fillBuffer().