The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.
Obtaining the Java Client
The Java client is available as a Java library. The latest version can be obtained as follows:
<dependency> <groupId>stream.nebula</groupId> <artifactId>nebulastream-java-client</artifactId> <version>0.0.50</version> </dependency>
Connecting to the Coordinator
Before performing any operation, we need to instantiate a
NebulaStreamRuntime object connect it to a running NebulaStream coordinator. We can create the connection as follows.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime(); nebulaStreamRuntime.getConfig().setHost("localhost") .setPort("8081");
Submitting a Query
Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will write the result stream to a CSV file.
String logicalSource = "default_logical"; Query query = new Query().from(logicalSource) .sink(new FileSink("out.csv", "CSV", "APPEND")); int queryId = nebulaStreamRuntime.executeQuery(query.generateCppCode(), "BottomUp");
The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.
... ..from(<logicalSourceName>) ...
... .filter(Predicate.Attribute(<fieldname>).<predicate>) ...
Currently, the Java client supports the following expressions of predicate
The Java Client allows for multiple expressions to be combined as a predicate using the
and and the
or operator. For example:
... .filter(Predicate.Attribute("key").greaterThan(0) .and(Predicate.Attribute("key").lessThan(10))) ...
The map operator accepts two parameters, i.e., a field name and a map expression.
... .map(<fieldname>, Expression.<expression>()) ...
Currently, the Java client support the following map expressions
Following is an example of a query with a map operator:
Query mapQuery = new Query().from("defaultLogical") .map("total", Expression.add("field_1","field_2"));
The Java client supports two types of windows, i.e., tumbling windows and sliding windows. For each type, we can set the time measures and aggregation function to use.
// tumbling window ... .window(TumblingWindow.of(new EventTime(<field name>), TimeMeasure.<time measure>(<window size>))) .apply(Aggregation.<aggregation function>(<field name>)); ... // sliding window ... .window(SlidingWindow.of(new EventTime(<field name>), TimeMeasure.<time measure>(<window size>), TimeMeasure.seconds(<window slide>))) .apply(Aggregation.<aggregation function>(<field name>)); ...
The available time measures are:
The Java client supports the following aggregation function
Furthermore, we can write the aggregation result to a new field using the
... .apply(Aggregation.<aggregation function>(<field name>) .as(<target field name>); ...
The Java client also supports submission of queries with a window-by-key operator:
.window(TumblingWindow.of(new EventTime(<field name>), TimeMeasure.<time measure>(<window size>)) .byKey("id")) .apply(Aggregation.<aggregation function>(<field name>));
The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition. Following is an example of a join operator:
Query q1 = new Query().from("bus"); Query q2 = new Query().from("metro"); q1.joinWith(q2) .where("station_id") .equalsTo("station_id") .window(TumblingWindow.of(new EventTime("timestamp"), TimeMeasure.seconds(10)));
The above example creates joins q1 and q2 on a condition where the
station_id in the
bus stream is the same as the
station_id in the
metro stream and both fall within the same 10 seconds tumbling window.
The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter. Following is an example of a query with a union operator:
Query q1 = new Query().from("bus"); Query q2 = new Query().from("metro"); // union q1 and q2 q1.unionWith(q2);
The Java Client allows queries to use the following sink oparators:
.sink(new PrintSink())- write the resulting stream to Coordinator’s console
.sink(new ZMQSink("localhost", 1234));- forward the resulting stream to a ZMQ instance
.sink(new FileSink("/tmp/out.csv", "CSV", "OVERRIDE"));- write the resulting stream to a CSV file in the Coordinator.
The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.
The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList
ArrayList<QueryCatalogEntry> catalog = nebulaStreamRuntime.getAllRegisteredQueries();
We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:
ArrayList<QueryCatalogEntry> catalogRegistered = nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");
Getting the Query Plan
The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.
Graph<LogicalQuery, DefaultEdge> queryPlan = nebulaStreamRuntime.getQueryPlan(<query id>);
Getting the Execution Plan
To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:
Graph<ExecutionNode, ExecutionLink> executionPlanGraph = nebulaStreamRuntime.getExecutionPlan(<query id>);
Deleting a Query
To stop a query, we can use the
stopQuery() method. The method returns true if the query was successfully marked for stop.
boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);
⚠️ This method only schedules the request to stop the query.
The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.
Retrieving Logical Source Catalog
To get the logical source catalog as a
List<String>, we can use the following method.
List<String> sourceCatalog = nebulaStreamRuntime.getLogicalSources();
Adding a Logical Source
To add a logical source, we can use the
addLogicalSource() method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added.
String sourceName = "TestLogicalSource"; String schema = "Schema::create()->addField(\"test\",INT32);"; boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);
Updating a Logical Source
To update a logical source, we can use the
updateLogicalSource() method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.
String sourceName = "TestLogicalSource"; String schema = "Schema::create()->addField(\"anothertest\",INT32);"; boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);
Deleting a Logical Source
We can remove a logical source from a Coordinator using the
deleteLogicalSource method. The method accepts a source name to be deleted as a parameter. Following is an example of using the
String sourceName = "TestLogicalSource"; boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);
Obtaining Physical Source Catalog
To get all physical sources of a logical source as an ArrayList
getPhysicalSources() method. The method expects a logical source name as a parameter.
ArrayList<String> physicalStreamOfDefaultLogical = nebulaStreamRuntime.getPhysicalSources("default_logical");