The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.
Obtaining the Java Client
The Java client is available as a Java library. The latest version can be obtained as follows:
<dependency> <groupId>stream.nebula</groupId> <artifactId>nebulastream-java-client</artifactId> <version>0.0.60</version> </dependency>
Connecting to the Coordinator
Before performing any operation, we need to instantiate a
NebulaStreamRuntime object connect it to a running NebulaStream coordinator. We can create the connection as follows.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime(); nebulaStreamRuntime.getConfig().setHost("localhost") .setPort("8081");
Creating a query
You can create a query by reading a from a particular named source through the NebulaStream runtime.
Query query = nebulaStreamRuntime.readFromSource("logical_source");
The code below assumes that we already have created a query on which we perform further operations.
Many operations in the query API operate on stream attributes.
To access an attribute, use the (static) method
To make the code less verbose, you can statically import the method:
import static stream.nebula.expression.Expressions.attribute;
The examples below assume that the method has been statically imported and leave out the name of the
All operations of the query API return the query object which allows the chaining of multiple operations in a fluid syntax.
The only exception is the Sink Operator, which returns a
⚠️ At the moment, the following operations are equivalent.
# Chain two filter operations on a query. Query q1 = nebulaStreamRuntime.readFromSource("logical_source") .filter(attribute("a")) .filter(attribute("b")); # Perform two filter operations on the same query object. Query q2 = nebulaStreamRuntime.readFromSource("logical_source"); q2.filter(attribute("a")); q2.filter(attribute("b"));
In other words, the operations of the query API mutate query object on which they are called. This will likely change in the future.
The filter operator accepts a predicate which can be evaluated to a boolean value. This predicate can be:
- a single attribute,
- the result of a comparison (
- the result of a boolean operation (
- an arbitrary combination of the above.
Filter the stream to contain only the records for which the attribute
my_attribute is true.
Filter the stream by comparing an attribute against a literal value.
Filter the stream by comparing two attributes.
Filter the stream by combining two attributes.
The filter operator accepts a variable number of attributes which are copied to the output stream. Each attribute can also be renamed.
Project two attributes.
Project a single attribute and rename it.
The map operator accepts two parameters. The first parameter is a attribute name, which can refer to an existing attribute in the stream or a new attribute. The second parameter is a complex expression over the existing stream attributes. The operator stores the result of the expression in the specified attribute.
The complex expression can contain arithmetic operations, arithmetic and boolean comparisons, and boolean functions. Refer to the Expression API for details.
The window operator is started by a
window method, which specifies the type and size of window, followed by an optional
keyBy method, which specifies one or more attributes on which the window is grouped, followed by an
apply method, which specifies one or more aggregation functions.
NebulaStream supports both tumbling and sliding windows.
The size of windows can be specified in milliseconds, seconds, minutes, or hours.
The time is always an explicit event time which is encoded as an attribute in the stream.
The supported aggregation functions are
The aggregated attribute can be renamed.
Create a tumbling window of 10 seconds, using the stream attribute
timestamp, and compute the sum of the attribute
value for each window.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.seconds(10)) .apply(Aggregation.sum("value"));
Create a tumbling window of 1 minute, using a slide of 30 seconds, group the window by the attributes
key2, and compute the min and max for each window.
query.window(SlidingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.minutes(1), TimeMeasure.seconds(30)) .keyBy("key1", "key2") .apply(Aggregation.min("value"), Aggregation.max("value"));
💡 Multiple aggregation functions require the experimental
THREAD_LOCAL window aggregation strategy.
Rename the result of an average aggregation.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.seconds(10)) .apply(Aggregation.average("value").as("mean_value"));
The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition.
Join q1 and q2 on a condition where the
station_id in the
bus stream is the same as the
station_id in the
metro stream and both fall within the same 10 seconds tumbling window.
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance(); Query q1 = runtime.readFromSource("bus"); Query q2 = runtime.readFromSource("metro"); q1.joinWith(q2) .where("station_id") .equalsTo("station_id") .window(TumblingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.seconds(10)));
The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter.
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance(); Query q1 = runtime.readFromSource("bus"); Query q2 = runtime.readFromSource("metro"); q1.unionWith(q2);
The inferModel operator executes a TensorFlow Lite model for each tuple in the input stream and appends the result to the tuple in the output stream.
💡 At the moment, the data type of the TensorFlow Lite model output has to be explicitly specified and it has to be
💡 At the moment, the name of the TensorFlow lite model has to be a valid path on the coordinator and the worker.
The following example loads a TensorFlow Lite model from a classpath resource and applies it to the fields
f4 of the input stream.
The output of the model is stored in the fields
iris2, which are appended to the tuples in the output stream.
Query q = nebulaStreamRuntime.readFromSource("iris") .inferModel( // TensorFlow model is read from a ResourceStream Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")), // Name must be a valid file name on the coordinator and worker "/tmp/iris_95acc.tflite") // Input attributes .on( attribute("f1"), attribute("f2"), attribute("f3"), attribute("f4")) // Output attributes with explicit type information .as( attribute("iris0", BasicType.FLOAT32), attribute("iris1", BasicType.FLOAT32), attribute("iris2", BasicType.FLOAT32))
The Java Client allows queries to use the following sink oparators:
.sink(new PrintSink())- write the resulting stream to Coordinator’s console
.sink(new ZMQSink("localhost", 1234));- forward the resulting stream to a ZMQ instance
.sink(new FileSink("/tmp/out.csv", "CSV_FORMAT", false));- write the resulting stream to a CSV file in the Coordinator.
The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.
Submitting a Query
Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will append the result stream to a CSV file.
String logicalSource = "default_logical"; Query query = nebulaStreamRuntime.readFromSource(logicalSource) .sink(new FileSink("out.csv", "CSV_FORMAT", true)); int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");
The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.
The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList
ArrayList<QueryCatalogEntry> catalog = nebulaStreamRuntime.getAllRegisteredQueries();
We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:
ArrayList<QueryCatalogEntry> catalogRegistered = nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");
Getting the Query Plan
The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.
Graph<LogicalQuery, DefaultEdge> queryPlan = nebulaStreamRuntime.getQueryPlan(<query id>);
Getting the Execution Plan
To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:
Graph<ExecutionNode, ExecutionLink> executionPlanGraph = nebulaStreamRuntime.getExecutionPlan(<query id>);
Deleting a Query
To stop a query, we can use the
stopQuery() method. The method returns true if the query was successfully marked for stop.
boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);
⚠️ This method only schedules the request to stop the query.
The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.
Retrieving Logical Source Catalog
To get the logical source catalog as a
List<String>, we can use the following method.
List<String> sourceCatalog = nebulaStreamRuntime.getLogicalSources();
Adding a Logical Source
To add a logical source, we can use the
addLogicalSource() method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added.
String sourceName = "TestLogicalSource"; String schema = "Schema::create()->addField(\"test\",INT32);"; boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);
Updating a Logical Source
To update a logical source, we can use the
updateLogicalSource() method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.
String sourceName = "TestLogicalSource"; String schema = "Schema::create()->addField(\"anothertest\",INT32);"; boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);
Deleting a Logical Source
We can remove a logical source from a Coordinator using the
deleteLogicalSource method. The method accepts a source name to be deleted as a parameter. Following is an example of using the
String sourceName = "TestLogicalSource"; boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);
Obtaining Physical Source Catalog
To get all physical sources of a logical source as an ArrayList
getPhysicalSources() method. The method expects a logical source name as a parameter.
ArrayList<String> physicalStreamOfDefaultLogical = nebulaStreamRuntime.getPhysicalSources("default_logical");