NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::CSVSource Class Reference

this class implement the CSV as an input source More...

#include <CSVSource.hpp>

Collaboration diagram for NES::CSVSource:
[legend]

Public Member Functions

 CSVSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, CSVSourceTypePtr csvSourceType, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > successors)
 constructor of the CSV source More...
 
std::optional< Runtime::TupleBufferreceiveData () override
 override the receiveData method for the csv source More...
 
void fillBuffer (Runtime::MemoryLayouts::TestTupleBuffer &)
 method to fill the buffer with tuples More...
 
std::string toString () const override
 override the toString method for the csv source More...
 
SourceType getType () const override
 Get source type. More...
 
std::string getFilePath () const
 Get file path for the csv file. More...
 
const CSVSourceTypePtrgetSourceConfig () const
 getter for source config More...
 
- Public Member Functions inherited from NES::DataSource
 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual void open ()
 This methods initializes thread-local state. For instance, it creates the local buffer pool and is necessary because we cannot do it in the constructor. More...
 
virtual void close ()
 This method cleans up thread-local state for the source. More...
 
virtual bool start ()
 method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More...
 
virtual bool stop (Runtime::QueryTerminationType graceful)
 method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
virtual void createOrLoadPersistedProperties ()
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
virtual void storePersistedProperties ()
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
virtual void clearPersistedProperties ()
 This method clears the persisted properties of the data source on had stop. More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
virtual void onEvent (Runtime::BaseEvent &) override
 API method called upon receiving an event. More...
 
virtual void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More...
 
virtual bool fail ()
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
virtual bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker)
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
virtual void reconfigure (ReconfigurationMessage &, WorkerContext &)
 reconfigure callback that will be called per thread More...
 
virtual void postReconfigurationCallback (ReconfigurationMessage &)
 callback that will be called on the last thread the executes the reconfiguration More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
virtual void onEndOfStream (Runtime::QueryTerminationType)
 
virtual DecomposedQueryPlanVersion getVersion () const
 
virtual bool startNewVersion ()
 start a previously scheduled new version for this data emitter More...
 
virtual bool insertReconfigurationMarker (ReconfigurationMarkerPtr)
 check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Protected Attributes

std::ifstream input
 
bool fileEnded
 
- Protected Attributes inherited from NES::DataSource
Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Additional Inherited Members

- Public Attributes inherited from NES::DataSource
const bool persistentSource
 
const std::string persistentSourceKey
 
- Protected Member Functions inherited from NES::DataSource
void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 

Detailed Description

this class implement the CSV as an input source

Constructor & Destructor Documentation

◆ CSVSource()

NES::CSVSource::CSVSource ( SchemaPtr  schema,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
CSVSourceTypePtr  csvSourceType,
OperatorId  operatorId,
OriginId  originId,
StatisticId  statisticId,
size_t  numSourceLocalBuffers,
GatheringMode  gatheringMode,
const std::string &  physicalSourceName,
std::vector< Runtime::Execution::SuccessorExecutablePipeline successors 
)
explicit

constructor of the CSV source

Parameters
schemaof the source
bufferManagerpointer to the buffer manager
queryManagerpointer to the query manager
csvSourceTypepoints to the current source configuration object, look at mqttSourceType and CSVSourceType for info
operatorIdcurrent operator id
originIdrepresents the identifier of the upstream operator that represents the origin of the input stream
statisticIdrepresents the unique identifier of components that we can track statistics for
numSourceLocalBuffersnumber of local source buffers
gatheringModethe gathering mode (INTERVAL_MODE, INGESTION_RATE_MODE, or ADAPTIVE_MODE)
physicalSourceNamethe name and unique identifier of a physical source
successorsthe subsequent operators in the pipeline to which the data is pushed
Returns
a DataSourcePtr pointing to the data source

References NES::DataSource::gatheringInterval, NES::DefaultPhysicalTypeFactory::getPhysicalType(), input, NES_DEBUG, NES_THROW_RUNTIME_ERROR, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::schema.

Here is the call graph for this function:

Member Function Documentation

◆ fillBuffer()

void NES::CSVSource::fillBuffer ( Runtime::MemoryLayouts::TestTupleBuffer buffer)

method to fill the buffer with tuples

Parameters
bufferto be filled

References fileEnded, NES::DataSource::generatedBuffers, NES::DataSource::generatedTuples, NES::Runtime::MemoryLayouts::TestTupleBuffer::getBuffer(), NES::Runtime::TupleBuffer::getBufferSize(), NES::Runtime::MemoryLayouts::TestTupleBuffer::getCapacity(), input, NES::DataSource::localBufferManager, NES_ASSERT2_FMT, NES_TRACE, NES_WARNING, NES::Util::printTupleBufferAsCSV(), NES::DataSource::schema, and NES::Runtime::MemoryLayouts::TestTupleBuffer::setNumberOfTuples().

Referenced by receiveData().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getFilePath()

std::string NES::CSVSource::getFilePath ( ) const

Get file path for the csv file.

◆ getSourceConfig()

const CSVSourceTypePtr & NES::CSVSource::getSourceConfig ( ) const

getter for source config

Returns
csvSourceType1

◆ getType()

SourceType NES::CSVSource::getType ( ) const
overridevirtual

Get source type.

Returns
source type

Implements NES::DataSource.

◆ receiveData()

std::optional< Runtime::TupleBuffer > NES::CSVSource::receiveData ( )
overridevirtual

override the receiveData method for the csv source

Returns
returns a buffer if available

Implements NES::DataSource.

References NES::DataSource::allocateBuffer(), fillBuffer(), NES_TRACE, and NES::DataSource::operatorId.

Here is the call graph for this function:

◆ toString()

std::string NES::CSVSource::toString ( ) const
overridevirtual

override the toString method for the csv source

Returns
returns string describing the binary source

Implements NES::DataSource.

References NES::DataSource::gatheringInterval, NES::DataSource::numberOfBuffersToProduce, and NES::DataSource::schema.

Member Data Documentation

◆ fileEnded

bool NES::CSVSource::fileEnded
protected

Referenced by fillBuffer(), and NES::TEST_F().

◆ input

std::ifstream NES::CSVSource::input
protected

Referenced by CSVSource(), and fillBuffer().


The documentation for this class was generated from the following files: