NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
#include <TestHarness.hpp>
Public Member Functions | |
TestHarness (Query queryWithoutSink, uint16_t restPort, uint16_t rpcPort, std::filesystem::path testHarnessResourcePath, uint64_t memSrcFrequency=0, uint64_t memSrcNumBuffToProcess=1) | |
The constructor of TestHarness. More... | |
TestHarness & | setJoinStrategy (QueryCompilation::StreamJoinStrategy &newJoinStrategy) |
Sets the join strategy. More... | |
TestHarness & | setWindowingStrategy (QueryCompilation::WindowingStrategy &newWindowingStrategy) |
Sets the join strategy. More... | |
template<typename T > | |
TestHarness & | pushElement (T element, WorkerId::Underlying workerId) |
push a single element/tuple to specific source More... | |
template<typename T > | |
TestHarness & | pushElement (T element, WorkerId workerId) |
push a single element/tuple to specific source More... | |
TestHarness & | addLogicalSource (const std::string &logicalSourceName, const SchemaPtr &schema) |
void | checkAndAddLogicalSources () |
check the schema size of the logical source and if it already exists More... | |
TestHarness & | attachWorkerWithMemorySourceToWorkerWithId (const std::string &logicalSourceName, WorkerId parentId, WorkerConfigurationPtr workerConfiguration=WorkerConfiguration::create()) |
add a memory source to be used in the test and connect to parent with specific parent id More... | |
TestHarness & | attachWorkerWithMemorySourceToCoordinator (const std::string &logicalSourceName) |
add a memory source to be used in the test More... | |
TestHarness & | attachWorkerWithLambdaSourceToCoordinator (PhysicalSourceTypePtr physicalSourceType, WorkerConfigurationPtr workerConfiguration) |
add a memory source to be used in the test More... | |
TestHarness & | attachWorkerWithCSVSourceToWorkerWithId (const CSVSourceTypePtr &csvSourceType, WorkerId parentId) |
add a csv source to be used in the test and connect to parent with specific parent id More... | |
TestHarness & | attachWorkerWithCSVSourceToCoordinator (const CSVSourceTypePtr &csvSourceType) |
add a csv source to be used in the test More... | |
TestHarness & | attachWorkerToWorkerWithId (WorkerId parentId) |
add worker and connect to parent with specific parent id More... | |
TestHarness & | attachWorkerToCoordinator () |
add non source worker More... | |
uint64_t | getWorkerCount () |
TestHarness & | validate () |
PhysicalSourceTypePtr | createPhysicalSourceOfLambdaType (TestHarnessWorkerConfigurationPtr workerConf) |
PhysicalSourceTypePtr | createPhysicalSourceOfMemoryType (TestHarnessWorkerConfigurationPtr workerConf) |
TestHarness & | setOutputFilePath (const std::string &newOutputFilePath) |
TestHarness & | setAppendMode (const std::string_view newAppendMode) |
TestHarness & | setupTopology (std::function< void(CoordinatorConfigurationPtr)> crdConfigFunctor=[](CoordinatorConfigurationPtr) { }, const std::vector< nlohmann::json > &distributionList={}) |
Method to setup the topology. More... | |
TestHarness & | addFileSink () |
TestHarness & | validateAndQueueAddQueryRequest (const Optimizer::PlacementStrategy &placementStrategy=Optimizer::PlacementStrategy::BottomUp) |
TestHarness & | runQuery (uint64_t numberOfBytesToExpect, const std::string &placementStrategyName="BottomUp", uint64_t testTimeoutInSeconds=60) |
Runs the query based on the given operator, pushed elements, and number of workers. More... | |
bool | checkFailedOrTimeout () const |
TestHarness & | stopCoordinatorAndWorkers () |
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > | getOutput () |
Returns the output for the previously run query. Support also data types with variable data size. More... | |
SchemaPtr | getOutputSchema () |
Returns the output schema of the query. More... | |
TopologyPtr | getTopology () |
const QueryPlanPtr & | getQueryPlan () const |
const Optimizer::GlobalExecutionPlanPtr & | getExecutionPlan () const |
Runtime::BufferManagerPtr | getBufferManager () const |
|
explicit |
The constructor of TestHarness.
numWorkers | number of worker (each for one physical source) to be used in the test |
queryWithoutSink | query object to test (without the sink operator) |
restPort | port for the rest service |
rpcPort | for for the grpc |
TestHarness & NES::TestHarness::addFileSink | ( | ) |
Add a CSV file sink to the query so that it can be executed.
Referenced by runQuery().
TestHarness & NES::TestHarness::addLogicalSource | ( | const std::string & | logicalSourceName, |
const SchemaPtr & | schema | ||
) |
References NES::TestUtils::logicalSourceName().
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerToCoordinator | ( | ) |
add non source worker
References attachWorkerToWorkerWithId().
Referenced by NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerToWorkerWithId | ( | WorkerId | parentId | ) |
add worker and connect to parent with specific parent id
parentId | id of the Test Harness worker to connect Note: The parent id can not be greater than the current testharness worker id |
References NES::TestHarnessWorkerConfiguration::create(), and NES::TestUtils::parentId().
Referenced by attachWorkerToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToCoordinator | ( | const CSVSourceTypePtr & | csvSourceType | ) |
add a csv source to be used in the test
logicalSourceName | logical source name |
csvSourceType | csv source type |
References attachWorkerWithCSVSourceToWorkerWithId().
Referenced by NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerWithCSVSourceToWorkerWithId | ( | const CSVSourceTypePtr & | csvSourceType, |
WorkerId | parentId | ||
) |
add a csv source to be used in the test and connect to parent with specific parent id
logicalSourceName | logical source name |
csvSourceType | csv source type |
parentId | id of the parent to connect |
References NES::TestHarnessWorkerConfiguration::create(), NES::TestHarnessWorkerConfiguration::CSVSource, and NES::TestUtils::parentId().
Referenced by attachWorkerWithCSVSourceToCoordinator(), NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), and NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerWithLambdaSourceToCoordinator | ( | PhysicalSourceTypePtr | physicalSourceType, |
WorkerConfigurationPtr | workerConfiguration | ||
) |
add a memory source to be used in the test
physicalSourceType | schema of the source |
workerConfiguration | source name |
References NES::TestHarnessWorkerConfiguration::create(), and NES::TestHarnessWorkerConfiguration::LambdaSource.
Referenced by NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToCoordinator | ( | const std::string & | logicalSourceName | ) |
add a memory source to be used in the test
logical | source name |
schema | schema of the source |
physical | source name |
References attachWorkerWithMemorySourceToWorkerWithId(), NES::TestUtils::logicalSourceName(), and backward::details::move().
Referenced by NES::TEST_F().
TestHarness & NES::TestHarness::attachWorkerWithMemorySourceToWorkerWithId | ( | const std::string & | logicalSourceName, |
WorkerId | parentId, | ||
WorkerConfigurationPtr | workerConfiguration = WorkerConfiguration::create() |
||
) |
add a memory source to be used in the test and connect to parent with specific parent id
logical | source name |
schema | schema of the source |
physical | source name |
parentId | id of the parent to connect |
References NES::TestHarnessWorkerConfiguration::create(), NES::TestUtils::logicalSourceName(), NES::TestHarnessWorkerConfiguration::MemorySource, NES::TestUtils::parentId(), and NES::TestUtils::physicalSourceName().
Referenced by attachWorkerWithMemorySourceToCoordinator(), and NES::TEST_F().
void NES::TestHarness::checkAndAddLogicalSources | ( | ) |
check the schema size of the logical source and if it already exists
logical | source name |
schema | schema of the source |
physical | source name |
References NES::TestUtils::logicalSourceName(), and NES_TRACE.
Referenced by setupTopology().
bool NES::TestHarness::checkFailedOrTimeout | ( | ) | const |
Check if the query has failed.
References NES::TestUtils::checkFailedOrTimeout().
PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfLambdaType | ( | TestHarnessWorkerConfigurationPtr | workerConf | ) |
References NES::TestUtils::logicalSourceName().
Referenced by setupTopology().
PhysicalSourceTypePtr NES::TestHarness::createPhysicalSourceOfMemoryType | ( | TestHarnessWorkerConfigurationPtr | workerConf | ) |
References NES::INTERVAL_MODE, NES::TestUtils::logicalSourceName(), apex::memcpy(), and NES_DEBUG.
Referenced by setupTopology().
Runtime::BufferManagerPtr NES::TestHarness::getBufferManager | ( | ) | const |
const Optimizer::GlobalExecutionPlanPtr & NES::TestHarness::getExecutionPlan | ( | ) | const |
std::vector< Runtime::MemoryLayouts::TestTupleBuffer > NES::TestHarness::getOutput | ( | ) |
Returns the output for the previously run query. Support also data types with variable data size.
References NES::TestUtils::createExpectedBuffersFromCsv(), and NES::TestUtils::createTestTupleBuffers().
Referenced by NES::TEST_F().
SchemaPtr NES::TestHarness::getOutputSchema | ( | ) |
Returns the output schema of the query.
Referenced by NES::TEST_F().
const QueryPlanPtr & NES::TestHarness::getQueryPlan | ( | ) | const |
TopologyPtr NES::TestHarness::getTopology | ( | ) |
uint64_t NES::TestHarness::getWorkerCount | ( | ) |
|
inline |
push a single element/tuple to specific source
element | element of Record to push |
workerId | id of the worker whose source will produce the pushed element |
References apex::memcpy(), NES::TestHarnessWorkerConfiguration::MemorySource, NES_THROW_RUNTIME_ERROR, NES::NESStrongType< T, Tag, invalid, initial >::toString(), and magic_enum::detail::value().
|
inline |
push a single element/tuple to specific source
element | element of Record to push |
workerId | id of the worker whose source will produce the pushed element |
Referenced by NES::TEST_F().
TestHarness & NES::TestHarness::runQuery | ( | uint64_t | numberOfBytesToExpect, |
const std::string & | placementStrategyName = "BottomUp" , |
||
uint64_t | testTimeoutInSeconds = 60 |
||
) |
Runs the query based on the given operator, pushed elements, and number of workers.
numberOfBytesToExpect | |
placementStrategyName | placement strategy name |
testTimeoutInSeconds |
References addFileSink(), NES::TestUtils::checkOutputContentLengthOrTimeout(), NES::TestUtils::checkStoppedOrTimeout(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, validateAndQueueAddQueryRequest(), magic_enum::detail::value(), NES::TestUtils::waitForQueryToStart(), and worker.
Referenced by NES::TEST_F().
TestHarness & NES::TestHarness::setAppendMode | ( | const std::string_view | newAppendMode | ) |
Set the append mode of the CSV file sink that is created to execute the query.
newAppendMode | The file sink APPEND mode (i.e., APPEND to append, OVERWRITE to overwrite). |
TestHarness & NES::TestHarness::setJoinStrategy | ( | QueryCompilation::StreamJoinStrategy & | newJoinStrategy | ) |
Sets the join strategy.
joinStrategy |
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().
TestHarness & NES::TestHarness::setOutputFilePath | ( | const std::string & | newOutputFilePath | ) |
Set the output file of the CSV file sink that is created to execute the query.
newOutputFilePath | The output file path. |
TestHarness & NES::TestHarness::setupTopology | ( | std::function< void(CoordinatorConfigurationPtr)> | crdConfigFunctor = [](CoordinatorConfigurationPtr) { } , |
const std::vector< nlohmann::json > & | distributionList = {} |
||
) |
Method to setup the topology.
crdConfigFunctor | A function pointer to specify the config changes of the CoordinatorConfiguration |
distributionList | A distribution of keys of each source for joins in JSON format |
blocking
blocking
withConnect
References checkAndAddLogicalSources(), createPhysicalSourceOfLambdaType(), createPhysicalSourceOfMemoryType(), NES::TestHarnessWorkerConfiguration::LambdaSource, NES::TestHarnessWorkerConfiguration::MemorySource, backward::details::move(), NES_DEBUG, NES_THROW_RUNTIME_ERROR, and magic_enum::detail::value().
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
TestHarness & NES::TestHarness::setWindowingStrategy | ( | QueryCompilation::WindowingStrategy & | newWindowingStrategy | ) |
Sets the join strategy.
joinStrategy |
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::Runtime::Execution::MultipleJoinsTest::runJoinQuery().
TestHarness & NES::TestHarness::stopCoordinatorAndWorkers | ( | ) |
TestHarness & NES::TestHarness::validate | ( | ) |
References NES::TestHarnessWorkerConfiguration::CSVSource, NES::TestHarnessWorkerConfiguration::LambdaSource, and NES::TestHarnessWorkerConfiguration::MemorySource.
Referenced by NES::DistributedMatrixJoinIntegrationTest::createTestHarness(), NES::DistributedNemoJoinIntegrationTest::createTestHarness(), NES::Runtime::Execution::JoinDeploymentTest::runAndValidateJoinQueryTwoLogicalStreams(), and NES::TEST_F().
TestHarness & NES::TestHarness::validateAndQueueAddQueryRequest | ( | const Optimizer::PlacementStrategy & | placementStrategy = Optimizer::PlacementStrategy::BottomUp | ) |
Submit the query to the coordinator.
placementStrategy | The placement strategy for the query. |
Referenced by runQuery().