REST API

Below we describe the REST APIs available for user to interact with the NebulaStream Coordinator.

The NebulaStream Coordinator runs a REST server to allow interaction with external clients. In this page, we describe all available REST endpoints of NebulaStream.

⚠️ NOTE:

  • The NebulaStream REST APIs are versioned, with specific versions which are queryable by prefixing the url with the version prefix. Prefixes are always of the form v[version_number]/nes. For example, to access version 1 of NebulaStream endpoint /foo/bar, the complete endpoint will be /v1/nes/foo/bar.

  • Querying unsupported versions or non-existing endpoints will return an HTTP 404 error.

  • There exist several async operations among these APIs, e.g. operations to submit or stop a query. These async calls will return a unique id which can be used to query the status of the requested operation.

  • Endpoints with a πŸ”§ symbol indicate that they are experimental.

Query

Endpoints for accessing or submitting queries.

Submit Query

Submit a query for execution.

API: /query/execute-query
Verb: POST
Response Codes:

  • 202 Accepted if query successfully scheduled for execution.
  • 400 Bad Request:
    • Incorrect or missing key word for user query.
    • No placement strategy specified or specified placement strategy not supported.
    • When providing lineage mode: lineage mode not supported
    • When providing fault tolerance mode: fault tolerance mode not supported.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X POST \
  http://[HOST:PORT]/v1/nes/query/execute-query \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/json' \
  -d '{
"userQuery":"[QUERY STRING]",
"placement": "[PLACEMENT STRATEGY]"
}'

Response:

{"queryId": "[QUERY ID]"}

Parameters:

  • [QUERY STRING] : unique identifier of the query. (πŸ’‘ Returned when query was submitted for execution)
  • [PLACEMENT STRATEGY]: name of the placement strategy. (πŸ’‘ valid inputs: BottomUp, TopDown)

Get Query Plan

Get the logical query plan of a query.

API: /query/query-plan?queryId=[QUERY ID]
Verb: GET
Response Codes:

  • 200 OK if query plan returned.
  • 400 Bad Request if parameter queryId is not provided.
  • 404 Not Found if provided queryId does not exist.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
  http://[HOST:PORT]/v1/nes/query/query-plan?queryId=[QUERY ID] \
  -H 'cache-control: no-cache'

Response:

{"edges":[{
        "source":"source",
        "target":"target"}],
"nodes":[{
         "id":"id",
         "name":"name",
         "nodeType":"nodeType"
        }]}

Parameters:

  • [QUERY ID] : unique identifier of the query. (πŸ’‘ Returned when query was submitted for execution)

Get Execution Plan of a Query

Get the execution plan of a running query.

API: /query/execution-plan?queryId=[QUERY ID]
Verb: GET
Response Codes:

  • 200 OK if execution plan returned.
  • 400 Bad Request if parameter queryId is not provided.
  • 404 Not Found if provided queryId does not exist.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
  http://[HOST:PORT]/v1/nes/query/execution-plan?queryId=[QUERY ID] \
  -H 'cache-control: no-cache'

Response:

{
"executionNodes":[{
                  "ScheduledQueries":[{
                                     "queryId":"1",
                                     "querySubPlans":[{
                                                     "operator":"operator",
                                                     "querySubPlanId":"querySubPlanId"
                                                     }]
                                     }],
                  "executionNodeId":"1",
                  "topologyNodeId":"1",
                  "topologyNodeIpAddress":"topologyNodeIpAddress"
                  }]
}

Parameters:

  • [QUERY ID] : unique identifier of the query. (πŸ’‘ Returned when query was submitted for execution)

Stop Query

Stop a running query.

API: /query/stop-query?queryId=[QUERY ID]
Verb: DELETE
Response Codes:

  • 202 Accepted if query scheduled for deletion.
  • 400 Bad Request:
    • Parameter queryId not provided.
    • Query status was either failed or already stopped.
  • 404 Not Found if provided queryId does not exist.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X DELETE \
  http://[HOST:PORT]/v1/nes/query/stop-query?queryId=[QUERY ID] \
  -H 'cache-control: no-cache'

Response:

{"success": "[true|false]"}

Parameters:

  • [QUERY ID] : unique identifier of the query. (πŸ’‘ Returned when query was submitted for execution)

Topology

Endpoints for accessing or updating the topology of underlying NebulaStream cluster.

Get Topology Graph

Get the NebulaStream topology graph as JSON.

API: /topology
Verb: GET
Response Codes:

  • 200 OK if current topology is returned.

Request:

