NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::Network::NetworkSink Class Reference

This represent a sink operator that acts as a connecting API between query processing and network stack. More...

#include <NetworkSink.hpp>

Collaboration diagram for NES::Network::NetworkSink:
[legend]

Public Member Functions

 NetworkSink (const SchemaPtr &schema, OperatorId uniqueNetworkSinkDescriptorId, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, NodeLocation const &destination, NesPartition nesPartition, Runtime::NodeEnginePtr nodeEngine, size_t numOfProducers, std::chrono::milliseconds waitTime, uint8_t retryTimes, uint64_t numberOfOrigins, DecomposedQueryPlanVersion version)
 constructor for the network sink More...
 
bool writeData (Runtime::TupleBuffer &inputBuffer, Runtime::WorkerContext &workerContext) override
 Writes data to the underlying output channel. More...
 
std::string toString () const override
 
void reconfigure (Runtime::ReconfigurationMessage &task, Runtime::WorkerContext &workerContext) override
 reconfiguration machinery for the network sink More...
 
void postReconfigurationCallback (Runtime::ReconfigurationMessage &) override
 callback that will be called on the last thread the executes the reconfiguration More...
 
void setup () override
 setup method to configure the network sink via a reconfiguration More...
 
void preSetup ()
 
void shutdown () override
 Destroys the network sink. More...
 
SinkMediumTypes getSinkMediumType () override
 method to return the type of medium More...
 
OperatorId getUniqueNetworkSinkDescriptorId () const
 method to return the network sinks descriptor id More...
 
Runtime::NodeEnginePtr getNodeEngine ()
 method to return the node engine pointer More...
 
DecomposedQueryPlanVersion getVersion ()
 method to return the plan version More...
 
void configureNewSinkDescriptor (const NetworkSinkDescriptor &newNetworkSinkDescriptor)
 reconfigure this sink to point to another downstream network source More...
 
bool scheduleNewDescriptor (const NetworkSinkDescriptor &networkSinkDescriptor)
 schedule a new receiver location and new receiver partition and versio number to be set for this sink. More...
 
bool applyNextSinkDescriptor ()
 apply pending changes to the receiver location, receiver partition and version number More...
 
- Public Member Functions inherited from NES::SinkMedium
 SinkMedium (SinkFormatPtr sinkFormat, Runtime::NodeEnginePtr nodeEngine, uint32_t numOfProducers, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion)
 public constructor for data sink More...
 
 SinkMedium (SinkFormatPtr sinkFormat, Runtime::NodeEnginePtr nodeEngine, uint32_t numOfProducers, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, uint64_t numberOfOrigins)
 public constructor for data sink More...
 
SharedQueryId getSharedQueryId () const
 get the id of the owning plan More...
 
DecomposedQueryId getParentPlanId () const
 get the subplan id of the owning plan More...
 
DecomposedQueryPlanVersion getParentPlanVersion () const
 get the subplan version of the owning plan More...
 
uint64_t getNumberOfWrittenOutBuffers ()
 debug function for testing to get number of written buffers More...
 
uint64_t getNumberOfWrittenOutTuples ()
 debug function for testing to get number of written tuples More...
 
SchemaPtr getSchemaPtr () const
 method to return the current schema of the sink More...
 
std::string getSinkFormat ()
 method to get the format as string More...
 
void reconfigure (Runtime::ReconfigurationMessage &message, Runtime::WorkerContext &context) override
 
void postReconfigurationCallback (Runtime::ReconfigurationMessage &message) override
 
OperatorId getOperatorId () const
 Get operator id. More...
 
void setMigrationFlag ()
 Sets that sink is used for state migration. More...
 
bool isForMigration () const
 Check whether this sink is used for state migration. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 

Protected Member Functions

void onEvent (Runtime::BaseEvent &event) override
 This method is called once an event is triggered for the current sink. More...
 
void onEvent (Runtime::BaseEvent &event, Runtime::WorkerContextRef workerContext)
 API method called upon receiving an event. More...
 

Friends

bool operator< (const NetworkSink &lhs, const NetworkSink &rhs)
 

Additional Inherited Members

- Protected Attributes inherited from NES::SinkMedium
SinkFormatPtr sinkFormat
 
bool schemaWritten
 
Runtime::NodeEnginePtr nodeEngine
 
std::atomic< uint32_t > activeProducers
 termination machinery More...
 
SharedQueryId sharedQueryId
 
