C++ Client

The C++ Client is a library that provides an interface for applications to interact with a NebulaStream Coordinator. The C++ Client provides functionalities to submit a query, and to manage running queries, sources, and the topology.

Obtaining the C++ Client

The C++ Client is provided in the main repository as a separate target. You can build and link your code against the libnes-client.so shared object.

Using the C++ Client

The C++ Client can be integrated into your project using CMake.

Add the following into your CMakeFiles.txt:

cmake_minimum_required(VERSION 3.21)
project(NebulaStreamC++Example)

set(CMAKE_CXX_STANDARD 20)

find_package(NebulaStream)
message(STATUS "NES_LIBRARIES found at ${NebulaStream_FOUND}")
add_executable(NebulaStreamC++Example main.cpp)
target_link_libraries(NebulaStreamC++Example NebulaStream::nes-client  NebulaStream::nes)

And then include the headers of the C++ Client in your source code:

#include <iostream>
#include <API/Query.hpp>
#include <Client/RemoteClient.hpp>
#include <API/QueryAPI.hpp>
int main() {
    auto client = NES::Client::RemoteClient();
    auto query = Query::from("test").sink(NullOutputSinkDescriptor::create());
    client.submitQuery(query);
    std::cout << "connection : "<< client.testConnection()<< std::endl;
    return 0;
}

Connecting to the Coordinator

Before performing any operation, we need to instantiate a RemoteClient object and connect it to a running Coordinator. We can make the connection as follows.

Client::RemoteClientPtr client;
client = std::make_shared<Client::RemoteClient>("localhost", 8081, std::chrono::seconds(20), true);

We tell our client to connect to "localhost", over port 8081, use a timeout of 20 seconds and set logging to true.

Submitting a Query

Once we setup a connection, we can submit queries to the Coordinator. Following is an example of submitting a simple query that will write a stream of results to a CSV file.

Query query = Query::from("default_logical")
                .sink(FileSinkDescriptor::create("out.csv", "CSV_FORMAT", "APPEND"));

Client::QueryConfig queryConfig;
    queryConfig.setPlacementType(PlacementStrategy::BottomUp);

int64_t queryId = client->submitQuery(query, queryConfig);

The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query will be submitted to the coordinator and will be executed lazily on the NebulaStream cluster.

Query Management

The query management API is a collection of methods from the C++ Client to monitor and manage query execution in a NebulaStream instance.

Query Catalog

The C++ Client represents a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog, we can use the following method.

auto allRegisteredQueries = client->getQueries(QueryStatus::Registered);

​ This way we can obtain all queries with the Registered status. We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, MarkedForStop, Stopped, Failed, Restarting, and Migrating.

Getting the Query Plan

To get the query plan of a specific query using the C++ Client, we can use the following method:

// store the queryId
int64_t queryId = client->submitQuery(query);
...
auto queryPlan = client->getQueryPlan(queryId);

Getting the Execution Plan

To get the execution plan of a specific query using the C++ Client, we can use the following method:

auto executionPlan = client->getQueryExecutionPlan(queryId);

Stopping a Query

To delete a queryusing the C++ Client, we can use the below method. The method returns true on successful deletion, false otherwise.

auto success = client->stopQuery(queryId);

Source Management

The source management API contains a collection of methods in the Java Client to retrieve, add, update and remove logical sources in a NebulaStream instance.

Retrieving Logical Source catalog

To get the logical source catalog using the C++ Client, we can use the following method:

auto logicalStreams = client->getLogicalSources();

Adding a Logical Source

To add a logical source, we can use the below method. The method expects the schema and a source name (in C++) as parameters and returns true on successful addition. ​

SchemaPtr schema = Schema::create()->addField("id", BasicType::UINT32);
auto success = client->addLogicalSource(schema, "sourceName");

Obtaining Physical Source Catalog

To get all physical sources using the C++ Client we can use the below method:

auto physicalSources = client->getPhysicalSources();

Query API

Currently the C++ Client covers an extensive list of operators. For a comprehensive description and examples refer to Query API.