NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::TCPSource Class Reference

source to receive data via TCP connection More...

#include <TPCSource-old.hpp>

Collaboration diagram for NES::TCPSource:
[legend]

Public Member Functions

 TCPSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, const TCPSourceTypePtr &tcpSourceType, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors)
 constructor of a TCP Source More...
 
std::optional< Runtime::TupleBufferreceiveData () override
 override the receiveData method for the csv source More...
 
bool fillBuffer (Runtime::MemoryLayouts::TestTupleBuffer &)
 method to fill the buffer with tuples More...
 
void createOrLoadPersistedProperties () override
 This method defines the logic to load the properties persisted during the previous run of the data source. More...
 
void storePersistedProperties () override
 This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call. More...
 
void clearPersistedProperties () override
 This method clears the persisted properties of the data source on had stop. More...
 
std::string toString () const override
 override the toString method for the csv source More...
 
SourceType getType () const override
 Get source type. More...
 
const TCPSourceTypePtr & getSourceConfig () const
 getter for source config More...
 
void open () override
 opens TCP connection More...
 
void close () override
 closes TCP connection More...
 
 TCPSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, TCPSourceTypePtr tcpSourceType, OperatorId operatorId, OriginId originId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors)
 constructor of a TCP Source More...
 
std::optional< Runtime::TupleBufferreceiveData () override
 override the receiveData method for the csv source More...
 
bool fillBuffer (Runtime::MemoryLayouts::TestTupleBuffer &)
 method to fill the buffer with tuples More...
 
uint64_t sizeUntilSearchToken (char token)
 search from the back (first inputted item) to the front for the given search token @token to search for More...
 
bool popGivenNumberOfValues (uint64_t numberOfValuesToPop, bool popTextDivider)
 pop number of values given and fill temp with popped values. If popTextDevider true, pop one more value and discard More...
 
std::string toString () const override
 override the toString method for the csv source More...
 
SourceType getType () const override
 Get source type. More...
 
const TCPSourceTypePtr & getSourceConfig () const
 getter for source config More...
 
void open () override
 opens TCP connection More...
 
void close () override
 closes TCP connection More...
 
- Public Member Functions inherited from NES::DataSource
 DataSource (SchemaPtr schema, Runtime::BufferManagerPtr bufferManager, Runtime::QueryManagerPtr queryManager, OperatorId operatorId, OriginId originId, StatisticId statisticId, size_t numSourceLocalBuffers, GatheringMode gatheringMode, const std::string &physicalSourceName, bool persistentSource, std::vector< Runtime::Execution::SuccessorExecutablePipeline > executableSuccessors=std::vector< Runtime::Execution::SuccessorExecutablePipeline >(), uint64_t sourceAffinity=std::numeric_limits< uint64_t >::max(), uint64_t taskQueueId=0)
 public constructor for data source @Note the number of buffers to process is set to UINT64_MAX and the value is needed by some test to produce a deterministic behavior More...
 
 DataSource ()=delete
 
virtual bool start ()
 method to start the source. 1.) check if bool running is true, if true return if not start source 2.) start new thread with runningRoutine More...
 
virtual bool stop (Runtime::QueryTerminationType graceful)
 method to stop the source. 1.) check if bool running is false, if false return, if not stop source 2.) stop thread by join More...
 
virtual void runningRoutine ()
 running routine while source is active More...
 
SchemaPtr getSchema () const
 method to return the current schema of the source More...
 
std::string getSourceSchemaAsString ()
 method to return the current schema of the source as string More...
 
bool isRunning () const noexcept
 debug function for testing to test if source is running More...
 
uint64_t getNumberOfGeneratedTuples () const
 debug function for testing to get number of generated tuples More...
 
uint64_t getNumberOfGeneratedBuffers () const
 debug function for testing to get number of generated buffer More...
 
virtual bool performSoftStop ()
 This method will mark the running as false this will result in 1. the interruption of data gathering loop and 2. initiation of soft stop routine. More...
 
void setGatheringInterval (std::chrono::milliseconds interval)
 method to set the sampling interval More...
 
virtual ~DataSource () NES_NOEXCEPT(false) override
 Internal destructor to make sure that the data source is stopped before deconstrcuted @Note must be public because of boost serialize. More...
 
uint64_t getNumBuffersToProcess () const
 Get number of buffers to be processed. More...
 
