|
NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
#include <TestHarness.hpp>
Public Member Functions | |
| TestHarness (Query queryWithoutSink, uint16_t restPort, uint16_t rpcPort, std::filesystem::path testHarnessResourcePath, uint64_t memSrcFrequency=0, uint64_t memSrcNumBuffToProcess=1) | |
| The constructor of TestHarness. More... | |
| TestHarness & | setJoinStrategy (QueryCompilation::StreamJoinStrategy &newJoinStrategy) |
| Sets the join strategy. More... | |
| TestHarness & | setWindowingStrategy (QueryCompilation::WindowingStrategy &newWindowingStrategy) |
| Sets the join strategy. More... | |
| template<typename T > | |
| TestHarness & | pushElement (T element, WorkerId::Underlying workerId) |
| push a single element/tuple to specific source More... | |
| template<typename T > | |
| TestHarness & | pushElement (T element, WorkerId workerId) |
| push a single element/tuple to specific source More... | |
| TestHarness & | addLogicalSource (const std::string &logicalSourceName, const SchemaPtr &schema) |
| void | checkAndAddLogicalSources () |
| check the schema size of the logical source and if it already exists More... | |
| TestHarness & | attachWorkerWithMemorySourceToWorkerWithId (const std::string &logicalSourceName, WorkerId parentId, WorkerConfigurationPtr workerConfiguration=WorkerConfiguration::create()) |
| add a memory source to be used in the test and connect to parent with specific parent id More... | |
| TestHarness & | attachWorkerWithMemorySourceToCoordinator (const std::string &logicalSourceName) |
| add a memory source to be used in the test More... | |
| TestHarness & | attachWorkerWithLambdaSourceToCoordinator (PhysicalSourceTypePtr physicalSourceType, WorkerConfigurationPtr workerConfiguration) |
| add a memory source to be used in the test More... | |
| TestHarness & | attachWorkerWithCSVSourceToWorkerWithId (const CSVSourceTypePtr &csvSourceType, WorkerId parentId) |
| add a csv source to be used in the test and connect to parent with specific parent id More... | |
| TestHarness & | attachWorkerWithCSVSourceToCoordinator (const CSVSourceTypePtr &csvSourceType) |
| add a csv source to be used in the test More... | |
| TestHarness & | attachWorkerToWorkerWithId (WorkerId parentId) |
| add worker and connect to parent with specific parent id More... | |
| TestHarness & | attachWorkerToCoordinator () |
| add non source worker More... | |
| uint64_t | getWorkerCount () |
| TestHarness & | validate () |
| PhysicalSourceTypePtr | createPhysicalSourceOfLambdaType (TestHarnessWorkerConfigurationPtr workerConf) |
| PhysicalSourceTypePtr | createPhysicalSourceOfMemoryType (TestHarnessWorkerConfigurationPtr workerConf) |
| TestHarness & | setOutputFilePath (const std::string &newOutputFilePath) |
| TestHarness & | setAppendMode (const std::string_view newAppendMode) |
| TestHarness & | setupTopology (std::function< void(CoordinatorConfigurationPtr)> crdConfigFunctor=[](CoordinatorConfigurationPtr) { }, const std::vector< nlohmann::json > &distributionList={}) |
| Method to setup the topology. More... | |
| TestHarness & | addFileSink () |
| TestHarness & | validateAndQueueAddQueryRequest (const Optimizer::PlacementStrategy &placementStrategy=Optimizer::PlacementStrategy::BottomUp) |
| TestHarness & | runQuery (uint64_t numberOfBytesToExpect, const std::string &placementStrategyName="BottomUp", uint64_t testTimeoutInSeconds=60) |
| Runs the query based on the given operator, pushed elements, and number of workers. More... | |
| bool | checkFailedOrTimeout () const |
| TestHarness & | stopCoordinatorAndWorkers () |
| std::vector< Runtime::MemoryLayouts::TestTupleBuffer > | getOutput () |
| Returns the output for the previously run query. Support also data types with variable data size. More... | |
| SchemaPtr | getOutputSchema () |
| Returns the output schema of the query. More... | |
| TopologyPtr | getTopology () |
| const QueryPlanPtr & | getQueryPlan () const |
| const Optimizer::GlobalExecutionPlanPtr & | getExecutionPlan () const |
| Runtime::BufferManagerPtr | getBufferManager () const |
|
explicit |
The constructor of TestHarness.
| numWorkers | number of worker (each for one physical source) to be used in the test |
| queryWithoutSink | query object to test (without the sink operator) |
| restPort | port for the rest service |
| rpcPort | for for the grpc |
| TestHarness & NES::TestHarness::addFileSink | ( | ) |
Add a CSV file sink to the query so that it can be executed.
Referenced by runQuery().
| TestHarness & NES::TestHarness::addLogicalSource | ( | const std::string & | logicalSourceName, |
| const SchemaPtr & | schema | ||
| ) |
References NES::TestUtils::logicalSourceName().
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerToCoordinator | ( | ) |
add non source worker
References attachWorkerToWorkerWithId().
Referenced by NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerToWorkerWithId | ( | WorkerId | parentId | ) |
add worker and connect to parent with specific parent id
| parentId | id of the Test Harness worker to connect Note: The parent id can not be greater than the current testharness worker id |
References NES::TestHarnessWorkerConfiguration::create(), and NES::TestUtils::parentId().
Referenced by attachWorkerToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToCoordinator | ( | const CSVSourceTypePtr & | csvSourceType | ) |
add a csv source to be used in the test
| logicalSourceName | logical source name |
| csvSourceType | csv source type |
References attachWorkerWithCSVSourceToWorkerWithId().
Referenced by NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToWorkerWithId | ( | const CSVSourceTypePtr & | csvSourceType, |
| WorkerId | parentId | ||
| ) |
add a csv source to be used in the test and connect to parent with specific parent id
| logicalSourceName | logical source name |
| csvSourceType | csv source type |
| parentId | id of the parent to connect |
References NES::TestHarnessWorkerConfiguration::create(), NES::TestHarnessWorkerConfiguration::CSVSource, and NES::TestUtils::parentId().
Referenced by attachWorkerWithCSVSourceToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerWithLambdaSourceToCoordinator | ( | PhysicalSourceTypePtr | physicalSourceType, |
| WorkerConfigurationPtr | workerConfiguration | ||
| ) |
add a memory source to be used in the test
| physicalSourceType | schema of the source |
| workerConfiguration | source name |
References NES::TestHarnessWorkerConfiguration::create(), and NES::TestHarnessWorkerConfiguration::LambdaSource.
Referenced by NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToCoordinator | ( | const std::string & | logicalSourceName | ) |
add a memory source to be used in the test
| logical | source name |
| schema | schema of the source |
| physical | source name |
References attachWorkerWithMemorySourceToWorkerWithId(), NES::TestUtils::logicalSourceName(), and backward::details::move().
Referenced by NES::TEST_F().
| TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId | ( | const std::string & | logicalSourceName, |
| WorkerId | parentId, | ||
| WorkerConfigurationPtr | workerConfiguration = WorkerConfiguration::create() |
||
| ) |
add a memory source to be used in the test and connect to parent with specific parent id
| logical | source name |
| schema | schema of the source |
| physical | source name |
| parentId | id of the parent to connect |
References NES::TestHarnessWorkerConfiguration::create(), NES::TestUtils::logicalSourceName(), NES::TestHarnessWorkerConfiguration::MemorySource, NES::TestUtils::parentId(), and NES::TestUtils::physicalSourceName().
Referenced by attachWorkerWithMemorySourceToCoordinator(), and NES::TEST_F().
| void NES::TestHarness::checkAndAddLogicalSources | ( | ) |
check the schema size of the logical source and if it already exists
| logical | source name |
| schema | schema of the source |
| physical | source name |
References NES::TestUtils::logicalSourceName(), and NES_TRACE.
Referenced by setupTopology().
| bool NES::TestHarness::checkFailedOrTimeout | ( | ) | const |
Check if the query has failed.
References NES::TestUtils::checkFailedOrTimeout().
| PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfLambdaType | ( | TestHarnessWorkerConfigurationPtr | workerConf | ) |
References NES::TestUtils::logicalSourceName().
Referenced by setupTopology().
| PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfMemoryType | ( | TestHarnessWorkerConfigurationPtr | workerConf | ) |
References NES::INTERVAL_MODE, NES::TestUtils::logicalSourceName(), apex::memcpy(), and NES_DEBUG.
Referenced by setupTopology().
| Runtime::BufferManagerPtr NES::TestHarness::getBufferManager | ( | ) | const |
| const Optimizer::GlobalExecutionPlanPtr & NES::TestHarness::getExecutionPlan | ( | ) | const |
| std::vector< Runtime::MemoryLayouts::TestTupleBuffer > NES::TestHarness::getOutput | ( | ) |
Returns the output for the previously run query. Support also data types with variable data size.
References NES::TestUtils::createExpectedBuffersFromCsv(), and NES::TestUtils::createTestTupleBuffers().
Referenced by NES::TEST_F().
| SchemaPtr NES::TestHarness::getOutputSchema | ( | ) |
Returns the output schema of the query.
Referenced by NES::TEST_F().
| const QueryPlanPtr & NES::TestHarness::getQueryPlan | ( | ) | const |
| TopologyPtr NES::TestHarness::getTopology | ( | ) |
| uint64_t NES::TestHarness::getWorkerCount | ( | ) |
|
inline |
push a single element/tuple to specific source
| element | element of Record to push |
| workerId | id of the worker whose source will produce the pushed element |
References apex::memcpy(), NES::TestHarnessWorkerConfiguration::MemorySource, NES_THROW_RUNTIME_ERROR, NES::NESStrongType< T, Tag, invalid, initial >::toString(), and magic_enum::detail::value().
|
inline |
push a single element/tuple to specific source
| element | element of Record to push |
| workerId | id of the worker whose source will produce the pushed element |
Referenced by NES::TEST_F().
| TestHarness & NES::TestHarness::runQuery | ( | uint64_t | numberOfBytesToExpect, |
| const std::string & | placementStrategyName = "BottomUp", |
||
| uint64_t | testTimeoutInSeconds = 60 |
||
| ) |
Runs the query based on the given operator, pushed elements, and number of workers.
| numberOfBytesToExpect | |
| placementStrategyName | placement strategy name |
| testTimeoutInSeconds |
References addFileSink(), NES::TestUtils::checkOutputContentLengthOrTimeout(), NES::TestUtils::checkStoppedOrTimeout(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, validateAndQueueAddQueryRequest(), magic_enum::detail::value(), NES::TestUtils::waitForQueryToStart(), and worker.
Referenced by NES::TEST_F().
| TestHarness & NES::TestHarness::setAppendMode | ( | const std::string_view | newAppendMode | ) |
Set the append mode of the CSV file sink that is created to execute the query.
| newAppendMode | The file sink APPEND mode (i.e., APPEND to append, OVERWRITE to overwrite). |
| TestHarness & NES::TestHarness::setJoinStrategy | ( | QueryCompilation::StreamJoinStrategy & | newJoinStrategy | ) |
Sets the join strategy.
| joinStrategy |
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().
| TestHarness & NES::TestHarness::setOutputFilePath | ( | const std::string & | newOutputFilePath | ) |
Set the output file of the CSV file sink that is created to execute the query.
| newOutputFilePath | The output file path. |
| TestHarness & NES::TestHarness::setupTopology | ( | std::function< void(CoordinatorConfigurationPtr)> | crdConfigFunctor = [](CoordinatorConfigurationPtr) { }, |
| const std::vector< nlohmann::json > & | distributionList = {} |
||
| ) |
Method to setup the topology.
| crdConfigFunctor | A function pointer to specify the config changes of the CoordinatorConfiguration |
| distributionList | A distribution of keys of each source for joins in JSON format |
blocking
blocking
withConnect
References checkAndAddLogicalSources(), createPhysicalSourceOfLambdaType(), createPhysicalSourceOfMemoryType(), NES::TestHarnessWorkerConfiguration::LambdaSource, NES::TestHarnessWorkerConfiguration::MemorySource, backward::details::move(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, and magic_enum::detail::value().
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
| TestHarness & NES::TestHarness::setWindowingStrategy | ( | QueryCompilation::WindowingStrategy & | newWindowingStrategy | ) |
Sets the join strategy.
| joinStrategy |
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().
| TestHarness & NES::TestHarness::stopCoordinatorAndWorkers | ( | ) |
| TestHarness & NES::TestHarness::validate | ( | ) |
References NES::TestHarnessWorkerConfiguration::CSVSource, NES::TestHarnessWorkerConfiguration::LambdaSource, and NES::TestHarnessWorkerConfiguration::MemorySource.
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
| TestHarness & NES::TestHarness::validateAndQueueAddQueryRequest | ( | const Optimizer::PlacementStrategy & | placementStrategy = Optimizer::PlacementStrategy::BottomUp | ) |
Submit the query to the coordinator.
| placementStrategy | The placement strategy for the query. |
Referenced by runQuery().