DecomposedQueryId decomposedQueryId
 
DecomposedQueryPlanVersion decomposedQueryVersion
 
uint64_t numberOfOrigins
 
uint64_t sentBuffer {0}
 
uint64_t sentTuples {0}
 
std::recursive_mutex writeMutex
 
bool migration {false}
 

Detailed Description

This represent a sink operator that acts as a connecting API between query processing and network stack.

Constructor & Destructor Documentation

◆ NetworkSink()

NES::Network::NetworkSink::NetworkSink ( const SchemaPtr schema,
OperatorId  uniqueNetworkSinkDescriptorId,
SharedQueryId  sharedQueryId,
DecomposedQueryId  decomposedQueryId,
DecomposedQueryPlanVersion  decomposedQueryVersion,
NodeLocation const &  destination,
NesPartition  nesPartition,
Runtime::NodeEnginePtr  nodeEngine,
size_t  numOfProducers,
std::chrono::milliseconds  waitTime,
uint8_t  retryTimes,
uint64_t  numberOfOrigins,
DecomposedQueryPlanVersion  version 
)
explicit

constructor for the network sink

Parameters
schema
networkManager
nodeLocation
nesPartition
versionThe initial version of this sink when it starts

References NES_ASSERT, and NES_DEBUG.

Member Function Documentation

◆ applyNextSinkDescriptor()

bool NES::Network::NetworkSink::applyNextSinkDescriptor ( )

apply pending changes to the receiver location, receiver partition and version number

Returns
true if pending changes were found and applied, false if no pending changes could be found

References configureNewSinkDescriptor().

Here is the call graph for this function:

◆ configureNewSinkDescriptor()

void NES::Network::NetworkSink::configureNewSinkDescriptor ( const NetworkSinkDescriptor &  newNetworkSinkDescriptor)

reconfigure this sink to point to another downstream network source

Parameters
newPartitionthe partition of the new downstram source
newReceiverLocationthe location of the node where the new downstream source is located
newVersionThe new version number assigned to this sink

References NES::Runtime::ConnectToNewReceiver, NES::SinkMedium::decomposedQueryId, NES_ASSERT2_FMT, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), and NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >::shared_from_this().

Referenced by applyNextSinkDescriptor().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getNodeEngine()

Runtime::NodeEnginePtr NES::Network::NetworkSink::getNodeEngine ( )

method to return the node engine pointer

Returns
node engine pointer

◆ getSinkMediumType()

SinkMediumTypes NES::Network::NetworkSink::getSinkMediumType ( )
overridevirtual

method to return the type of medium

Returns
type of medium

Implements NES::SinkMedium.

References NES::NETWORK_SINK.

◆ getUniqueNetworkSinkDescriptorId()

OperatorId NES::Network::NetworkSink::getUniqueNetworkSinkDescriptorId ( ) const

method to return the network sinks descriptor id

Returns
id

Referenced by reconfigure(), and writeData().

Here is the caller graph for this function:

◆ getVersion()

DecomposedQueryPlanVersion NES::Network::NetworkSink::getVersion ( )

method to return the plan version

Returns
decomposed plan version

◆ onEvent() [1/2]

void NES::Network::NetworkSink::onEvent ( Runtime::BaseEvent event)
overrideprotectedvirtual

This method is called once an event is triggered for the current sink.

Parameters
event

Implements NES::Runtime::RuntimeEventListener.

References NES::SinkMedium::decomposedQueryId, NES::Runtime::BaseEvent::getEventType(), NES::Runtime::kStartSourceEvent, and NES_DEBUG.

Referenced by onEvent().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ onEvent() [2/2]

void NES::Network::NetworkSink::onEvent ( Runtime::BaseEvent event,
Runtime::WorkerContextRef  workerContext 
)
protected

API method called upon receiving an event.

Note
Only calls onEvent(event)
Parameters
event
workerContext

References NES_DEBUG, and onEvent().

Here is the call graph for this function:

◆ postReconfigurationCallback()

void NES::Network::NetworkSink::postReconfigurationCallback ( Runtime::ReconfigurationMessage )
overridevirtual

callback that will be called on the last thread the executes the reconfiguration

Reimplemented from NES::Runtime::Reconfigurable.

