NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
TestUtils.hpp File Reference
#include <Catalogs/Query/QueryCatalog.hpp>
#include <Catalogs/Source/SourceCatalog.hpp>
#include <Components/NesCoordinator.hpp>
#include <Components/NesWorker.hpp>
#include <Configurations/Worker/PhysicalSourceTypes/CSVSourceType.hpp>
#include <Execution/Operators/Streaming/Join/StreamJoinUtil.hpp>
#include <Plans/Global/Query/GlobalQueryPlan.hpp>
#include <Runtime/QueryStatistics.hpp>
#include <Runtime/RuntimeForwardRefs.hpp>
#include <Runtime/TupleBuffer.hpp>
#include <Util/Logger/Logger.hpp>
#include <Util/Mobility/Waypoint.hpp>
#include <Util/StdInt.hpp>
#include <Util/Subprocess/Subprocess.hpp>
#include <Util/TestTupleBuffer.hpp>
#include <chrono>
#include <fstream>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <nlohmann/json.hpp>
Include dependency graph for TestUtils.hpp:

Classes

struct  NES::TestUtils::CsvFileParams
 Struct for storing all csv file params for tests. It is solely a container for grouping csv files. More...
 
struct  NES::TestUtils::SourceTypeConfigCSV
 
struct  NES::TestUtils::JoinParams
 Struct for storing all parameter for the join. More...
 
class  NES::DummyQueryListener
 

Namespaces

 NES
 This exception represents a network error.
 
 NES::Runtime
 Turn this on to have Thread::current_num_threads_ keep a count of currently-active threads.
 
 NES::TestUtils
 this is a util class for the tests
 

Macros

#define ALL_JOIN_STRATEGIES
 This define states all join strategies that will be tested in all join-specific tests. More...
 
#define ALL_WINDOW_STRATEGIES    ::testing::Values(QueryCompilation::WindowingStrategy::SLICING, QueryCompilation::WindowingStrategy::BUCKETING)
 This define states all window strategies that will be tested in all join-specific tests. Note that BUCKETING is not supported for HASH_JOIN_VAR_SIZED. More...
 
#define JOIN_STRATEGIES_WINDOW_STRATEGIES   ::testing::Combine(ALL_JOIN_STRATEGIES, ALL_WINDOW_STRATEGIES)
 This combines all join strategies and window strategies that will be tested in all join-specific test cases. More...
 

Typedefs

using Clock = std::chrono::high_resolution_clock
 

Functions

const std::string NES::TestUtils::configOption (const std::string &name, const std::string &value, bool prefix=false)
 
template<typename T >
std::string NES::TestUtils::configOption (const std::string &name, T value, bool prefix=false)
 
std::string NES::TestUtils::bufferSizeInBytes (uint64_t size, bool prefix=false)
 Creates the command line argument with a buffer size. More...
 
std::string NES::TestUtils::configPath (const std::string &filename)
 Creates the command line argument for the fileName. More...
 
std::string NES::TestUtils::workerConfigPath (const std::string &filename)
 Creates the command line argument for the worker config path. More...
 
std::string NES::TestUtils::coordinatorPort (uint64_t coordinatorPort)
 Creates the command line argument for a coordinator port. More...
 
std::string NES::TestUtils::parentId (uint64_t parentId)
 Creates the command line argument for the parent id. More...
 
std::string NES::TestUtils::numberOfSlots (uint64_t coordinatorPort, bool prefix=false)
 Creates the command line argument for the numberOfSlots. More...
 
std::string NES::TestUtils::numLocalBuffers (uint64_t localBuffers, bool prefix=false)
 Creates the command line argument for the number of local buffers. More...
 
std::string NES::TestUtils::numGlobalBuffers (uint64_t globalBuffers, bool prefix=false)
 Creates the command line argument for the number of global buffers. More...
 
std::string NES::TestUtils::rpcPort (uint64_t rpcPort)
 Creates the command line argument for the rpc port. More...
 
std::string NES::TestUtils::sourceType (SourceType sourceType)
 Creates the command line argument for the source type. More...
 
std::string NES::TestUtils::csvSourceFilePath (std::string filePath)
 Creates the command line argument for the csv source file path. More...
 
std::string NES::TestUtils::dataPort (uint64_t dataPort)
 Creates the command line argument for the data port. More...
 
std::string NES::TestUtils::numberOfTuplesToProducePerBuffer (uint64_t numberOfTuplesToProducePerBuffer)
 Creates the command line argument for the number of tuples of tuples to produce per buffer. More...
 