std::chrono::milliseconds getGatheringInterval () const
 Get gathering interval. More...
 
uint64_t getGatheringIntervalCount () const
 Get number representation of gathering interval. More...
 
OperatorId getOperatorId () const
 Gets the operator id for the data source. More...
 
void setOperatorId (OperatorId operatorId)
 Set the operator id for the data source. More...
 
std::vector< Runtime::Execution::SuccessorExecutablePipelinegetExecutableSuccessors ()
 Returns the list of successor pipelines. More...
 
void addExecutableSuccessors (std::vector< Runtime::Execution::SuccessorExecutablePipeline > newPipelines)
 Add a list of successor pipelines. More...
 
template<typename Derived >
std::shared_ptr< Derived > shared_from_base ()
 This method is necessary to avoid problems with the shared_from_this machinery combined with multi-inheritance. More...
 
virtual std::vector< Schema::MemoryLayoutTypegetSupportedLayouts ()
 This method returns all supported layouts. More...
 
virtual void onEvent (Runtime::BaseEvent &) override
 API method called upon receiving an event. More...
 
virtual void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event, whose handling requires the WorkerContext (e.g. its network channels). More...
 
virtual bool fail ()
 
void setSourceSharing (bool value)
 set source sharing value More...
 
void incrementNumberOfConsumerQueries ()
 set the number of queries that use this source More...
 
virtual bool handleReconfigurationMarker (ReconfigurationMarkerPtr marker)
 check if a reconfiguration marker contains an event for this source. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
virtual void reconfigure (ReconfigurationMessage &, WorkerContext &)
 reconfigure callback that will be called per thread More...
 
virtual void postReconfigurationCallback (ReconfigurationMessage &)
 callback that will be called on the last thread the executes the reconfiguration More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::DataEmitter
virtual ~DataEmitter () NES_NOEXCEPT(false)=default
 
virtual void onEndOfStream (Runtime::QueryTerminationType)
 
virtual DecomposedQueryPlanVersion getVersion () const
 
virtual bool startNewVersion ()
 start a previously scheduled new version for this data emitter More...
 
virtual bool insertReconfigurationMarker (ReconfigurationMarkerPtr)
 check if a reconfiguration marker contains an event for this data emitter. If so, trigger the reconfiguration and propagate the marker downstream. More...
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Additional Inherited Members

- Public Attributes inherited from NES::DataSource
const bool persistentSource
 
const std::string persistentSourceKey
 
- Protected Member Functions inherited from NES::DataSource
void emitWork (Runtime::TupleBuffer &buffer, bool addBufferMetaData=true) override
 create a task using the provided buffer and submit it to a task consumer, e.g., query manager More...
 
NES::Runtime::MemoryLayouts::TestTupleBuffer allocateBuffer ()
 
- Protected Attributes inherited from NES::DataSource
Runtime::QueryManagerPtr queryManager
 
Runtime::BufferManagerPtr localBufferManager
 
Runtime::FixedSizeBufferPoolPtr bufferManager {nullptr}
 
std::vector< Runtime::Execution::SuccessorExecutablePipelineexecutableSuccessors
 
OperatorId operatorId
 
OriginId originId
 
StatisticId statisticId
 
SchemaPtr schema
 
uint64_t generatedTuples {0}
 
uint64_t generatedBuffers {0}
 
uint64_t numberOfBuffersToProduce = std::numeric_limits<decltype(numberOfBuffersToProduce)>::max()
 
uint64_t numSourceLocalBuffers
 
uint64_t gatheringIngestionRate {}
 
std::chrono::milliseconds gatheringInterval {0}
 
GatheringMode gatheringMode
 
SourceType type
 
Runtime::QueryTerminationType wasGracefullyStopped {Runtime::QueryTerminationType::Graceful}
 
std::atomic_bool wasStarted {false}
 
std::atomic_bool futureRetrieved {false}
 
std::atomic_bool running {false}
 
std::promise< bool > completedPromise
 
uint64_t sourceAffinity
 
uint64_t taskQueueId
 
bool sourceSharing = false
 
const std::string physicalSourceName
 
std::atomic< uint64_t > refCounter = 0
 
std::atomic< uint64_t > numberOfConsumerQueries = 1
 
Runtime::MemoryLayouts::MemoryLayoutPtr memoryLayout
 

