Stream Processing Operators
Operators in NebulaStream consume and transform records from a stream.
Filter
The filter operator evaluates an expression for each record and retains those for which the expression returns true. The output schema stays the same.
stream.filter(Attribute("f1") > 10);
stream.filter(Attribute("f1") > 10 && Attribute("f2") < 10));
stream.filter(attributee("f1").greaterThan(10));
stream.filter(attribute("f1").greaterThan(10).and(attribute("f2").lessThan(10)));
Projection
Selects a set of attributes from each record. The output schema only contains the selected attributes.
stream.project(Attribute("f1"), Attribute("f2"));
stream.project(attribute("f1"), attribute("f2"));
Furthermore, it is possible to rename the projected attribute.
stream.project(Attribute("f1").as("f1_new"));
stream.project(attribute("f1").as("f1_new"))
Tranformations/Map
Executes an expression on each input record and assigns the result to a new or an already existing attribute.
stream.map(Attribute("f1") = Attribute("f1") + 1);
stream.map(Attribute("f1")++);
stream.map(Attribute("new_field") = Attribute("f1") * Attribute("f2"));
stream.map(Attribute("new_field") = POWER(Attribute("f1"), Attribute("f2"))); // raises f1 to the power of f2
stream.map(Attribute("new_field") = Attribute("f1") % Attribute("f2")); // modulo operator
stream.map("f1", attribute("f1").add(1));
// The ++ operator is not supported in Java, however, the code is equivalent to the line above.
stream.map("new_field", attribute("f1").multiply(attribute("f2")));
stream.map("new_field", Expressipons.power(attribute("f2"), attribute("f2")); // raises f1 to the power of f2
stream.map("new_field", attribute("f1").remainder(attribute("f2"))); // modulo operator
Stream Rename
Renames an incoming stream to a different name. This prevents from attribute name collision while performing a self-join.
stream.as("new_stream")
Window Aggregations
Windows are a fundamental concept of NebulaStream to handle infinite data streams.
Windows split the stream into “buckets” of finite size, over which we can apply computations, e.g. aggregations or joins.
In general, windows are defined by a window type
, a window measure
, and a window function
.
Furthermore, a window aggregation can be keyed and non-keyed, which behaves similarly to a grouped aggregation in SQL.
The result schema of a window aggregation always consists of the window start, the window end, the window function result, and the key for keyed windows.
Window Types
The first thing to specify is the window type, which determines how the records of a stream are assigned to windows. (leave out one or more).
NebulaStream currently supports tumbling
and sliding
window types on keyed
and non-keyed
streams with a time-based window measure as well as the content-based threshold
windows with a user-defined predicate on the tuple attributes on non-keyed
streams.
Tumbling Windows
A tumbling window assigns each record to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, for a tumbling window of 10s, the window collects all records within the time interval of 10s. Then, after this interval is over it computes the aggregation and outputs the result to the next operator. Finally, it discards all records inside the window and starts over. It collects the records for the next 10 seconds in a new window and computes the aggregation function on those records.
The following code snippet shows an event time tumbling window with the size of 10 seconds. NebulaStream also supports milliseconds and minutes as window sizes.
// Tumbling window over 10 seconds.
stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
[.byKey(KeyedAttribute)]
.apply(AggregationFunction)
// Tumbling window over 10 seconds.
stream.window(TumblingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.seconds(10)))
[.byKey(KeyedAttribute)]
.apply(AggregationFunction);
Sliding Windows
Like a tumbling window, the sliding window assigns records to windows of fixed length. The size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Thus, sliding windows can be overlapping if the slide is smaller than the window size. In this case, records are assigned to multiple windows.
For example, we could have a sliding window of size 10 minutes that slides by 5 minutes. This means that every 5 minutes, the aggregation function is triggered to compute the aggregation over the last 10 minutes. As a result, this window type overlaps.
// Sliding window with size of 10 minutes and slide of 5 minutes.
stream.window(SlidingWindow::of
(EventTime(Attribute("timestamp")), Minutes(10), Minutes(5)))
[.byKey(KeyedAttribute)]
.apply(AggregationFunction)
// Sliding window with size of 10 minutes and slide of 5 minutes.
stream.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
TimeMeasure.minutes(10), TimeMeasure.minutes(5)))
[.byKey(KeyedAttribute)]
.apply(AggregationFunction);
Threshold Windows
A threshold window assigns a sequence of records to a window where the value of a user-defined attribute satisfies a given predicate. Threshold windows have a variable size and a gap of at least one tuple between each consecutive window.
For example, a threshold window with the predicate Attribute("f1") < 45
starts with the first record having a f1
value smaller than 45 and triggers after receiving a record with a f1
value greater or equal to 45.
Then, it computes the aggregation and outputs the result to the next operator.
Finally, it discards all records inside the window and starts over.
Additionally, a threshold window can have an optional minimum window size, i.e., a window only triggers if the sequence of events is greater or equals the user-defined minimum size. The default minimum count is 0.
💡 In NebulaStream, the threshold window takes an arbitrary expression as input to select the records for a window. Therefore, besides threshold filters it also allows for binary expression over two attribute values.
💡 Threshold windows only support event time.
The following example defines a threshold window with the predicate Attribute("f1") < 45
and a minimum size of 5
.
// Threshold window with the predicate `Attribute("f1") < 45` and a minimum count of `5`
stream.window(ThresholdWindow::of(Attribute("f1") < 45 , 5))
.apply(AggregationFunction)
// Threshold window with the predicate `Attribute("f1") < 45` and a minimum count of `5`
stream.window(ThresholdWindow.of(Attribute("f1").lessThan(45), 5))
.apply(AggregationFunction);
Keyed vs Non-Keyed Windows
After the window type, we have to specify if the stream should be keyed or not.
The keyBy(...)
method assigns a logical key to each record, and all operations on the window content will be scoped to this key.
If keyBy(...)
is not called, the stream is not keyed.
In the case of keyed streams, any attribute of the incoming records can be used as a key.
For example, let’s say, we have a data stream called vehicle
, which collects data from vehicles and has the following attributes: id
, vehicle_type
, speed
.
In vehicle_type we have cars
, bus
, and bikes
. A keyed window on the attribute vehicle_type would create three logical streams: one where vehicle_type is car
, another one where vehicle_type is bus
and the third one with only bikes. These three data streams would result in three distinct window aggregates, one per vehicle_type.
💡 In contrast to other popular stream processing systems, the number of keys/partitions does not impact the level of parallelization in NebulaStream.
// Creates a non-keyed window
stream.window(WindowType)
.apply(AggregationFunction)
// Creates a keyed window on the attributes "key1" and "key2"
stream.window(WindowType)
.byKey(Attribute("key1"), Attribute("keys"))
.apply(AggregationFunction)
// Creates a non-keyed window
stream.window(WindowType)
.apply(AggregationFunction)
// Creates a keyed window on the attributes "key1" and "key2"
stream.window(WindowType)
.byKey("key1", "key2")
.apply(AggregationFunction)
Window Functions
After defining the window type, we need to specify the computation we want to perform on each window.
This is the responsibility of the window function, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing.
If the window is keyed, the window function is executed on each key individually.
Currently, NebulaStream supports several common aggregation functions, i.e., Count
, Sum
, Min
, Max
, Avg
, and Median
.
All aggregation functions, except Count
are computed over a specific record attribute.
Furthermore, the user can specify a result attribute name, using the method as()
on the aggregation function.
// Computes the sum over attribute x1 and
// stores the result in the attribute sum.
stream.window(WindowType)
[.byKey(KeyedAttribute)]
.apply(Sum(Attribute("x1"))->as(Attribute("sum"))
// Counts all records in the window and
// stores the result in the attribute count.
stream.window(WindowType)
[.byKey(KeyedAttribute)]
.apply(Count()->as(Attribute("count"))
// Computes the sum over attribute x1 and
// stores the result in the attribute sum.
stream.window(WindowType)
[.byKey(KeyedAttribute)]
.apply(Aggregation.sum("x1").as("sum"));
stream.window(WindowType)
[.byKey(KeyedAttribute)]
.apply(Aggregation.count().as("count"));
Window Joins
A window join occurs the records of two streams that share a common key and lie in the same window.
These windows are defined by a window type and are evaluated on records from both of the streams.
If the join predicate is valid, the records from both sides of the join are combined into a single record containing all attributes from both join sides.
For example, let’s say we have an Orders
and a Products
stream.
The result of a windowed join of Orders
and Products
contains the attributes of both schemas with a prefix that identifies the origin stream.
To join two streams, NebulaStream offers the streamA.joinWith(streamB)
method.
Following this call we have to specify the join predicate and the window type.
The following code shows how we can formulate a stream join between the Products
and the Orders
streams.
auto orders = Query::from("orders");
auto products = Query::from("products");
auto result = orders
.joinWith(products)
.where(Attribute("product_id")).equalsTo(Attribute("id"))
.window(TumblingWindow::of(EventTime(Attribute("ts")), Seconds(10)))
Query orders = nebulaStreamRuntime.readFromSource("orders");
Query products = nebulaStreamRuntime.readFromSource("products");
orders.joinWith(products)
.where("product_id")
.equalsTo("id")
.window(TumblingWindow.of(EventTIme.eventTime("ts"), TimeMeasure.seconds(10)));
Union
Union combines two data streams into a common result stream and interleaves the individual records. Thus, the schema of both data streams has to be union compatible, i.e., both streams have the same number of attributes and corresponding attributes have the identical data types and name. To unify the schemas, we can first perform a projection. In the following, we first apply a projection on the cars and bikes stream before using unionWith.
auto cars = Query::from("cars").project(Attribute("f1"));
auto bikes = Query::from("bikes").project(Attribute("f1"));
auto vehicles = cars.unionWith(bikes);
Query cars = nebulaStreamRuntime.readFromSource("cars").project(attribute("f1"));
Query bikes = nebulaStreamRuntime.readFromSource("bikes").project(attribute("f1"));
Query vehicles = cars.unionWith(bikes);
Sinks
NebulaStream offers different sinks, which transfer records of the result stream to external systems.
💡 add links to webpages of protocols
File Sink
The file sink materializes a result stream
to a file in different file formats.
Parameters:
- Path: The file path, which has to be a valid path at the coordinator.
- Format:
- CSV_FORMAT: Stores the stream in a comma-seperated format.
- NES_FORMAT: Stores the stream in NebulaStream native binary format.
- TEXT_FORMAT: Stores the stream in a human-readable format.
- Append: Flag to indicate if the file should appended or overwritten.
// Materializes the result stream as a file stream.sink(FileSinkDescriptor::create( "/path/to/file", "CSV_FORMAT|NES_FORMAT|TEXT_FORMAT", "String" // *"true"* for append, *"false"* for overwrite ));
// Materializes the result stream as a file stream.sink(new FileSink( "/path/to/file", "CSV_FORMAT|NES_FORMAT|TEXT_FORMAT", "APPEND|OVERWRITE" ));
ZMQ Sink
The ZMQ sink sends all records of the result stream to a ZMQ server. 💡 All data is sent in NebulaStream’s native binary format.
Parameters:
- Host: The address of the ZMQ server.
- Port: The port of the ZMQ server.
// Configures the connection to the ZMQ server. stream.sink(ZmqSinkDescriptor::create("localhost", 8080));
// Configures the connection to the ZMQ server. stream.sink(new ZMQSink("localhost", 8080));
MQTT Sink
The MQTT sink sends all records of the result stream to an MQTT broker.
Parameters:
- Host: The host of the MQTT broker.
- Topic: MQTT topic chosen to publish client data to.
- User: User identification for client.
- MaxBufferedMSGs: Maximal number of messages that can be buffered by the client before disconnecting.
- TimeUnit: Time unit was chosen by client user for message delay.
- MessageDelay: Time before next message is sent by a client to the broker.
- qualityOfService: Either ‘at most once’ or ‘at least once’. QOS > 0 required for a non-clean (persistent) session.
- asynchronousClient: Determine whether client is asynchronous or synchronous.
// Configures the connection to the MQTT server.
stream.sink(MQTTSinkDescriptor::create(
"ws://127.0.0.1:9001",
"/nesui",
"rfRqLGZRChg8eS30PEeR",
5,
MQTTSinkDescriptor::milliseconds,
500,
MQTTSinkDescriptor::atLeastOnce,
false));
stream.sink(new MQTTSink(
"ws://127.0.0.1:9001",
"/nesui",
"rfRqLGZRChg8eS30PEeR",
5,
MQTTSink.TimeUnits.milliseconds,
500,
MQTTSink.ServiceQualities.atLeastOnce,
false));
OPC Sink
The OPC sink sends all records of the result stream to an the OPC server.
Parameters:
- URL: The server url used to connect to the OPC server.
- NodeId: Id of node to write data to.
- User: User name for server.
- Password: Password for server.
// Configures the connection to the OPC server.
stream.sink(OPCSinkDescriptor::create(url, nodeid, user, password));
currently not supported
Print Sink
The print sink outputs data from the result stream to the console at the coordinator node.
// Configures a print sink.
stream.sink(PrintSinkDescriptor::create());
// Configures a print sink.
stream.sink(new PrintSink())