std::string NES::TestUtils::physicalSourceName (std::string physicalSourceName)
 Creates the command line argument for the physical source name. More...
 
std::string NES::TestUtils::logicalSourceName (std::string logicalSourceName)
 Creates the command line argument for setting the logical source name. More...
 
std::string NES::TestUtils::numberOfBuffersToProduce (uint64_t numberOfBuffersToProduce)
 Creates the command line argument for setting the number of buffers to produce. More...
 
std::string NES::TestUtils::sourceGatheringInterval (uint64_t sourceGatheringInterval)
 Creates the command line argument for setting the source gathering interval. More...
 
std::string NES::TestUtils::tcpSocketHost (std::string host)
 Enables the usage of tcp socket host. More...
 
std::string NES::TestUtils::tcpSocketPort (std::string port)
 Enables the usage of tcp socket port. More...
 
std::string NES::TestUtils::inputFormat (std::string format)
 Enables the usage of tcp socket input format. More...
 
std::string NES::TestUtils::tcpSocketPersistentSource (std::string persistentSource)
 Enables the usage of tcp socket persistent source. More...
 
std::string NES::TestUtils::tcpSocketDecidedMessageSize (std::string decidedSize)
 Enables the usage of tcp socket decided message size. More...
 
std::string NES::TestUtils::tcpSocketBufferSize (std::string bufferSize)
 Enables the usage of tcp socket decided message size. More...
 
std::string NES::TestUtils::restPort (uint64_t restPort)
 Creates the command line argument for setting the rest port. More...
 
std::string NES::TestUtils::enableDebug ()
 Creates the command line argument to enable debugging. More...
 
std::string NES::TestUtils::workerHealthCheckWaitTime (uint64_t workerWaitTime)
 Creates the command line argument for setting the health check wait time for the worker. More...
 
std::string NES::TestUtils::coordinatorHealthCheckWaitTime (uint64_t coordinatorWaitTime)
 Creates the command line argument for setting the health check wait time for the coordinator. More...
 
std::string NES::TestUtils::enableMonitoring (bool prefix=false)
 Creates the command line argument if to enable monitoring. More...
 
std::string NES::TestUtils::monitoringWaitTime (uint64_t monitoringWaitTime)
 Creates the command line argument if to set monitoring wait time. More...
 
std::string NES::TestUtils::disableDistributedWindowingOptimization ()
 
std::string NES::TestUtils::enableNemoPlacement ()
 Creates the command line argument for enabling nemo placement. More...
 
std::string NES::TestUtils::enableNemoJoin ()
 Creates the command line argument for enabling nemo join. More...
 
std::string NES::TestUtils::enableMatrixJoin ()
 Creates the command line argument for enabling matrix join. More...
 
std::string NES::TestUtils::setDistributedWindowChildThreshold (uint64_t val)
 Creates the command line argument for setting the threshold of the distributed window child. More...
 
std::string NES::TestUtils::setDistributedWindowCombinerThreshold (uint64_t val)
 Creates the command line argument for setting the threshold of the distributed window combiner. More...
 
std::string NES::TestUtils::enableSlicingWindowing (bool prefix=false)
 Creates the command line argument if to enable slicing windowing. More...
 
std::string NES::TestUtils::enableNautilusWorker ()
 Enables the usage of Nautilus. More...
 
std::string NES::TestUtils::enableNautilusCoordinator ()
 Enables the usage of Nautilus at the coordinator. More...
 
Util::Subprocess NES::TestUtils::startCoordinator (std::initializer_list< std::string > list)
 start a new instance of a nes coordinator with a set of configuration flags More...
 
Util::Subprocess NES::TestUtils::startWorker (std::initializer_list< std::string > flags)
 start a new instance of a nes worker with a set of configuration flags More...
 
std::shared_ptr< Util::Subprocess > NES::TestUtils::startWorkerPtr (std::initializer_list< std::string > flags)
 start a new instance of a nes worker with a set of configuration flags More...
 
bool NES::TestUtils::checkCompleteOrTimeout (const Runtime::NodeEnginePtr &ptr, QueryId queryId, uint64_t expectedResult)
 method to check the produced buffers and tasks for n seconds and either return true or timeout More...
 
bool NES::TestUtils::waitForQueryToStart (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeoutInSec=std::chrono::seconds(defaultStartQueryTimeout))
 This method is used for waiting till the query gets into running status or a timeout occurs. More...
 
