Java Client

The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.

Obtaining the Java Client

The Java client is available as a Java library. The latest version can be obtained as follows:

implementation 'stream.nebula:nebulastream-java-client:0.0.50'
<dependency>
    <groupId>stream.nebula</groupId>
    <artifactId>nebulastream-java-client</artifactId>
    <version>0.0.50</version>
</dependency>

Connecting to the Coordinator

Before performing any operation, we need to instantiate a NebulaStreamRuntime object connect it to a running NebulaStream coordinator. We can create the connection as follows.

NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost")
               .setPort("8081");

Submitting a Query

Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will write the result stream to a CSV file.

String logicalSource = "default_logical";

Query query = new Query().from(logicalSource)
                .sink(new FileSink("out.csv", "CSV", "APPEND"));

int queryId = nebulaStreamRuntime.executeQuery(query.generateCppCode(), "BottomUp");

The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster. ​

Query API

Source Operator

    ...
    ..from(<logicalSourceName>)
    ...

Filter Operator

    ...
    .filter(Predicate.Attribute(<fieldname>).<predicate>)
    ...

Currently, the Java client supports the following expressions of predicate ​

  • .equal(intValue)
  • .greaterThan(intValue)
  • .greaterThanOrEqual(intValue)
  • .lessThan(intValue)
  • .lessThanOrEqual(intValue)

The Java Client allows for multiple expressions to be combined as a predicate using the and and the or operator. For example:

    ...
    .filter(Predicate.Attribute("key").greaterThan(0)
        .and(Predicate.Attribute("key").lessThan(10)))
    ...

Map Operator

The map operator accepts two parameters, i.e., a field name and a map expression.

    ...
    .map(<fieldname>, Expression.<expression>())
    ...

​ Currently, the Java client support the following map expressions

  • .constant(Intvalue)
  • .add(<fieldname1>,<fieldname2>)
  • .substract(<fieldname1>,<fieldname2>)
  • .multiply(<fieldname1>,<fieldname2>)
  • .divide(<fieldname1>,<fieldname2>)

Following is an example of a query with a map operator:

   Query mapQuery = new Query().from("defaultLogical")
            .map("total", Expression.add("field_1","field_2"));

Window Operator

The Java client supports two types of windows, i.e., tumbling windows and sliding windows. For each type, we can set the time measures and aggregation function to use.



    // tumbling window
    ...
    .window(TumblingWindow.of(new EventTime(<field name>),
                              TimeMeasure.<time measure>(<window size>)))
    .apply(Aggregation.<aggregation function>(<field name>));
    ...


    // sliding window
    ...
    .window(SlidingWindow.of(new EventTime(<field name>),
                             TimeMeasure.<time measure>(<window size>),
                             TimeMeasure.seconds(<window slide>)))
    .apply(Aggregation.<aggregation function>(<field name>));
    ...

​ The available time measures are: ​

  • TimeMeasure.milliseconds(intValue)
  • TimeMeasure.seconds(intValue)
  • TimeMeasure.minutes(intValue)
  • TimeMeasure.hours(intValue)

The Java client supports the following aggregation function

  • sum(<field name>)

​ Furthermore, we can write the aggregation result to a new field using the as() operator:

    ...
    .apply(Aggregation.<aggregation function>(<field name>)
        .as(<target field name>);
    ...

​ The Java client also supports submission of queries with a window-by-key operator:

    .window(TumblingWindow.of(new EventTime(<field name>),
                        TimeMeasure.<time measure>(<window size>))
                        .byKey("id"))
    .apply(Aggregation.<aggregation function>(<field name>));

Join Operator

The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition. Following is an example of a join operator: ​

Query q1 = new Query().from("bus");
Query q2 = new Query().from("metro");

q1.joinWith(q2)
    .where("station_id")
    .equalsTo("station_id")
    .window(TumblingWindow.of(new EventTime("timestamp"),
    TimeMeasure.seconds(10)));

​ The above example creates joins q1 and q2 on a condition where the station_id in the bus stream is the same as the station_id in the metro stream and both fall within the same 10 seconds tumbling window. ​

Union Operator

The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter. Following is an example of a query with a union operator:

Query q1 = new Query().from("bus");
Query q2 = new Query().from("metro");

// union q1 and q2
q1.unionWith(q2);

Sink Operator

The Java Client allows queries to use the following sink oparators:

  • .sink(new PrintSink()) - write the resulting stream to Coordinator’s console
  • .sink(new ZMQSink("localhost", 1234)); - forward the resulting stream to a ZMQ instance
  • .sink(new FileSink("/tmp/out.csv", "CSV", "OVERRIDE")); - write the resulting stream to a CSV file in the Coordinator. ​ ​

Query Management

The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance. ​

Query Catalog

The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList, we can use the following method.

ArrayList<QueryCatalogEntry> catalog =
            nebulaStreamRuntime.getAllRegisteredQueries();

​ We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:

ArrayList<QueryCatalogEntry> catalogRegistered =
        nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");

Getting the Query Plan

The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.

Graph<LogicalQuery, DefaultEdge> queryPlan =
            nebulaStreamRuntime.getQueryPlan(<query id>);

Getting the Execution Plan

To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:

Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
            nebulaStreamRuntime.getExecutionPlan(<query id>);

Deleting a Query

To stop a query, we can use the stopQuery() method. The method returns true if the query was successfully marked for stop.

boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);

⚠️ This method only schedules the request to stop the query.

Source Management

The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.

Retrieving Logical Source Catalog

To get the logical source catalog as a List<String>, we can use the following method.

List<String> sourceCatalog =
            nebulaStreamRuntime.getLogicalSources();

Adding a Logical Source

To add a logical source, we can use the addLogicalSource() method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added. ​

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";

boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);

Updating a Logical Source

To update a logical source, we can use the updateLogicalSource() method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.

String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";

boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);

Deleting a Logical Source

We can remove a logical source from a Coordinator using the deleteLogicalSource method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream() method:

String sourceName = "TestLogicalSource";

boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);

Obtaining Physical Source Catalog

To get all physical sources of a logical source as an ArrayList, we can use the getPhysicalSources() method. The method expects a logical source name as a parameter.

ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");