Java Client

The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.

Obtaining the Java Client

The Java client is available as a Java library. The latest version can be obtained as follows:

implementation 'stream.nebula:nebulastream-java-client:0.0.74'
<dependency>
    <groupId>stream.nebula</groupId>
    <artifactId>nebulastream-java-client</artifactId>
    <version>0.0.74</version>
</dependency>

Connecting to the Coordinator

Before performing any operation, we need to instantiate a NebulaStreamRuntime object connect it to a running NebulaStream coordinator. We can create the connection as follows.

NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost").setPort("8081");

Query API

Creating a query

You can create a query by reading a from a particular named source through the NebulaStream runtime.

Query query = nebulaStreamRuntime.readFromSource("logical_source");

The code below assumes that we already have created a query on which we perform further operations.

Accessing attributes

Many operations in the query API operate on stream attributes. To access an attribute, use the (static) method Expressions.attribute(String). To make the code less verbose, you can statically import the method:

import static stream.nebula.expression.Expressions.attribute;

The examples below assume that the method has been statically imported and leave out the name of the Expressions class.

Chaining operations

All operations of the query API return the query object which allows the chaining of multiple operations in a fluid syntax. The only exception is the Sink Operator, which returns a Sink object.

⚠️ At the moment, the following operations are equivalent.

# Chain two filter operations on a query.
Query q1 = nebulaStreamRuntime.readFromSource("logical_source")
    .filter(attribute("a"))
    .filter(attribute("b"));

# Perform two filter operations on the same query object.
Query q2 = nebulaStreamRuntime.readFromSource("logical_source");
q2.filter(attribute("a"));
q2.filter(attribute("b"));

In other words, the operations of the query API mutate query object on which they are called. This will likely change in the future.

Filter Operator

The filter operator accepts a predicate which can be evaluated to a boolean value. This predicate can be:

  • a single attribute,
  • the result of a comparison (lessThan, lessThanOrEqual, greaterThan, greaterThanOrEqual, equalTo, notEqualTo),
  • the result of a boolean operation (and, or, not),
  • an arbitrary combination of the above.

Examples

Filter the stream to contain only the records for which the attribute my_attribute is true.

query.filter(attribute("my_attribute"));

Filter the stream by comparing an attribute against a literal value.

query.filter(attribute("my_attribute").lessThan(5));

Filter the stream by comparing two attributes.

query.filter(attribute("attribute_1").greaterThan(attribute("attribute_2")));

Filter the stream by combining two attributes.

query.filter(attribute("boolean_attribute").and(attribute("numeric_attribute").lessThan(1)));

Projection Operator

The filter operator accepts a variable number of attributes which are copied to the output stream. Each attribute can also be renamed.

💡 Examples

Project two attributes.

query.project(attribute("attribute_1"), attribute("attribute_2));

Project a single attribute and rename it.

query.project(attribute("attribute_1").as("new_attribute_name"));

Map Operator

The map operator accepts two parameters. The first parameter is a attribute name, which can refer to an existing attribute in the stream or a new attribute. The second parameter is a complex expression over the existing stream attributes. The operator stores the result of the expression in the specified attribute, which is appended to the input tuples.

The complex expression can contain arithmetic operations, arithmetic and boolean comparisons, and boolean functions. Refer to the Expression API for details.

Example

query.map("total", attribute("field_1").add(attribute("field_2")));

Map Operator with Java UDFs

It is also possible to pass a Java UDF which implements the MapFunction interface to the map operator. The operator will execute the UDF for every record in the input stream and write the results to the output stream.

💡 This operation is only available through the Java client and has no corresponding operation in C++ client.

Input and output types

The UDF can accept both primitive types (e.g., int, double, String, or boolean) as well as complex types expressed as POJOs. A complex type expressed as a POJO may only fields which are themselves primitive types, i.e., it is not possible to nest complex types, or use arrays or collections.

Input and output schema

The input schema of the Java UDF has to match the output schema of the previous operator. If necessary, an explicit project operator can be used to remove other attributes.

For complex input/output types, the names of the input/output stream match the names of the attributes of the POJO. For primitive input/output types, the name of the attribute in the input stream does not matter. The name of the attribute in the output stream must be specified when invoking the map operator.

The Java UDF map operator replaces the stream with the output of the UDF for each input record. 💡 This is different from the native map operator, which replaces existing fields stream or appends new fields to the stream.

Example: Map UDF with primitive input and output type

Project a stream to a single attribute (which is an INT32) and return a stream with BOOLEAN attribute is_even, indicating if the input is even.

query
.project(attribute("int_attribute"))                     # Project stream to single attribute
.map("is_even", new MapFunction<Integer, Boolean>() {    # Create output stream with single attribute
  public Boolean map(final Integer value) {
     return value % 2 == 0;
  }});

💡 The UDF must be specified as an anonymous inner function and not as a Java lambda.

Example: Map UDF with complex input and output type

The following code defines the complex types CartesianCoordinate and PolarCoordinate, and a UDF CartesianToPolar that converts between both types.

static class CartesianCoordinate {
    double x;
    double y;
}

static class PolarCoordinate {
    double angle;
    double radius;
}

static class CartesianToPolar implements MapFunction<CartesianCoordinate, PolarCoordinate> {
    public PolarCoordinate map(final CartesianCoordinate value) {
        PolarCoordinate output =  new PolarCoordinate();
        output.radius = Math.sqrt(value.x * value.x + value.y * value.y);
        output.angle = Math.atan2(value.x, value.y);
        return output;
    }
}

This UDF can then be used without specifying the names of the attribute in the output stream.

query
.project(attribute("x"), attribute("y"))   # Project stream to required attributes
.map(new CartesianToPolar());              # Call UDF to transform stream

Window Operator

The window operator is started by a window method, which specifies the type and size of window, followed by an optional keyBy method, which specifies one or more attributes on which the window is grouped, followed by an apply method, which specifies one or more aggregation functions.

NebulaStream supports both tumbling and sliding windows. The size of windows can be specified in milliseconds, seconds, minutes, or hours. The time is always an explicit event time which is encoded as an attribute in the stream. The supported aggregation functions are min, max, count, sum, average, and median. The aggregated attribute can be renamed.

Examples

Create a tumbling window of 10 seconds, using the stream attribute timestamp, and compute the sum of the attribute value for each window.

query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                               TimeMeasure.seconds(10))
     .apply(Aggregation.sum("value"));

Create a tumbling window of 1 minute, using a slide of 30 seconds, group the window by the attributes key1 and key2, and compute the min and max for each window.

query.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
                              TimeMeasure.minutes(1),
                              TimeMeasure.seconds(30))
     .keyBy("key1", "key2")
     .apply(Aggregation.min("value"), Aggregation.max("value"));