template<typename Predicate = std::equal_to<uint64_t>>
bool NES::TestUtils::checkCompleteOrTimeout (const NesWorkerPtr &nesWorker, QueryId queryId, const GlobalQueryPlanPtr &globalQueryPlan, uint64_t expectedResult)
 method to check the produced buffers and tasks for n seconds and either return true or timeout More...
 
template<typename Predicate = std::equal_to<uint64_t>>
bool NES::TestUtils::checkCompleteOrTimeout (const NesCoordinatorPtr &nesCoordinator, QueryId queryId, const GlobalQueryPlanPtr &globalQueryPlan, uint64_t expectedResult, bool minOneProcessedTask=false, std::chrono::seconds timeoutSeconds=defaultTimeout)
 method to check the produced buffers and tasks for n seconds and either return true or timeout More...
 
bool NES::TestUtils::checkRemovedDecomposedQueryOrTimeoutAtWorker (DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, NesWorkerPtr worker, std::chrono::seconds timeout=defaultTimeout)
 
bool NES::TestUtils::checkStoppedOrTimeout (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeout=defaultTimeout)
 Check if the query is been stopped successfully within the timeout. More...
 
bool NES::TestUtils::checkStoppedOrTimeoutAtWorker (SharedQueryId sharedQueryId, NesWorkerPtr worker, std::chrono::seconds timeout=defaultTimeout)
 Check if the query is been stopped successfully within the timeout. More...
 
bool NES::TestUtils::checkFailedOrTimeout (QueryId queryId, const Catalogs::Query::QueryCatalogPtr &queryCatalog, std::chrono::seconds timeout=defaultTimeout)
 Check if the query has failed within the timeout. More...
 
bool NES::TestUtils::checkOutputOrTimeout (string expectedContent, const string &outputFilePath, uint64_t customTimeoutInSeconds=0)
 Check if the query result was produced. More...
 
bool NES::TestUtils::checkIfOutputFileIsNotEmtpy (uint64_t minNumberOfLines, const string &outputFilePath, uint64_t customTimeout=0)
 Check if any query result was produced. More...
 
bool NES::TestUtils::checkOutputContentLengthOrTimeout (QueryId queryId, Catalogs::Query::QueryCatalogPtr queryCatalog, uint64_t numberOfRecordsToExpect, const string &outputFilePath, auto testTimeout=defaultTimeout)
 Check if the query result was produced. More...
 
bool NES::TestUtils::checkFileCreationOrTimeout (const string &outputFilePath)
 Check if a outputfile is created. More...
 
bool NES::TestUtils::checkRESTServerStartedOrTimeout (uint64_t restPort, uint64_t customTimeout=0)
 Check if Coordinator REST API is available or timeout. More...
 
bool NES::TestUtils::checkCompleteOrTimeout (QueryId queryId, uint64_t expectedNumberBuffers, const std::string &restPort="8081")
 This method is used for checking if the submitted query produced the expected result within the timeout. More...
 
bool NES::TestUtils::checkRunningOrTimeout (QueryId queryId, const std::string &restPort="8081")
 This method is used for checking if the submitted query is running. More...
 
bool NES::TestUtils::stopQueryViaRest (QueryId queryId, const std::string &restPort="8081")
 This method is used for stop a query. More...
 
nlohmann::json NES::TestUtils::getExecutionPlan (QueryId queryId, const std::string &restPort)
 This method is used for getting the execution plan via REST. More...
 
nlohmann::json NES::TestUtils::startQueryViaRest (const string &queryString, const std::string &restPort="8081")
 This method is used for executing a query. More...
 
nlohmann::json NES::TestUtils::addSourceStatistics (const string &queryString, const std::string &restPort="8081")
 This method is used for adding source statistics. More...
 
nlohmann::json NES::TestUtils::makeMonitoringRestCall (const string &restCall, const std::string &restPort="8081")
 This method is used for making a monitoring rest call. More...
 
bool NES::TestUtils::addLogicalSource (const string &schemaString, const std::string &restPort="8081")
 This method is used adding a logical source. More...
 
bool NES::TestUtils::waitForWorkers (uint64_t restPort, uint16_t maxTimeout, uint16_t expectedWorkers)
 
