User Defined Functions

NebulaStream can execute user-defined functions on the tuples in a stream.

Tuple-based TensorFlow Lite UDFs

NebulaStream can execute a TensorFlow Lite model, which takes one or more attributes of the a stream tuple as input parameters, and appends the output of the model to the tuple.

💡 At the moment, the user has to specify the data type of the output parameters.

💡 At the moment, the data type of the output parameters must be FLOAT32 (which has to be explicitly specified).

💡 When using the C++ client, the TensorFlow model must be stored in a file on the coordinator.

💡 When using the Java client, the TensorFlow model is automatically transferred from the Java client to the coordinator. However, the user must specify a valid path on the coordinator as the model name.

Example

In the following example, the model stored in the file iris_95acc.tflite is applied to the fields f1, f2, f3, and f4 of the input stream. The output of the model is stored in the fields iris0, iris1, iris2, which are appended to the tuples in the output stream.

Query q = Query::from("irisData").inferModel(
    // Existing TensorFlow model file on the coordinator
    "/path/to/iris_95acc.tflite",
    // Input attributes
    {
       Attribute("f1"),
       Attribute("f2"),
       Attribute("f3"),
       Attribute("f4")
    },
    // Output attributes with explicit type information
    {
       Attribute("iris0", FLOAT32),
       Attribute("iris1", FLOAT32),
       Attribute("iris2", FLOAT32)
    })
Query q = nebulaStreamRuntime.readFromSource("iris")
  .inferModel(
    // TensorFlow model is read from a ResourceStream
    Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")),
    // Name must be a valid file name on the coordinator and worker
    "/tmp/iris_95acc.tflite")
  // Input attributes
  .on(
    attribute("f1"),
    attribute("f2"),
    attribute("f3"),
    attribute("f4"))
  // Output attributes with explicit type information
  .as(
    attribute("iris0", BasicType.FLOAT32),
    attribute("iris1", BasicType.FLOAT32),
    attribute("iris2", BasicType.FLOAT32))