💡 Multiple aggregation functions require the experimental THREAD_LOCAL window aggregation strategy.

Rename the result of an average aggregation.

query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                               TimeMeasure.seconds(10))
     .apply(Aggregation.average("value").as("mean_value"));

Join Operator

The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition.

Example

Join q1 and q2 on a condition where the station_id in the bus stream is the same as the station_id in the metro stream and both fall within the same 10 seconds tumbling window.

NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");

q1.joinWith(q2)
    .where("station_id")
    .equalsTo("station_id")
    .window(TumblingWindow.of(EventTime.eventTime("timestamp"),
                              TimeMeasure.seconds(10)));

Union Operator

The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter.

Example

NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");

q1.unionWith(q2);

TensorFlow inference

The inferModel operator executes a TensorFlow Lite model for each tuple in the input stream and appends the result to the tuple in the output stream.

💡 At the moment, the data type of the TensorFlow Lite model output has to be explicitly specified and it has to be FLOAT32.

💡 At the moment, the name of the TensorFlow lite model has to be a valid path on the coordinator and the worker.

Example

The following example loads a TensorFlow Lite model from a classpath resource and applies it to the fields f1, f2, f3, and f4 of the input stream. The output of the model is stored in the fields iris0, iris1, iris2, which are appended to the tuples in the output stream.

Query q = nebulaStreamRuntime.readFromSource("iris")
  .inferModel(
    // TensorFlow model is read from a ResourceStream
    Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")),
    // Name must be a valid file name on the coordinator and worker
    "/tmp/iris_95acc.tflite")
  // Input attributes
  .on(
    attribute("f1"),
    attribute("f2"),
    attribute("f3"),
    attribute("f4"))
  // Output attributes with explicit type information
  .as(
    attribute("iris0", BasicType.FLOAT32),
    attribute("iris1", BasicType.FLOAT32),
    attribute("iris2", BasicType.FLOAT32))

Sink Operator

The Java Client allows queries to use the following sink oparators:

  • .sink(new PrintSink()) - write the resulting stream to Coordinator’s console
  • .sink(new ZMQSink("localhost", 1234)); - forward the resulting stream to a ZMQ instance
  • .sink(new FileSink("/tmp/out.csv", "CSV_FORMAT", false)); - write the resulting stream to a CSV file in the Coordinator.

Query Management

The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.

Submitting a Query

Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will append the result stream to a CSV file.

String logicalSource = "default_logical";

Query query = nebulaStreamRuntime.readFromSource(logicalSource)
                .sink(new FileSink("out.csv", "CSV_FORMAT", true));

int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");

The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.

Query Catalog

The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList, we can use the following method.

ArrayList<QueryCatalogEntry> catalog =
            nebulaStreamRuntime.getAllRegisteredQueries();

​ We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:

ArrayList<QueryCatalogEntry> catalogRegistered =
        nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");

Getting the Query Plan

The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.

Graph<LogicalQuery, DefaultEdge> queryPlan =
            nebulaStreamRuntime.getQueryPlan(<query id>);

Getting the Execution Plan

To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:

Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
            nebulaStreamRuntime.getExecutionPlan(<query id>);

Deleting a Query

To stop a query, we can use the stopQuery() method. The method returns true if the query was successfully marked for stop.

boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);

⚠️ This method only schedules the request to stop the query.

Source Management

The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.

Retrieving Logical Source Catalog

To get the logical source catalog as a List<String>, we can use the following method.

List<String> sourceCatalog =
            nebulaStreamRuntime.getLogicalSources();

Adding a Logical Source

To add a logical source, we can use the addLogicalSource() method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added. ​

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";

boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);

Updating a Logical Source

To update a logical source, we can use the updateLogicalSource() method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";

boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);

Deleting a Logical Source

We can remove a logical source from a Coordinator using the deleteLogicalSource method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream() method:

String sourceName = "TestLogicalSource";

boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);

Obtaining Physical Source Catalog

To get all physical sources of a logical source as an ArrayList, we can use the getPhysicalSources() method. The method expects a logical source name as a parameter.

ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");