References NES::Runtime::ConnectToNewReceiver, NES::SinkMedium::decomposedQueryId, NES::SinkMedium::decomposedQueryVersion, NES::Runtime::ReconfigurationMessage::getType(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::ReconfigurationMetadata::instanceOf(), NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, NES_NOT_IMPLEMENTED, NES_WARNING, NES::SinkMedium::postReconfigurationCallback(), and NES::Runtime::ReconfigurationMarker.

Here is the call graph for this function:

◆ preSetup()

void NES::Network::NetworkSink::preSetup ( )

References NES::SinkMedium::decomposedQueryId, NES_DEBUG, NES_WARNING, and NES::detail::virtual_enable_shared_from_this< RuntimeEventListener, false >::shared_from_this().

Here is the call graph for this function:

◆ reconfigure()

void NES::Network::NetworkSink::reconfigure ( Runtime::ReconfigurationMessage task,
Runtime::WorkerContext workerContext 
)
overridevirtual

reconfiguration machinery for the network sink

Parameters
taskdescriptor of the reconfiguration
workerContextthe thread on which this is called

Reimplemented from NES::Runtime::Reconfigurable.

References NES::Runtime::WorkerContext::abortConnectionProcess(), NES::Runtime::ConnectionEstablished, NES::Runtime::ConnectToNewReceiver, NES::Runtime::WorkerContext::createStorage(), NES::SinkMedium::decomposedQueryId, NES::SinkMedium::decomposedQueryVersion, NES::Runtime::WorkerContext::decreaseObjectRefCnt(), NES::Runtime::FailEndOfStream, NES::Runtime::Failure, NES::Runtime::NesThread::getId(), NES::Runtime::ReconfigurationMessage::getType(), getUniqueNetworkSinkDescriptorId(), NES::Runtime::ReconfigurationMessage::getUserData(), NES::Runtime::Graceful, NES::Runtime::HardEndOfStream, NES::Runtime::HardStop, NES::Runtime::Initialize, NES::Runtime::Invalid, NES::Runtime::WorkerContext::isAsyncConnectionInProgress(), backward::details::move(), NES_ASSERT, NES_ASSERT2_FMT, NES_DEBUG, NES_NOT_IMPLEMENTED, NES_THROW_RUNTIME_ERROR, NES_WARNING, NES::Network::VersionUpdate::nodeLocation, NES::Runtime::ReconfigurationMarker, NES::Runtime::reconfigurationTypeToTerminationType(), NES::SinkMedium::reconfigure(), NES::Runtime::WorkerContext::releaseNetworkChannel(), NES::Runtime::WorkerContext::setObjectRefCnt(), NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), NES::SinkMedium::sharedQueryId, NES::Runtime::SoftEndOfStream, NES::Runtime::WorkerContext::storeNetworkChannel(), NES::Runtime::WorkerContext::storeNetworkChannelFuture(), and NES::Runtime::WorkerContext::waitForAsyncConnection().

Here is the call graph for this function:

◆ scheduleNewDescriptor()

bool NES::Network::NetworkSink::scheduleNewDescriptor ( const NetworkSinkDescriptor &  networkSinkDescriptor)

schedule a new receiver location and new receiver partition and versio number to be set for this sink.

Parameters
networkSinkDescriptor: a sink descriptor containing the new location and partition which will be set as pending
Returns
false if the changes have already been applied, true otherwise

◆ setup()

void NES::Network::NetworkSink::setup ( )
overridevirtual

setup method to configure the network sink via a reconfiguration

Implements NES::SinkMedium.

References NES::SinkMedium::decomposedQueryId, NES::Runtime::Initialize, NES_DEBUG, NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >::shared_from_this(), and NES::SinkMedium::sharedQueryId.

Here is the call graph for this function:

◆ shutdown()

void NES::Network::NetworkSink::shutdown ( )
overridevirtual

Destroys the network sink.

Implements NES::SinkMedium.

References NES::SinkMedium::decomposedQueryId, NES_DEBUG, and NES::SinkMedium::sharedQueryId.

◆ toString()

std::string NES::Network::NetworkSink::toString ( ) const
overridevirtual
Returns
the string representation of the network sink

Implements NES::SinkMedium.

◆ writeData()

bool NES::Network::NetworkSink::writeData ( Runtime::TupleBuffer inputBuffer,
Runtime::WorkerContext workerContext 
)
overridevirtual

Friends And Related Function Documentation

◆ operator<

bool operator< ( const NetworkSink lhs,
const NetworkSink rhs 
)
friend

The documentation for this class was generated from the following files: