NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
ZMQTest.cpp File Reference
#include <BaseIntegrationTest.hpp>
#include <array>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <thread>
#include <zmq.hpp>
#include <API/Schema.hpp>
#include <API/TestSchemas.hpp>
#include <Catalogs/Source/PhysicalSource.hpp>
#include <Runtime/NodeEngine.hpp>
#include <Runtime/NodeEngineBuilder.hpp>
#include <Sources/SourceCreator.hpp>
#include <Util/Logger/Logger.hpp>
#include <Util/TestUtils.hpp>
Include dependency graph for ZMQTest.cpp:

Classes

class  ZMQTest
 

Macros

#define LOCAL_ADDRESS   "127.0.0.1"
 

Functions

 TEST_F (ZMQTest, testZmqSourceReceiveData)
 
 TEST_F (ZMQTest, DISABLED_testZmqSinkSendData)
 
 TEST_F (ZMQTest, DISABLED_testZmqSinkToSource)
 

Macro Definition Documentation

◆ LOCAL_ADDRESS

#define LOCAL_ADDRESS   "127.0.0.1"

Function Documentation

◆ TEST_F() [1/3]

TEST_F ( ZMQTest  ,
DISABLED_testZmqSinkSendData   
)

Create ZeroMQ Data Sink. auto testSchema = Schema::create()->addField("KEY", BasicType::UINT32)->addField("VALUE", UINT32); auto zmq_sink = createBinaryZmqSink(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_sink->toString());

Put test data into a TupleBuffer vector. void *buffer = new char[testDataSize]; auto tuple_buffer = TupleBuffer(buffer, testDataSize, sizeof(uint32_t) * 2, test_data.size() / 2); auto tuple_buffer = bufferManager->getFixedSizeBuffer();

std::memcpy(tuple_buffer->getBuffer(), &test_data, testDataSize); auto tuple_buffer_vec = std::vector<TupleBufferPtr>(); tuple_buffer_vec.push_back(tuple_buffer);

Start thread for receiving the data. bool receiving_finished = false; auto receiving_thread = std::thread([&]() { Starting receiving enviroment. int linger = 0; auto context = zmq::context_t(1); auto socket = zmq::socket_t(context, ZMQ_SUB); socket.connect(address.c_str()); socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));

Receive message. zmq::message_t new_data; socket.recv(&new_data); // envelope - not needed at the moment socket.recv(&new_data); // actual data

Test received data. uint32_t tuple = (uint32_t) new_data.data(); for (uint64_t i = 0; i != new_data.size() / testSchema->getSchemaSizeInBytes(); ++i) { EXPECT_EQ(*(tuple++), i); uint64_t expected = 100 - i; EXPECT_EQ(*(tuple++), expected); }

socket.close();
receiving_finished = true;

});

Wait until receiving is complete. while (!receiving_finished) { zmq_sink->writeDataInBatch(tuple_buffer_vec); } receiving_thread.join();

Release buffer memory. delete[] (char*) buffer;

◆ TEST_F() [2/3]

TEST_F ( ZMQTest  ,
DISABLED_testZmqSinkToSource   
)

FIXME: this test makes no sense, redo it Put test data into a TupleBuffer vector. void *buffer = new char[testDataSize]; auto tuple_buffer = TupleBuffer(buffer, testDataSize, sizeof(uint32_t) * 2, test_data.size() / 2); auto tuple_buffer = bufferManager->getFixedSizeBuffer();

std::memcpy(tuple_buffer->getBuffer(), &test_data, testDataSize); auto tuple_buffer_vec = std::vector<TupleBufferPtr>(); tuple_buffer_vec.push_back(tuple_buffer);

Create ZeroMQ Data Sink. auto zmq_sink = createBinaryZmqSink(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_sink->toString());

Create ZeroMQ Data Source. auto zmq_source = createZmqSource(testSchema, LOCAL_ADDRESS, LOCAL_PORT); NES_DEBUG("{}", zmq_source->toString());

Start thread for receivingh the data. bool receiving_finished = false; auto receiving_thread = std::thread([&]() { Receive data. auto new_data = zmq_source->receiveData();

Test received data. uint32_t tuple = (uint32_t) new_data->getBuffer(); for (uint64_t i = 0; i != new_data->getNumberOfTuples(); ++i) { EXPECT_EQ(*(tuple++), i); uint64_t expected = 100 - i; EXPECT_EQ(*(tuple++), expected); } bufferManager->releaseFixedSizeBuffer(new_data); receiving_finished = true; });

Wait until receiving is complete. while (!receiving_finished) { zmq_sink->writeDataInBatch(tuple_buffer_vec); } receiving_thread.join();

Release buffer memory. delete[] (char*) buffer;

◆ TEST_F() [3/3]

TEST_F ( ZMQTest  ,
testZmqSourceReceiveData   
)

References NES::createZmqSource(), NES::TestSchemas::getSchemaTemplate(), LOCAL_ADDRESS, apex::memcpy(), NES_DEBUG, NES_ERROR, and socket().

Here is the call graph for this function: