Java Client
The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.
Obtaining the Java Client
The Java client is available as a Java library. The latest version can be obtained as follows:
implementation 'stream.nebula:nebulastream-java-client:0.0.67'
<dependency>
<groupId>stream.nebula</groupId>
<artifactId>nebulastream-java-client</artifactId>
<version>0.0.60</version>
</dependency>
Connecting to the Coordinator
Before performing any operation, we need to instantiate a NebulaStreamRuntime
object connect it to a running NebulaStream coordinator. We can create the connection as follows.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost")
.setPort("8081");
Query API
Creating a query
You can create a query by reading a from a particular named source through the NebulaStream runtime.
Query query = nebulaStreamRuntime.readFromSource("logical_source");
The code below assumes that we already have created a query on which we perform further operations.
Accessing attributes
Many operations in the query API operate on stream attributes.
To access an attribute, use the (static) method Expressions.attribute(String)
.
To make the code less verbose, you can statically import the method:
import static stream.nebula.expression.Expressions.attribute;
The examples below assume that the method has been statically imported and leave out the name of the Expressions
class.
Chaining operations
All operations of the query API return the query object which allows the chaining of multiple operations in a fluid syntax.
The only exception is the Sink Operator, which returns a Sink
object.
⚠️ At the moment, the following operations are equivalent.
# Chain two filter operations on a query.
Query q1 = nebulaStreamRuntime.readFromSource("logical_source")
.filter(attribute("a"))
.filter(attribute("b"));
# Perform two filter operations on the same query object.
Query q2 = nebulaStreamRuntime.readFromSource("logical_source");
q2.filter(attribute("a"));
q2.filter(attribute("b"));
In other words, the operations of the query API mutate query object on which they are called. This will likely change in the future.
Filter Operator
The filter operator accepts a predicate which can be evaluated to a boolean value. This predicate can be:
- a single attribute,
- the result of a comparison (
lessThan
,lessThanOrEqual
,greaterThan
,greaterThanOrEqual
,equalTo
,notEqualTo
), - the result of a boolean operation (
and
,or
,not
), - an arbitrary combination of the above.
Examples
Filter the stream to contain only the records for which the attribute my_attribute
is true.
query.filter(attribute("my_attribute"));
Filter the stream by comparing an attribute against a literal value.
query.filter(attribute("my_attribute").lessThan(5));
Filter the stream by comparing two attributes.
query.filter(attribute("attribute_1").greaterThan(attribute("attribute_2")));
Filter the stream by combining two attributes.
query.filter(attribute("boolean_attribute").and(attribute("numeric_attribute").lessThan(1)));
Projection Operator
The filter operator accepts a variable number of attributes which are copied to the output stream. Each attribute can also be renamed.
💡 Examples
Project two attributes.
query.project(attribute("attribute_1"), attribute("attribute_2));
Project a single attribute and rename it.
query.project(attribute("attribute_1").as("new_attribute_name"));
Map Operator
The map operator accepts two parameters. The first parameter is a attribute name, which can refer to an existing attribute in the stream or a new attribute. The second parameter is a complex expression over the existing stream attributes. The operator stores the result of the expression in the specified attribute.
The complex expression can contain arithmetic operations, arithmetic and boolean comparisons, and boolean functions. Refer to the Expression API for details.
Example
query.map("total", attribute("field_1").add(attribute("field_2")));
Window Operator
The window operator is started by a window
method, which specifies the type and size of window, followed by an optional keyBy
method, which specifies one or more attributes on which the window is grouped, followed by an apply
method, which specifies one or more aggregation functions.
NebulaStream supports both tumbling and sliding windows.
The size of windows can be specified in milliseconds, seconds, minutes, or hours.
The time is always an explicit event time which is encoded as an attribute in the stream.
The supported aggregation functions are min
, max
, count
, sum
, average
, and median
.
The aggregated attribute can be renamed.
Examples
Create a tumbling window of 10 seconds, using the stream attribute timestamp
, and compute the sum of the attribute value
for each window.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10))
.apply(Aggregation.sum("value"));
Create a tumbling window of 1 minute, using a slide of 30 seconds, group the window by the attributes key1
and key2
, and compute the min and max for each window.
query.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.minutes(1),
TimeMeasure.seconds(30))
.keyBy("key1", "key2")
.apply(Aggregation.min("value"), Aggregation.max("value"));
💡 Multiple aggregation functions require the experimental THREAD_LOCAL
window aggregation strategy.
Rename the result of an average aggregation.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10))
.apply(Aggregation.average("value").as("mean_value"));
Join Operator
The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition.
Example
Join q1 and q2 on a condition where the station_id
in the bus
stream is the same as the station_id
in the metro
stream and both fall within the same 10 seconds tumbling window.
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");
q1.joinWith(q2)
.where("station_id")
.equalsTo("station_id")
.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10)));
Union Operator
The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter.
Example
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");
q1.unionWith(q2);
TensorFlow inference
The inferModel operator executes a TensorFlow Lite model for each tuple in the input stream and appends the result to the tuple in the output stream.
💡 At the moment, the data type of the TensorFlow Lite model output has to be explicitly specified and it has to be FLOAT32
.
💡 At the moment, the name of the TensorFlow lite model has to be a valid path on the coordinator and the worker.
Example
The following example loads a TensorFlow Lite model from a classpath resource and applies it to the fields f1
, f2
, f3
, and f4
of the input stream.
The output of the model is stored in the fields iris0
, iris1
, iris2
, which are appended to the tuples in the output stream.
Query q = nebulaStreamRuntime.readFromSource("iris")
.inferModel(
// TensorFlow model is read from a ResourceStream
Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")),
// Name must be a valid file name on the coordinator and worker
"/tmp/iris_95acc.tflite")
// Input attributes
.on(
attribute("f1"),
attribute("f2"),
attribute("f3"),
attribute("f4"))
// Output attributes with explicit type information
.as(
attribute("iris0", BasicType.FLOAT32),
attribute("iris1", BasicType.FLOAT32),
attribute("iris2", BasicType.FLOAT32))
Sink Operator
The Java Client allows queries to use the following sink oparators:
.sink(new PrintSink())
- write the resulting stream to Coordinator’s console.sink(new ZMQSink("localhost", 1234));
- forward the resulting stream to a ZMQ instance.sink(new FileSink("/tmp/out.csv", "CSV_FORMAT", false));
- write the resulting stream to a CSV file in the Coordinator.
Query Management
The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.
Submitting a Query
Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will append the result stream to a CSV file.
String logicalSource = "default_logical";
Query query = nebulaStreamRuntime.readFromSource(logicalSource)
.sink(new FileSink("out.csv", "CSV_FORMAT", true));
int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");
The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.
Query Catalog
The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList
ArrayList<QueryCatalogEntry> catalog =
nebulaStreamRuntime.getAllRegisteredQueries();
We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:
ArrayList<QueryCatalogEntry> catalogRegistered =
nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");
Getting the Query Plan
The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.
Graph<LogicalQuery, DefaultEdge> queryPlan =
nebulaStreamRuntime.getQueryPlan(<query id>);
Getting the Execution Plan
To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:
Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
nebulaStreamRuntime.getExecutionPlan(<query id>);
Deleting a Query
To stop a query, we can use the stopQuery()
method. The method returns true if the query was successfully marked for stop.
boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);
⚠️ This method only schedules the request to stop the query.
Source Management
The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.
Retrieving Logical Source Catalog
To get the logical source catalog as a List<String>
, we can use the following method.
List<String> sourceCatalog =
nebulaStreamRuntime.getLogicalSources();
Adding a Logical Source
To add a logical source, we can use the addLogicalSource()
method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";
boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);
Updating a Logical Source
To update a logical source, we can use the updateLogicalSource()
method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";
boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);
Deleting a Logical Source
We can remove a logical source from a Coordinator using the deleteLogicalSource
method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream()
method:
String sourceName = "TestLogicalSource";
boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);
Obtaining Physical Source Catalog
To get all physical sources of a logical source as an ArrayListgetPhysicalSources()
method. The method expects a logical source name as a parameter.
ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");