Complex Event Processing

NebulaStreamCEP presents the experimental Complex Event Processing (CEP) feature of NebulaStream and enables users to detect specified patterns in event streams. It extends NebulaStream’s operator set with CEP-specific operators, i.e., conjunction, sequence, disjunction, and times. In the remainder, Figure 1 is used to support the explanation of these operators. Furthermore, the stream processing operators and especially the mandatory window operator can be used to specify patterns.

💡 For accurate pattern detection, the sliding window operator with a slide size equivalent to the ingestion rate of the most frequent stream as well as event time needs to be applied to the pattern. Other window types and slide sizes can be applied but may lead to the loss of matches.

Figure 1: CEP Example with Window
Figure 1: CEP Example with Window

Conjunction

💡 Currently, it is not possible to create multi-conjunction or multi-sequence patterns.

The conjunction operator is a binary operator that expects a record (event) from stream1 and a record from stream2 to occur within a window, regardless of their temporal order. In the following, we apply the conjunction operator andWith to two logical streams. The output schema contains the attributes of both schemas with a prefix that identifies the origin stream.

Query::from("stream1")
.andWith(Query::from("stream2"))
.window(SlidingWindow::of(EventTime(Attribute("timestamp")),Minutes(5),Minutes(1)));
currently not supported

For the conjunction pattern above, the highlighted window of Figure 1 returns three matches, i.e., each record of stream2 combined with the record of stream1.

Sequence

The sequence operator is a binary operator that expects a record from stream1 and a record from stream2 to occur within a window. In contrast to the conjunction operator, the sequence operator is a temporal operator that requires the records to occur in the specified temporal order, i.e., the timestamp of record from stream1 is smaller than the timestamp of the record from stream2. In the following, we apply the sequence operator seqWith to two logical streams. The output schema contains the attributes of both schemas with a prefix that identifies the origin stream.

Query::from("stream1")
.seqWith(Query::from("stream2"))
.window(SlidingWindow::of(EventTime(Attribute("timestamp")),Minutes(5),Minutes(1)));
currently not supported

For the sequence pattern above, the highlighted window of Figure 1 returns two matches, i.e., the record of stream1 composed with each of the two subsequent records of stream2.

Disjunction

💡 Currently, it is not possible to apply a window operator to the disjunction pattern.

The disjunction operator is a binary operator that expects either a record from stream1 or a record from stream2 to occur within a window, regardless of any temporal order. Furthermore, the schema of both data streams has to be union compatible. If necessary, we can use projections to make the streams union compatible. In the following, we apply the disjunction operator orWith to two logical streams.

Query::from("stream1")
.orWith(Query::from("stream3"))
.window(SlidingWindow::of(EventTime(Attribute("timestamp")),Minutes(5),Minutes(1)));
currently not supported

For the disjunction pattern above, the highlighted window of Figure 1 returns the record of stream1 as a match.

Times

The times operator, also called iteration operator in CEP engines, is a unary operator that allows for the occurrence of multiple records of a stream in a sequence. In particular, NebulaStreamCEP allows for the following variants:

  • .times() The variant handles an unlimited amount of event occurrences but at least one record per window.

  • .times(n) This variant enables the specification of patterns that detect event sequences of n record in a window, where n > 0.

  • .times(n,m) This variant enables the specification of patterns that detect event sequences of n to m records in a window. (n,m > 0)

    • 💡 Note:
      • if n = 0, the result is equivalent to .times(n)
      • if m = 0, the result is an unbounded variant of the times operator that expects at least n occurrences of the specified event.

In the following, we apply variant .times(n,m) to a logical stream.

Query::from("stream2")
.times(3,10)
.window(SlidingWindow::of(EventTime(Attribute("timestamp")),Minutes(5),Minutes(1)));
currently not supported

For the iteration pattern above, the highlighted window of Figure 1 returns one matches, i.e., a record of count 3.

Not

💡 Not yet supported

Selection and Consumption Policies

NebulaStreamCEP supports the selection policy skip-till-any-match and the consumption policy non-consuming. The support of more restrictive policies is planned.