nlohmann::json NES::TestUtils::getTopology (uint64_t restPort)
 This method is used for making a REST call to coordinator to get the topology as Json. More...
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBuffersFromCsv (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, uint64_t numTuplesPerBuffer)
 Creates the expected buffers from the csv file. More...
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBuffersFromCsv (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",")
 Creates the expected buffers from the csv file. More...
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBufferFromStream (std::istream &istream, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",")
 Fills the buffer from a stream. More...
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBuffersFromCsvSpecificLines (const std::string &csvFileName, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, const int fromLine, const int toLine, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",")
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBufferFromStreamSpecificLines (std::istream &istream, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, const int fromLine, const int toLine, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",")
 
std::vector< Runtime::TupleBuffer > NES::TestUtils::createExpectedBufferFromCSVString (std::string str, const SchemaPtr &schema, const Runtime::BufferManagerPtr &bufferManager, bool skipHeader=false, uint64_t numTuplesPerBuffer=0, const std::string &delimiter=",")
 Fills the buffer from a stream. More...
 
uint64_t NES::TestUtils::countTuples (std::vector< Runtime::TupleBuffer > &buffers)
 Counts the tuple in all buffers. More...
 
uint64_t NES::TestUtils::countTuples (std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &buffers)
 
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > NES::TestUtils::createTestTupleBuffers (std::vector< Runtime::TupleBuffer > &buffers, const SchemaPtr &schema)
 Converts all of the tuple buffers to dynamic tuple buffers. More...
 
bool NES::TestUtils::buffersContainSameTuples (std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &expectedBuffers, std::vector< Runtime::MemoryLayouts::TestTupleBuffer > &actualBuffers, bool orderSensitive=false)
 Compares if leftBuffers contain the same tuples as rightBuffers. More...
 
template<typename T >
std::vector< T > NES::TestUtils::createVecFromPointer (T *startPtr, T *endPtr)
 Creates a vector for the memory [startPtr, endPtr]. More...
 
template<typename T >
std::vector< T > NES::TestUtils::createVecFromPointer (T *startPtr, uint64_t numItems)
 Creates a vector for the memory [startPtr, startPtr + numItems]. More...
 
template<typename T >
std::vector< T > NES::TestUtils::createVecFromTupleBuffer (Runtime::TupleBuffer buffer)
 Creates a vector for the memory that this tupleBuffer is responsible for. More...
 
CSVSourceTypePtr NES::TestUtils::createSourceTypeCSV (const SourceTypeConfigCSV &sourceTypeConfigCSV)
 Creates a csv source that produces as many buffers as the csv file contains. More...
 
std::vector< PhysicalTypePtr > NES::TestUtils::getPhysicalTypes (const SchemaPtr &schema)
 
std::vector< NES::Spatial::DataTypes::Experimental::WaypointNES::getWaypointsFromCsv (const std::string &csvPath, Timestamp startTime)
 read mobile device path waypoints from csv More...
 
void NES::writeWaypointsToCsv (const std::string &csvPath, const std::vector< NES::Spatial::DataTypes::Experimental::Waypoint > &waypoints)
 write mobile device path waypoints to a csv file to use as input for the LocationProviderCSV class More...
 
uint64_t NES::countOccurrences (const std::string &searchString, const std::string &targetString)
 

Macro Definition Documentation

◆ ALL_JOIN_STRATEGIES

#define ALL_JOIN_STRATEGIES
Value:
::testing::Values(QueryCompilation::StreamJoinStrategy::NESTED_LOOP_JOIN, \
QueryCompilation::StreamJoinStrategy::HASH_JOIN_GLOBAL_LOCKING, \
QueryCompilation::StreamJoinStrategy::HASH_JOIN_GLOBAL_LOCK_FREE, \
QueryCompilation::StreamJoinStrategy::HASH_JOIN_LOCAL, \
QueryCompilation::StreamJoinStrategy::HASH_JOIN_VAR_SIZED)

This define states all join strategies that will be tested in all join-specific tests.

◆ ALL_WINDOW_STRATEGIES

#define ALL_WINDOW_STRATEGIES    ::testing::Values(QueryCompilation::WindowingStrategy::SLICING, QueryCompilation::WindowingStrategy::BUCKETING)

This define states all window strategies that will be tested in all join-specific tests. Note that BUCKETING is not supported for HASH_JOIN_VAR_SIZED.

◆ JOIN_STRATEGIES_WINDOW_STRATEGIES

#define JOIN_STRATEGIES_WINDOW_STRATEGIES   ::testing::Combine(ALL_JOIN_STRATEGIES, ALL_WINDOW_STRATEGIES)

This combines all join strategies and window strategies that will be tested in all join-specific test cases.

Typedef Documentation

◆ Clock

using Clock = std::chrono::high_resolution_clock