User Defined Functions
NebulaStream can execute user-defined functions on the tuples in a stream.
On this page
Tuple-based TensorFlow Lite UDFs
NebulaStream can execute a TensorFlow Lite model, which takes one or more attributes of the a stream tuple as input parameters, and appends the output of the model to the tuple.
💡 At the moment, the user has to specify the data type of the output parameters.
💡 At the moment, the data type of the output parameters must be FLOAT32
(which has to be explicitly specified).
💡 When using the C++ client, the TensorFlow model must be stored in a file on the coordinator.
💡 When using the Java client, the TensorFlow model is automatically transferred from the Java client to the coordinator. However, the user must specify a valid path on the coordinator as the model name.
Example
In the following example, the model stored in the file iris_95acc.tflite
is applied to the fields f1
, f2
, f3
, and f4
of the input stream.
The output of the model is stored in the fields iris0
, iris1
, iris2
, which are appended to the tuples in the output stream.
Query q = Query::from("irisData").inferModel(
// Existing TensorFlow model file on the coordinator
"/path/to/iris_95acc.tflite",
// Input attributes
{
Attribute("f1"),
Attribute("f2"),
Attribute("f3"),
Attribute("f4")
},
// Output attributes with explicit type information
{
Attribute("iris0", FLOAT32),
Attribute("iris1", FLOAT32),
Attribute("iris2", FLOAT32)
})
Query q = nebulaStreamRuntime.readFromSource("iris")
.inferModel(
// TensorFlow model is read from a ResourceStream
Objects.requireNonNull(getClass().getResourceAsStream("/iris_95acc.tflite")),
// Name must be a valid file name on the coordinator and worker
"/tmp/iris_95acc.tflite")
// Input attributes
.on(
attribute("f1"),
attribute("f2"),
attribute("f3"),
attribute("f4"))
// Output attributes with explicit type information
.as(
attribute("iris0", BasicType.FLOAT32),
attribute("iris1", BasicType.FLOAT32),
attribute("iris2", BasicType.FLOAT32))