Stream Processing Operators

Operators in NebulaStream consume and transform records from a stream.

Filter

The filter operator evaluates an expression for each record and retains those for which the expression returns true. The output schema stays the same.

stream.filter(Attribute("f1") > 10);
stream.filter(Attribute("f1") > 10 && Attribute("f2") < 10));
stream.filter(attributee("f1").greaterThan(10));
stream.filter(attribute("f1").greaterThan(10).and(attribute("f2").lessThan(10)));

Projection

Selects a set of attributes from each record. The output schema only contains the selected attributes.

stream.project(Attribute("f1"), Attribute("f2"));
stream.project(attribute("f1"), attribute("f2"));

Furthermore, it is possible to rename the projected attribute.

stream.project(Attribute("f1").as("f1_new"));
stream.project(attribute("f1").as("f1_new"))

Tranformations/Map

Executes an expression on each input record and assigns the result to a new or an already existing attribute.

stream.map(Attribute("f1") = Attribute("f1") + 1);
stream.map(Attribute("f1")++);
stream.map(Attribute("new_field") = Attribute("f1") * Attribute("f2"));
stream.map(Attribute("new_field") = POWER(Attribute("f1"), Attribute("f2"))); // raises f1 to the power of f2
stream.map(Attribute("new_field") = Attribute("f1") % Attribute("f2")); // modulo operator
stream.map("f1", attribute("f1").add(1));
// The ++ operator is not supported in Java, however, the code is equivalent to the line above.
stream.map("new_field", attribute("f1").multiply(attribute("f2")));
stream.map("new_field", Expressipons.power(attribute("f2"), attribute("f2")); // raises f1 to the power of f2
stream.map("new_field", attribute("f1").remainder(attribute("f2"))); // modulo operator

Stream Rename

Renames an incoming stream to a different name. This prevents from attribute name collision while performing a self-join.

stream.as("new_stream")

Window Aggregations

Windows are a fundamental concept of NebulaStream to handle infinite data streams. Windows split the stream into “buckets” of finite size, over which we can apply computations, e.g. aggregations or joins. In general, windows are defined by a window type, a window measure, and a window function. Furthermore, a window aggregation can be keyed and non-keyed, which behaves similarly to a grouped aggregation in SQL.

The result schema of a window aggregation always consists of the window start, the window end, the window function result, and the key for keyed windows.

erDiagram WindowResult { UINT64 start UINT64 end AggregateType aggregate } KeyedWindowResult { UINT64 start UINT64 end KeyType key AggregateType aggregate }

Window Types

The first thing to specify is the window type, which determines how the records of a stream are assigned to windows. (leave out one or more). NebulaStream currently supports tumbling and sliding window types on keyed and non-keyed streams with a time-based window measure as well as the content-based threshold windows with a user-defined predicate on the tuple attributes on non-keyed streams.

Tumbling Windows

A tumbling window assigns each record to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, for a tumbling window of 10s, the window collects all records within the time interval of 10s. Then, after this interval is over it computes the aggregation and outputs the result to the next operator. Finally, it discards all records inside the window and starts over. It collects the records for the next 10 seconds in a new window and computes the aggregation function on those records.

The following code snippet shows an event time tumbling window with the size of 10 seconds. NebulaStream also supports milliseconds and minutes as window sizes.

// Tumbling window over 10 seconds.
stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
      [.byKey(KeyedAttribute)]
      .apply(AggregationFunction)
// Tumbling window over 10 seconds.
stream.window(TumblingWindow.of(EventTime.eventTime("timestamp"), TimeMeasure.seconds(10)))
      [.byKey(KeyedAttribute)]
      .apply(AggregationFunction);

Sliding Windows

Like a tumbling window, the sliding window assigns records to windows of fixed length. The size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Thus, sliding windows can be overlapping if the slide is smaller than the window size. In this case, records are assigned to multiple windows.

For example, we could have a sliding window of size 10 minutes that slides by 5 minutes. This means that every 5 minutes, the aggregation function is triggered to compute the aggregation over the last 10 minutes. As a result, this window type overlaps.

// Sliding window with size of 10 minutes and slide of 5 minutes.
stream.window(SlidingWindow::of
              (EventTime(Attribute("timestamp")), Minutes(10), Minutes(5)))
      [.byKey(KeyedAttribute)]
      .apply(AggregationFunction)
// Sliding window with size of 10 minutes and slide of 5 minutes.
stream.window(SlidingWindow.of(EventTime.eventTime("timestamp"),
      TimeMeasure.minutes(10), TimeMeasure.minutes(5)))
      [.byKey(KeyedAttribute)]
      .apply(AggregationFunction);

Threshold Windows

A threshold window assigns a sequence of records to a window where the value of a user-defined attribute satisfies a given predicate. Threshold windows have a variable size and a gap of at least one tuple between each consecutive window.

For example, a threshold window with the predicate Attribute("f1") < 45 starts with the first record having a f1 value smaller than 45 and triggers after receiving a record with a f1 value greater or equal to 45. Then, it computes the aggregation and outputs the result to the next operator. Finally, it discards all records inside the window and starts over.

Additionally, a threshold window can have an optional minimum window size, i.e., a window only triggers if the sequence of events is greater or equals the user-defined minimum size. The default minimum count is 0.

💡 In NebulaStream, the threshold window takes an arbitrary expression as input to select the records for a window. Therefore, besides threshold filters it also allows for binary expression over two attribute values.

💡 Threshold windows only support event time.

The following example defines a threshold window with the predicate Attribute("f1") < 45 and a minimum size of 5.

// Threshold window with the predicate `Attribute("f1") < 45` and a minimum count of `5`
stream.window(ThresholdWindow::of(Attribute("f1") < 45 , 5))
.apply(AggregationFunction)
// Threshold window with the predicate `Attribute("f1") < 45` and a minimum count of `5`
stream.window(ThresholdWindow.of(Attribute("f1").lessThan(45), 5))
.apply(AggregationFunction);

Keyed vs Non-Keyed Windows

After the window type, we have to specify if the stream should be keyed or not. The keyBy(...) method assigns a logical key to each record, and all operations on the window content will be scoped to this key. If keyBy(...) is not called, the stream is not keyed. In the case of keyed streams, any attribute of the incoming records can be used as a key. For example, let’s say, we have a data stream called vehicle, which collects data from vehicles and has the following attributes: id, vehicle_type, speed. In vehicle_type we have cars, bus, and bikes. A keyed window on the attribute vehicle_type would create three logical streams: one where vehicle_type is car, another one where vehicle_type is bus and the third one with only bikes. These three data streams would result in three distinct window aggregates, one per vehicle_type.

💡 In contrast to other popular stream processing systems, the number of keys/partitions does not impact the level of parallelization in NebulaStream.

// Creates a non-keyed window
stream.window(WindowType)
  .apply(AggregationFunction)

// Creates a keyed window on the attributes "key1" and "key2"
stream.window(WindowType)
  .byKey(Attribute("key1"), Attribute("keys"))
  .apply(AggregationFunction)
// Creates a non-keyed window
stream.window(WindowType)
  .apply(AggregationFunction)

// Creates a keyed window on the attributes "key1" and "key2"
stream.window(WindowType)
  .byKey("key1", "key2")
  .apply(AggregationFunction)

Window Functions

After defining the window type, we need to specify the computation we want to perform on each window. This is the responsibility of the window function, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing. If the window is keyed, the window function is executed on each key individually. Currently, NebulaStream supports several common aggregation functions, i.e., Count, Sum, Min, Max, Avg, and Median. All aggregation functions, except Count are computed over a specific record attribute. Furthermore, the user can specify a result attribute name, using the method as() on the aggregation function.

// Computes the sum over attribute x1 and
// stores the result in the attribute sum.
stream.window(WindowType)
      [.byKey(KeyedAttribute)]
      .apply(Sum(Attribute("x1"))->as(Attribute("sum"))

// Counts all records in the window  and
// stores the result in the attribute count.
stream.window(WindowType)
      [.byKey(KeyedAttribute)]
      .apply(Count()->as(Attribute("count"))
// Computes the sum over attribute x1 and
// stores the result in the attribute sum.
stream.window(WindowType)
      [.byKey(KeyedAttribute)]
      .apply(Aggregation.sum("x1").as("sum"));

stream.window(WindowType)
      [.byKey(KeyedAttribute)]
      .apply(Aggregation.count().as("count"));

Window Joins

A window join occurs the records of two streams that share a common key and lie in the same window. These windows are defined by a window type and are evaluated on records from both of the streams. If the join predicate is valid, the records from both sides of the join are combined into a single record containing all attributes from both join sides. For example, let’s say we have an Orders and a Products stream. The result of a windowed join of Orders and Products contains the attributes of both schemas with a prefix that identifies the origin stream.

erDiagram Orders { UINT64 id UINT64 product_id UINT64 items UINT64 ts } Products { UINT64 id DOUBLE value UINT64 ts } OrdersXProducts { UINT64 orders_id UINT64 orders_product_id UINT64 orders_items UINT64 products_id UINT64 products_value }

To join two streams, NebulaStream offers the streamA.joinWith(streamB) method. Following this call we have to specify the join predicate and the window type. The following code shows how we can formulate a stream join between the Productsand the Orders streams.

auto orders = Query::from("orders");
auto products = Query::from("products");

auto result = orders
      .joinWith(products)
      .where(Attribute("product_id")).equalsTo(Attribute("id"))
      .window(TumblingWindow::of(EventTime(Attribute("ts")), Seconds(10)))
Query orders = nebulaStreamRuntime.readFromSource("orders");
Query products = nebulaStreamRuntime.readFromSource("products");

orders.joinWith(products)
      .where("product_id")
      .equalsTo("id")
      .window(TumblingWindow.of(EventTIme.eventTime("ts"), TimeMeasure.seconds(10)));

Union

Union combines two data streams into a common result stream and interleaves the individual records. Thus, the schema of both data streams has to be union compatible, i.e., both streams have the same number of attributes and corresponding attributes have the identical data types and name. To unify the schemas, we can first perform a projection. In the following, we first apply a projection on the cars and bikes stream before using unionWith.

auto cars = Query::from("cars").project(Attribute("f1"));
auto bikes = Query::from("bikes").project(Attribute("f1"));
auto vehicles = cars.unionWith(bikes);
Query cars = nebulaStreamRuntime.readFromSource("cars").project(attribute("f1"));
Query bikes = nebulaStreamRuntime.readFromSource("bikes").project(attribute("f1"));
Query vehicles = cars.unionWith(bikes);

Sinks

NebulaStream offers different sinks, which transfer records of the result stream to external systems.

💡 add links to webpages of protocols

File Sink

The file sink materializes a result stream to a file in different file formats. Parameters:

  • Path: The file path, which has to be a valid path at the coordinator.
  • Format:
    • CSV_FORMAT: Stores the stream in a comma-seperated format.
    • NES_FORMAT: Stores the stream in NebulaStream native binary format.
    • TEXT_FORMAT: Stores the stream in a human-readable format.
  • Append: Flag to indicate if the file should appended or overwritten.
      // Materializes  the result stream as a file
      stream.sink(FileSinkDescriptor::create(
      "/path/to/file",
      "CSV_FORMAT|NES_FORMAT|TEXT_FORMAT",
      "String" // *"true"* for append, *"false"* for overwrite
      ));
      
    
      // Materializes  the result stream as a file
      stream.sink(new FileSink(
      "/path/to/file",
      "CSV_FORMAT|NES_FORMAT|TEXT_FORMAT",
      "APPEND|OVERWRITE"
      ));
      

ZMQ Sink

The ZMQ sink sends all records of the result stream to a ZMQ server. 💡 All data is sent in NebulaStream’s native binary format.

Parameters:

  • Host: The address of the ZMQ server.
  • Port: The port of the ZMQ server.
      // Configures the connection to the ZMQ server.
      stream.sink(ZmqSinkDescriptor::create("localhost", 8080));
      
    
      // Configures the connection to the ZMQ server.
      stream.sink(new ZMQSink("localhost", 8080));
      

MQTT Sink

The MQTT sink sends all records of the result stream to an MQTT broker.

Parameters:

  • Host: The host of the MQTT broker.
  • Topic: MQTT topic chosen to publish client data to.
  • User: User identification for client.
  • MaxBufferedMSGs: Maximal number of messages that can be buffered by the client before disconnecting.
  • TimeUnit: Time unit was chosen by client user for message delay.
  • MessageDelay: Time before next message is sent by a client to the broker.
  • qualityOfService: Either ‘at most once’ or ‘at least once’. QOS > 0 required for a non-clean (persistent) session.
  • asynchronousClient: Determine whether client is asynchronous or synchronous.
// Configures the connection to the MQTT server.
stream.sink(MQTTSinkDescriptor::create(
  "ws://127.0.0.1:9001",
  "/nesui",
  "rfRqLGZRChg8eS30PEeR",
  5,
  MQTTSinkDescriptor::milliseconds,
  500,
  MQTTSinkDescriptor::atLeastOnce,
  false));
stream.sink(new MQTTSink(
  "ws://127.0.0.1:9001",
  "/nesui",
  "rfRqLGZRChg8eS30PEeR",
  5,
  MQTTSink.TimeUnits.milliseconds,
  500,
  MQTTSink.ServiceQualities.atLeastOnce,
  false));

OPC Sink

The OPC sink sends all records of the result stream to an the OPC server.

Parameters:

  • URL: The server url used to connect to the OPC server.
  • NodeId: Id of node to write data to.
  • User: User name for server.
  • Password: Password for server.
// Configures the connection to the OPC server.
stream.sink(OPCSinkDescriptor::create(url, nodeid, user, password));
currently not supported

The print sink outputs data from the result stream to the console at the coordinator node.

// Configures a print sink.
stream.sink(PrintSinkDescriptor::create());
// Configures a print sink.
stream.sink(new PrintSink())