Java Client
The Java client is a Java library that provides an interface for Java applications to interact with the NebulaStream coordinator. It offers functionalities to submit a query and manage running queries, sources, and the topology.
Obtaining the Java Client
The Java client is available as a Java library. The latest version can be obtained as follows:
implementation 'stream.nebula:nebulastream-java-client:0.0.74'
<dependency>
<groupId>stream.nebula</groupId>
<artifactId>nebulastream-java-client</artifactId>
<version>0.0.74</version>
</dependency>
Connecting to the Coordinator
Before performing any operation, we need to instantiate a NebulaStreamRuntime
object connect it to a running NebulaStream coordinator. We can create the connection as follows.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost").setPort("8081");
Query API
Creating a query
You can create a query by reading a from a particular named source through the NebulaStream runtime.
Query query = nebulaStreamRuntime.readFromSource("logical_source");
The code below assumes that we already have created a query on which we perform further operations.
Accessing attributes
Many operations in the query API operate on stream attributes.
To access an attribute, use the (static) method Expressions.attribute(String)
.
To make the code less verbose, you can statically import the method:
import static stream.nebula.expression.Expressions.attribute;
The examples below assume that the method has been statically imported and leave out the name of the Expressions
class.
Chaining operations
All operations of the query API return the query object which allows the chaining of multiple operations in a fluid syntax.
The only exception is the Sink Operator, which returns a Sink
object.
⚠️ At the moment, the following operations are equivalent.
# Chain two filter operations on a query.
Query q1 = nebulaStreamRuntime.readFromSource("logical_source")
.filter(attribute("a"))
.filter(attribute("b"));
# Perform two filter operations on the same query object.
Query q2 = nebulaStreamRuntime.readFromSource("logical_source");
q2.filter(attribute("a"));
q2.filter(attribute("b"));
In other words, the operations of the query API mutate query object on which they are called. This will likely change in the future.
Filter Operator
The filter operator accepts a predicate which can be evaluated to a boolean value. This predicate can be:
- a single attribute,
- the result of a comparison (
lessThan
,lessThanOrEqual
,greaterThan
,greaterThanOrEqual
,equalTo
,notEqualTo
), - the result of a boolean operation (
and
,or
,not
), - an arbitrary combination of the above.
Examples
Filter the stream to contain only the records for which the attribute my_attribute
is true.
query.filter(attribute("my_attribute"));
Filter the stream by comparing an attribute against a literal value.
query.filter(attribute("my_attribute").lessThan(5));
Filter the stream by comparing two attributes.
query.filter(attribute("attribute_1").greaterThan(attribute("attribute_2")));
Filter the stream by combining two attributes.
query.filter(attribute("boolean_attribute").and(attribute("numeric_attribute").lessThan(1)));
Projection Operator
The filter operator accepts a variable number of attributes which are copied to the output stream. Each attribute can also be renamed.
💡 Examples
Project two attributes.
query.project(attribute("attribute_1"), attribute("attribute_2));
Project a single attribute and rename it.
query.project(attribute("attribute_1").as("new_attribute_name"));
Map Operator
The map operator accepts two parameters. The first parameter is a attribute name, which can refer to an existing attribute in the stream or a new attribute. The second parameter is a complex expression over the existing stream attributes. The operator stores the result of the expression in the specified attribute, which is appended to the input tuples.
The complex expression can contain arithmetic operations, arithmetic and boolean comparisons, and boolean functions. Refer to the Expression API for details.
Example
query.map("total", attribute("field_1").add(attribute("field_2")));
Map Operator with Java UDFs
It is also possible to pass a Java UDF which implements the MapFunction
interface to the map operator.
The operator will execute the UDF for every record in the input stream and write the results to the output stream.
💡 This operation is only available through the Java client and has no corresponding operation in C++ client.
Input and output types
The UDF can accept both primitive types (e.g., int
, double
, String
, or boolean
) as well as complex types expressed as POJOs.
A complex type expressed as a POJO may only fields which are themselves primitive types, i.e., it is not possible to nest complex types, or use arrays or collections.
Input and output schema
The input schema of the Java UDF has to match the output schema of the previous operator.
If necessary, an explicit project
operator can be used to remove other attributes.
For complex input/output types, the names of the input/output stream match the names of the attributes of the POJO. For primitive input/output types, the name of the attribute in the input stream does not matter. The name of the attribute in the output stream must be specified when invoking the map operator.
The Java UDF map operator replaces the stream with the output of the UDF for each input record. 💡 This is different from the native map operator, which replaces existing fields stream or appends new fields to the stream.
Example: Map UDF with primitive input and output type
Project a stream to a single attribute (which is an INT32
) and return a stream with BOOLEAN
attribute is_even
, indicating if the input is even.
query
.project(attribute("int_attribute")) # Project stream to single attribute
.map("is_even", new MapFunction<Integer, Boolean>() { # Create output stream with single attribute
public Boolean map(final Integer value) {
return value % 2 == 0;
}});
💡 The UDF must be specified as an anonymous inner function and not as a Java lambda.
Example: Map UDF with complex input and output type
The following code defines the complex types CartesianCoordinate
and PolarCoordinate
, and a UDF CartesianToPolar
that converts between both types.
static class CartesianCoordinate {
double x;
double y;
}
static class PolarCoordinate {
double angle;
double radius;
}
static class CartesianToPolar implements MapFunction<CartesianCoordinate, PolarCoordinate> {
public PolarCoordinate map(final CartesianCoordinate value) {
PolarCoordinate output = new PolarCoordinate();
output.radius = Math.sqrt(value.x * value.x + value.y * value.y);
output.angle = Math.atan2(value.x, value.y);
return output;
}
}
This UDF can then be used without specifying the names of the attribute in the output stream.
query
.project(attribute("x"), attribute("y")) # Project stream to required attributes
.map(new CartesianToPolar()); # Call UDF to transform stream
Window Operator
The window operator is started by a window
method, which specifies the type and size of window, followed by an optional keyBy
method, which specifies one or more attributes on which the window is grouped, followed by an apply
method, which specifies one or more aggregation functions.
NebulaStream supports both tumbling and sliding windows.
The size of windows can be specified in milliseconds, seconds, minutes, or hours.
The time is always an explicit event time which is encoded as an attribute in the stream.
The supported aggregation functions are min
, max
, count
, sum
, average
, and median
.
The aggregated attribute can be renamed.
Examples
Create a tumbling window of 10 seconds, using the stream attribute timestamp
, and compute the sum of the attribute value
for each window.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10))
.apply(Aggregation.sum("value"));
Create a tumbling window of 1 minute, using a slide of 30 seconds, group the window by the attributes key1
and key2
, and compute the min and max for each window.
query.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.minutes(1),
TimeMeasure.seconds(30))
.keyBy("key1", "key2")
.apply(Aggregation.min("value"), Aggregation.max("value"));
💡 Multiple aggregation functions require the experimental THREAD_LOCAL
window aggregation strategy.
Rename the result of an average aggregation.
query.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10))
.apply(Aggregation.average("value").as("mean_value"));
Join Operator
The join operator joins tuples coming from two event streams. To define a join operator, we need two queries, a field name from each stream to join for predicate evaluation, and a window definition.
Example
Join q1 and q2 on a condition where the station_id
in the bus
stream is the same as the station_id
in the metro
stream and both fall within the same 10 seconds tumbling window.
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");
q1.joinWith(q2)
.where("station_id")
.equalsTo("station_id")
.window(TumblingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.seconds(10)));
Union Operator
The union operator merges two streams into a single stream. In this case, both streams need to have the same schema. Similar to join, the union operator in the Java client needs a second query as a parameter.
Example
NebulaStreamRuntime runtime = NebulaStreamRuntime.getInstance();
Query q1 = runtime.readFromSource("bus");
Query q2 = runtime.readFromSource("metro");
q1.unionWith(q2);
TensorFlow inference
The inferModel operator executes a TensorFlow Lite model for each tuple in the input stream and appends the result to the tuple in the output stream.
💡 At the moment, the data type of the TensorFlow Lite model output has to be explicitly specified and it has to be FLOAT32
.
💡 At the moment, the name of the TensorFlow lite model has to be a valid path on the coordinator and the worker.
Example
The following example loads a TensorFlow Lite model from a classpath resource and applies it to the fields f1
, f2
, f3
, and f4
of the input stream.
The output of the model is stored in the fields iris0
, iris1
, iris2
, which are appended to the tuples in the output stream.
Query q = nebulaStreamRuntime.readFromSource("iris")
.inferModel(
// TensorFlow model is read from a ResourceStream
Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")),
// Name must be a valid file name on the coordinator and worker
"/tmp/iris_95acc.tflite")
// Input attributes
.on(
attribute("f1"),
attribute("f2"),
attribute("f3"),
attribute("f4"))
// Output attributes with explicit type information
.as(
attribute("iris0", BasicType.FLOAT32),
attribute("iris1", BasicType.FLOAT32),
attribute("iris2", BasicType.FLOAT32))
Sink Operator
The Java Client allows queries to use the following sink oparators:
.sink(new PrintSink())
- write the resulting stream to Coordinator’s console.sink(new ZMQSink("localhost", 1234));
- forward the resulting stream to a ZMQ instance.sink(new FileSink("/tmp/out.csv", "CSV_FORMAT", false));
- write the resulting stream to a CSV file in the Coordinator.
Query Management
The query management API is a collection of methods from the Java client library to monitor and manage query execution in a NebulaStream instance.
Submitting a Query
Once we have the connection, we can submit a query to the Coordinator. Following is an example of submitting a simple query that will append the result stream to a CSV file.
String logicalSource = "default_logical";
Query query = nebulaStreamRuntime.readFromSource(logicalSource)
.sink(new FileSink("out.csv", "CSV_FORMAT", true));
int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");
The query submission interface requires two parameters: the query to be submitted and the operator placement strategy. The currently available operator placement strategies are: “BottomUp” and “TopDown”. The query is submitted to the coordinator and executed lazily on the NebulaStream cluster.
Query Catalog
The java client represent a query as a QueryCatalogEntry object. A QueryCatalogentry object stores the query id, query status, and query string. To get the whole Query catalog as an ArrayList
ArrayList<QueryCatalogEntry> catalog =
nebulaStreamRuntime.getAllRegisteredQueries();
We can also obtain queries with a specific status. The available statuses are Registered, Scheduling, Running, and Failed. For example:
ArrayList<QueryCatalogEntry> catalogRegistered =
nebulaStreamRuntime.getRegisteredQueryWithStatus("Registered");
Getting the Query Plan
The Java client stores a query plan as a graph of consisting LogicalQuery objects as its nodes. A LogicalQuery object stores the operator id, name, and the nodeType. To get the query plan of a specific query as a Graph<LogicalQuery, DefaultEdge>, we can use this method.
Graph<LogicalQuery, DefaultEdge> queryPlan =
nebulaStreamRuntime.getQueryPlan(<query id>);
Getting the Execution Plan
To get the execution plan of a specific query as a Graph<ExecutionNode, ExecutionLink>, we can use the following method:
Graph<ExecutionNode, ExecutionLink> executionPlanGraph =
nebulaStreamRuntime.getExecutionPlan(<query id>);
Deleting a Query
To stop a query, we can use the stopQuery()
method. The method returns true if the query was successfully marked for stop.
boolean markedForStopped = nebulaStreamRuntime.stopQuery(queryId);
⚠️ This method only schedules the request to stop the query.
Source Management
The source management API contains a collection of methods in the Java client to retrieve, add, update and remove logical sources in a instance.
Retrieving Logical Source Catalog
To get the logical source catalog as a List<String>
, we can use the following method.
List<String> sourceCatalog =
nebulaStreamRuntime.getLogicalSources();
Adding a Logical Source
To add a logical source, we can use the addLogicalSource()
method. The method expects a source name and the schema (in C++) as parameters and returns true if the new source was successfully added.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"test\",INT32);";
boolean added = nebulaStreamRuntime.addLogicalSource(sourceName, schema);
Updating a Logical Source
To update a logical source, we can use the updateLogicalSource()
method. We need to pass a source name to be updated and the new schema (in C++) as parameters. The method returns true if the source was successfully updated.
String sourceName = "TestLogicalSource";
String schema = "Schema::create()->addField(\"anothertest\",INT32);";
boolean updated = nebulaStreamRuntime.updateLogicalSource(sourceName, schema);
Deleting a Logical Source
We can remove a logical source from a Coordinator using the deleteLogicalSource
method. The method accepts a source name to be deleted as a parameter. Following is an example of using the deleteLogicalStream()
method:
String sourceName = "TestLogicalSource";
boolean deleted = nebulaStreamRuntime.deleteLogicalSource(sourceName);
Obtaining Physical Source Catalog
To get all physical sources of a logical source as an ArrayListgetPhysicalSources()
method. The method expects a logical source name as a parameter.
ArrayList<String> physicalStreamOfDefaultLogical =
nebulaStreamRuntime.getPhysicalSources("default_logical");