Configuration options

This page describes how to configure the runtime behavior of the NebulaStream Coordinator and Workers.

The NebulaStream Coordinator and Worker provide two ways to configure their runtime behavior.

  1. Setting configuration options on the command line.
  2. Setting configuration options in a YAML configuration file.

❗ For the NebulaStream Coordinator, configuration setting is specified on the command line take precedence over the same option in the YAML configuration file.

❗ For the NebulaStream Worker, command line options do not override the settings in the YAML configuration file.

❗ For most configuration options, the key is always the same, regardless whether it is specified on the command line or in a YAML configuration file. The exception is the configuration of logical sources in the NebulaStream coordinator and physical sources in the NebulaStream worker.

In this page, we describe the general way how to set configuration options on the command line or in a YAML configuration file. We also describe every configuration option and provide their default values.

Configuration on the Command Line

If the NebulaStream Coordinator or Worker are started without any command line parameters, all configuration options are initialized to their default values.

To set the configuration option key to the value value, use the syntax --key=value.

💡 In the following example, the configuration option logLevel is set to LOG_INFO:

nesCoordinator --logLevel=LOG_INFO

Configuration in a YAML file

It is also possible to create a YAML configuration file. To set configuration options set from this file, use the command line parameter configPath.

💡 In the following example, configuration options are set using the contents of the file coordinator.yml:

nesCoordinator --configPath=coordinator.yml

We provide examples of the YAML configuration files in the NebulaStream Tutorial repository:

Coordinator Configuration Options

The configuration options of the Coordinator configure network settings, the NebulaStream optimizer, logical sources, and enable experimental features.

General Coordinator Configuration

The following table lists general configuration options of the NebulaStream Coordinator in alphabetical order. As the Coordinator encapsulates a worker internally, it supports all worker configurations via the “worker:” configuration option.

KeyDefault valueDescription
configPathNo defaultPath to a YAML configuration file.
coordinatorIp127.0.0.1Coordinator RPC server IP address.
enableQueryReconfigurationfalseEnable reconfiguration of running query plans.
logLevelLOG_DEBUGThe detail of log messages. Possible values are: LOG_NONE, LOG_WARNING, LOG_DEBUG, LOG_INFO, or LOG_TRACE.
restIp127.0.0.1Coordinator REST server IP address.
restPort8081Coordinator REST server TCP port.
rpcPort4000Coordinator RPC server TCP port. Used to receive control messages.
workerConfigPathNo defaultPath to a YAML configuration file for the internal worker.

Optimizer configuration

The following table lists configuration options of the NebulaStream optimizer in alphabetical order. These configuration options begin with the prefix optimizer..

KeyDefault valueDescription
optimizer.distributedWindowChildThreshold2Threshold for the distribution of window aggregations. Indicates the number of child operators from which a window operator is distributed.
optimizer.distributedWindowCombinerThreshold4Threshold for the insertion of pre-aggregation operators. Indicates the number of child nodes from which on we will introduce combine operator between the pre-aggregation operator and the final aggregation.
optimizer.memoryLayoutPolicyFORCE_ROW_LAYOUTIndicates the memory layout policy and allows the engine to prefer a row or columnar layout.

