NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::FileSink Class Reference

The file sink writes the stream result to a text file, in CSV or JSON format. More...

#include <FileSink.hpp>

Collaboration diagram for NES::FileSink:
[legend]

Public Member Functions

 FileSink (SinkFormatPtr format, Runtime::NodeEnginePtr nodeEngine, uint32_t numOfProducers, const std::string &filePath, bool append, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, uint64_t numberOfOrigins=1)
 Create a file sink. More...
 
void setup () override
 Setup the file sink. More...
 
void shutdown () override
 Clean up the file sink. More...
 
bool writeData (Runtime::TupleBuffer &inputBuffer, Runtime::WorkerContextRef) override
 Write the contents of a tuple buffer to the file sink. More...
 
std::string toString () const override
 Return a string representation of the file sink. More...
 
SinkMediumTypes getSinkMediumType () override
 Return the type of the sink medium. More...
 
- Public Member Functions inherited from NES::SinkMedium
 SinkMedium (SinkFormatPtr sinkFormat, Runtime::NodeEnginePtr nodeEngine, uint32_t numOfProducers, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion)
 public constructor for data sink More...
 
 SinkMedium (SinkFormatPtr sinkFormat, Runtime::NodeEnginePtr nodeEngine, uint32_t numOfProducers, SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, uint64_t numberOfOrigins)
 public constructor for data sink More...
 
virtual bool writeData (Runtime::TupleBuffer &inputBuffer, Runtime::WorkerContext &workerContext)=0
 method to write a TupleBuffer More...
 
SharedQueryId getSharedQueryId () const
 get the id of the owning plan More...
 
DecomposedQueryId getParentPlanId () const
 get the subplan id of the owning plan More...
 
DecomposedQueryPlanVersion getParentPlanVersion () const
 get the subplan version of the owning plan More...
 
uint64_t getNumberOfWrittenOutBuffers ()
 debug function for testing to get number of written buffers More...
 
uint64_t getNumberOfWrittenOutTuples ()
 debug function for testing to get number of written tuples More...
 
SchemaPtr getSchemaPtr () const
 method to return the current schema of the sink More...
 
std::string getSinkFormat ()
 method to get the format as string More...
 
void reconfigure (Runtime::ReconfigurationMessage &message, Runtime::WorkerContext &context) override
 
void postReconfigurationCallback (Runtime::ReconfigurationMessage &message) override
 
OperatorId getOperatorId () const
 Get operator id. More...
 
void setMigrationFlag ()
 Sets that sink is used for state migration. More...
 
bool isForMigration () const
 Check whether this sink is used for state migration. More...
 
- Public Member Functions inherited from NES::Runtime::Reconfigurable
 ~Reconfigurable () NES_NOEXCEPT(false) override=default
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this< Reconfigurable, false >
 ~virtual_enable_shared_from_this () NES_NOEXCEPT(isNoexceptDestructible) override=default
 
std::shared_ptr< T1 > shared_from_this ()
 
std::weak_ptr< T1 > weak_from_this ()
 
- Public Member Functions inherited from NES::detail::virtual_enable_shared_from_this_base< isNoexceptDestructible >
virtual ~virtual_enable_shared_from_this_base () NES_NOEXCEPT(isNoexceptDestructible)=default
 

Protected Attributes

std::string filePath
 The output file path of the file sink. More...
 
std::ofstream outputFile
 The output file stream. More...
 
bool append {false}
 Indicate if the output should be appended to an existing file. More...
 
bool isOpen {false}
 Indicate if the file could be opened during setup. More...
 
- Protected Attributes inherited from NES::SinkMedium
SinkFormatPtr sinkFormat
 
bool schemaWritten
 
Runtime::NodeEnginePtr nodeEngine
 
std::atomic< uint32_t > activeProducers
 termination machinery More...
 
SharedQueryId sharedQueryId
 
DecomposedQueryId decomposedQueryId
 
DecomposedQueryPlanVersion decomposedQueryVersion
 
uint64_t numberOfOrigins
 
uint64_t sentBuffer {0}
 
uint64_t sentTuples {0}
 
std::recursive_mutex writeMutex
 
bool migration {false}
 

Detailed Description

The file sink writes the stream result to a text file, in CSV or JSON format.

Constructor & Destructor Documentation

◆ FileSink()

NES::FileSink::FileSink ( SinkFormatPtr  format,
Runtime::NodeEnginePtr  nodeEngine,
uint32_t  numOfProducers,
const std::string &  filePath,
bool  append,
SharedQueryId  sharedQueryId,
DecomposedQueryId  decomposedQueryId,
DecomposedQueryPlanVersion  decomposedQueryVersion,
uint64_t  numberOfOrigins = 1 
)
explicit

Create a file sink.

Parameters
nodeEngineThe node engine of the worker.
numOfProducers?
filePathName of the file to which the stream is written.
appendTrue, if the stream should be appended to an existing file. If false, an existing file is first removed.
sharedQueryId?
decomposedQueryId?
numberOfOriginsnumber of origins of a given query

Member Function Documentation

◆ getSinkMediumType()

SinkMediumTypes NES::FileSink::getSinkMediumType ( )
overridevirtual

Return the type of the sink medium.

Returns
SinkMediumTypes::FILE_SINK indicating that this is a file sink.

Implements NES::SinkMedium.

References NES::FILE_SINK.

◆ setup()

void NES::FileSink::setup ( )
overridevirtual

Setup the file sink.

This method attempts to open the file. If the file exists, it is first removed, unless append is true. If the file cannot be opened, subsequent calls to writeData will fail.

Implements NES::SinkMedium.

References append, filePath, isOpen, NES_DEBUG, NES_ERROR, outputFile, NES::SinkMedium::schemaWritten, and NES::SinkMedium::sinkFormat.

◆ shutdown()

void NES::FileSink::shutdown ( )
overridevirtual

Clean up the file sink.

This method closes the file.

Implements NES::SinkMedium.

References filePath, NES_WARNING, and outputFile.

◆ toString()

std::string NES::FileSink::toString ( ) const
overridevirtual

Return a string representation of the file sink.

Returns
A string describing the file sink.

Implements NES::SinkMedium.

References NES::SinkMedium::sinkFormat.

◆ writeData()

bool NES::FileSink::writeData ( Runtime::TupleBuffer inputBuffer,
Runtime::WorkerContextRef   
)
override

Write the contents of a tuple buffer to the file sink.

Parameters
inputBufferThe tuple buffer that should be written to the file sink.
Returns
True, if the contents of the tuple buffer could be written completely to the file sink.

References filePath, isOpen, NES_DEBUG, NES_ERROR, NES::NES_FORMAT, outputFile, NES::SinkMedium::schemaWritten, NES::SinkMedium::sinkFormat, and NES::SinkMedium::writeMutex.

Member Data Documentation

◆ append

bool NES::FileSink::append {false}
protected

Indicate if the output should be appended to an existing file.

Referenced by setup().

◆ filePath

std::string NES::FileSink::filePath
protected

The output file path of the file sink.

Referenced by setup(), shutdown(), and writeData().

◆ isOpen

bool NES::FileSink::isOpen {false}
protected

Indicate if the file could be opened during setup.

Referenced by setup(), and writeData().

◆ outputFile

std::ofstream NES::FileSink::outputFile
protected

The output file stream.

Referenced by setup(), shutdown(), and writeData().


The documentation for this class was generated from the following files: