Configuration options
This page describes how to configure the runtime behavior of the NebulaStream Coordinator and Workers.
The NebulaStream Coordinator and Worker provide two ways to configure their runtime behavior.
- Setting configuration options on the command line.
- Setting configuration options in a YAML configuration file.
❗ For the NebulaStream Coordinator, configuration setting is specified on the command line take precedence over the same option in the YAML configuration file.
❗ For the NebulaStream Worker, command line options do not override the settings in the YAML configuration file.
❗ For most configuration options, the key is always the same, regardless whether it is specified on the command line or in a YAML configuration file. The exception is the configuration of logical sources in the NebulaStream coordinator and physical sources in the NebulaStream worker.
In this page, we describe the general way how to set configuration options on the command line or in a YAML configuration file. We also describe every configuration option and provide their default values.
Configuration on the Command Line
If the NebulaStream Coordinator or Worker are started without any command line parameters, all configuration options are initialized to their default values.
To set the configuration option key
to the value value
, use the syntax --key=value
.
💡 In the following example, the configuration option logLevel
is set to LOG_INFO
:
nesCoordinator --logLevel=LOG_INFO
Configuration in a YAML file
It is also possible to create a YAML configuration file.
To set configuration options set from this file, use the command line parameter configPath
.
💡 In the following example, configuration options are set using the contents of the file coordinator.yml
:
nesCoordinator --configPath=coordinator.yml
We provide examples of the YAML configuration files in the NebulaStream Tutorial repository:
Coordinator Configuration Options
The configuration options of the Coordinator configure network settings, the NebulaStream optimizer, logical sources, and enable experimental features.
General Coordinator Configuration
The following table lists general configuration options of the NebulaStream Coordinator in alphabetical order. As the Coordinator encapsulates a worker internally, it supports all worker configurations via the “worker:” configuration option.
Key | Default value | Description |
---|---|---|
configPath | No default | Path to a YAML configuration file. |
coordinatorHost | 127.0.0.1 | Coordinator RPC server IP address or hostname. |
enableQueryReconfiguration | false | Enable reconfiguration of running query plans. |
logLevel | LOG_DEBUG | The detail of log messages. Possible values are: LOG_NONE, LOG_WARNING, LOG_DEBUG, LOG_INFO, or LOG_TRACE. |
restIp | 127.0.0.1 | Coordinator REST server IP address. |
restPort | 8081 | Coordinator REST server TCP port. |
rpcPort | 4000 | Coordinator RPC server TCP port. Used to receive control messages. |
workerConfigPath | No default | Path to a YAML configuration file for the internal worker. |
Optimizer configuration
The following table lists configuration options of the NebulaStream optimizer in alphabetical order. These configuration options begin with the prefix optimizer.
.
Key | Default value | Description |
---|---|---|
optimizer.distributedWindowChildThreshold | 2 | Threshold for the distribution of window aggregations. Indicates the number of child operators from which a window operator is distributed. |
optimizer.distributedWindowCombinerThreshold | 4 | Threshold for the insertion of pre-aggregation operators. Indicates the number of child nodes from which on we will introduce combine operator between the pre-aggregation operator and the final aggregation. |
optimizer.memoryLayoutPolicy | FORCE_ROW_LAYOUT | Indicates the memory layout policy and allows the engine to prefer a row or columnar layout. Possible values are:
|
optimizer.performAdvanceSemanticValidation | false | Perform advance semantic validation on the incoming queries. ❗ This option is set to false by default as currently not all operators are supported by Z3 based signature generator. Because of this, in some cases, enabling this check may result in a crash or incorrect behavior. |
optimizer.performDistributedWindowsOptimization | true | Enables the distribution of window aggregations. This optimization will enable the distribution of window aggregation across multiple nodes. To this end, the optimizer will create pre-aggregation operators that are located close to the data source. |
optimizer.performOnlySourceOperatorExpansion | false | Perform only source operator duplication when applying Logical Source Expansion Rewrite Rule. |
optimizer.queryBatchSize | 1 | The number of queries to be processed together. |
optimizer.queryMergerRule | DefaultQueryMergerRule | The rule to be used for performing query merging. Valid options are: DefaultQueryMergerRule, Z3SignatureBasedCompleteQueryMergerRule, Z3SignatureBasedPartialQueryMergerRule, HashSignatureBasedCompleteQueryMergerRule, HashSignatureBasedPartialQueryMergerRule, DefaultQueryMergerRule, HybridCompleteQueryMergerRule. |
Internal worker configuration
At the moment, the coordinator starts an internal worker.
This internal worker can be configured in the coordinator configuration by prefixing a worker configuration option with the prefix worker.
.
It is also possible to extract the configuration options for the internal worker in a separate YAML configuration file, using the configuration option --workerConfigPath
.
The evaluation order of options which configure the internal worker are as follows:
- A configuration option prefixed with
worker.
in the YAML configuration file for the coordinator. - The contents of a YAML configuration file that is specified by the key
workerConfigPath
inside the YAML configuration file of the coordinator. - The contents of a YAML configuration file that is specified by the key
workerConfigPath
on the command line when starting the coordinator. - A configuration option prefixed with
worker.
on the command line when starting the coordinator.
This order follows the expectation that options on the command line (including workerConfigPath
) overwrite options in the configuration file.
There are three exceptions, which are the worker options coordinatorHost
, coordinatorPort
, and localWorkerHost
.
These are always set to the respective values of the coordinator.
Logical Sources Configuration
Logical sources can only be configured in the YAML configuration file and not on the command line. That is because it is not possible to define multiple logical sources on the command line.
A logical source is defined by a name (logicalSourceName
) and a schema.
The schema consists of a number of fields that also have a name and a type.
Valid types are: INT8
, UINT8
, INT16
, UINT16
, INT32
, UINT32
, INT64
, FLOAT32
, UINT64
, FLOAT64
, BOOLEAN
, TEXT
and CHAR
.
The TEXT
type represents a variable-length string.
The CHAR
type represents a fixed-length string and also requires a length
value.
The type FLOAT32
is represented as a single precision, and the type FLOAT64
is double precision.
💡 The example below shows how to define a logical source with the name default_logical
and a schema consisting of the fields id
, value
, and char_value
.
logicalSources:
- logicalSourceName: "default_logical"
fields:
- name: "id"
type: "UINT32"
- name: "sensor_value"
type: "UINT64"
- name: "char_value"
type: "CHAR"
length: 5
Worker Configuration Options
The configuration options of the Worker configure network settings, the NebulaStream query compiler, physical sources, and enable experimental features.
General Worker Configuration
The following table lists general configuration options of the NebulaStream Worker in alphabetical order.
Key | Default value | Description |
---|---|---|
bufferSizeInBytes | 4096 | The size of individual TupleBuffers in bytes. This property has to be the same over a whole deployment. |
configPath | No default | Path to a YAML configuration file. |
coordinatorHost | 127.0.0.1 | Coordinator RPC server IP address or hostname. |
coordinatorPort | 4000 | Coordinator RPC server TCP port. Needs to the same as rpcPort in Coordinator. |
dataPort | 0 | Data server TCP port of this worker. Used to receive data. A value of 0 means that the port is selected automatically. |
localWorkerHost | 127.0.0.1 | IP or hostname of the Worker. |
locationCoordinates | No default | Coordinates of the physical location of the worker. |
logLevel | LOG_DEBUG | The detail of log messages. Possible values are: LOG_NONE, LOG_WARNING, LOG_DEBUG, LOG_INFO, or LOG_TRACE. |
numWorkerThreads | 1 | The number of worker threads. |
numaAwareness | false | Enables support for Non-Uniform Memory Access (NUMA) systems. |
numberOfBuffersInGlobalBufferManager | 1024 | The number of buffers in the global buffer manager. Controls how much memory is consumed by the system. |
numberOfBuffersInSourceLocalBufferPool | 64 | The number of buffers in source local buffer pool. Indicates how many buffers a single data source can allocate. This property controls the backpressure mechanism as a data source that can’t allocate new records can’t ingest more data. |
numberOfBuffersPerWorker | 128 | The number of buffers in task local buffer pool. Indicates how many buffers a single worker thread can allocate. |
numberOfQueues | 1 | The number of processing queues. |
numberOfSlots | 65535 | The number of slots define the amount of computing resources that are usable at this worker. This enables the restriction of the amount of concurrently deployed queries and operators. |
numberOfThreadsPerQueue | 0 | Number of threads per processing queue. |
parentId | 0 | The ID of this node’s parent in the NebulaStream IoT network topology. |
physicalSources | see | |
queryManagerMode | Dynamic | The mode in which the query manager is running.
|
queuePinList | No default | Pins specific worker threads to specific queues. ❗ This setting is deprecated and will be removed. |
rpcPort | 0 | Worker RPC server TCP port. Used to receive control messages. A value of 0 means that the port is selected automatically. |
sourcePinList | No default | Pin specific data sources to specific CPU cores. ❗ This setting is deprecated and will be removed. |
workerPinList | No default | Pin specific worker threads to specific CPU cores. ❗ This setting is deprecated and will be removed. |
Query Compiler Configuration
The following table lists configuration options of the NebulaStream query compiler in alphabetical order. These configuration options begin with the prefix queryCompiler.
.
Key | Default value | Description |
---|---|---|
queryCompiler.pipeliningStrategy | OPERATOR_FUSION | Indicates the pipelining strategy for the query compiler. Possible values are: OPERATOR_FUSION or OPERATOR_AT_A_TIME . |
queryCompiler.compilationStrategy | OPTIMIZE | Indicates the optimization strategy for the query compiler. Possible values are: FAST , DEBUG or OPTIMIZE . |
queryCompiler.outputBufferOptimizationLevel | ALL | Indicates the OutputBufferAllocationStrategy. Possible values are: ALL , NO , ONLY_INPLACE_OPERATIONS_NO_FALLBACK , REUSE_INPUT_BUFFER_AND_OMIT_OVERFLOW_CHECK_NO_FALLBACK , REUSE_INPUT_BUFFER_NO_FALLBACK , OR OMIT_OVERFLOW_CHECK_NO_FALLBACK . |
queryCompiler.windowingStrategy | DEFAULT | Indicates the windowingStrategy. Possible values are: DEFAULT , THREAD_LOCAL . |
queryCompiler.queryCompilerType | DEFAULT_QUERY_COMPILER | Indicates the query compiler. Possible values are: DEFAULT_QUERY_COMPILER for the legacy C++ query compiler and NAUTILUS_QUERY_COMPILER for the new Nautilus query compiler. |
Physical Sources Configuration
Physical sources can be defined both on the command line and also in the YAML configuration file.
❗ On the command line, we can only define a single physical source. In contrast, in the YAML configuration file, we can define multiple physical sources.
The following table lists the configuration options that have to be specified for every physical source.
The configuration options for physical sources begin with the prefix physicalSources.
.
Key | Default value | Description |
---|---|---|
physicalSources.logicalSourceName | No default | The name of the logical source to which this physical source belongs. |
physicalSources.physicalSourceName | No default | The name of this physical source. |
physicalSources.type | No default | The type of this physical source. See below for a description of the types. |
NebulaStream supports the following physical sources types:
BinarySource
: Reads data from a binary file.CSVSource
: Reads data from a CSV file and repeats the data multiple times.KafkaSource
: Reads data from a Kafka broker.MQTTSource
: Reads data from a MQTT broker.MaterializedViewSource
: Read from a materialized view.OPCSource
: Reads data from an OPC server.
These source types require additional configuration options which we describe below.
BinarySource
A BinarySource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.filePath | No default | Required. The path to the binary file that should be read. |
CSVSource
A CSVSource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.delimiter | “,” | The delimiter between the values of a record. |
physicalSources.filePath | No default | Required. The path to the CSV file that should be read. |
physicalSources.numberOfBuffersToProduce | 0 | Number of buffers to produce. If 0, the entire file is read. |
physicalSources.numberOfTuplesToProducePerBuffer | 0 | Number of tuples to produce per buffer. If 0, the tuple buffers are filled to capacity. |
physicalSources.skipHeader | false | Skip first line of the file. |
physicalSources.sourceGatheringInterval | 1 | Gathering interval of the source. |
KafkaSource
A KafkaSource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.autoCommit | 1 | Boolean value where 1 equals true and 0 equals false. |
physicalSources.brokers | No default | Kafka brokers. |
physicalSources.connectionTimeout | 10 | Connection timeout for source. |
physicalSources.groupId | testGroup | User name. |
physicalSources.topic | testTopic | Topic to listen to. |
MQTTSource
A MQTTSource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.topic | testTopic | Topic to listen to. |
physicalSources.cleanSession | true | If true, clean up session after client loses connection. If false, keep data for client after connection loss (persistent session). |
physicalSources.clientId | testClient | Client ID. Needs to be unique for each connected MQTTSource. |
physicalSources.flushIntervalMS | -1 | TupleBuffer flush interval in milliseconds. |
physicalSources.inputFormat | JSON | Input format. Possible values are: JSON or CSV . |
physicalSources.url | ws://127.0.0.1:9001 | URL to connect to. |
physicalSources.qos | 2 | Quality of service. |
physicalSources.userName | testUser | User name. Can be chosen arbitrarily. |
MaterializedViewSource
❗ This source type is experimental.
A MaterializedViewSource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.materializedViewId | 1 | The id of the materialized view to read from. |
OPCSource
An OPCSource can be configured with the following configuration options.
Key | Default value | Description |
---|---|---|
physicalSources.namespaceIndex | 1 | Namespace index of the node. |
physicalSources.nodeIdentifier | the.answer | Node identifier. |
physicalSources.password | No default | Password. |
physicalSources.userName | testUser | User name. |