NebuLi: Single-Node Deployment

In this section, we describe how NebulaStream operates in a single-node environment, focusing on the nebuli client and its interaction with a single NebulaStream worker.

query.yaml Example

NebulaStream queries, along with their associated sources and sinks, can be defined in a query.yaml file. Here is an example:

query: |
    SELECT * FROM pm_abp_source INTO pm_abp_sink
sink:
  name: pm_abp_sink
  type: MQTT
  config:
    inputFormat: JSON
    serverURI: mqtt://localhost:1883
    clientId: pm-abp-s2s
    topic: sink/live/patient_monitor_abp
    qos: 1
logical:
  - name: pm_abp_source
    schema:
      - name: timestamp
        type: UINT64
      - name: value
        type: FLOAT64
physical:
  - logical: pm_abp_source
    parserConfig:
      type: CSV
      tupleDelimiter: "|"
      fieldDelimiter: ","
    sourceConfig:
      type: MQTT
      serverURI: mqtt://localhost:1883
      topic: source/live/patient_monitor_abp
      qos: 1

Logical Sources

A logical source describes a schema for incoming data. Multiple physical sources can attach to a single logical source. Queries only reference these logical sources, which may then expand to ingest data from all attached physical sources.

nebuli Client for Single-Node Deployment

nebuli acts as a client to communicate with a NebulaStream worker. While the worker runs continuously, nebuli is ephemeral; it is primarily used to launch, stop, and monitor queries.

nebuli is stateless and performs parsing and logical optimization of queries before sending them to the worker.

To connect to a running NebulaStream worker, nebuli requires the -s parameter to specify the gRPC URI of the worker (e.g., 127.0.0.1:8080).

Here is the help message for the nebuli command-line tool in single-node mode:

Usage: nebuli [-d | --debug] <command> [<args>]

NebulaStream Command Line Interface

Options:
  -d, --debug      Dump the Query plan and enable debug logging

Commands:
  register         Register a query
    Usage: nebuli register [-s <server>] [-m <target-machine>] [-i <input>] [-x] <query>
      -s, --server   grpc uri e.g., 127.0.0.1:8080
      -m, --target-machine The hosts machine architecture. This is required when deploying a model. It defaults to host. (Choices: host, generic_x86, rpi5)
      -i, --input    Read the query description. Use - for stdin which is the default (default: "-")
      -x             Immediately start the query after registration (equivalent to calling start) (flag)

  start            Start a query
    Usage: nebuli start <queryId> [-s <server>]
      <queryId>      ID of the query to start
      -s, --server   grpc uri e.g., 127.0.0.1:8080

  stop             Stop a query
    Usage: nebuli stop <queryId> [-s <server>]
      <queryId>      ID of the query to stop
      -s, --server   grpc uri e.0.0.1:8080

  unregister       Unregister a query
    Usage: nebuli unregister <queryId> [-s <server>]
      <queryId>      ID of the query to unregister
      -s, --server   grpc uri e.g., 127.0.0.1:8080

  dump             Dump the DecomposedQueryPlan
    Usage: nebuli dump [-o <output>] [-m <target-machine>] [-i <input>] <query>
      -o, --output   Write the DecomposedQueryPlan to file. Use - for stdout (default: "-")
      -m, --target-machine The hosts machine architecture. This is required when deploying a model. It defaults to host. (Choices: host, generic_x86, rpi5)
      -i, --input    Read the query description. Use - for stdin which is the default (default: "-")
      <query>        Query Statement (default: "")

  status           Observe the state of individual queries
    Usage: nebuli status <queryId> [-s <server>]
      <queryId>      ID of the query to observe
      -s, --server   grpc uri e.g., 127.0.0.1:8080

Query Lifecycle: Register, Start, Stop, and Unregister

There is a clear distinction between register and start commands:

  • register: This command loads the query onto the server and performs validation without actually running it. This is useful for testing query validity or preparing multiple queries for later execution.
  • start: This command initiates the execution of a previously registered query.

The -x flag with the register command allows you to immediately start the query after registration, combining both steps into one.

Similarly, for stopping queries:

  • stop: This command pauses a running query. A stopped query can be restarted later.
  • unregister: This command completely removes a query from the server.