NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::TestHarness Class Reference

#include <TestHarness.hpp>

Public Member Functions

 TestHarness (Query queryWithoutSink, uint16_t restPort, uint16_t rpcPort, std::filesystem::path testHarnessResourcePath, uint64_t memSrcFrequency=0, uint64_t memSrcNumBuffToProcess=1)
 The constructor of TestHarness. More...
 
TestHarnesssetJoinStrategy (QueryCompilation::StreamJoinStrategy &newJoinStrategy)
 Sets the join strategy. More...
 
TestHarnesssetWindowingStrategy (QueryCompilation::WindowingStrategy &newWindowingStrategy)
 Sets the join strategy. More...
 
template<typename T >
TestHarnesspushElement (T element, WorkerId::Underlying workerId)
 push a single element/tuple to specific source More...
 
template<typename T >
TestHarnesspushElement (T element, WorkerId workerId)
 push a single element/tuple to specific source More...
 
TestHarnessaddLogicalSource (const std::string &logicalSourceName, const SchemaPtr &schema)
 
void checkAndAddLogicalSources ()
 check the schema size of the logical source and if it already exists More...
 
TestHarnessattachWorkerWithMemorySourceToWorkerWithId (const std::string &logicalSourceName, WorkerId parentId, WorkerConfigurationPtr workerConfiguration=WorkerConfiguration::create())
 add a memory source to be used in the test and connect to parent with specific parent id More...
 
TestHarnessattachWorkerWithMemorySourceToCoordinator (const std::string &logicalSourceName)
 add a memory source to be used in the test More...
 
TestHarnessattachWorkerWithLambdaSourceToCoordinator (PhysicalSourceTypePtr physicalSourceType, WorkerConfigurationPtr workerConfiguration)
 add a memory source to be used in the test More...
 
TestHarnessattachWorkerWithCSVSourceToWorkerWithId (const CSVSourceTypePtr &csvSourceType, WorkerId parentId)
 add a csv source to be used in the test and connect to parent with specific parent id More...
 
TestHarnessattachWorkerWithCSVSourceToCoordinator (const CSVSourceTypePtr &csvSourceType)
 add a csv source to be used in the test More...
 
TestHarnessattachWorkerToWorkerWithId (WorkerId parentId)
 add worker and connect to parent with specific parent id More...
 
TestHarnessattachWorkerToCoordinator ()
 add non source worker More...
 
uint64_t getWorkerCount ()
 
TestHarnessvalidate ()
 
PhysicalSourceTypePtr createPhysicalSourceOfLambdaType (TestHarnessWorkerConfigurationPtr workerConf)
 
PhysicalSourceTypePtr createPhysicalSourceOfMemoryType (TestHarnessWorkerConfigurationPtr workerConf)
 
TestHarnesssetOutputFilePath (const std::string &newOutputFilePath)
 
TestHarnesssetAppendMode (const std::string_view newAppendMode)
 
TestHarnesssetupTopology (std::function< void(CoordinatorConfigurationPtr)> crdConfigFunctor=[](CoordinatorConfigurationPtr) { }, const std::vector< nlohmann::json > &distributionList={})
 Method to setup the topology. More...
 
TestHarnessaddFileSink ()
 
TestHarnessvalidateAndQueueAddQueryRequest (const Optimizer::PlacementStrategy &placementStrategy=Optimizer::PlacementStrategy::BottomUp)
 
TestHarnessrunQuery (uint64_t numberOfBytesToExpect, const std::string &placementStrategyName="BottomUp", uint64_t testTimeoutInSeconds=60)
 Runs the query based on the given operator, pushed elements, and number of workers. More...
 
bool checkFailedOrTimeout () const
 
TestHarnessstopCoordinatorAndWorkers ()
 
std::vector< Runtime::MemoryLayouts::TestTupleBuffergetOutput ()
 Returns the output for the previously run query. Support also data types with variable data size. More...
 
SchemaPtr getOutputSchema ()
 Returns the output schema of the query. More...
 
TopologyPtr getTopology ()
 
const QueryPlanPtrgetQueryPlan () const
 
const Optimizer::GlobalExecutionPlanPtrgetExecutionPlan () const
 
Runtime::BufferManagerPtr getBufferManager () const
 

Constructor & Destructor Documentation

◆ TestHarness()

NES::TestHarness::TestHarness ( Query  queryWithoutSink,
uint16_t  restPort,
uint16_t  rpcPort,
std::filesystem::path  testHarnessResourcePath,
uint64_t  memSrcFrequency = 0,
uint64_t  memSrcNumBuffToProcess = 1 
)
explicit

The constructor of TestHarness.

Parameters
numWorkersnumber of worker (each for one physical source) to be used in the test
queryWithoutSinkquery object to test (without the sink operator)
restPortport for the rest service
rpcPortfor for the grpc

Member Function Documentation

◆ addFileSink()

TestHarness & NES::TestHarness::addFileSink ( )

Add a CSV file sink to the query so that it can be executed.

Returns
This test harness.

Referenced by runQuery().

