Java Client

The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.

Obtaining the Java Client

The Java client is available as a Java library. The latest version can be obtained as follows:

implementation 'stream.nebula:nebulastream-java-client:0.0.60'
<dependency>
    <groupId>stream.nebula</groupId>
    <artifactId>nebulastream-java-client</artifactId>
    <version>0.0.60</version>
</dependency>

Connecting to the Coordinator

Before performing any operation, we need to instantiate a NebulaStreamRuntime object connect it to a running NebulaStream coordinator. We can create the connection as follows.

NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost")
               .setPort("8081");

Query API

Creating a query

You can create a query by reading a from a particular named source through the NebulaStream runtime.

Query query = nebulaStreamRuntime.readFromSource("logical_source");

The code below assumes that we already have created a query on which we perform further operations.

Accessing attributes

Many operations in the query API operate on stream attributes. To access an attribute, use the (static) method Expressions.attribute(String). To make the code less verbose, you can statically import the method:

import static stream.nebula.expression.Expressions.attribute;

The examples below assume that the method has been statically imported and leave out the name of the Expressions class.

Chaining operations

All operations of the query API return the query object which allows the chaining of multiple operations in a fluid syntax. The only exception is the Sink Operator, which returns a Sink object.

⚠️ At the moment, the following operations are equivalent.

# Chain two filter operations on a query.
Query q1 = nebulaStreamRuntime.readFromSource("logical_source")
    .filter(attribute("a"))
    .filter(attribute("b"));

# Perform two filter operations on the same query object.
Query q2 = nebulaStreamRuntime.readFromSource("logical_source");
q2.filter(attribute("a"));
q2.filter(attribute("b"));

In other words, the operations of the query API mutate query object on which they are called. This will likely change in the future.

Filter Operator

The filter operator accepts a predicate which can be evaluated to a boolean value. This predicate can be:

  • a single attribute,
  • the result of a comparison (lessThan, lessThanOrEqual, greaterThan, greaterThanOrEqual, equalTo, notEqualTo),
  • the result of a boolean operation (and, or, not),
  • an arbitrary combination of the above.

Examples

Filter the stream to contain only the records for which the attribute my_attribute is true.

query.filter(attribute("my_attribute"));

Filter the stream by comparing an attribute against a literal value.

query.filter(attribute("my_attribute").lessThan(5));

Filter the stream by comparing two attributes.

query.filter(attribute("attribute_1").greaterThan(attribute("attribute_2")));

Filter the stream by combining two attributes.

query.filter(attribute("boolean_attribute").and(attribute("numeric_attribute").lessThan(1)));

Projection Operator

The filter operator accepts a variable number of attributes which are copied to the output stream. Each attribute can also be renamed.

💡 Examples

Project two attributes.

query.project(attribute("attribute_1"), attribute("attribute_2));

Project a single attribute and rename it.

query.project(attribute("attribute_1").as("new_attribute_name"));

Map Operator

The map operator accepts two parameters. The first parameter is a attribute name, which can refer to an existing attribute in the stream or a new attribute. The second parameter is a complex expression over the existing stream attributes. The operator stores the result of the expression in the specified attribute.

The complex expression can contain arithmetic operations, arithmetic and boolean comparisons, and boolean functions. Refer to the Expression API for details.

Example

query.map("total", attribute("field_1").add(attribute("field_2")));

Window Operator

The window operator is started by a window method, which specifies the type and size of window, followed by an optional keyBy method, which specifies one or more attributes on which the window is grouped, followed by an apply method, which specifies one or more aggregation functions.

NebulaStream supports both tumbling and sliding windows. The size of windows can be specified in milliseconds, seconds, minutes, or hours. The time is always an explicit event time which is encoded as an attribute in the stream. The supported aggregation functions are min, max, count, sum, average, and median. The aggregated attribute can be renamed.

Examples

Create a tumbling window of 10 seconds, using the stream attribute timestamp, and compute the sum of the attribute value for each window.

query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                               TimeMeasure.seconds(10))
     .apply(Aggregation.sum("value"));

Create a tumbling window of 1 minute, using a slide of 30 seconds, group the window by the attributes key1 and key2, and compute the min and max for each window.

query.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
                              TimeMeasure.minutes(1),
                              TimeMeasure.seconds(30))
     .keyBy("key1", "key2")
     .apply(Aggregation.min("value"), Aggregation.max("value"));

💡 Multiple aggregation functions require the experimental THREAD_LOCAL window aggregation strategy.

Rename the result of an average aggregation.

query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                               TimeMeasure.seconds(10))
     .apply(Aggregation.average("value").as("mean_value"));

Join Operator

The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition.

Example

Join q1 and q2 on a condition where the station_id in the bus stream is the same as the station_id in the metro stream and both fall within the same 10 seconds tumbling window.

NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");

q1.joinWith(q2)
    .where("station_id")
    .equalsTo("station_id")
    .window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                              TimeMeasure.seconds(10)));

Union Operator

The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter.

Example

NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");

q1.unionWith(q2);

Sink Operator

The Java Client allows queries to use the following sink oparators:

  • .sink(new PrintSink()) - write the resulting stream to Coordinator’s console
  • .sink(new ZMQSink("localhost", 1234)); - forward the resulting stream to a ZMQ instance
  • .sink(new FileSink("/tmp/out.csv", "CSV", "OVERRIDE")); - write the resulting stream to a CSV file in the Coordinator. ​ ​

Query Management

The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.

Submitting a Query

Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will write the result stream to a CSV file.

String logicalSource = "default_logical";

Query query = nebulaStreamRuntime.readFromSource(logicalSource)
                .sink(new FileSink("out.csv", "CSV", "APPEND"));

int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");

The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.

Query Catalog

The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList, we can use the following method.

ArrayList<QueryCatalogEntry> catalog =
            nebulaStreamRuntime.getAllRegisteredQueries();

​ We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:

ArrayList<QueryCatalogEntry> catalogRegistered =
        nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");

Getting the Query Plan

The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.

Graph<LogicalQuery, DefaultEdge> queryPlan =
            nebulaStreamRuntime.getQueryPlan(<query id>);

Getting the Execution Plan

To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:

Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
            nebulaStreamRuntime.getExecutionPlan(<query id>);

Deleting a Query

To stop a query, we can use the stopQuery() method. The method returns true if the query was successfully marked for stop.

boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);

⚠️ This method only schedules the request to stop the query.

Source Management

The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.

Retrieving Logical Source Catalog

To get the logical source catalog as a List<String>, we can use the following method.

List<String> sourceCatalog =
            nebulaStreamRuntime.getLogicalSources();

Adding a Logical Source

To add a logical source, we can use the addLogicalSource() method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added. ​

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";

boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);

Updating a Logical Source

To update a logical source, we can use the updateLogicalSource() method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";

boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);

Deleting a Logical Source

We can remove a logical source from a Coordinator using the deleteLogicalSource method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream() method:

String sourceName = "TestLogicalSource";

boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);

Obtaining Physical Source Catalog

To get all physical sources of a logical source as an ArrayList, we can use the getPhysicalSources() method. The method expects a logical source name as a parameter.

ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");