Detailed Description

source to receive data via TCP connection

Constructor & Destructor Documentation

◆ TCPSource() [1/2]

NES::TCPSource::TCPSource ( SchemaPtr  schema,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
const TCPSourceTypePtr &  tcpSourceType,
OperatorId  operatorId,
OriginId  originId,
StatisticId  statisticId,
size_t  numSourceLocalBuffers,
GatheringMode  gatheringMode,
const std::string &  physicalSourceName,
std::vector< Runtime::Execution::SuccessorExecutablePipeline executableSuccessors 
)
explicit

constructor of a TCP Source

Parameters
schemathe schema of the data
bufferManagerThe BufferManager is responsible for: 1. Pooled Buffers: preallocated fixed-size buffers of memory that must be reference counted 2. Unpooled Buffers: variable sized buffers that are allocated on-the-fly. They are also subject to reference counting.
queryManagercomes with functionality to manage the queries
tcpSourceTypepoints at current TCPSourceType config object, look at same named file for info
operatorIdrepresents a locally running query execution plan
originIdrepresents an origin
statisticIdrepresents the unique identifier of components that we can track statistics for
numSourceLocalBuffersnumber of local source buffers
gatheringModethe gathering mode used
physicalSourceNamethe name and unique identifier of a physical source
executableSuccessorsexecutable operators coming after this source

References NES::DefaultPhysicalTypeFactory::getPhysicalType(), NES_TRACE, and NES::DataSource::schema.

Here is the call graph for this function:

◆ TCPSource() [2/2]

NES::TCPSource::TCPSource ( SchemaPtr  schema,
Runtime::BufferManagerPtr  bufferManager,
Runtime::QueryManagerPtr  queryManager,
TCPSourceTypePtr  tcpSourceType,
OperatorId  operatorId,
OriginId  originId,
size_t  numSourceLocalBuffers,
GatheringMode  gatheringMode,
const std::string &  physicalSourceName,
std::vector< Runtime::Execution::SuccessorExecutablePipeline executableSuccessors 
)
explicit

constructor of a TCP Source

Parameters
schemathe schema of the data
bufferManagerThe BufferManager is responsible for: 1. Pooled Buffers: preallocated fixed-size buffers of memory that must be reference counted 2. Unpooled Buffers: variable sized buffers that are allocated on-the-fly. They are also subject to reference counting.
queryManagercomes with functionality to manage the queries
tcpSourceTypepoints at current TCPSourceType config object, look at same named file for info
operatorIdrepresents a locally running query execution plan
originIdrepresents an origin
numSourceLocalBuffersnumber of local source buffers
gatheringModethe gathering mode used
physicalSourceNamethe name and unique identifier of a physical source
executableSuccessorsexecutable operators coming after this source

Member Function Documentation

◆ clearPersistedProperties()

void NES::TCPSource::clearPersistedProperties ( )
overridevirtual

This method clears the persisted properties of the data source on had stop.

Reimplemented from NES::DataSource.

References NES::Runtime::HardStop, NES_WARNING, NES::DataSource::persistentSource, NES::DataSource::persistentSourceKey, NES::DataSource::queryManager, and NES::DataSource::wasGracefullyStopped.

◆ close() [1/2]

void NES::TCPSource::close ( )
overridevirtual

closes TCP connection

Reimplemented from NES::DataSource.

References NES::DataSource::close(), NES::Runtime::HardStop, NES_ERROR, NES_TRACE, NES::DataSource::persistentSource, and NES::DataSource::wasGracefullyStopped.

Here is the call graph for this function:

◆ close() [2/2]

void NES::TCPSource::close ( )
overridevirtual

closes TCP connection

Reimplemented from NES::DataSource.

◆ createOrLoadPersistedProperties()

void NES::TCPSource::createOrLoadPersistedProperties ( )
overridevirtual

This method defines the logic to load the properties persisted during the previous run of the data source.

Reimplemented from NES::DataSource.

References connect(), NES_ERROR, NES_TRACE, NES::DataSource::persistentSource, NES::DataSource::persistentSourceKey, NES::DataSource::queryManager, socket(), and NES::timeout.

Here is the call graph for this function:

◆ fillBuffer() [1/2]

bool NES::TCPSource::fillBuffer ( Runtime::MemoryLayouts::TestTupleBuffer tupleBuffer)