Here is the caller graph for this function:

◆ addLogicalSource()

TestHarness & NES::TestHarness::addLogicalSource ( const std::string &  logicalSourceName,
const SchemaPtr schema 
)

References NES::TestUtils::logicalSourceName().

Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerToCoordinator()

TestHarness & NES::TestHarness::attachWorkerToCoordinator ( )

add non source worker

References attachWorkerToWorkerWithId().

Referenced by NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerToWorkerWithId()

TestHarness & NES::TestHarness::attachWorkerToWorkerWithId ( WorkerId  parentId)

add worker and connect to parent with specific parent id

Parameters
parentIdid of the Test Harness worker to connect Note: The parent id can not be greater than the current testharness worker id

References NES::TestHarnessWorkerConfiguration::create(), and NES::TestUtils::parentId().

Referenced by attachWorkerToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerWithCSVSourceToCoordinator()

TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToCoordinator ( const CSVSourceTypePtr csvSourceType)

add a csv source to be used in the test

Parameters
logicalSourceNamelogical source name
csvSourceTypecsv source type

References attachWorkerWithCSVSourceToWorkerWithId().

Referenced by NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerWithCSVSourceToWorkerWithId()

TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToWorkerWithId ( const CSVSourceTypePtr csvSourceType,
WorkerId  parentId 
)

add a csv source to be used in the test and connect to parent with specific parent id

Parameters
logicalSourceNamelogical source name
csvSourceTypecsv source type
parentIdid of the parent to connect

References NES::TestHarnessWorkerConfiguration::create(), NES::TestHarnessWorkerConfiguration::CSVSource, and NES::TestUtils::parentId().

Referenced by attachWorkerWithCSVSourceToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerWithLambdaSourceToCoordinator()

TestHarness & NES::TestHarness::attachWorkerWithLambdaSourceToCoordinator ( PhysicalSourceTypePtr  physicalSourceType,
WorkerConfigurationPtr  workerConfiguration 
)

add a memory source to be used in the test

Parameters
physicalSourceTypeschema of the source
workerConfigurationsource name

References NES::TestHarnessWorkerConfiguration::create(), and NES::TestHarnessWorkerConfiguration::LambdaSource.

Referenced by NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerWithMemorySourceToCoordinator()

TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToCoordinator ( const std::string &  logicalSourceName)

add a memory source to be used in the test

Parameters
logicalsource name
schemaschema of the source
physicalsource name

References attachWorkerWithMemorySourceToWorkerWithId(), NES::TestUtils::logicalSourceName(), and backward::details::move().

Referenced by NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ attachWorkerWithMemorySourceToWorkerWithId()

TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId ( const std::string &  logicalSourceName,
WorkerId  parentId,
WorkerConfigurationPtr  workerConfiguration = WorkerConfiguration::create() 
)

add a memory source to be used in the test and connect to parent with specific parent id

Parameters
logicalsource name
schemaschema of the source
physicalsource name
parentIdid of the parent to connect

References NES::TestHarnessWorkerConfiguration::create(), NES::TestUtils::logicalSourceName(), NES::TestHarnessWorkerConfiguration::MemorySource, NES::TestUtils::parentId(), and NES::TestUtils::physicalSourceName().

