Tutorial

In this tutorial we will recreate the NebulaStream demo on our local machine. We will learn how to:

  • Set up and configure a NebulaStream instance inside Docker,
  • Set up physical and logical sources which provide streaming data,
  • Run a NebulaStream query using different clients, and
  • Visualize the results of the query in the NebulaStream UI.

Getting Started

In this tutorial, we use Docker images which contain the NebulaStream binaries and the UI. We also need additional files to configure the NebulaStream instance and provide streaming data.

First, we clone the public nebulastream-tutorial Github repository.

git clone https://github.com/nebulastream/nebulastream-tutorial
cd nebulastream-tutorial

To use the NebulaStream Java client to run queries on NebulaStream, we also have to install Gradle.

Setting up a Nebulastream Instance With Docker

We use Docker Compose to set up a NebulaStream instance on our local machine. Specifically, we create four Docker containers.

  • In the first container, we run the NebulaStream coordinator, which is a central component managing the NebulaStream instance.
  • In the second container, we run a NebulaStream worker, which runs on a device in the fog or cloud layers and provides data and processes queries. In our tutorial, the worker represents a device that is connected to a wind turbine. Later, we will simulate a continuous stream of data detailing the energy generated by the wind turbine.
  • The third container runs the NebulaStream UI with which we will visualize the results of our query.
  • The fourth container runs the MQTT broker Eclipse Mosquitto, which is used by the NebulaStream UI to receive the results of a streaming query.

The file docker-compose.yml contains the definition of the Docker containers. We also configure a local network nebulastream-network between the Docker containers.

docker-compose.yml
version: "3"
services:
  coordinator:
    image: nebulastream/nes-executable-image:latest
    entrypoint: ["bash", "-c", "nesCoordinator --configPath=/tutorial/coordinator.yml"]
    ports:
      - 8081:8081
    volumes:
      - ./:/tutorial
    networks:
      nebulastream-network:
        ipv4_address: 172.31.0.2
  worker:
    image: nebulastream/nes-executable-image:latest
    entrypoint: ["bash", "-c", "sleep 3 && nesWorker --configPath=/tutorial/worker-1.yml"]
    volumes:
      - ./:/tutorial
    networks:
      nebulastream-network:
        ipv4_address: 172.31.0.3
  mosquitto:
    image: eclipse-mosquitto
    ports:
      - 9001:9001
    volumes:
      - ./mosquitto/config:/mosquitto/config
    networks:
      nebulastream-network:
        ipv4_address: 172.31.0.11
  ui:
    image: nebulastream/nes-ui-image:latest
    ports:
      - 9000:9000
    networks:
      nebulastream-network:
        ipv4_address: 172.31.0.12
networks:
  nebulastream-network:
    ipam:
      config:
        - subnet: 172.31.0.0/24

To start the NebulaStream instance, execute the following commands inside the nebulastream-tutorial folder.

docker-compose pull    # Make sure we have the latest images.
docker-compose up      # Start the Docker containers

Afterwards, you should see the output of the various services starting up. The output should look similar to the following screenshot:

Output of running <code>docker-compose up</code>. The NebulaStream coordinator and worker banner are shown.

Configuring the NebulaStream Instance

We configure the NebulaStream instance through YAML configuration files, using the --configPath parameter on the command line. The file coordinator.yml contains the configuration for the NebulaStream coordinator and the file worker-1.yml contains the configuration for the NebulaStream worker.

We have to configure three aspects of the NebulaStream instance:

  • The network between the coordinator and the worker.
  • A logical source in the coordinator, describing the schema of the wind_turbines data stream.
  • A physical source in the worker, to produce data for the wind_turbines stream.

Network Configuration

Docker-compose takes care of forwarding ports between the Docker containers, so we can focus on configuring our network. The following excerpts show how to configure the network between the coordinator and the worker.

In coordinator.yml, we have to define the interfaces on which the coordinator listens for remote procedure calls (setting coordinatorIp) and for REST API calls (setting restIp). For both settings, we use the IP address which we assigned to the coordinator Docker container in docker-compose.yml. We also have to configure the port on which the coordinator accepts REST calls (setting restPort).

Network configuration in coordinator.yml
###
### Network configuration
###
coordinatorIp: 172.31.0.2
restIp: 172.31.0.2
restPort: 8081

In worker-1.yml, we have to define the interface on which the worker listens for remote procedure calls. Again, we use the IP address which we assigned to the worker Docker container in docker-compose.yml. We also have to let the worker know how to reach the coordinator. For this, we use the name of the coordinator Docker container defined in docker-compose.yml.

Network configuration in worker-1.yml
###
### Network configuration
###
localWorkerIp: 172.31.0.3
coordinatorIp: coordinator

Logical Source Configuration