method to fill the buffer with tuples

Parameters
bufferto be filled

References NES::DataSource::generatedBuffers, NES::DataSource::generatedTuples, NES::Runtime::MemoryLayouts::TestTupleBuffer::getCapacity(), NES::Runtime::MemoryLayouts::TestTupleBuffer::getNumberOfTuples(), NES::DataSource::localBufferManager, NES_ASSERT, NES_DEBUG, NES_TRACE, NES_WARNING, NES::DataSource::operatorId, NES::DataSource::running, NES::DataSource::schema, NES::Runtime::MemoryLayouts::TestTupleBuffer::setNumberOfTuples(), size(), and sizeUntilSearchToken().

Referenced by receiveData().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ fillBuffer() [2/2]

bool NES::TCPSource::fillBuffer ( Runtime::MemoryLayouts::TestTupleBuffer )

method to fill the buffer with tuples

Parameters
bufferto be filled

◆ getSourceConfig() [1/2]

const TCPSourceTypePtr & NES::TCPSource::getSourceConfig ( ) const

getter for source config

Returns
tcpSourceType

◆ getSourceConfig() [2/2]

const TCPSourceTypePtr& NES::TCPSource::getSourceConfig ( ) const

getter for source config

Returns
tcpSourceType

◆ getType() [1/2]

SourceType NES::TCPSource::getType ( ) const
overridevirtual

Get source type.

Returns
source type

Implements NES::DataSource.

◆ getType() [2/2]

SourceType NES::TCPSource::getType ( ) const
overridevirtual

Get source type.

Returns
source type

Implements NES::DataSource.

◆ open() [1/2]

void NES::TCPSource::open ( )
overridevirtual

opens TCP connection

Reimplemented from NES::DataSource.

References NES_THROW_RUNTIME_ERROR, NES_TRACE, and NES::DataSource::open().

Here is the call graph for this function:

◆ open() [2/2]

void NES::TCPSource::open ( )
overridevirtual

opens TCP connection

Reimplemented from NES::DataSource.

◆ popGivenNumberOfValues()

bool NES::TCPSource::popGivenNumberOfValues ( uint64_t  numberOfValuesToPop,
bool  popTextDivider 
)

pop number of values given and fill temp with popped values. If popTextDevider true, pop one more value and discard

Parameters
numberOfValuesToPopnumber of values to pop and fill temp with
popTextDividerif true, pop one more value and discard, if false, only pop given number of values to pop
Returns
true if number of values to pop successfully popped, false otherwise

◆ receiveData() [1/2]

std::optional< Runtime::TupleBuffer > NES::TCPSource::receiveData ( )
overridevirtual

override the receiveData method for the csv source

Returns
returns a buffer if available

Implements NES::DataSource.

References NES::DataSource::allocateBuffer(), fillBuffer(), NES_DEBUG, NES_ERROR, NES::DataSource::running, and toString().

Here is the call graph for this function:

◆ receiveData() [2/2]

std::optional<Runtime::TupleBuffer> NES::TCPSource::receiveData ( )
overridevirtual

override the receiveData method for the csv source

Returns
returns a buffer if available

Implements NES::DataSource.

◆ sizeUntilSearchToken()

uint64_t NES::TCPSource::sizeUntilSearchToken ( char  token)

search from the back (first inputted item) to the front for the given search token @token to search for

Returns
number of places until first occurrence of token (place of token not included)

Referenced by fillBuffer().

Here is the caller graph for this function:

◆ storePersistedProperties()

void NES::TCPSource::storePersistedProperties ( )
overridevirtual

This method stores all properties that need to be persisted for future retrieval during createOrLoadPersistedProperties call.

Reimplemented from NES::DataSource.

References NES_WARNING, NES::DataSource::persistentSource, NES::DataSource::persistentSourceKey, and NES::DataSource::queryManager.

◆ toString() [1/2]

std::string NES::TCPSource::toString ( ) const
overridevirtual

override the toString method for the csv source

Returns
returns string describing the binary source

Implements NES::DataSource.

References NES::DataSource::schema.

Referenced by receiveData().

Here is the caller graph for this function:

◆ toString() [2/2]

std::string NES::TCPSource::toString ( ) const
overridevirtual

override the toString method for the csv source

Returns
returns string describing the binary source

Implements NES::DataSource.


The documentation for this class was generated from the following files: