NebuLi: Distributed Deployment
In this section, we describe how NebulaStream operates in a distributed environment, supporting cloud-edge infrastructures and geo-distributed deployments.
Overview of Distributed Architecture
The distributed implementation of NebulaStream is currently experimental, and we are actively working on Kubernetes integration. The distributed version of NebulaStream can be used, but it is currently on a specific branch (e.g., ls-distributed-poc
, though this may change).
Fundamentally, the distributed architecture still works with a stateless optimizer, namely nebuli
.
Topology Configuration with topology.yaml
When starting nebuli
in distributed mode, you need to supply a topology.yaml
file which describes the workers in the topology.
Here is an example topology.yaml
:
logical:
- name: stream
schema:
- type: UINT64
name: id
- type: UINT64
name: value
- type: UINT64
name: timestamp
nodes:
- connection: source-1:9090
grpc: source-1:8080
capacity: 10
physical:
- logical: stream
parserConfig:
type: CSV
sourceConfig:
filePath: '/sources/Map_Add_stream.csv'
type: File
- connection: root:9090
grpc: root:8080
capacity: 10
links:
upstreams: [ source-1:9090 ]
sinks:
- name: sink
type: Print
config:
ingestion: 100
inputFormat: CSV
Workers are named in the system, and upstreams
and downstreams
establish connections between workers. Additionally, the topology places sources and sinks within the topology. Logical Sources are globally defined for all workers, but each worker can choose its own physical sources and attach them to the logical source. Currently, there is no concept of logical sinks, thus sinks are directly registered at a worker. The capacity
parameter is used to steer the bottom-up placement for testing purposes. Since nebuli
in its current state is stateless, each invocation will use all capacities available.
Two-Node Topology Example
graph TD
A[source-1:9090] --> B[root:9090]
subgraph Worker 1
A
end
subgraph Worker 2
B
end
nebuli
Command-Line Usage
nebuli
loads the topology via the -t
parameter.
Here is the help message for the nebuli
command-line tool:
Usage: nebuli [-d | --debug] [-t <topology>] <command> [<args>]
NebulaStream Command Line Interface
Options:
-d, --debug Dump the Query plan and enable debug logging
-t, --topology Enables distributed deployment, based on a topology
Commands:
register Register a query
Usage: nebuli register <query> [-i <config>] [-s <address>] [-x]
<query> Query Statement (default: "")
-i Legacy SingleNodeQuery configuration
-s Legacy SingleNode server address
-x (flag)
start Start a query
Usage: nebuli start <queryId>
<queryId> ID of the query to start
stop Stop a query
Usage: nebuli stop <queryId>
<queryId> ID of the query to stop
unregister Unregister a query
Usage: nebuli unregister <queryId>
<queryId> ID of the query to unregister
dump Dump the DecomposedQueryPlan
Usage: nebuli dump [-o <output>] [-i <config>] [-s <address>] <query>
-o, --output Write the DecomposedQueryPlan to file. Use - for stdout (default: "-")
-i Legacy SingleNode query configuration
-s Legacy SingleNode server address
<query> Query Statement (default: "")
status Observe the state of individual queries
Usage: nebuli status [-i <config>] [-s <address>] [--after <timestamp>]
-i Legacy SingleNodeQuery configuration
-s Legacy SingleNode server address
--after Request only status updates that happens after this unix timestamp.
Example command to register a query:
nebuli -t topology.yaml register -x "SELECT * FROM stream INTO sink"
Just like the non-distributed implementation, nebuli
will perform parsing and logical optimization. Additionally, it will perform placement of each logical operator and decompose the single query plan into multiple query plans. The decomposed query plans are stitched together using network sources and sinks, making them effectively their own queries.
nebuli
will then deploy each partial query plan to the relevant workers, returning a distributed ID.
You can observe the state of the individual queries with:
nebuli status ID
To stop all queries:
nebuli unregister ID
The design document for distributed processing contains additional information and can be found at 20250120_Distributed.md. It describes the data channel implementation as well as the algorithm used for placement decomposition.
Starting Workers in Distributed Mode
By default, NebulaStream workers are not configured for distributed mode. To enable distributed operation, you need to pass two specific arguments when starting a worker:
--bind=<address>:<port>
: This argument specifies the address and port the worker will listen on. Use0.0.0.0:<port>
to bind to all available network interfaces.--connection=<connection-name>:<port>
: This argument specifies the name the worker will use when reaching out to other workers in the distributed topology.
Example:
nes-single-node-worker --bind=0.0.0.0:9090 --connection=dns-name:9090 --query...