In NebulaStream, a logical source represents a data stream on which we can execute queries. Each logical source contains the data of one or more physical sources (see below). In order to access the data of a logical source, we have to define it in the coordinator, i.e., we have to give it a name and describe the data schema.

In this tutorial, we use the wind_turbine logical source. The schema of this source contains a number of fields, but we are only interested in three: (1) The field metadata_id identifies an individual wind turbine. (2) The field features_properties_updated contains the timestamp when a stream record was produced. (3) The field features_properties_mag contains the amount of energy produced by this wind turbine during the interval represented by the record.

We define the logical source and its schema centrally in the coordinator. The following excerpt shows how to do that in the coordinator.yml configuration file.

Logical source configuration in coordinator.yml
###
### Logical source configuration
###
logicalSources:
        # The name of the logical source.
        - logicalSourceName: "wind_turbines"
          # The description of the schema.
          fields:
                  - name: type
                    type: CHAR
                    length: 50
                  - name: metadata_generated
                    type: UINT64
                  - name: metadata_title
                    type: CHAR
                    length: 50
                  - name: metadata_id
                    type: CHAR
                    length: 50
                  - name: features_type
                    type: CHAR
                    length: 50
                  - name: features_properties_capacity
                    type: UINT64
                  - name: features_properties_efficiency
                    type: FLOAT32
                  - name: features_properties_mag
                    type: FLOAT32
                  - name: features_properties_time
                    type: UINT64
                  - name: features_properties_updated
                    type: UINT64
                  - name: features_properties_type
                    type: CHAR
                    length: 50
                  - name: features_geometry_type
                    type: CHAR
                    length: 50
                  - name: features_geometry_coordinates_longitude
                    type: FLOAT32
                  - name: features_geometry_coordinates_latitude
                    type: FLOAT32
                  - name: features_eventId
                    type: CHAR
                    length: 50

Physical Source Configuration

In NebulaStream, a physical source represents the data produced by a sensor.

In this tutorial, we simulate a sensor, called wind_turbine_1, that produces data related to a wind turbine. The file wind-turbine-1.csv contains an entry for every minute of a single day. It shows, e.g., how much energy is produced by this wind turbine in one minute.

We configure the worker to read the data from this CSV file using a CSV source. The following excerpt shows how to do that in the worker-1.yml configuration file.

Physical source configuration in worker-1.yml
###
### Physical source configuration
###
physicalSources:
        # This physical source is part of the `wind_turbines` logical source.
        - logicalSourceName: wind_turbines
          # The name of this physical source.
          physicalSourceName: wind_turbine_1
          # This physical source reads data from a CSV file.
          type: CSVSource
          configuration:
                  # The path of the CSV file.
                  filePath: /tutorial/wind-turbine-1.csv
                  # Skip the first line because it contains the field names of the CSV file.
                  skipHeader: true
                  # Additional settings to configure how data is provided to the logical source.
                  numberOfBuffersToProduce: 1024
                  numberOfTuplesToProducePerBuffer: 0

Running Queries

Now that our NebulaStream instance is running, and we have defined a logical and physical source, we can run a streaming query.

Submitting a Query With the Java Client

The NebulaStream Java client is one way to run a streaming query on NebulaStream. In this tutorial, we provide an example program in the folder java-client-example. The example contains the following Java program, which uses the Java API to connect to our NebulaStream instance and execute a query.

NebulaStreamTutorial.java
import stream.nebula.exceptions.RESTException;
import stream.nebula.operators.Aggregation;
import stream.nebula.operators.sinks.FileSink;
import stream.nebula.operators.sinks.Sink;
import stream.nebula.operators.window.TumblingWindow;
import stream.nebula.runtime.NebulaStreamRuntime;
import stream.nebula.runtime.Query;

import java.io.IOException;

import static stream.nebula.operators.window.EventTime.eventTime;
import static stream.nebula.operators.window.TimeMeasure.minutes;

public class NebulaStreamTutorial {

    public static void main(String[] args) throws RESTException, IOException {
        // Create a NebulaStream runtime and connect it to the NebulaStream coordinator.
        NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
        nebulaStreamRuntime.getConfig().setHost("localhost").setPort("8081");

        // Create a streaming query
        Query query = nebulaStreamRuntime
                .readFromSource("wind_turbines")
                .window(TumblingWindow.of(eventTime("features_properties_updated"), minutes(10)))
                .byKey("metadata_id")
                .apply(Aggregation.sum("features_properties_mag"));

        // Finish the query with a sink
        Sink sink = query.sink(new FileSink("/tutorial/java-query-results.csv", "CSV_FORMAT", true));

        // Submit the query to the coordinator.
        int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");
    }

}

In this example, we first create a NebulaStreamRuntime object and configure it to connect to the coordinator. We then create a query that reads data from wind_turbines logical source, creates a 10 minute tumbling window over the attribute features_properties_updated, and aggregates the field features_properties_mag. The output of the query is written to another CSV file /tutorial/java-query-results.csv on the coordinator. After execution, you can find this file in the nebulastream-tutorial folder.