curl -X GET \
  http://[HOST:PORT]/v1/nes/topology \
  -H 'cache-control: no-cache'

Response:

{
    "edges":[{
             "source": "source",
             "target": "target"
            }],
    "nodes": [{
             "available_resources": "available_resources",
             "id": "nodeId",
             "ip_address": "node_ip_address"
             }]
}

Add a Parent Node πŸ”§

Linking two workers in the NebulaStream topology using their associated node IDs.

API: /topology/addParent
Verb: POST
Response Codes:

  • 200 OK if successfully added.
  • 400 Bad Request if [PARENT NODE ID] or [CHILD NODE ID] are not provided or invalid.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X POST \
http://[HOST:PORT]/v1/nes/topology/addParent \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"parentId":"[PARENT NODE ID]",
"childId": "[CHILD NODE ID]"
}'

Response:

{"Success": "[true|false]"}

Parameters:

  • [PARENT NODE ID] : unique identifier of the worker node that will act as parent.
  • [CHILD NODE ID] : unique identifier of the worker node that will act as parent’s child.

⚠️ A parent node is a worker which is running closer to the cloud data center. A child node on the other hand is a worker running closer to the IoT devices.

Remove a Parent Node πŸ”§

Removing two workers in the NebulaStream topology using their associated node Ids.

API: /topology/removeParent
Verb: POST
Response Codes:

  • 200 OK if successfully removed.
  • 400 Bad Request if [PARENT NODE ID] or [CHILD NODE ID] are not provided or invalid.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X POST \
http://[HOST:PORT]/v1/nes/topology/removeParent \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"parentId":"[PARENT NODE ID]",
"childId": "[CHILD NODE ID]"
}'

Response:

{"Success": "[true|false]"}

Parameters:

  • [PARENT NODE ID] : unique identifier of the worker node that will act as parent.
  • [CHILD NODE ID] : unique identifier of the worker node that will act as parent’s child.

Query Catalog

Endpoints for querying or updating the query catalog.

Get All Queries

Get all queries registered at NebulaStream.

API: /queryCatalog/allRegisteredQueries
Verb: GET
Response Codes:

  • 200 OK if successfully return all registered queries.
  • 204 No Response if query catalog is empty.
  • 400 Bad Request if issue in interacting with query catalog.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/queryCatalog/allRegisteredQueries \
-H 'cache-control: no-cache' \

Response:

{[
    {"queryId": "...",
    "queryString": "...",
    "queryStatus": "...",
    "queryPlan": "...",
    "queryInfo": "..." }
    ,...
]}

Get Queries With Status

Get all queries with a specific status form NebulaStream.

API: /queryCatalog/queries?status=[STATUS]
Verb: GET
Response Codes:

  • 200 OK if successfully return all registered queries.
  • 204 No Response if query catalog is empty.
  • 400 Bad Request if requested status is invalid.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/queryCatalog/queries?status=[STATUS] \
-H 'cache-control: no-cache' \

Response:

{[
    {"[QUERY ID]": "[QUERY STRING]"}
    ,...
]}

Parameters:

  • [STATUS] : status of the queries to extract. (πŸ’‘ valid status are: REGISTERED, SCHEDULING, RUNNING, MARKED_FOR_STOP, STOPPED, FAILED, RESTARTING, MIGRATING )

Get Query Status

Get all queries with a specific status form NebulaStream.

API: /queryCatalog/status?queryId=[QUERY ID]
Verb: GET
Response Codes:

  • 200 OK if successfully returns the status of the query.
  • 400 Bad Request if requested query id is invalid or missing.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/queryCatalog/status?queryId=[QUERY ID]\
-H 'cache-control: no-cache' \

Response:

{
    "status": "[QUERY STATUS]"
}

Parameters:

  • [QUERY ID] : ID of the query to check

Source Catalog

Endpoints for querying or updating the source catalog.

Get Logical Sources

Get all queries registered at NebulaStream.

API: /sourceCatalog/allLogicalSource
Verb: GET
Response Codes:

  • 200 Ok if successfully return all registered logical source.
  • 404 Resource Not Found if source catalog is empty.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/sourceCatalog/allLogicalSource \
-H 'cache-control: no-cache' \

Response:

{[
    {"[LOGICAL SOURCE NAME]" : "[SCHEMA]"}
    ,...
]}

Get Physical Sources

Get all physical sources for a given logical source.

API: /sourceCatalog/allPhysicalSource?logicalSourceName=[LOGICAL SOURCE NAME]
Verb: GET
Response Codes:

  • 200 OK if successfully return all registered physical sources.
  • 404 Resource Not Found if logical source has no physical source defined.
  • 400 Bad Request if logical source name is not provided or logical source not exists.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/sourceCatalog/allPhysicalSource?logicalSourceName=[LOGICAL SOURCE NAME] \
-H 'cache-control: no-cache' \

Response:

{"Physical Sources":  ["string description of the physical source", ... ]}

Parameters:

  • [LOGICAL SOURCE NAME] : name of the logical source for which the physical sources are to be returned.

Get Schema

Get the schema of a logical Source.

API: /sourceCatalog/schema?logicalSourceName=[LOGICAL SOURCE NAME]
Verb: GET
Response Codes:

  • 200 OK if successfully return schema for the logical source.
  • 404 Resource Not Found if logical source has no physical source defined.
  • 400 Bad Request if logical source name is not provided or logical source not exists.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X GET \
http://[HOST:PORT]/v1/nes/sourceCatalog/schema?logicalSourceName=[LOGICAL SOURCE NAME] \
-H 'cache-control: no-cache' \

Response: A Protobuf encoded schema.

Parameters:

  • [LOGICAL SOURCE NAME] : name of the logical source for which the physical sources are to be returned.

Add Logical Source

Add a new logical source.

API: /sourceCatalog/addLogicalSource
Verb: POST
Response Codes:

  • 200 OK if successfully added logical source.
  • 400 Bad Request if logical source already exists or issue in the payload.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X POST \
http://[HOST:PORT]/v1/nes/sourceCatalog/addLogicalSource \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"logicalSourceName": "[LOGICAL SOURCE NAME]",
"schema": "[SCHEMA STRING]"
}'

Response:

{"Success": "[true|false]"}

Parameters:

  • [LOGICAL SOURCE NAME] : name of the logical source to add.
  • [SCHEMA STRING]: schema in string form.

Update Logical Source

Update an existing logical Source.

API: /sourceCatalog/updateLogicalSource
Verb: POST
Response Codes:

  • 200 OK if successfully updated logical source.
  • 400 Bad Request if logical source already exists or issue in the payload.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X POST \
http://[HOST:PORT]/v1/nes/sourceCatalog/updateLogicalSource \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"logicalSourceName": "[LOGICAL SOURCE NAME]",
"schema": "[SCHEMA STRING]"
}'

Response:

{"Success": "[true|false]"}

Parameters:

  • [LOGICAL SOURCE NAME] : name of the logical source for which schema needs to be updated.
  • [SCHEMA STRING]: schema in string form.

Delete Logical Source

Delete a logical source.

API: /sourceCatalog/deleteLogicalSource?logicalSourceName=[LOGICAL SOURCE NAME]
Verb: DELETE
Response Codes:

  • 200 OK if successfully removed logical source.
  • 400 Bad Request if logical source already exists or issue in the payload.
  • 500 Internal Error if Coordinator has some error.

Request:

curl -X DELETE \
  http://[HOST:PORT]/v1/nes/sourceCatalog/deleteLogicalSource?logicalSourceName=[LOGICAL SOURCE NAME] \
  -H 'cache-control: no-cache' \

Response:

{"Success": "[true|false]"}

Parameters:

  • [LOGICAL SOURCE NAME] : name of the logical source that need to be removed.

Monitoring

Endpoints to query the NebulaStream monitoring data.

⚠️ The first numbers in the JSON dict correspond to the IDs of the nodes in the topology.

Get Monitoring Data From All Nodes

Get monitoring data as JSON from all nodes in the topology.

API: /monitoring/metrics
Verb: GET
Response Code: 200 OK

Request:

curl -X GET \
  http://[HOST:PORT]/v1/nes/monitoring/metrics \
  -H 'cache-control: no-cache' \

Response:

{
    "1": {
        "wrapped_cpu": {
            "CORE_1": {
                "CORE_NUM": 1,
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 165593,
                "IOWAIT": 208,
                "IRQ": 0,
                "NICE": 109,
                "SOFTIRQ": 5,
                "STEAL": 0,
                "SYSTEM": 10830,
                "USER": 43651
            },
            [...]
            "CORE_8": {
                "CORE_NUM": 8,
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 163532,
                "IOWAIT": 158,
                "IRQ": 0,
                "NICE": 60,
                "SOFTIRQ": 2738,
                "STEAL": 0,
                "SYSTEM": 10901,
                "USER": 42541
            },
            "TOTAL": {
                "CORE_NUM": 0,
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 1321569,
                "IOWAIT": 1502,
                "IRQ": 0,
                "NICE": 290,
                "SOFTIRQ": 3018,
                "STEAL": 0,
                "SYSTEM": 89345,
                "USER": 346209
            }
        },
        "disk": {
            "F_BAVAIL": 26742328,
            "F_BFREE": 29946424,
            "F_BLOCKS": 62733020,
            "F_BSIZE": 4096,
            "F_FRSIZE": 4096
        },
        "memory": {
            "BUFFER_RAM": 1684271104,
            "FREE_HIGH": 0,
            "FREE_RAM": 17513349120,
            "FREE_SWAP": 2147479552,
            "LOADS_15MIN": 122336,
            "LOADS_1MIN": 80704,
            "LOADS_5MIN": 146432,
            "MEM_UNIT": 1,
            "PROCS": 1666,
            "SHARED_RAM": 116293632,
            "TOTAL_HIGH": 0,
            "TOTAL_RAM": 41721716736,
            "TOTAL_SWAP": 2147479552
        },
        "wrapped_network": [
            {
              "R_BYTES": 1083094,
              "R_COMPRESSED": 0,
              "R_DROP": 0,
              "R_ERRS": 0,
              "R_FIFO": 0,
              "R_FRAME": 0,
              "R_MULTICAST": 0,
              "R_PACKETS": 10634,
              "T_BYTES": 1083094,
              "T_CARRIER": 0,
              "T_COLLS": 0,
              "T_COMPRESSED": 0,
              "T_DROP": 0,
              "T_ERRS": 0,
              "T_FIFO": 0,
              "T_PACKETS": 10634
            },
            [...]
        ]
    },
    "2": {
      [...]
    },
    [...]
}

Get Monitoring Data From a Single Node

Get monitoring data from a specific node.

API: /monitoring/metrics/[NODE ID]
Verb: GET
Response Code: 200 OK

Request:

curl -X GET \
  http://[HOST:PORT]/v1/nes/monitoring/metrics/[NODE ID] \
  -H 'cache-control: no-cache' \

Response:

{
    "wrapped_cpu": {
        "CORE_1": {
            "CORE_NUM": 1,
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 165593,
            "IOWAIT": 208,
            "IRQ": 0,
            "NICE": 109,
            "SOFTIRQ": 5,
            "STEAL": 0,
            "SYSTEM": 10830,
            "USER": 43651
        },
        [...]
        "CORE_8": {
            "CORE_NUM": 8,
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 163532,
            "IOWAIT": 158,
            "IRQ": 0,
            "NICE": 60,
            "SOFTIRQ": 2738,
            "STEAL": 0,
            "SYSTEM": 10901,
            "USER": 42541
        },
        "TOTAL": {
            "CORE_NUM": 0,
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 1321569,
            "IOWAIT": 1502,
            "IRQ": 0,
            "NICE": 290,
            "SOFTIRQ": 3018,
            "STEAL": 0,
            "SYSTEM": 89345,
            "USER": 346209
        }
    },
    "disk": {
        "F_BAVAIL": 26742328,
        "F_BFREE": 29946424,
        "F_BLOCKS": 62733020,
        "F_BSIZE": 4096,
        "F_FRSIZE": 4096
    },
    "memory": {
        "BUFFER_RAM": 1684271104,
        "FREE_HIGH": 0,
        "FREE_RAM": 17513349120,
        "FREE_SWAP": 2147479552,
        "LOADS_15MIN": 122336,
        "LOADS_1MIN": 80704,
        "LOADS_5MIN": 146432,
        "MEM_UNIT": 1,
        "PROCS": 1666,
        "SHARED_RAM": 116293632,
        "TOTAL_HIGH": 0,
        "TOTAL_RAM": 41721716736,
        "TOTAL_SWAP": 2147479552
    },
    "wrapped_network": [
        {
          "R_BYTES": 1083094,
          "R_COMPRESSED": 0,
          "R_DROP": 0,
          "R_ERRS": 0,
          "R_FIFO": 0,
          "R_FRAME": 0,
          "R_MULTICAST": 0,
          "R_PACKETS": 10634,
          "T_BYTES": 1083094,
          "T_CARRIER": 0,
          "T_COLLS": 0,
          "T_COMPRESSED": 0,
          "T_DROP": 0,
          "T_ERRS": 0,
          "T_FIFO": 0,
          "T_PACKETS": 10634
        },
        [...]
    ]
}

Available Metrics

CPU