Referenced by attachWorkerWithMemorySourceToCoordinator(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ checkAndAddLogicalSources()

void NES::TestHarness::checkAndAddLogicalSources ( )

check the schema size of the logical source and if it already exists

Parameters
logicalsource name
schemaschema of the source
physicalsource name

References NES::TestUtils::logicalSourceName(), and NES_TRACE.

Referenced by setupTopology().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ checkFailedOrTimeout()

bool NES::TestHarness::checkFailedOrTimeout ( ) const

Check if the query has failed.

Returns
True if the query has failed; false, otherwise or if a timeout was reached.

References NES::TestUtils::checkFailedOrTimeout().

Here is the call graph for this function:

◆ createPhysicalSourceOfLambdaType()

PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfLambdaType ( TestHarnessWorkerConfigurationPtr  workerConf)

References NES::TestUtils::logicalSourceName().

Referenced by setupTopology().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ createPhysicalSourceOfMemoryType()

PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfMemoryType ( TestHarnessWorkerConfigurationPtr  workerConf)

References NES::INTERVAL_MODE, NES::TestUtils::logicalSourceName(), apex::memcpy(), and NES_DEBUG.

Referenced by setupTopology().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getBufferManager()

Runtime::BufferManagerPtr NES::TestHarness::getBufferManager ( ) const

Referenced by NES::TEST_F().

Here is the caller graph for this function:

◆ getExecutionPlan()

const Optimizer::GlobalExecutionPlanPtr & NES::TestHarness::getExecutionPlan ( ) const

◆ getOutput()

std::vector< Runtime::MemoryLayouts::TestTupleBuffer > NES::TestHarness::getOutput ( )

Returns the output for the previously run query. Support also data types with variable data size.

Returns
Vector of TestTupleBuffers

References NES::TestUtils::createExpectedBuffersFromCsv(), and NES::TestUtils::createTestTupleBuffers().

Referenced by NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ getOutputSchema()

SchemaPtr NES::TestHarness::getOutputSchema ( )

Returns the output schema of the query.

Returns
SchemaPtr

Referenced by NES::TEST_F().

Here is the caller graph for this function:

◆ getQueryPlan()

const QueryPlanPtr & NES::TestHarness::getQueryPlan ( ) const

◆ getTopology()

TopologyPtr NES::TestHarness::getTopology ( )

Referenced by NES::TEST_F().

Here is the caller graph for this function:

◆ getWorkerCount()

uint64_t NES::TestHarness::getWorkerCount ( )

Referenced by NES::TEST_F().

Here is the caller graph for this function:

◆ pushElement() [1/2]

template<typename T >
TestHarness& NES::TestHarness::pushElement ( element,
WorkerId  workerId 
)
inline

push a single element/tuple to specific source

Parameters
elementelement of Record to push
workerIdid of the worker whose source will produce the pushed element

References apex::memcpy(), NES::TestHarnessWorkerConfiguration::MemorySource, NES_THROW_RUNTIME_ERROR, NES::NESStrongType< T, Tag, invalid, initial >::toString(), and magic_enum::detail::value().

Here is the call graph for this function:

◆ pushElement() [2/2]

template<typename T >
TestHarness& NES::TestHarness::pushElement ( element,
WorkerId::Underlying  workerId 
)
inline

push a single element/tuple to specific source

Parameters
elementelement of Record to push
workerIdid of the worker whose source will produce the pushed element

Referenced by NES::TEST_F().

Here is the caller graph for this function:

◆ runQuery()

TestHarness & NES::TestHarness::runQuery ( uint64_t  numberOfBytesToExpect,
const std::string &  placementStrategyName = "BottomUp",
uint64_t  testTimeoutInSeconds = 60 
)

Runs the query based on the given operator, pushed elements, and number of workers.

Parameters
numberOfBytesToExpect
placementStrategyNameplacement strategy name
testTimeoutInSeconds
Returns
TestHarness

References addFileSink(), NES::TestUtils::checkOutputContentLengthOrTimeout(), NES::TestUtils::checkStoppedOrTimeout(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, validateAndQueueAddQueryRequest(), magic_enum::detail::value(), NES::TestUtils::waitForQueryToStart(), and worker.

Referenced by NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ setAppendMode()

TestHarness & NES::TestHarness::setAppendMode ( const std::string_view  newAppendMode)

Set the append mode of the CSV file sink that is created to execute the query.

Parameters
newAppendModeThe file sink APPEND mode (i.e., APPEND to append, OVERWRITE to overwrite).
Returns
This test harness.

◆ setJoinStrategy()

TestHarness & NES::TestHarness::setJoinStrategy ( QueryCompilation::StreamJoinStrategy newJoinStrategy)

Sets the join strategy.

Parameters
joinStrategy
Returns
Self

Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().

Here is the caller graph for this function:

◆ setOutputFilePath()

TestHarness & NES::TestHarness::setOutputFilePath ( const std::string &  newOutputFilePath)

Set the output file of the CSV file sink that is created to execute the query.

Parameters
newOutputFilePathThe output file path.
Returns
This test harness.

◆ setupTopology()

TestHarness & NES::TestHarness::setupTopology ( std::function< void(CoordinatorConfigurationPtr)>  crdConfigFunctor = [](CoordinatorConfigurationPtr) { },
const std::vector< nlohmann::json > &  distributionList = {} 
)

Method to setup the topology.

Parameters
crdConfigFunctorA function pointer to specify the config changes of the CoordinatorConfiguration
distributionListA distribution of keys of each source for joins in JSON format
Returns
the TestHarness

blocking

blocking

withConnect

References checkAndAddLogicalSources(), createPhysicalSourceOfLambdaType(), createPhysicalSourceOfMemoryType(), NES::TestHarnessWorkerConfiguration::LambdaSource, NES::TestHarnessWorkerConfiguration::MemorySource, backward::details::move(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, and magic_enum::detail::value().

Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ setWindowingStrategy()

TestHarness & NES::TestHarness::setWindowingStrategy ( QueryCompilation::WindowingStrategy &  newWindowingStrategy)

Sets the join strategy.

Parameters
joinStrategy
Returns
Self

Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().

Here is the caller graph for this function:

◆ stopCoordinatorAndWorkers()

TestHarness & NES::TestHarness::stopCoordinatorAndWorkers ( )

Stop the coordinator and all workers.

Returns
This test harness.

References worker.

◆ validate()

◆ validateAndQueueAddQueryRequest()

TestHarness & NES::TestHarness::validateAndQueueAddQueryRequest ( const Optimizer::PlacementStrategy placementStrategy = Optimizer::PlacementStrategy::BottomUp)

Submit the query to the coordinator.

Parameters
placementStrategyThe placement strategy for the query.
Returns
This test harness.

Referenced by runQuery().

Here is the caller graph for this function:

The documentation for this class was generated from the following files: