NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
#include <BaseIntegrationTest.hpp>
#include <array>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <thread>
#include <zmq.hpp>
#include <API/Schema.hpp>
#include <API/TestSchemas.hpp>
#include <Catalogs/Source/PhysicalSource.hpp>
#include <Runtime/NodeEngine.hpp>
#include <Runtime/NodeEngineBuilder.hpp>
#include <Sources/SourceCreator.hpp>
#include <Util/Logger/Logger.hpp>
#include <Util/TestUtils.hpp>
Classes | |
class | ZMQTest |
Macros | |
#define | LOCAL_ADDRESS "127.0.0.1" |
Functions | |
TEST_F (ZMQTest, testZmqSourceReceiveData) | |
TEST_F (ZMQTest, DISABLED_testZmqSinkSendData) | |
TEST_F (ZMQTest, DISABLED_testZmqSinkToSource) | |
#define LOCAL_ADDRESS "127.0.0.1" |
TEST_F | ( | ZMQTest | , |
DISABLED_testZmqSinkSendData | |||
) |
Create ZeroMQ Data Sink. auto testSchema = Schema::create()->addField("KEY", BasicType::UINT32)->addField("VALUE", UINT32); auto zmq_sink = createBinaryZmqSink(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_sink->toString());
Put test data into a TupleBuffer vector. void *buffer = new char[testDataSize]; auto tuple_buffer = TupleBuffer(buffer, testDataSize, sizeof(uint32_t) * 2, test_data.size() / 2); auto tuple_buffer = bufferManager->getFixedSizeBuffer();
std::memcpy(tuple_buffer->getBuffer(), &test_data, testDataSize); auto tuple_buffer_vec = std::vector<TupleBufferPtr>(); tuple_buffer_vec.push_back(tuple_buffer);
Start thread for receiving the data. bool receiving_finished = false; auto receiving_thread = std::thread([&]() { Starting receiving enviroment. int linger = 0; auto context = zmq::context_t(1); auto socket = zmq::socket_t(context, ZMQ_SUB); socket.connect(address.c_str()); socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
Receive message. zmq::message_t new_data; socket.recv(&new_data); // envelope - not needed at the moment socket.recv(&new_data); // actual data
Test received data. uint32_t tuple = (uint32_t) new_data.data(); for (uint64_t i = 0; i != new_data.size() / testSchema->getSchemaSizeInBytes(); ++i) { EXPECT_EQ(*(tuple++), i); uint64_t expected = 100 - i; EXPECT_EQ(*(tuple++), expected); }
socket.close(); receiving_finished = true;
});
Wait until receiving is complete. while (!receiving_finished) { zmq_sink->writeDataInBatch(tuple_buffer_vec); } receiving_thread.join();
Release buffer memory. delete[] (char*) buffer;
TEST_F | ( | ZMQTest | , |
DISABLED_testZmqSinkToSource | |||
) |
FIXME: this test makes no sense, redo it Put test data into a TupleBuffer vector. void *buffer = new char[testDataSize]; auto tuple_buffer = TupleBuffer(buffer, testDataSize, sizeof(uint32_t) * 2, test_data.size() / 2); auto tuple_buffer = bufferManager->getFixedSizeBuffer();
std::memcpy(tuple_buffer->getBuffer(), &test_data, testDataSize); auto tuple_buffer_vec = std::vector<TupleBufferPtr>(); tuple_buffer_vec.push_back(tuple_buffer);
Create ZeroMQ Data Sink. auto zmq_sink = createBinaryZmqSink(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_sink->toString());
Create ZeroMQ Data Source. auto zmq_source = createZmqSource(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_source->toString());
Start thread for receivingh the data. bool receiving_finished = false; auto receiving_thread = std::thread([&]() { Receive data. auto new_data = zmq_source->receiveData();
Test received data. uint32_t tuple = (uint32_t) new_data->getBuffer(); for (uint64_t i = 0; i != new_data->getNumberOfTuples(); ++i) { EXPECT_EQ(*(tuple++), i); uint64_t expected = 100 - i; EXPECT_EQ(*(tuple++), expected); } bufferManager->releaseFixedSizeBuffer(new_data); receiving_finished = true; });
Wait until receiving is complete. while (!receiving_finished) { zmq_sink->writeDataInBatch(tuple_buffer_vec); } receiving_thread.join();
Release buffer memory. delete[] (char*) buffer;
TEST_F | ( | ZMQTest | , |
testZmqSourceReceiveData | |||
) |
References NES::createZmqSource(), NES::TestSchemas::getSchemaTemplate(), LOCAL_ADDRESS, apex::memcpy(), NES_DEBUG, NES_ERROR, and socket().