You can run the example program using gradle:

cd java-client-example
gradle run

This should produce the following output:

It shows that the query was submitted to NebulaStream and the coordinator returned the query ID 1.

In the screenshot, you can see the JSON object that is sent to the coordinator to execute the query. The CSV file java-query-results.csv, which is produced by this program, should contain the following content (only the first six out of 144 lines are shown):

wind_turbines$start:INTEGER,wind_turbines$end:INTEGER,wind_turbines$metadata_id:ArrayType,wind_turbines$features_properties_mag:(Float)
1647385200000,1647385800000,650f0dac-6228-4ee8-906d-4173261d7095,7542428.500000
1647385800000,1647386400000,650f0dac-6228-4ee8-906d-4173261d7095,9941600.000000
1647386400000,1647387000000,650f0dac-6228-4ee8-906d-4173261d7095,10842671.000000
1647387000000,1647387600000,650f0dac-6228-4ee8-906d-4173261d7095,14288031.000000
1647387600000,1647388200000,650f0dac-6228-4ee8-906d-4173261d7095,8157870.000000
...

Submitting a Query Over the Rest API

Alternatively, we can also submit the query directly to the REST API, e.g., using curl in a shell. The following example executes the same query as in the Java example.

curl -d@rest-query-with-csv-sink.json http://localhost:8081/v1/nes/query/execute-query

The payload of the REST call is contained in the file rest-query-with-csv-sink.json, which we provide with the tutorial. It contains the following JSON object:

{
  "userQuery": "Query::from(\"wind_turbines\").window(TumblingWindow::of(EventTime(Attribute(\"features_properties_updated\")), Minutes(10))).byKey(Attribute(\"metadata_id\")).apply(Sum(Attribute(\"features_properties_mag\"))).sink(FileSinkDescriptor::create(\"/tutorial/rest-query-result.csv\", \"CSV_FORMAT\", \"APPEND\"));",
  "placement": "BottomUp"
}

⚠️ At the moment, we submit queries using the C++ syntax over the REST API. This will change in the future.

This command should produce the following output:

{
  "queryId": 1
}

Again we run this query, we can see its results in the file rest-query-results.csv.

Visualizing the Query Results in the Nebulastream UI

As the last step in this tutorial, we will visualize the query results in the NebulaStream UI. With the Docker containers running, open the NebulaStream UI in the browser which listens on http://localhost:9000.

⚠️ The NebulaStream UI is still very experimental.

Connecting the UI to the Nebulastream Coordinator

First, we have to connect the UI to the NebulaStream coordinator. Click on the settings icon Settings icon in the UI in the sidebar on the left. We can connect to the coordinator running on localhost on port 8081. This port is exposed from the Docker container to our host system.

UI settings with <code>localhost</code> as host and 8081 as port

Next, click on the source catalog icon Source catalog icon to open the source catalog. There we can see the wind_turbines logical source.

Source catalog with <code>wind_turbines</code> source.

Running a Query With an MQTT Sink

The NebulaStream UI reads query results from an MQTT broker. In our Docker setup, we use Eclipse Mosquitto as am MQTT broker. The file rest-query-with-mqtt-sink.json contains a version of our original query which sends its results to an MQTT sink. To submit the query over the REST API, run the following command.

curl -d@rest-query-with-mqtt-sink.json http://localhost:8081/v1/nes/query/execute-query

We can see that the query is running by clicking on the query catalog icon Query catalog icon in the UI.

Query catalog with query 1 running.

Configuring a Visualization Chart

Now that the query is running and sending data to an MQTT broker, we can set up a visualization chart. Click on the result visualization icon Source catalog icon to open the result visualization page. On this page, click on the “Add graph” button in the top-right corner, and create a multi line chart as shown in the following screenshot:

Creating a multi line chart

On this page, select the query in the drop down menu at the top and select the multi line chart.

⚠️ On macOS, we have to use localhost as the host of the MQTT broker. On Linux, the IP address of the MQTT service configured in docker-compose.yml, i.e., 172.31.0.11, also works. In both cases, we use 9001 as the port of the MQTT broker.

In the following page, click on “Edit graph” and set up the chart as shown in the following screenshot:

Creating a multi line chart

Select “features_properties_mag” as the attribute and “start” as the x axis. Then click “Draw graph”. We should now see the values of the tumbling window displayed as in the following screenshot:

Creating a multi line chart

Next Steps

You now know how to set up a NebulaStream instance, how to define logical and physical queries, how to run a query, and how to visualize its results. Next, you can learn more about:

If you want to look under the hood, or contribute to NebulaStream, we are happy to provide you access to a source code prerelease.