The metrics are read in Linux from /proc/stat/. The units of the CPU metrics are measured is jiffies.

  • usr : CPU usage at the user level
  • nice : CPU usage for user processes labeled β€œnice”
  • sys : CPU usage at the system (Linux kernel) level
  • iowait : CPU usage idling waiting on a disk read/write
  • irq : CPU usage handling hardware interrupts
  • soft : CPU usage handing software interrupts
  • steal : CPU usage being forced to wait for a hypervisor handling other virtual processors
  • guest : CPU usage spent running a virtual processor
  • idle : CPU usage on idle time (no processes, and not waiting on a disk read/write)

Memory

The metrics are obtained via the “sysinfo” library, which reads underneath /proc/meminfo.

  • loads[3] : 1, 5, and 15 minute load averages
  • totalram : Total usable main memory size
  • freeram : Available memory size
  • sharedram: Amount of shared memory
  • bufferram: Memory used by buffers
  • totalswap: Total swap space size
  • freeswap : Swap space still available
  • procs : Number of current processes
  • totalhigh: Total high memory size
  • freehigh : Available high memory size
  • mem_unit : Memory unit size in bytes

Disk

The metrics are obtained via the “statvfs” library.

  • f_bsize : Filesystem block size
  • f_frsize: Fragment size
  • f_blocks: Size of fs in f_frsize units
  • f_bfree : Number of free blocks
  • f_bavail: Number of free blocks for

Network

Network metrics are represented in a list for each network interface. The metrics are obtained via /proc/net/dev.

Meaning of the metrics (r)received, (t)transmitted:

  • bytes : The total number of bytes of data transmitted or received by the interface.
  • packets : The total number of packets of data transmitted or received by the interface.
  • errs : The total number of transmit or receive errors detected by the device driver.
  • drop : The total number of packets dropped by the device driver.
  • fifo : The number of FIFO buffer errors.
  • frame : The number of packet framing errors.
  • colls : The number of collisions detected on the interface.
  • compressed: The number of compressed packets transmitted or received by the device driver. (This appears to be unused in the 2.2.15 kernel.)
  • carrier : The number of carrier losses detected by the device driver.
  • multicast : The number of multicast frames transmitted or received by the device driver.

UDF Catalog

The REST interface provides the following API to manipulate the UDF catalog.

The API encodes Java UDF descriptors using the following Protobuf message.

message JavaUdfDescriptorMessage {
  message JavaUdfClassDefinition {
    string class_name = 1;
    bytes byte_code = 2;
  }

string udf_class_name = 1;
string udf_method_name = 2;
bytes serialized_instance = 3;
repeated JavaUdfClassDefinition classes = 4;
}

Register a Java UDF πŸ”§

API: /udf-catalog/registerJavaUdf
Verb: POST
Response codes:

  • 200 OK, if the UDF could be registered.
  • 400 Bad Request, if the UDF with given name already exists.

Request:

A Protobuf message encoding a UDF name and a Java UDF descriptor:

message RegisterJavaUdfRequest {
string udf_name = 1;
JavaUdfDescriptorMessage java_udf_descriptor = 2;}

Get a Java UDF Descriptor πŸ”§

API: /udf-catalog/getUdfDescriptor?udfName=[UDF NAME]
Verb: GET
Status codes:

  • 200 OK, if the UDF was found or not found.
  • 400 Bad Request, if [UDF NAME] parameter is missing.
  • 400 Bad Request, if the URL contains additional parameters.

Request:

A URI with the UDF name passed as the udfName parameter.

Response:

A Protobuf message encoding if the UDF was found and, if it was found, a Java UDF descriptor:

message GetJavaUdfDescriptorResponse {
bool found = 1;
optional JavaUdfDescriptorMessage java_udf_descriptor = 2;}

Remove UDF πŸ”§

API: /udf-catalog/removeUdf?udfName=[UDF NAME]
Verb: DELETE
Status codes:

  • 200 OK, if the UDF was removed.
  • 200 OK, if the UDF was not found.
  • 400 Bad Request, if [UDF NAME] parameter is missing.
  • 400 Bad Request, if the URL contains additional parameters.

Request:

A URI with the UDF name passed as the udfName parameter.

Response:

A JSON object indicating if the UDF was removed. (Note that it is not an error condition if a client tries to remove an unknown UDF because it could have been removed by another client.)

{
"removed": [true|false]
}

List UDFs πŸ”§

API: /udf-catalog/listUdfs Verb: GET Status codes:

  • 200 OK

Request:

Empty.

Response:

A JSON object containing a list of Java UDFs in the UDF catalog.

{
"udfs": [ "my_udf" ]
}