NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
this is a util class for the tests More...
Classes | |
class | TestPhaseProvider |
class | TestSinkDescriptor |
This class is used for representing the description of a test sink operator. More... | |
class | TestSinkProvider |
class | TestSourceDescriptor |
class | TestSourceProvider |
struct | CsvFileParams |
Struct for storing all csv file params for tests. It is solely a container for grouping csv files. More... | |
struct | SourceTypeConfigCSV |
struct | JoinParams |
Struct for storing all parameter for the join. More... | |
Functions | |
QueryCompilation::QueryCompilerPtr | createTestQueryCompiler (QueryCompilation::QueryCompilerOptionsPtr options=QueryCompilation::QueryCompilerOptions::createDefaultOptions()) |
utility method necessary if one wants to write a test that uses a mocked sink using a test sink descriptor More... | |
const std::string | configOption (const std::string &name, const std::string &value, bool prefix=false) |
template<typename T > | |
std::string | configOption (const std::string &name, T value, bool prefix=false) |
std::string | bufferSizeInBytes (uint64_t size, bool prefix=false) |
Creates the command line argument with a buffer size. More... | |
std::string | configPath (const std::string &filename) |
Creates the command line argument for the fileName. More... | |
std::string | workerConfigPath (const std::string &filename) |
Creates the command line argument for the worker config path. More... | |
std::string | coordinatorPort (uint64_t coordinatorPort) |
Creates the command line argument for a coordinator port. More... | |
std::string | parentId (uint64_t parentId) |
Creates the command line argument for the parent id. More... | |
std::string | numberOfSlots (uint64_t coordinatorPort, bool prefix=false) |
Creates the command line argument for the numberOfSlots. More... | |
std::string | numLocalBuffers (uint64_t localBuffers, bool prefix=false) |
Creates the command line argument for the number of local buffers. More... | |
std::string | numGlobalBuffers (uint64_t globalBuffers, bool prefix=false) |
Creates the command line argument for the number of global buffers. More... | |
std::string | rpcPort (uint64_t rpcPort) |
Creates the command line argument for the rpc port. More... | |
std::string | sourceType (SourceType sourceType) |
Creates the command line argument for the source type. More... | |
std::string | csvSourceFilePath (std::string filePath) |
Creates the command line argument for the csv source file path. More... | |
std::string | dataPort (uint64_t dataPort) |
Creates the command line argument for the data port. More... | |
std::string | numberOfTuplesToProducePerBuffer (uint64_t numberOfTuplesToProducePerBuffer) |
Creates the command line argument for the number of tuples of tuples to produce per buffer. More... | |
std::string | physicalSourceName (std::string physicalSourceName) |
Creates the command line argument for the physical source name. More... | |
std::string | logicalSourceName (std::string logicalSourceName) |
Creates the command line argument for setting the logical source name. More... | |
std::string | numberOfBuffersToProduce (uint64_t numberOfBuffersToProduce) |
Creates the command line argument for setting the number of buffers to produce. More... | |
std::string | sourceGatheringInterval (uint64_t sourceGatheringInterval) |
Creates the command line argument for setting the source gathering interval. More... | |
std::string | tcpSocketHost (std::string host) |
Enables the usage of tcp socket host. More... | |
std::string | tcpSocketPort (std::string port) |
Enables the usage of tcp socket port. More... | |
std::string | inputFormat (std::string format) |
Enables the usage of tcp socket input format. More... | |
std::string | tcpSocketPersistentSource (std::string persistentSource) |
Enables the usage of tcp socket persistent source. More... | |
std::string | tcpSocketDecidedMessageSize (std::string decidedSize) |
Enables the usage of tcp socket decided message size. More... | |
std::string | tcpSocketBufferSize (std::string bufferSize) |
Enables the usage of tcp socket decided message size. More... | |
std::string | restPort (uint64_t restPort) |
Creates the command line argument for setting the rest port. More... | |
std::string | enableDebug () |
Creates the command line argument to enable debugging. More... | |
std::string | workerHealthCheckWaitTime (uint64_t workerWaitTime) |
Creates the command line argument for setting the health check wait time for the worker. More... | |
std::string | coordinatorHealthCheckWaitTime (uint64_t coordinatorWaitTime) |
Creates the command line argument for setting the health check wait time for the coordinator. More... | |
std::string | enableMonitoring (bool prefix=false) |
Creates the command line argument if to enable monitoring. More... | |
std::string | monitoringWaitTime (uint64_t monitoringWaitTime) |
Creates the command line argument if to set monitoring wait time. More... | |
std::string | disableDistributedWindowingOptimization () |
std::string | enableNemoPlacement () |
Creates the command line argument for enabling nemo placement. More... | |
std::string | enableNemoJoin () |
Creates the command line argument for enabling nemo join. More... | |
std::string | enableMatrixJoin () |
Creates the command line argument for enabling matrix join. More... | |
std::string | setDistributedWindowChildThreshold (uint64_t val) |
Creates the command line argument for setting the threshold of the distributed window child. More... | |
std::string | setDistributedWindowCombinerThreshold (uint64_t val) |
Creates the command line argument for setting the threshold of the distributed window combiner. More... | |
std::string | enableSlicingWindowing (bool prefix=false) |
Creates the command line argument if to enable slicing windowing. More... | |
std::string | enableNautilusWorker () |
Enables the usage of Nautilus. More... | |
std::string | enableNautilusCoordinator () |
Enables the usage of Nautilus at the coordinator. More... | |
Util::Subprocess | startCoordinator (std::initializer_list< std::string > list) |
start a new instance of a nes coordinator with a set of configuration flags More... | |
Util::Subprocess | startWorker (std::initializer_list< std::string > flags) |
start a new instance of a nes worker with a set of configuration flags More... | |
std::shared_ptr< Util::Subprocess > | startWorkerPtr (std::initializer_list< std::string > flags) |
start a new instance of a nes worker with a set of configuration flags More... | |
bool | checkCompleteOrTimeout (const Runtime::NodeEnginePtr &ptr, QueryId queryId, uint64_t expectedResult) |
method to check the produced buffers and tasks for n seconds and either return true or timeout More... | |
bool | waitForQueryToStart (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeoutInSec=std::chrono::seconds(defaultStartQueryTimeout)) |
This method is used for waiting till the query gets into running status or a timeout occurs. More... | |
template<typename Predicate = std::equal_to<uint64_t>> | |
bool | checkCompleteOrTimeout (const NesWorkerPtr &nesWorker, QueryId queryId, const GlobalQueryPlanPtr &globalQueryPlan, uint64_t expectedResult) |
method to check the produced buffers and tasks for n seconds and either return true or timeout More... | |
template<typename Predicate = std::equal_to<uint64_t>> | |
bool | checkCompleteOrTimeout (const NesCoordinatorPtr &nesCoordinator, QueryId queryId, const GlobalQueryPlanPtr &globalQueryPlan, uint64_t expectedResult, bool minOneProcessedTask=false, std::chrono::seconds timeoutSeconds=defaultTimeout) |
method to check the produced buffers and tasks for n seconds and either return true or timeout More... | |
bool | checkRemovedDecomposedQueryOrTimeoutAtWorker (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, NesWorkerPtr worker, std::chrono::seconds timeout=defaultTimeout) |
bool | checkStoppedOrTimeout (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeout=defaultTimeout) |
Check if the query is been stopped successfully within the timeout. More... | |
bool | checkStoppedOrTimeoutAtWorker (SharedQueryId sharedQueryId, NesWorkerPtr worker, std::chrono::seconds timeout=defaultTimeout) |
Check if the query is been stopped successfully within the timeout. More... | |
bool | checkFailedOrTimeout (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeout=defaultTimeout) |
Check if the query has failed within the timeout. More... | |
bool | checkOutputOrTimeout (string expectedContent, const string &outputFilePath, uint64_t customTimeoutInSeconds=0) |
Check if the query result was produced. More... | |
bool | checkIfOutputFileIsNotEmtpy (uint64_t minNumberOfLines, const string &outputFilePath, uint64_t customTimeout=0) |
Check if any query result was produced. More... | |
bool | checkOutputContentLengthOrTimeout (QueryId queryId, Catalogs::Query::QueryCatalogPtr queryCatalog, uint64_t numberOfRecordsToExpect, const string &outputFilePath, auto testTimeout=defaultTimeout) |
Check if the query result was produced. More... | |
bool | checkFileCreationOrTimeout (const string &outputFilePath) |
Check if a outputfile is created. More... | |
bool | checkRESTServerStartedOrTimeout (uint64_t restPort, uint64_t customTimeout=0) |
Check if Coordinator REST API is available or timeout. More... | |
bool | checkCompleteOrTimeout (QueryId queryId, uint64_t expectedNumberBuffers, const std::string &restPort="8081") |
This method is used for checking if the submitted query produced the expected result within the timeout. More... | |
bool | checkRunningOrTimeout (QueryId queryId, const std::string &restPort="8081") |
This method is used for checking if the submitted query is running. More... | |
bool | stopQueryViaRest (QueryId queryId, const std::string &restPort="8081") |
This method is used for stop a query. More... | |
nlohmann::json | getExecutionPlan (QueryId queryId, const std::string &restPort) |
This method is used for getting the execution plan via REST. More... | |
nlohmann::json | startQueryViaRest (const string &queryString, const std::string &restPort="8081") |
This method is used for executing a query. More... | |
nlohmann::json | addSourceStatistics (const string &queryString, const std::string &restPort="8081") |
This method is used for adding source statistics. More... | |
nlohmann::json | makeMonitoringRestCall (const string &restCall, const std::string &restPort="8081") |
This method is used for making a monitoring rest call. More... | |
bool | addLogicalSource (const string &schemaString, const std::string &restPort="8081") |
This method is used adding a logical source. More... | |
bool | waitForWorkers (uint64_t restPort, uint16_t maxTimeout, uint16_t expectedWorkers) |
nlohmann::json | getTopology (uint64_t restPort) |
This method is used for making a REST call to coordinator to get the topology as Json. More... | |
std::vector< Runtime::TupleBuffer > | createExpectedBuffersFromCsv (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, uint64_t numTuplesPerBuffer) |
Creates the expected buffers from the csv file. More... | |
std::vector< Runtime::TupleBuffer > | createExpectedBuffersFromCsv (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",") |
Creates the expected buffers from the csv file. More... | |
std::vector< Runtime::TupleBuffer > | createExpectedBufferFromStream (std::istream &istream, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",") |
Fills the buffer from a stream. More... | |
std::vector< Runtime::TupleBuffer > | createExpectedBuffersFromCsvSpecificLines (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, const int fromLine, const int toLine, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",") |
std::vector< Runtime::TupleBuffer > | createExpectedBufferFromStreamSpecificLines (std::istream &istream, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, const int fromLine, const int toLine, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",") |
std::vector< Runtime::TupleBuffer > | createExpectedBufferFromCSVString (std::string str, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",") |
Fills the buffer from a stream. More... | |
uint64_t | countTuples (std::vector< Runtime::TupleBuffer > &buffers) |
Counts the tuple in all buffers. More... | |
uint64_t | countTuples (std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &buffers) |
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > | createTestTupleBuffers (std::vector< Runtime::TupleBuffer > &buffers, const SchemaPtr &schema) |
Converts all of the tuple buffers to dynamic tuple buffers. More... | |
bool | buffersContainSameTuples (std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &expectedBuffers, std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &actualBuffers, bool orderSensitive=false) |
Compares if leftBuffers contain the same tuples as rightBuffers. More... | |
template<typename T > | |
std::vector< T > | createVecFromPointer (T *startPtr, T *endPtr) |
Creates a vector for the memory [startPtr, endPtr]. More... | |
template<typename T > | |
std::vector< T > | createVecFromPointer (T *startPtr, uint64_t numItems) |
Creates a vector for the memory [startPtr, startPtr + numItems]. More... | |
template<typename T > | |
std::vector< T > | createVecFromTupleBuffer (Runtime::TupleBuffer buffer) |
Creates a vector for the memory that this tupleBuffer is responsible for. More... | |
CSVSourceTypePtr | createSourceTypeCSV (const SourceTypeConfigCSV &sourceTypeConfigCSV) |
Creates a csv source that produces as many buffers as the csv file contains. More... | |
std::vector< PhysicalTypePtr > | getPhysicalTypes (const SchemaPtr &schema) |
bool | checkCompleteOrTimeout (const Runtime::NodeEnginePtr &ptr, SharedQueryId sharedQueryId, uint64_t expectedResult) |
method to check the produced buffers and tasks for n seconds and either return true or timeout More... | |
this is a util class for the tests
bool NES::TestUtils::addLogicalSource | ( | const string & | schemaString, |
const std::string & | restPort = "8081" |
||
) |
This method is used adding a logical source.
query | string |
References NES_DEBUG, and restPort().
Referenced by NES::TEST_F().
nlohmann::json NES::TestUtils::addSourceStatistics | ( | const string & | queryString, |
const std::string & | restPort = "8081" |
||
) |
This method is used for adding source statistics.
This method is used for adding statistics to a source.
query | string |
References NES_DEBUG, and restPort().
Referenced by NES::TEST_F().
bool NES::TestUtils::buffersContainSameTuples | ( | std::vector< Runtime::MemoryLayouts::TestTupleBuffer > & | expectedBuffers, |
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > & | actualBuffers, | ||
bool | orderSensitive = false |
||
) |
Compares if leftBuffers contain the same tuples as rightBuffers.
expectedBuffers | |
actualBuffers | |
orderSensitive | If set to true, the order is taken into account |
References NES_ERROR.
Referenced by NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::FileSinkIntegrationTest::runQueryAndVerifyExpectedResults(), NES::TEST_F(), and TEST_P().
std::string NES::TestUtils::bufferSizeInBytes | ( | uint64_t | size, |
bool | prefix = false |
||
) |
Creates the command line argument with a buffer size.
size | |
prefix |
References configOption(), and size().
Referenced by NES::TEST_F().
bool NES::TestUtils::checkCompleteOrTimeout | ( | const NesCoordinatorPtr & | nesCoordinator, |
QueryId | queryId, | ||
const GlobalQueryPlanPtr & | globalQueryPlan, | ||
uint64_t | expectedResult, | ||
bool | minOneProcessedTask = false , |
||
std::chrono::seconds | timeoutSeconds = defaultTimeout |
||
) |
method to check the produced buffers and tasks for n seconds and either return true or timeout
nesCoordinator | to NesCoordinator |
queryId | |
queryCatalog | |
expectedResult |
bool NES::TestUtils::checkCompleteOrTimeout | ( | const NesWorkerPtr & | nesWorker, |
QueryId | queryId, | ||
const GlobalQueryPlanPtr & | globalQueryPlan, | ||
uint64_t | expectedResult | ||
) |
method to check the produced buffers and tasks for n seconds and either return true or timeout
nesWorker | to NesWorker |
queryId | |
queryCatalog | |
expectedResult |
bool NES::TestUtils::checkCompleteOrTimeout | ( | const Runtime::NodeEnginePtr & | ptr, |
QueryId | queryId, | ||
uint64_t | expectedResult | ||
) |
method to check the produced buffers and tasks for n seconds and either return true or timeout
ptr | to Runtime |
queryId | |
expectedResult |
Referenced by NES::MonitoringQueriesTest::runMetricsQueryTest(), and NES::TEST_F().
bool NES::TestUtils::checkCompleteOrTimeout | ( | const Runtime::NodeEnginePtr & | ptr, |
SharedQueryId | sharedQueryId, | ||
uint64_t | expectedResult | ||
) |
bool NES::TestUtils::checkCompleteOrTimeout | ( | QueryId | queryId, |
uint64_t | expectedNumberBuffers, | ||
const std::string & | restPort = "8081" |
||
) |
This method is used for checking if the submitted query produced the expected result within the timeout.
queryId | Id of the query |
expectedNumberBuffers | The expected value |
References NES_DEBUG, restPort(), and NES::NESStrongType< T, Tag, invalid, initial >::toString().
bool NES::TestUtils::checkFailedOrTimeout | ( | QueryId | queryId, |
const Catalogs::Query::QueryCatalogPtr & | queryCatalog, | ||
std::chrono::seconds | timeout = defaultTimeout |
||
) |
Check if the query has failed within the timeout.
queryId | Id of the query to be stopped |
queryCatalog | the catalog containig the queries in the system |
References magic_enum::enum_name(), NES::FAILED, NES_DEBUG, NES_TRACE, NES_WARNING, and NES::timeout.
Referenced by NES::TestHarness::checkFailedOrTimeout(), and NES::TEST_F().
bool NES::TestUtils::checkFileCreationOrTimeout | ( | const string & | outputFilePath | ) |
Check if a outputfile is created.
expectedContent | |
outputFilePath |
References NES_TRACE.
Referenced by NES::TEST_F().
bool NES::TestUtils::checkIfOutputFileIsNotEmtpy | ( | uint64_t | minNumberOfLines, |
const string & | outputFilePath, | ||
uint64_t | customTimeout = 0 |
||
) |
Check if any query result was produced.
outputFilePath |
References NES_ERROR, and NES_TRACE.
Referenced by NES::TEST_F().
bool NES::TestUtils::checkOutputContentLengthOrTimeout | ( | QueryId | queryId, |
Catalogs::Query::QueryCatalogPtr | queryCatalog, | ||
uint64_t | numberOfRecordsToExpect, | ||
const string & | outputFilePath, | ||
auto | testTimeout = defaultTimeout |
||
) |
Check if the query result was produced.
queryId | |
queryCatalogService | |
numberOfRecordsToExpect | |
outputFilePath | |
testTimeout |
Referenced by NES::TestHarness::runQuery().
bool NES::TestUtils::checkOutputOrTimeout | ( | string | expectedContent, |
const string & | outputFilePath, | ||
uint64_t | customTimeoutInSeconds = 0 |
||
) |
Check if the query result was produced.
expectedContent | |
outputFilePath |
expectedContent | |
outputFilePath | |
customTimeoutInSeconds |
References NES_ERROR, and NES_TRACE.
Referenced by NES::TEST_F(), and NES::TEST_P().
bool NES::TestUtils::checkRemovedDecomposedQueryOrTimeoutAtWorker | ( | DecomposedQueryId | decomposedQueryId, |
DecomposedQueryPlanVersion | decomposedQueryVersion, | ||
NesWorkerPtr | worker, | ||
std::chrono::seconds | timeout = defaultTimeout |
||
) |
References NES::Runtime::Execution::Created, NES::Runtime::Execution::Deployed, NES::Runtime::Execution::ErrorState, NES::Runtime::Execution::Finished, NES::Runtime::Execution::Invalid, NES_DEBUG, NES_TRACE, NES::Runtime::Execution::Running, NES::Runtime::Execution::Stopped, NES::timeout, and worker.
bool NES::TestUtils::checkRESTServerStartedOrTimeout | ( | uint64_t | restPort, |
uint64_t | customTimeout = 0 |
||
) |
Check if Coordinator REST API is available or timeout.
expectedContent | |
outputFilePath |
References NES_INFO, NES_TRACE, and restPort().
Referenced by NES::TEST_F().
bool NES::TestUtils::checkRunningOrTimeout | ( | QueryId | queryId, |
const std::string & | restPort = "8081" |
||
) |
This method is used for checking if the submitted query is running.
queryId | Id of the query |
References NES_DEBUG, restPort(), and NES::NESStrongType< T, Tag, invalid, initial >::toString().
Referenced by NES::TEST_F(), and NES::MonitoringControllerTest::waitForMonitoringQuery().
bool NES::TestUtils::checkStoppedOrTimeout | ( | QueryId | queryId, |
const Catalogs::Query::QueryCatalogPtr & | queryCatalog, | ||
std::chrono::seconds | timeout = defaultTimeout |
||
) |
Check if the query is been stopped successfully within the timeout.
queryId | Id of the query to be stopped |
queryCatalog | the catalog containig the queries in the system |
References magic_enum::enum_name(), NES_DEBUG, NES_TRACE, NES_WARNING, NES::STOPPED, and NES::timeout.
Referenced by NES::MonitoringQueriesTest::runMetricsQueryTest(), NES::TestHarness::runQuery(), and NES::TEST_F().
bool NES::TestUtils::checkStoppedOrTimeoutAtWorker | ( | SharedQueryId | sharedQueryId, |
NesWorkerPtr | worker, | ||
std::chrono::seconds | timeout = defaultTimeout |
||
) |
Check if the query is been stopped successfully within the timeout.
sharedQueryId | Id of the query to be stopped |
worker | the worker which the query runs on |
References NES::Runtime::Execution::Created, NES::Runtime::Execution::Deployed, NES::Runtime::Execution::ErrorState, NES::Runtime::Execution::Finished, NES::Runtime::Execution::Invalid, NES_DEBUG, NES_TRACE, NES::Runtime::Execution::Running, NES::Runtime::Execution::Stopped, NES::timeout, and worker.
Referenced by NES::TEST_F().
const std::string NES::TestUtils::configOption | ( | const std::string & | name, |
const std::string & | value, | ||
bool | prefix = false |
||
) |
Create a command line parameter for a configuration option for the coordinator or worker.
name | The name of the command line option. |
value | The value of the command line option. |
prefix | If true, prefix the name of the option with "worker." to configure the internal worker of the coordinator. |
References magic_enum::detail::value().
Referenced by bufferSizeInBytes(), enableMonitoring(), enableSlicingWindowing(), numberOfSlots(), numGlobalBuffers(), and numLocalBuffers().
std::string NES::TestUtils::configOption | ( | const std::string & | name, |
T | value, | ||
bool | prefix = false |
||
) |
Create a command line parameter for a configuration option for the coordinator or worker.
name | The name of the command line option. |
value | The value of the command line option. |
prefix | If true, prefix the name of the option with "worker." to configure the internal worker of the coordinator. |
References magic_enum::detail::value().
std::string NES::TestUtils::configPath | ( | const std::string & | filename | ) |
Creates the command line argument for the fileName.
filename |
Referenced by NES::TEST_F().
std::string NES::TestUtils::coordinatorHealthCheckWaitTime | ( | uint64_t | coordinatorWaitTime | ) |
Creates the command line argument for setting the health check wait time for the coordinator.
coordinatorWaitTime |
std::string NES::TestUtils::coordinatorPort | ( | uint64_t | coordinatorPort | ) |
Creates the command line argument for a coordinator port.
coordinatorPort |
Referenced by numberOfSlots(), and NES::TEST_F().
uint64_t NES::TestUtils::countTuples | ( | std::vector< Runtime::MemoryLayouts::TestTupleBuffer > & | buffers | ) |
uint64_t NES::TestUtils::countTuples | ( | std::vector< Runtime::TupleBuffer > & | buffers | ) |
Counts the tuple in all buffers.
buffers |
Referenced by NES::TEST_F(), and NES::TestSink::writeData().
std::vector<Runtime::TupleBuffer> NES::TestUtils::createExpectedBufferFromCSVString | ( | std::string | str, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
bool | skipHeader = false , |
||
uint64_t | numTuplesPerBuffer = 0 , |
||
const std::string & | delimiter = "," |
||
) |
Fills the buffer from a stream.
str | |
schema | |
bufferManager | |
skipHeader | |
numTuplesPerBuffer | |
delimiter |
Referenced by NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::FileSinkIntegrationTest::runQueryAndVerifyExpectedResults(), and NES::TEST_F().
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBufferFromStream | ( | std::istream & | istream, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
bool | skipHeader = false , |
||
uint64_t | numTuplesPerBuffer = 0 , |
||
const std::string & | delimiter = "," |
||
) |
Fills the buffer from a stream.
csvFileName | |
schema | |
bufferManager | |
skipHeader | |
numTuplesPerBuffer | |
delimiter |
References NES::Runtime::MemoryLayouts::TestTupleBuffer::createTestTupleBuffer(), getPhysicalTypes(), and NES_DEBUG.
Referenced by createExpectedBuffersFromCsv(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBufferFromStreamSpecificLines | ( | std::istream & | istream, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
const int | fromLine, | ||
const int | toLine, | ||
bool | skipHeader = false , |
||
uint64_t | numTuplesPerBuffer = 0 , |
||
const std::string & | delimiter = "," |
||
) |
References NES::Runtime::MemoryLayouts::TestTupleBuffer::createTestTupleBuffer(), getPhysicalTypes(), and NES_DEBUG.
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBuffersFromCsv | ( | const std::string & | csvFileName, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
bool | skipHeader = false , |
||
uint64_t | numTuplesPerBuffer = 0 , |
||
const std::string & | delimiter = "," |
||
) |
Creates the expected buffers from the csv file.
csvFileName | |
schema | |
bufferManager | |
skipHeader | |
numTuplesPerBuffer | |
delimiter |
References createExpectedBufferFromStream(), NES_ASSERT2_FMT, and NES_DEBUG.
std::vector<Runtime::TupleBuffer> NES::TestUtils::createExpectedBuffersFromCsv | ( | const std::string & | csvFileName, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
uint64_t | numTuplesPerBuffer | ||
) |
Creates the expected buffers from the csv file.
csvFileName | |
schema | |
bufferManager | |
numTuplesPerBuffer |
Referenced by NES::TestHarness::getOutput(), NES::MultiThreadedTest::runQuery(), NES::JoinMultiThreadedTest::runQueryAndPrintMissingRecords(), NES::Runtime::Execution::StreamJoinQueryExecutionTest::runQueryWithCsvFiles(), NES::Runtime::Execution::StreamIntervalJoinQueryExecutionTest::runQueryWithCsvFiles(), NES::Runtime::Execution::StreamJoinQueryExecutionTest::runQueryWithCsvFilesWithoutStoppingQuery(), NES::Runtime::Execution::StreamIntervalJoinQueryExecutionTest::runSingleJoinQuery(), NES::Runtime::Execution::StreamJoinQueryExecutionTest::runSingleJoinQuery(), NES::TEST_F(), and NES::Runtime::Execution::TEST_P().
std::vector<Runtime::TupleBuffer> NES::TestUtils::createExpectedBuffersFromCsvSpecificLines | ( | const std::string & | csvFileName, |
const SchemaPtr & | schema, | ||
const Runtime::BufferManagerPtr & | bufferManager, | ||
const int | fromLine, | ||
const int | toLine, | ||
bool | skipHeader = false , |
||
uint64_t | numTuplesPerBuffer = 0 , |
||
const std::string & | delimiter = "," |
||
) |
CSVSourceTypePtr NES::TestUtils::createSourceTypeCSV | ( | const SourceTypeConfigCSV & | sourceTypeConfigCSV | ) |
Creates a csv source that produces as many buffers as the csv file contains.
SourceTypeConfig | container for configuration parameters of a source type. |
References NES::TestUtils::SourceTypeConfigCSV::fileName, NES::TestUtils::SourceTypeConfigCSV::gatheringInterval, NES::TestUtils::SourceTypeConfigCSV::isSkipHeader, NES::TestUtils::SourceTypeConfigCSV::logicalSourceName, NES::TestUtils::SourceTypeConfigCSV::numberOfBuffersToProduce, NES::TestUtils::SourceTypeConfigCSV::numberOfTuplesToProduce, NES::TestUtils::SourceTypeConfigCSV::physicalSourceName, and sourceType().
Referenced by NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), and NES::UnionDeploymentTest::SetUp().
|
inline |
utility method necessary if one wants to write a test that uses a mocked sink using a test sink descriptor
References NES::Compiler::CPPCompiler::create(), and NES::OperatorHandlerStore::create().
Referenced by TEST_F().
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > NES::TestUtils::createTestTupleBuffers | ( | std::vector< Runtime::TupleBuffer > & | buffers, |
const SchemaPtr & | schema | ||
) |
Converts all of the tuple buffers to dynamic tuple buffers.
buffers | |
schema |
References NES::Runtime::MemoryLayouts::TestTupleBuffer::createTestTupleBuffer().
Referenced by NES::TestHarness::getOutput(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::FileSinkIntegrationTest::runQueryAndVerifyExpectedResults(), and NES::TEST_F().
|
inline |
Creates a vector for the memory [startPtr, endPtr].
T |
startPtr | |
endPtr |
|
inline |
Creates a vector for the memory [startPtr, startPtr + numItems].
T |
startPtr | |
numItems |
|
inline |
Creates a vector for the memory that this tupleBuffer is responsible for.
T |
startPtr | |
numItems |
References NES::Runtime::TupleBuffer::getBuffer(), and NES::Runtime::TupleBuffer::getNumberOfTuples().
std::string NES::TestUtils::csvSourceFilePath | ( | std::string | filePath | ) |
Creates the command line argument for the csv source file path.
filePath |
Referenced by NES::TEST_F().
std::string NES::TestUtils::dataPort | ( | uint64_t | dataPort | ) |
Creates the command line argument for the data port.
dataPort |
Referenced by NES::CoordinatorRPCServer::RegisterWorker(), NES::NodeEngineTest::SetUp(), NES::NodeEngineTest::TearDown(), NES::TEST_F(), and NES::RequestProcessor::TEST_F().
std::string NES::TestUtils::disableDistributedWindowingOptimization | ( | ) |
std::string NES::TestUtils::enableDebug | ( | ) |
Creates the command line argument to enable debugging.
Referenced by NES::TEST_F().
std::string NES::TestUtils::enableMatrixJoin | ( | ) |
Creates the command line argument for enabling matrix join.
std::string NES::TestUtils::enableMonitoring | ( | bool | prefix = false | ) |
Creates the command line argument if to enable monitoring.
prefix |
References configOption().
Referenced by NES::TEST_F().
std::string NES::TestUtils::enableNautilusCoordinator | ( | ) |
Enables the usage of Nautilus at the coordinator.
Referenced by NES::TEST_F().
std::string NES::TestUtils::enableNautilusWorker | ( | ) |
Enables the usage of Nautilus.
Referenced by NES::TEST_F().
std::string NES::TestUtils::enableNemoJoin | ( | ) |
Creates the command line argument for enabling nemo join.
Referenced by NES::TEST_F().
std::string NES::TestUtils::enableNemoPlacement | ( | ) |
Creates the command line argument for enabling nemo placement.
std::string NES::TestUtils::enableSlicingWindowing | ( | bool | prefix = false | ) |
Creates the command line argument if to enable slicing windowing.
prefix |
References configOption().
nlohmann::json NES::TestUtils::getExecutionPlan | ( | QueryId | queryId, |
const std::string & | restPort | ||
) |
This method is used for getting the execution plan via REST.
This method is used for stop a query.
rest | port string |
queryId | Id of the query |
References NES_DEBUG, restPort(), and NES::NESStrongType< T, Tag, invalid, initial >::toString().
std::vector< PhysicalTypePtr > NES::TestUtils::getPhysicalTypes | ( | const SchemaPtr & | schema | ) |
References NES::DefaultPhysicalTypeFactory::getPhysicalType().
Referenced by NES::Util::createBuffersFromCSVFile(), createExpectedBufferFromStream(), and createExpectedBufferFromStreamSpecificLines().
nlohmann::json NES::TestUtils::getTopology | ( | uint64_t | restPort | ) |
This method is used for making a REST call to coordinator to get the topology as Json.
1 | the rest port |
References NES_INFO, and restPort().
Referenced by NES::TEST_F().
std::string NES::TestUtils::inputFormat | ( | std::string | format | ) |
Enables the usage of tcp socket input format.
Referenced by NES::TEST_F().
std::string NES::TestUtils::logicalSourceName | ( | std::string | logicalSourceName | ) |
Creates the command line argument for setting the logical source name.
logicalSourceName |
Referenced by NES::TestHarness::addLogicalSource(), NES::QueryReconfigurationTest::addLogicalSourceAndCreatePhysicalSourceType(), NES::TestHarness::attachWorkerWithMemorySourceToCoordinator(), NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId(), NES::SemanticQueryValidationTest::CallValidation(), NES::TestHarness::checkAndAddLogicalSources(), NES::RequestProcessor::AddKeyDistributionEntryEvent::create(), NES::TestHarness::createPhysicalSourceOfLambdaType(), NES::TestHarness::createPhysicalSourceOfMemoryType(), NES::createSimpleInputStream(), NES::Statistic::DefaultStatisticQueryGenerator::createStatisticQuery(), NES::REST::Controller::SourceCatalogController::ENDPOINT(), NES::RequestProcessor::AddKeyDistributionEntryEvent::getLogicalSourceName(), NES::RequestProcessor::RemoveLogicalSourceEvent::getLogicalSourceName(), NES::RequestProcessor::RemovePhysicalSourceEvent::getLogicalSourceName(), NES::Client::RemoteClient::getPhysicalSources(), NES::DataGeneratorMultiKey::getSource(), NES::DataGenerator::getSource(), NES::DataGeneratorMultiValue::getSource(), NES::RequestHandlerService::queueAddKeyDistributionEntryRequest(), NES::RequestHandlerService::queueRegisterLogicalSourceRequest(), NES::RequestHandlerService::queueUnregisterLogicalSourceRequest(), NES::RequestHandlerService::queueUnregisterPhysicalSourceRequest(), NES::RequestHandlerService::queueUpdateLogicalSourceRequest(), NES::Monitoring::MonitoringManager::registerLogicalMonitoringStreams(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::QueryReconfigurationTest::startWorkerWithLambdaSource(), NES::TEST_F(), NES::RequestProcessor::TEST_F(), TEST_F(), and NES::TEST_P().
nlohmann::json NES::TestUtils::makeMonitoringRestCall | ( | const string & | restCall, |
const std::string & | restPort = "8081" |
||
) |
This method is used for making a monitoring rest call.
1 | the rest call |
2 | the rest port |
References NES_DEBUG, and restPort().
Referenced by NES::TEST_F(), and NES::MetricValidator::waitForMonitoringStreamsOrTimeout().
std::string NES::TestUtils::monitoringWaitTime | ( | uint64_t | monitoringWaitTime | ) |
Creates the command line argument if to set monitoring wait time.
prefix |
std::string NES::TestUtils::numberOfBuffersToProduce | ( | uint64_t | numberOfBuffersToProduce | ) |
Creates the command line argument for setting the number of buffers to produce.
numberOfBuffersToProduce |
Referenced by NES::createDefaultDataSourceWithSchemaForVarBuffers(), NES::createLambdaSource(), NES::StatisticsIntegrationTest::createWorker(), NES::StatisticsIntegrationTest::SetUp(), and NES::TEST_F().
std::string NES::TestUtils::numberOfSlots | ( | uint64_t | coordinatorPort, |
bool | prefix = false |
||
) |
Creates the command line argument for the numberOfSlots.
coordinatorPort | |
prefix |
References configOption(), and coordinatorPort().
Referenced by NES::RequestProcessor::TEST_F().
std::string NES::TestUtils::numberOfTuplesToProducePerBuffer | ( | uint64_t | numberOfTuplesToProducePerBuffer | ) |
Creates the command line argument for the number of tuples of tuples to produce per buffer.
numberOfTuplesToProducePerBuffer |
Referenced by NES::TEST_F().
std::string NES::TestUtils::numGlobalBuffers | ( | uint64_t | globalBuffers, |
bool | prefix = false |
||
) |
Creates the command line argument for the number of global buffers.
globalBuffers | |
prefix |
References configOption().
std::string NES::TestUtils::numLocalBuffers | ( | uint64_t | localBuffers, |
bool | prefix = false |
||
) |
Creates the command line argument for the number of local buffers.
localBuffers | |
prefix |
References configOption().
std::string NES::TestUtils::parentId | ( | uint64_t | parentId | ) |
Creates the command line argument for the parent id.
parentId |
Referenced by NES::TestHarness::attachWorkerToWorkerWithId(), NES::TestHarness::attachWorkerWithCSVSourceToWorkerWithId(), NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId(), NES::REST::Controller::TopologyController::ENDPOINT(), NES::CoordinatorRPCServer::GetParents(), and NES::TEST_F().
std::string NES::TestUtils::physicalSourceName | ( | std::string | physicalSourceName | ) |
Creates the command line argument for the physical source name.
physicalSourceName |
Referenced by NES::QueryReconfigurationTest::addLogicalSourceAndCreatePhysicalSourceType(), NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId(), NES::RequestProcessor::AddKeyDistributionEntryEvent::create(), NES::createBenchmarkSource(), NES::createBinaryFileSource(), NES::createCSVFileSource(), NES::DistributedMatrixJoinIntegrationTest::createCSVSourceType(), NES::DistributedNemoJoinIntegrationTest::createCSVSourceType(), NES::createDefaultDataSourceWithSchemaForOneBuffer(), NES::createDefaultDataSourceWithSchemaForVarBuffers(), NES::createDefaultSourceWithoutSchemaForOneBuffer(), NES::createLambdaSource(), NES::createMemorySource(), NES::createMonitoringSource(), NES::createNetworkSource(), NES::createSenseSource(), NES::createSimpleInputStream(), NES::Experimental::createStaticDataSource(), NES::createTCPSource(), NES::StatisticsIntegrationTest::createWorker(), NES::createZmqSource(), NES::RequestProcessor::AddKeyDistributionEntryEvent::getPhysicalSourceName(), NES::RequestProcessor::RemovePhysicalSourceEvent::getPhysicalSourceName(), NES::DataGeneratorMultiKey::getSource(), NES::DataGenerator::getSource(), NES::DataGeneratorMultiValue::getSource(), NES::RequestHandlerService::queueAddKeyDistributionEntryRequest(), NES::RequestHandlerService::queueUnregisterPhysicalSourceRequest(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::QueryReconfigurationTest::startWorker(), NES::QueryReconfigurationTest::startWorkerWithLambdaSource(), NES::TEST_F(), and TEST_F().
std::string NES::TestUtils::restPort | ( | uint64_t | restPort | ) |
Creates the command line argument for setting the rest port.
restPort |
Referenced by addLogicalSource(), addSourceStatistics(), checkCompleteOrTimeout(), checkRESTServerStartedOrTimeout(), checkRunningOrTimeout(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), getExecutionPlan(), getTopology(), makeMonitoringRestCall(), NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery(), NES::RemoteClientTest::SetUp(), NES::FilterPushDownTest::SetUp(), NES::MillisecondIntervalTest::SetUp(), NES::QueryReconfigurationTest::startCoordinator(), NES::SourceCatalogControllerTest::startCoordinator(), NES::UDFCatalogControllerTest::startCoordinator(), startQueryViaRest(), stopQueryViaRest(), NES::TEST_F(), NES::Spatial::TEST_F(), NES::RequestProcessor::TEST_F(), NES::TEST_P(), NES::MetricValidator::waitForMonitoringStreamsOrTimeout(), and waitForWorkers().
std::string NES::TestUtils::rpcPort | ( | uint64_t | rpcPort | ) |
Creates the command line argument for the rpc port.
rpcPort |
Referenced by NES::TEST_F().
std::string NES::TestUtils::setDistributedWindowChildThreshold | ( | uint64_t | val | ) |
Creates the command line argument for setting the threshold of the distributed window child.
val |
std::string NES::TestUtils::setDistributedWindowCombinerThreshold | ( | uint64_t | val | ) |
Creates the command line argument for setting the threshold of the distributed window combiner.
val |
std::string NES::TestUtils::sourceGatheringInterval | ( | uint64_t | sourceGatheringInterval | ) |
Creates the command line argument for setting the source gathering interval.
sourceGatheringInterval |
Referenced by NES::TEST_F().
std::string NES::TestUtils::sourceType | ( | SourceType | sourceType | ) |
Creates the command line argument for the source type.
sourceType |
References magic_enum::enum_name().
Referenced by NES::Monitoring::MonitoringAgent::addMonitoringStreams(), createSourceTypeCSV(), NES::MonitoringQueriesTest::runMetricsQueryTest(), NES::QueryReconfigurationTest::startWorker(), and NES::TEST_F().
Util::Subprocess NES::TestUtils::startCoordinator | ( | std::initializer_list< std::string > | list | ) |
start a new instance of a nes coordinator with a set of configuration flags
flags |
References NES_ERROR, and NES_INFO.
Referenced by NES::TEST_F().
nlohmann::json NES::TestUtils::startQueryViaRest | ( | const string & | queryString, |
const std::string & | restPort = "8081" |
||
) |
This method is used for executing a query.
query | string |
References NES_DEBUG, and restPort().
Referenced by NES::TEST_F().
Util::Subprocess NES::TestUtils::startWorker | ( | std::initializer_list< std::string > | flags | ) |
start a new instance of a nes worker with a set of configuration flags
flags |
References NES_ERROR, and NES_INFO.
Referenced by NES::QueryReconfigurationTest::startWorkerAsChildOf(), and NES::TEST_F().
std::shared_ptr< Util::Subprocess > NES::TestUtils::startWorkerPtr | ( | std::initializer_list< std::string > | flags | ) |
start a new instance of a nes worker with a set of configuration flags
flags |
References NES_INFO.
bool NES::TestUtils::stopQueryViaRest | ( | QueryId | queryId, |
const std::string & | restPort = "8081" |
||
) |
This method is used for stop a query.
queryId | Id of the query |
References NES_DEBUG, restPort(), and NES::NESStrongType< T, Tag, invalid, initial >::toString().
std::string NES::TestUtils::tcpSocketBufferSize | ( | std::string | bufferSize | ) |
Enables the usage of tcp socket decided message size.
References NES::bufferSize.
Referenced by NES::TEST_F().
std::string NES::TestUtils::tcpSocketDecidedMessageSize | ( | std::string | decidedSize | ) |
Enables the usage of tcp socket decided message size.
Referenced by NES::TEST_F().
std::string NES::TestUtils::tcpSocketHost | ( | std::string | host | ) |
Enables the usage of tcp socket host.
References host.
Referenced by NES::TEST_F().
std::string NES::TestUtils::tcpSocketPersistentSource | ( | std::string | persistentSource | ) |
Enables the usage of tcp socket persistent source.
Referenced by NES::TEST_F().
std::string NES::TestUtils::tcpSocketPort | ( | std::string | port | ) |
Enables the usage of tcp socket port.
Referenced by NES::TEST_F().
bool NES::TestUtils::waitForQueryToStart | ( | QueryId | queryId, |
const Catalogs::Query::QueryCatalogPtr & | queryCatalog, | ||
std::chrono::seconds | timeoutInSec = std::chrono::seconds(defaultStartQueryTimeout) |
||
) |
This method is used for waiting till the query gets into running status or a timeout occurs.
queryId | : the query id to check for |
queryCatalog | the catalog to look into for status change |
timeoutInSec | time to wait before stop checking |
References magic_enum::enum_name(), NES::FAILED, NES::MARKED_FOR_HARD_STOP, NES::MARKED_FOR_SOFT_STOP, NES_ERROR, NES_TRACE, NES_WARNING, NES::RUNNING, NES::SOFT_STOP_COMPLETED, NES::SOFT_STOP_TRIGGERED, and NES::STOPPED.
Referenced by NES::MonitoringQueriesTest::runMetricsQueryTest(), NES::TestHarness::runQuery(), and NES::TEST_F().
bool NES::TestUtils::waitForWorkers | ( | uint64_t | restPort, |
uint16_t | maxTimeout, | ||
uint16_t | expectedWorkers | ||
) |
References NES_ASSERT2_FMT, NES_DEBUG, NES_ERROR, NES_INFO, and restPort().
Referenced by NES::RemoteClientTest::SetUp(), NES::LocationControllerIntegrationTest::startCoordinator(), and NES::TEST_F().
std::string NES::TestUtils::workerConfigPath | ( | const std::string & | filename | ) |
Creates the command line argument for the worker config path.
filename |
Referenced by main().
std::string NES::TestUtils::workerHealthCheckWaitTime | ( | uint64_t | workerWaitTime | ) |
Creates the command line argument for setting the health check wait time for the worker.
workerWaitTime |
Referenced by NES::TEST_F().