Tutorial
In this tutorial we will recreate the NebulaStream demo on our local machine. We will learn how to:
- Set up and configure a NebulaStream instance inside Docker,
- Set up physical and logical sources which provide streaming data,
- Run a NebulaStream query using different clients, and
- Visualize the results of the query in the NebulaStream UI.
Getting Started
In this tutorial, we use Docker images which contain the NebulaStream binaries and the UI. We also need additional files to configure the NebulaStream instance and provide streaming data.
First, we clone the public nebulastream-tutorial
Github repository.
git clone https://github.com/nebulastream/nebulastream-tutorial
cd nebulastream-tutorial
To use the NebulaStream Java client to run queries on NebulaStream, we also have to install Gradle.
Setting up a Nebulastream Instance With Docker
We use Docker Compose to set up a NebulaStream instance on our local machine. Specifically, we create four Docker containers.
- In the first container, we run the NebulaStream coordinator, which is a central component managing the NebulaStream instance.
- In the second container, we run a NebulaStream worker, which runs on a device in the fog or cloud layers and provides data and processes queries. In our tutorial, the worker represents a device that is connected to a wind turbine. Later, we will simulate a continuous stream of data detailing the energy generated by the wind turbine.
- The third container runs the NebulaStream UI with which we will visualize the results of our query.
- The fourth container runs the MQTT broker Eclipse Mosquitto, which is used by the NebulaStream UI to receive the results of a streaming query.
The file docker-compose.yml contains the definition of the Docker containers.
We also configure a local network nebulastream-network
between the Docker containers.
docker-compose.yml
version: "3"
services:
coordinator:
image: nebulastream/nes-executable-image:latest
entrypoint: ["bash", "-c", "nesCoordinator --configPath=/tutorial/coordinator.yml"]
ports:
- 8081:8081
volumes:
- ./:/tutorial
networks:
nebulastream-network:
ipv4_address: 172.31.0.2
worker:
image: nebulastream/nes-executable-image:latest
entrypoint: ["bash", "-c", "sleep 3 && nesWorker --configPath=/tutorial/worker-1.yml"]
volumes:
- ./:/tutorial
networks:
nebulastream-network:
ipv4_address: 172.31.0.3
mosquitto:
image: eclipse-mosquitto
ports:
- 9001:9001
volumes:
- ./mosquitto/config:/mosquitto/config
networks:
nebulastream-network:
ipv4_address: 172.31.0.11
ui:
image: nebulastream/nes-ui-image:latest
ports:
- 9000:9000
networks:
nebulastream-network:
ipv4_address: 172.31.0.12
networks:
nebulastream-network:
ipam:
config:
- subnet: 172.31.0.0/24
To start the NebulaStream instance, execute the following commands inside the nebulastream-tutorial
folder.
docker-compose pull # Make sure we have the latest images.
docker-compose up # Start the Docker containers
Afterwards, you should see the output of the various services starting up. The output should look similar to the following screenshot:
Configuring the NebulaStream Instance
We configure the NebulaStream instance through YAML configuration files, using the --configPath
parameter on the command line.
The file coordinator.yml
contains the configuration for the NebulaStream coordinator and the file worker-1.yml
contains the configuration for the NebulaStream worker.
We have to configure three aspects of the NebulaStream instance:
- The network between the coordinator and the worker.
- A logical source in the coordinator, describing the schema of the
wind_turbines
data stream. - A physical source in the worker, to produce data for the
wind_turbines
stream.
Network Configuration
Docker-compose takes care of forwarding ports between the Docker containers, so we can focus on configuring our network. The following excerpts show how to configure the network between the coordinator and the worker.
In coordinator.yml
, we have to define the interfaces on which the coordinator listens for remote procedure calls (setting coordinatorIp
) and for REST API calls (setting restIp
).
For both settings, we use the IP address which we assigned to the coordinator Docker container in docker-compose.yml
.
We also have to configure the port on which the coordinator accepts REST calls (setting restPort
).
Network configuration in coordinator.yml
###
### Network configuration
###
coordinatorIp: 172.31.0.2
restIp: 172.31.0.2
restPort: 8081
In worker-1.yml
, we have to define the interface on which the worker listens for remote procedure calls.
Again, we use the IP address which we assigned to the worker Docker container in docker-compose.yml
.
We also have to let the worker know how to reach the coordinator.
For this, we use the name of the coordinator Docker container defined in docker-compose.yml
.
Network configuration in worker-1.yml
###
### Network configuration
###
localWorkerIp: 172.31.0.3
coordinatorIp: coordinator
Logical Source Configuration
In NebulaStream, a logical source represents a data stream on which we can execute queries. Each logical source contains the data of one or more physical sources (see below). In order to access the data of a logical source, we have to define it in the coordinator, i.e., we have to give it a name and describe the data schema.
In this tutorial, we use the wind_turbine
logical source.
The schema of this source contains a number of fields, but we are only interested in three:
(1) The field metadata_id
identifies an individual wind turbine.
(2) The field features_properties_updated
contains the timestamp when a stream record was produced.
(3) The field features_properties_mag
contains the amount of energy produced by this wind turbine during the interval represented by the record.
We define the logical source and its schema centrally in the coordinator.
The following excerpt shows how to do that in the coordinator.yml
configuration file.
Logical source configuration in coordinator.yml
###
### Logical source configuration
###
logicalSources:
# The name of the logical source.
- logicalSourceName: "wind_turbines"
# The description of the schema.
fields:
- name: type
type: CHAR
length: 50
- name: metadata_generated
type: UINT64
- name: metadata_title
type: CHAR
length: 50
- name: metadata_id
type: CHAR
length: 50
- name: features_type
type: CHAR
length: 50
- name: features_properties_capacity
type: UINT64
- name: features_properties_efficiency
type: FLOAT32
- name: features_properties_mag
type: FLOAT32
- name: features_properties_time
type: UINT64
- name: features_properties_updated
type: UINT64
- name: features_properties_type
type: CHAR
length: 50
- name: features_geometry_type
type: CHAR
length: 50
- name: features_geometry_coordinates_longitude
type: FLOAT32
- name: features_geometry_coordinates_latitude
type: FLOAT32
- name: features_eventId
type: CHAR
length: 50
Physical Source Configuration
In NebulaStream, a physical source represents the data produced by a sensor.
In this tutorial, we simulate a sensor, called wind_turbine_1
, that produces data related to a wind turbine.
The file wind-turbine-1.csv
contains an entry for every minute of a single day.
It shows, e.g., how much energy is produced by this wind turbine in one minute.
We configure the worker to read the data from this CSV file using a CSV source.
The following excerpt shows how to do that in the worker-1.yml
configuration file.
Physical source configuration in worker-1.yml
###
### Physical source configuration
###
physicalSources:
# This physical source is part of the `wind_turbines` logical source.
- logicalSourceName: wind_turbines
# The name of this physical source.
physicalSourceName: wind_turbine_1
# This physical source reads data from a CSV file.
type: CSVSource
configuration:
# The path of the CSV file.
filePath: /tutorial/wind-turbine-1.csv
# Skip the first line because it contains the field names of the CSV file.
skipHeader: true
# Additional settings to configure how data is provided to the logical source.
numberOfBuffersToProduce: 1024
numberOfTuplesToProducePerBuffer: 0
Running Queries
Now that our NebulaStream instance is running, and we have defined a logical and physical source, we can run a streaming query.
Submitting a Query With the Java Client
The NebulaStream Java client is one way to run a streaming query on NebulaStream.
In this tutorial, we provide an example program in the folder java-client-example
.
The example contains the following Java program, which uses the Java API to connect to our NebulaStream instance and execute a query.
NebulaStreamTutorial.java
import stream.nebula.exceptions.RESTException;
import stream.nebula.operators.Aggregation;
import stream.nebula.operators.sinks.FileSink;
import stream.nebula.operators.sinks.Sink;
import stream.nebula.operators.window.TumblingWindow;
import stream.nebula.runtime.NebulaStreamRuntime;
import stream.nebula.runtime.Query;
import java.io.IOException;
import static stream.nebula.operators.window.EventTime.eventTime;
import static stream.nebula.operators.window.TimeMeasure.minutes;
public class NebulaStreamTutorial {
public static void main(String[] args) throws RESTException, IOException {
// Create a NebulaStream runtime and connect it to the NebulaStream coordinator.
NebulaStreamRuntime nebulaStreamRuntime = NebulaStreamRuntime.getRuntime();
nebulaStreamRuntime.getConfig().setHost("localhost").setPort("8081");
// Create a streaming query
Query query = nebulaStreamRuntime
.readFromSource("wind_turbines")
.window(TumblingWindow.of(eventTime("features_properties_updated"), minutes(10)))
.byKey("metadata_id")
.apply(Aggregation.sum("features_properties_mag"));
// Finish the query with a sink
Sink sink = query.sink(new FileSink("/tutorial/java-query-results.csv", "CSV_FORMAT", true));
// Submit the query to the coordinator.
int queryId = nebulaStreamRuntime.executeQuery(query, "BottomUp");
}
}
In this example, we first create a NebulaStreamRuntime
object and configure it to connect to the coordinator.
We then create a query that reads data from wind_turbines
logical source, creates a 10 minute tumbling window over the attribute features_properties_updated
, and aggregates the field features_properties_mag
.
The output of the query is written to another CSV file /tutorial/java-query-results.csv
on the coordinator.
After execution, you can find this file in the nebulastream-tutorial
folder.
You can run the example program using gradle
:
cd java-client-example
gradle run
This should produce the following output:
In the screenshot, you can see the JSON object that is sent to the coordinator to execute the query.
The CSV file java-query-results.csv
, which is produced by this program, should contain the following content (only the first six out of 144 lines are shown):
wind_turbines$start:INTEGER,wind_turbines$end:INTEGER,wind_turbines$metadata_id:ArrayType,wind_turbines$features_properties_mag:(Float)
1647385200000,1647385800000,650f0dac-6228-4ee8-906d-4173261d7095,7542428.500000
1647385800000,1647386400000,650f0dac-6228-4ee8-906d-4173261d7095,9941600.000000
1647386400000,1647387000000,650f0dac-6228-4ee8-906d-4173261d7095,10842671.000000
1647387000000,1647387600000,650f0dac-6228-4ee8-906d-4173261d7095,14288031.000000
1647387600000,1647388200000,650f0dac-6228-4ee8-906d-4173261d7095,8157870.000000
...
Submitting a Query Over the Rest API
Alternatively, we can also submit the query directly to the REST API, e.g., using curl
in a shell.
The following example executes the same query as in the Java example.
curl -d@rest-query-with-csv-sink.json http://localhost:8081/v1/nes/query/execute-query
The payload of the REST call is contained in the file rest-query-with-csv-sink.json
, which we provide with the tutorial.
It contains the following JSON object:
{
"userQuery": "Query::from(\"wind_turbines\").window(TumblingWindow::of(EventTime(Attribute(\"features_properties_updated\")), Minutes(10))).byKey(Attribute(\"metadata_id\")).apply(Sum(Attribute(\"features_properties_mag\"))).sink(FileSinkDescriptor::create(\"/tutorial/rest-query-result.csv\", \"CSV_FORMAT\", \"APPEND\"));",
"placement": "BottomUp"
}
⚠️ At the moment, we submit queries using the C++ syntax over the REST API. This will change in the future.
This command should produce the following output:
{
"queryId": 1
}
Again we run this query, we can see its results in the file rest-query-results.csv
.
Visualizing the Query Results in the Nebulastream UI
As the last step in this tutorial, we will visualize the query results in the NebulaStream UI. With the Docker containers running, open the NebulaStream UI in the browser.
⚠️ The NebulaStream UI is still very experimental.
Connecting the UI to the Nebulastream Coordinator
First, we have to connect the UI to the NebulaStream coordinator.
Click on the settings icon
in the sidebar on the left.
We can connect to the coordinator running on
localhost
on port 8081.
This port is exposed from the Docker container to our host system.
Next, click on the source catalog icon
to open the source catalog.
There we can see the
wind_turbines
logical source.
Running a Query With an MQTT Sink
The NebulaStream UI reads query results from an MQTT broker.
In our Docker setup, we use Eclipse Mosquitto as am MQTT broker.
The file rest-query-with-mqtt-sink.json
contains a version of our original query which sends its results to an MQTT sink.
To submit the query over the REST API, run the following command.
curl -d@rest-query-with-mqtt-sink.json http://localhost:8081/v1/nes/query/execute-query
We can see that the query is running by clicking on the query catalog icon
in the UI.
Configuring a Visualization Chart
Now that the query is running and sending data to an MQTT broker, we can set up a visualization chart.
Click on the result visualization icon
to open the result visualization page.
On this page, click on the “Add graph” button in the top-right corner, and create a multi line chart as shown in the following screenshot:
On this page, select the query in the drop down menu at the top and select the multi line chart.
⚠️ On macOS, we have to use localhost
as the host of the MQTT broker.
On Linux, the IP address of the MQTT service configured in docker-compose.yml
, i.e., 172.31.0.11, also works.
In both cases, we use 9001 as the port of the MQTT broker.
In the following page, click on “Edit graph” and set up the chart as shown in the following screenshot:
Select “features_properties_mag” as the attribute and “start” as the x axis. Then click “Draw graph”. We should now see the values of the tumbling window displayed as in the following screenshot:
Next Steps
You now know how to set up a NebulaStream instance, how to define logical and physical queries, how to run a query, and how to visualize its results. Next, you can learn more about:
- The general concepts of NebulaStream.
- How to configure the NebulaStream coordinator and worker.
- The general query API, and how to use the Java client, the C++ client, or the REST API.
If you want to look under the hood, or contribute to NebulaStream, we are happy to provide you access to a source code prerelease.