Possible values are:

  • FORCE_ROW_LAYOUT: Enforces a row layout between all operators.
  • FORCE_COLUMN_LAYOUT: Enforces a column layout between all operators.
    optimizer.performAdvanceSemanticValidationfalsePerform advance semantic validation on the incoming queries.
    ❗ This option is set to false by default as currently not all operators are supported by Z3 based signature generator. Because of this, in some cases, enabling this check may result in a crash or incorrect behavior.
    optimizer.performDistributedWindowsOptimizationtrueEnables the distribution of window aggregations. This optimization will enable the distribution of window aggregation across multiple nodes. To this end, the optimizer will create pre-aggregation operators that are located close to the data source.
    optimizer.performOnlySourceOperatorExpansionfalsePerform only source operator duplication when applying Logical Source Expansion Rewrite Rule.
    optimizer.queryBatchSize1The number of queries to be processed together.
    optimizer.queryMergerRuleDefaultQueryMergerRuleThe rule to be used for performing query merging. Valid options are: SyntaxBasedCompleteQueryMergerRule, SyntaxBasedPartialQueryMergerRule, Z3SignatureBasedCompleteQueryMergerRule, Z3SignatureBasedPartialQueryMergerRule, Z3SignatureBasedPartialQueryMergerBottomUpRule, HashSignatureBasedCompleteQueryMergerRule, ImprovedStringSignatureBasedCompleteQueryMergerRule, ImprovedStringSignatureBasedPartialQueryMergerRule, StringSignatureBasedPartialQueryMergerRule, DefaultQueryMergerRule, HybridCompleteQueryMergerRule.

    Internal worker configuration

    At the moment, the coordinator starts an internal worker. This internal worker can be configured in the coordinator configuration by prefixing a worker configuration option with the prefix worker.. It is also possible to extract the configuration options for the internal worker in a separate YAML configuration file, using the configuration option --workerConfigPath. The evaluation order of options which configure the internal worker are as follows:

    1. A configuration option prefixed with worker. in the YAML configuration file for the coordinator.
    2. The contents of a YAML configuration file that is specified by the key workerConfigPath inside the YAML configuration file of the coordinator.
    3. The contents of a YAML configuration file that is specified by the key workerConfigPath on the command line when starting the coordinator.
    4. A configuration option prefixed with worker. on the command line when starting the coordinator.

    This order follows the expectation that options on the command line (including workerConfigPath) overwrite options in the configuration file.

    There are three exceptions, which are the worker options coordinatorIp, coordinatorPort, and localWorkerIp. These are always set to the respective values of the coordinator.

    Logical Sources Configuration

    Logical sources can only be configured in the YAML configuration file and not on the command line. That is because it is not possible to define multiple logical sources on the command line.

    A logical source is defined by a name (logicalSourceName) and a schema. The schema consists of a number of fields that also have a name and a type. Valid types are: INT8, UINT8, INT16, UINT16, INT32, UINT32, INT64, FLOAT32, UINT64, FLOAT64, BOOLEAN, TEXT and CHAR. The TEXT type represents a variable-length string. The CHAR type represents a fixed-length string and also requires a length value. The type FLOAT32 is represented as a single precision, and the type FLOAT64 is double precision.

    💡 The example below shows how to define a logical source with the name default_logical and a schema consisting of the fields id, value, and char_value.

    logicalSources:
      - logicalSourceName: "default_logical"
        fields:
          - name: "id"
            type: "UINT32"
          - name: "sensor_value"
            type: "UINT64"
          - name: "char_value"
            type: "CHAR"
            length: 5
    

    Worker Configuration Options

    The configuration options of the Worker configure network settings, the NebulaStream query compiler, physical sources, and enable experimental features.

    General Worker Configuration

    The following table lists general configuration options of the NebulaStream Worker in alphabetical order.

    KeyDefault valueDescription
    bufferSizeInBytes4096The size of individual TupleBuffers in bytes. This property has to be the same over a whole deployment.
    configPathNo defaultPath to a YAML configuration file.
    coordinatorIp127.0.0.1Coordinator RPC server IP address.
    coordinatorPort4000Coordinator RPC server TCP port. Needs to the same as rpcPort in Coordinator.
    dataPort0Data server TCP port of this worker. Used to receive data. A value of 0 means that the port is selected automatically.
    localWorkerIp127.0.0.1IP of the Worker.
    locationCoordinatesNo defaultCoordinates of the physical location of the worker.
    logLevelLOG_DEBUGThe detail of log messages. Possible values are: LOG_NONE, LOG_WARNING, LOG_DEBUG, LOG_INFO, or LOG_TRACE.
    numWorkerThreads1The number of worker threads.
    numaAwarenessfalseEnables support for Non-Uniform Memory Access (NUMA) systems.
    numberOfBuffersInGlobalBufferManager1024The number of buffers in the global buffer manager. Controls how much memory is consumed by the system.
    numberOfBuffersInSourceLocalBufferPool64The number of buffers in source local buffer pool. Indicates how many buffers a single data source can allocate. This property controls the backpressure mechanism as a data source that can’t allocate new records can’t ingest more data.
    numberOfBuffersPerWorker128The number of buffers in task local buffer pool. Indicates how many buffers a single worker thread can allocate.
    numberOfQueues1The number of processing queues.
    numberOfSlots65535The number of slots define the amount of computing resources that are usable at this worker. This enables the restriction of the amount of concurrently deployed queries and operators.
    numberOfThreadsPerQueue0Number of threads per processing queue.
    parentId0The ID of this node’s parent in the NebulaStream IoT network topology.
    physicalSourcessee
    queryManagerModeDynamicThe mode in which the query manager is running.

    • Dynamic: Only one queue overall.
    • Static: Use queue per query and a specified number of threads per queue.
    queuePinListNo defaultPins specific worker threads to specific queues.
    ❗ This setting is deprecated and will be removed.
    rpcPort0Worker RPC server TCP port. Used to receive control messages. A value of 0 means that the port is selected automatically.
    sourcePinListNo defaultPin specific data sources to specific CPU cores.
    ❗ This setting is deprecated and will be removed.
    workerPinListNo defaultPin specific worker threads to specific CPU cores.
    ❗ This setting is deprecated and will be removed.

    Query Compiler Configuration

    The following table lists configuration options of the NebulaStream query compiler in alphabetical order. These configuration options begin with the prefix queryCompiler..

    KeyDefault valueDescription
    queryCompiler.pipeliningStrategyOPERATOR_FUSIONIndicates the pipelining strategy for the query compiler. Possible values are: OPERATOR_FUSION or OPERATOR_AT_A_TIME.
    queryCompiler.compilationStrategyOPTIMIZEIndicates the optimization strategy for the query compiler. Possible values are: FAST, DEBUG or OPTIMIZE.
    queryCompiler.outputBufferOptimizationLevelALLIndicates the OutputBufferAllocationStrategy. Possible values are: ALL, NO, ONLY_INPLACE_OPERATIONS_NO_FALLBACK, REUSE_INPUT_BUFFER_AND_OMIT_OVERFLOW_CHECK_NO_FALLBACK, REUSE_INPUT_BUFFER_NO_FALLBACK, OR OMIT_OVERFLOW_CHECK_NO_FALLBACK.
    queryCompiler.windowingStrategyDEFAULTIndicates the windowingStrategy. Possible values are: DEFAULT, THREAD_LOCAL.
    queryCompiler.queryCompilerTypeDEFAULT_QUERY_COMPILERIndicates the query compiler. Possible values are: DEFAULT_QUERY_COMPILER for the legacy C++ query compiler and NAUTILUS_QUERY_COMPILER for the new Nautilus query compiler.

    Physical Sources Configuration

    Physical sources can be defined both on the command line and also in the YAML configuration file.

    ❗ On the command line, we can only define a single physical source. In contrast, in the YAML configuration file, we can define multiple physical sources.

    The following table lists the configuration options that have to be specified for every physical source. The configuration options for physical sources begin with the prefix physicalSources..

    KeyDefault valueDescription
    physicalSources.logicalSourceNameNo defaultThe name of the logical source to which this physical source belongs.
    physicalSources.physicalSourceNameNo defaultThe name of this physical source.
    physicalSources.typeNo defaultThe type of this physical source. See below for a description of the types.

    NebulaStream supports the following physical sources types:

    • BinarySource: Reads data from a binary file.
    • CSVSource: Reads data from a CSV file and repeats the data multiple times.
    • KafkaSource: Reads data from a Kafka broker.
    • MQTTSource: Reads data from a MQTT broker.
    • MaterializedViewSource: Read from a materialized view.
    • OPCSource: Reads data from an OPC server.

    These source types require additional configuration options which we describe below.

    BinarySource

    A BinarySource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.filePathNo defaultRequired. The path to the binary file that should be read.

    CSVSource

    A CSVSource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.delimiter“,”The delimiter between the values of a record.
    physicalSources.filePathNo defaultRequired. The path to the CSV file that should be read.
    physicalSources.numberOfBuffersToProduce0Number of buffers to produce. If 0, the entire file is read.
    physicalSources.numberOfTuplesToProducePerBuffer0Number of tuples to produce per buffer. If 0, the tuple buffers are filled to capacity.
    physicalSources.skipHeaderfalseSkip first line of the file.
    physicalSources.sourceGatheringInterval1Gathering interval of the source.

    KafkaSource

    A KafkaSource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.autoCommit1Boolean value where 1 equals true and 0 equals false.
    physicalSources.brokersNo defaultKafka brokers.
    physicalSources.connectionTimeout10Connection timeout for source.
    physicalSources.groupIdtestGroupUser name.
    physicalSources.topictestTopicTopic to listen to.

    MQTTSource

    A MQTTSource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.topictestTopicTopic to listen to.
    physicalSources.cleanSessiontrueIf true, clean up session after client loses connection. If false, keep data for client after connection loss (persistent session).
    physicalSources.clientIdtestClientClient ID. Needs to be unique for each connected MQTTSource.
    physicalSources.flushIntervalMS-1TupleBuffer flush interval in milliseconds.
    physicalSources.inputFormatJSONInput format. Possible values are: JSON or CSV.
    physicalSources.urlws://127.0.0.1:9001URL to connect to.
    physicalSources.qos2Quality of service.
    physicalSources.userNametestUserUser name. Can be chosen arbitrarily.

    MaterializedViewSource

    ❗ This source type is experimental.

    A MaterializedViewSource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.materializedViewId1The id of the materialized view to read from.

    OPCSource

    An OPCSource can be configured with the following configuration options.

    KeyDefault valueDescription
    physicalSources.namespaceIndex1Namespace index of the node.
    physicalSources.nodeIdentifierthe.answerNode identifier.
    physicalSources.passwordNo defaultPassword.
    physicalSources.userNametestUserUser name.