Java Client
The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.
On this page
Obtaining the Java Client
The Java client is available as a Java library. The latest version can be obtained as follows:
implementation 'stream.nebula:nebulastream-java-client:0.0.50'
<dependency>
<groupId>stream.nebula</groupId>
<artifactId>nebulastream-java-client</artifactId>
<version>0.0.50</version>
</dependency>
Connecting to the Coordinator
Before performing any operation, we need to instantiate a NebulaStreamRuntime
object connect it to a running NebulaStream coordinator. We can create the connection as follows.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost")
.setPort("8081");
Submitting a Query
Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will write the result stream to a CSV file.
String logicalSource = "default_logical";
Query query = new Query().from(logicalSource)
.sink(new FileSink("out.csv", "CSV", "APPEND"));
int queryId = nebulaStreamRuntime.executeQuery(query.generateCppCode(), "BottomUp");
The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.
Query API
Source Operator
...
..from(<logicalSourceName>)
...
Filter Operator
...
.filter(Predicate.Attribute(<fieldname>).<predicate>)
...
Currently, the Java client supports the following expressions of predicate
.equal(intValue)
.greaterThan(intValue)
.greaterThanOrEqual(intValue)
.lessThan(intValue)
.lessThanOrEqual(intValue)
The Java Client allows for multiple expressions to be combined as a predicate using the and
and the or
operator. For example:
...
.filter(Predicate.Attribute("key").greaterThan(0)
.and(Predicate.Attribute("key").lessThan(10)))
...
Map Operator
The map operator accepts two parameters, i.e., a field name and a map expression.
...
.map(<fieldname>, Expression.<expression>())
...
Currently, the Java client support the following map expressions
.constant(Intvalue)
.add(<fieldname1>,<fieldname2>)
.substract(<fieldname1>,<fieldname2>)
.multiply(<fieldname1>,<fieldname2>)
.divide(<fieldname1>,<fieldname2>)
Following is an example of a query with a map operator:
Query mapQuery = new Query().from("defaultLogical")
.map("total", Expression.add("field_1","field_2"));
Window Operator
The Java client supports two types of windows, i.e., tumbling windows and sliding windows. For each type, we can set the time measures and aggregation function to use.
// tumbling window
...
.window(TumblingWindow.of(new EventTime(<field name>),
TimeMeasure.<time measure>(<window size>)))
.apply(Aggregation.<aggregation function>(<field name>));
...
// sliding window
...
.window(SlidingWindow.of(new EventTime(<field name>),
TimeMeasure.<time measure>(<window size>),
TimeMeasure.seconds(<window slide>)))
.apply(Aggregation.<aggregation function>(<field name>));
...
The available time measures are:
TimeMeasure.milliseconds(intValue)
TimeMeasure.seconds(intValue)
TimeMeasure.minutes(intValue)
TimeMeasure.hours(intValue)
The Java client supports the following aggregation function
sum(<field name>)
Furthermore, we can write the aggregation result to a new field using the as()
operator:
...
.apply(Aggregation.<aggregation function>(<field name>)
.as(<target field name>);
...
The Java client also supports submission of queries with a window-by-key operator:
.window(TumblingWindow.of(new EventTime(<field name>),
TimeMeasure.<time measure>(<window size>))
.byKey("id"))
.apply(Aggregation.<aggregation function>(<field name>));
Join Operator
The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition. Following is an example of a join operator:
Query q1 = new Query().from("bus");
Query q2 = new Query().from("metro");
q1.joinWith(q2)
.where("station_id")
.equalsTo("station_id")
.window(TumblingWindow.of(new EventTime("timestamp"),
TimeMeasure.seconds(10)));
The above example creates joins q1 and q2 on a condition where the station_id
in the bus
stream is the same as the station_id
in the metro
stream and both fall within the same 10 seconds tumbling window.
Union Operator
The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter. Following is an example of a query with a union operator:
Query q1 = new Query().from("bus");
Query q2 = new Query().from("metro");
// union q1 and q2
q1.unionWith(q2);
Sink Operator
The Java Client allows queries to use the following sink oparators:
.sink(new PrintSink())
- write the resulting stream to Coordinator’s console.sink(new ZMQSink("localhost", 1234));
- forward the resulting stream to a ZMQ instance.sink(new FileSink("/tmp/out.csv", "CSV", "OVERRIDE"));
- write the resulting stream to a CSV file in the Coordinator.
Query Management
The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.
Query Catalog
The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList
ArrayList<QueryCatalogEntry> catalog =
nebulaStreamRuntime.getAllRegisteredQueries();
We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:
ArrayList<QueryCatalogEntry> catalogRegistered =
nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");
Getting the Query Plan
The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.
Graph<LogicalQuery, DefaultEdge> queryPlan =
nebulaStreamRuntime.getQueryPlan(<query id>);
Getting the Execution Plan
To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:
Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
nebulaStreamRuntime.getExecutionPlan(<query id>);
Deleting a Query
To stop a query, we can use the stopQuery()
method. The method returns true if the query was successfully marked for stop.
boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);
⚠️ This method only schedules the request to stop the query.
Source Management
The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.
Retrieving Logical Source Catalog
To get the logical source catalog as a List<String>
, we can use the following method.
List<String> sourceCatalog =
nebulaStreamRuntime.getLogicalSources();
Adding a Logical Source
To add a logical source, we can use the addLogicalSource()
method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";
boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);
Updating a Logical Source
To update a logical source, we can use the updateLogicalSource()
method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";
boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);
Deleting a Logical Source
We can remove a logical source from a Coordinator using the deleteLogicalSource
method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream()
method:
String sourceName = "TestLogicalSource";
boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);
Obtaining Physical Source Catalog
To get all physical sources of a logical source as an ArrayListgetPhysicalSources()
method. The method expects a logical source name as a parameter.
ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");