How to add a Source
In NebulaStream, sources are responsible for ingesting data into the system. They typically connect to external systems or hardware through communication channels—such as a client library or kernel API—to read data from a file, network, serial device, or other medium.
Common examples of sources:
- File
- TCP
- MQTT
- Kafka
- PostgreSQL
- Serial Port
These can represent external software systems, networking protocols, or hardware devices.
Most of the time, a source will wrap a client library to interact with the data source we want to connect to and read data from. This guide walks you through implementing a source plugin using MQTTSource as an example. MQTT allows reading data from an MQTT broker via the MQTT network protocol and is popular in the IoT space.
1. Overview
Plugins are implemented in the nes-plugins
top-level directory.
For each new source, create a new directory under Sources/
, such as MQTTSource/
.
This directory will contain your implementation.
Generally, you can structure this however you see fit and describe the resulting structure in the CMakeLists.txt
.
In our example, we will use one header and .cpp file for the MQTT source.
nes-plugins/
├── Sources/
│ ├── MQTTSource/
│ │ ├── MQTTSource.hpp
│ │ ├── MQTTSource.cpp
│ │ ├── CMakeLists.txt
│ │ └── ...
│ ├── KafkaSource/
│ └── ...
├── Sinks/
├── Functions/
└── ...
2. Dependencies & CMake
Each source plugin must define two plugin libraries:
- The actual source plugin used at runtime by workers.
- A validation plugin used during query planning by
nebuli
(our stateless coordinator).
In the CMakeLists.txt
file of our directory, we tell CMake how we want our plugin to be built:
find_package(PahoMqttCpp CONFIG REQUIRED) # <-- We depend on the paho-mqtt lib
# 1) Source Plugin
add_plugin_as_library(
MQTT # Plugin name
Source # Registry name
nes-sources-registry # Component registry name
mqtt-source-plugin-library # Plugin library name
MQTTSource.cpp # Source files list
)
target_include_directories(mqtt-source-plugin-library
PUBLIC include
PRIVATE .
)
target_link_libraries(mqtt-source-plugin-library PRIVATE PahoMqttCpp::paho-mqttpp3-static)
# 2) Source Validation Plugin
add_plugin_as_library(MQTT SourceValidation nes-sources-registry mqtt-source-validation-plugin-library MQTTSource.cpp)
target_include_directories(mqtt-source-validation-plugin-library
PUBLIC include
PRIVATE .
)
target_link_libraries(mqtt-source-validation-plugin-library PRIVATE PahoMqttCpp::paho-mqttpp3-static)
Notes:
- Plugin name, Registry type, and Registry name are fixed and must match internal expectations.
- The output library name is freely chosen
- Use
FetchContent
for third-party dependencies not already included in our internalvcpkg/vcpkg.json
.
Both plugins will be compiled into a library that is linked against the source registry.
Upon activation of the plugins, CMake will generate a Registrar
type for each registry, which passes the registration function (source plugin) or validation function (validation plugin) for our MQTTSource
to the source registry.
We will look into the construction/validation more closely in the next section.
3. Validation & Construction
During lowering of a query plan that uses a source, NebulaStream will ask the SourceRegistry
for an instance of our source through the mentioned registration function.
It looks like this:
using SourceRegistryReturnType = std::unique_ptr<Source>;
SourceRegistryReturnType SourceGeneratedRegistrar::RegisterMQTTSource(SourceRegistryArguments sourceRegistryArguments)
{
return std::make_unique<MQTTSource>(sourceRegistryArguments.sourceDescriptor);
}
Our MQTTSource
constructor will receive a SourceDescriptor
, which contains configuration metadata passed from the frontend.
The register function is associated with the registrar type generated by CMake during the build and is given to the registry.
Currently, sources should define at least a single constructor that takes an argument of type SourceDescriptor
.
At its heart, we have a DescriptorConfig
that stores all parameters to fully specify a source.
The configuration maps the name of the parameter to a std::variant
of all types that we allow as values for config parameters.
Example constructor usage:
MQTTSource::MQTTSource(const SourceDescriptor& sourceDescriptor)
: serverUri(sourceDescriptor.getFromConfig(ConfigParametersMQTT::SERVER_URI))
, clientId(sourceDescriptor.getFromConfig(ConfigParametersMQTT::CLIENT_ID))
, topic(sourceDescriptor.getFromConfig(ConfigParametersMQTT::TOPIC))
, qos(sourceDescriptor.getFromConfig(ConfigParametersMQTT::QOS))
/// ...other parameters
{
}
The getFromConfig
function extracts and converts configuration parameters stored in the DescriptorConfig
.
At the point of the construction of the source, we expect that all mandatory parameters exist in the config and contain valid values or sensible defaults of the correct types.
In order to give this guarantee, during query planning, we perform a validation phase that will fail in the presence of any wrong or missing parameters.
Configurations::DescriptorConfig::Config MQTTSource::validateAndFormat(std::unordered_map<std::string, std::string> config)
{
return Configurations::DescriptorConfig::validateAndFormat<ConfigParametersMQTT>(std::move(config), NAME);
}
Configurations::DescriptorConfig::Config;
SourceValidationGeneratedRegistrar::RegisterMQTTSourceValidation(SourceValidationRegistryArguments sourceConfig)
{
return MQTTSource::validateAndFormat(std::move(sourceConfig.config));
}
This validation function RegisterMQTTSourceValidation
will be registered in the mentioned validation plugin registry.
We need to define it explicitly in one of our source files, with exactly this name for it to match the generated registrar.
The pattern is Register{plugin-name}{plugin-registry}
.
The function should take an std::unordered_map<std::string, std::string>
and is expected to:
- Lookup each parameter name to check if it has been supplied by the user
- If it does not exist, either report an error or supply a sensible default
- Parse the string into the correct type expected by the source
- Check its validity, e.g., a port needs to be a valid
uint16_t
Let’s look at an actual example:
static inline const NES::Configurations::DescriptorConfig::ConfigParameter<uint32_t> PORT{
"port",
std::nullopt,
[](const std::unordered_map<std::string, std::string>& config)
{
const auto portNumber = NES::Configurations::DescriptorConfig::tryGet(PORT, config);
if (portNumber.has_value())
{
constexpr uint32_t PORT_NUMBER_MAX = 65535;
if (portNumber.value() <= PORT_NUMBER_MAX)
{
return portNumber;
}
NES_ERROR("Specified port is: {}, but ports must be between 0 and {}", portNumber.value(), PORT_NUMBER_MAX);
}
return portNumber;
}};
Here, we first use the tryGet
function of the DescriptorConfig
that returns an optional.
If the parameter is set, we move on to the port number validation, otherwise we just return std::nullopt
.
We check if the port number is within the allowed bounds and return it if yes, an empty optional otherwise.
4. Implementation
The interface to implement a source is straightforward:
/// Read data into a `TupleBuffer`, until the TupleBuffer is full, or stop was requested.
/// @return the number of bytes read
virtual size_t fillTupleBuffer(NES::Memory::TupleBuffer& tupleBuffer, const std::stop_token& stopToken) = 0;
/// If applicable, opens resources like file descriptors, database connections, etc.
virtual void open() = 0;
/// If applicable, closes resources like file descriptors, database connections, etc.
virtual void close() = 0;
The central method we need to implement is called fillTupleBuffer
, which takes a reference to a buffer and a stop_token
.
It should implement the necessary logic to ingest data into the buffer.
Under normal circumstances, on each invocation, you should write to the beginning of the buffer’s memory.
We return from this function in either of four cases:
- An error internal to our source happens, e.g., the MQTT source could not establish a connection to the specified broker2.
- The
stop_token
indicates stop has been requested from a higher-level component (e.g., the query was stopped) - We have received an “end of stream” (EOS) signal
- The buffer has been filled to its capacity
In case (1), we can decide whether the error is recoverable or unrecoverable, and either try again or throw an
exception.
When stop has been requested (2), we should return, signalling the number of bytes read so far.
In order to be responsive to possible stop requests, we should check the stop_token
regularly, if possible.
(3) indicates an EOS signal (e.g., a terminated connection) and it means that we do not expect any further data, which
we report by returning 0.
Finally, if buffer has been filled entirely (4), we should also return, reporting the buffer size as the number of read
bytes.
The open
and close
methods can be used to create or release resources like file descriptors.
open
will be called once before the first invocation to fillTupleBuffer
, close
likewise at the end, before
dropping the source.
For our MQTT source, open
might look like this:
void MQTTSource::open()
{
client = std::make_unique<mqtt::client>(serverURI, clientId);
try
{
const auto connectOptions = mqtt::connect_options_builder().automatic_reconnect(true).clean_session(false).finalize();
client->start_consuming();
const auto token = client->connect(connectOptions);
if (const auto response = token->get_connect_response(); !response.is_session_present())
{
client->subscribe(topic, qos)->wait();
}
}
catch (const mqtt::exception& exception)
{
/// Convert an MQTT exception to internal exception
throw CannotOpenSource("Could not connect to MQTT broker: {}", exception.what());
}
}
In short, we instantiate an MQTT client through which we can interact with a broker, passing necessary information.
Then, we try to establish a session and subscribe to a topic, indicating that we are ready to start consuming data.
If the client throws any exception, we rethrow a CannotOpenSource
exception, which is a NebulaStream-specific exception.
A list of all NebulaStream-specific exceptions can be found in ExceptionDefinitions.inc
.
Similarly, close can be implemented:
void MQTTSource::close() try
{
client->unsubscribe(topic)->wait();
client->disconnect()->wait();
}
catch (const mqtt::exception& e)
{
NES_ERROR("Error on close: {}", e.what());
}
Because no methods of the MQTTSource
will be called after close
, we simply report any MQTT-related error closing the session and return.
In most scenarios, fillTupleBuffer
implements an I/O request loop that stops when either the buffer is full, an error occurs or EOS is signalled.
size_t MQTTSource::fillTupleBuffer(NES::Memory::TupleBuffer& buf, const std::stop_token& stopToken)
{
/// (1) Setup offset within buffer and size of buffer
size_t offset = 0;
const size_t size = tupleBuffer.getBufferSize();
/// (2) Process stashed payload first (from prior invocation of `fillTupleBuffer`)
if (!payloadStash.empty())
{
auto payload = payloadStash.consume(size);
writePayloadToBuffer(payload, buf, offset);
}
/// (3) Fill buffer with new messages until full, timeout, or Error/EoS
while (offset < size && !stopToken.stop_requested()) /// <-- if stop requested, end loop
{
if (auto& message = client->consume(); message)
writePayloadToBuffer(message->get_payload(), buf, offset); /// <-- message received successfully
}
return std::min(offset, size);
}
A thing to notice here is that this MQTT client library allocates its own receive buffer for incoming data.
This possibly results in larger messages than currently available space in our buffer.
To handle such a case, we stash intermediate data until the next invocation of fillTupleBuffer
, where it is consumed before making another consume request.
When errors occur during ingestion, the client will throw an exception, which is handled in a higher-level component.
If you want to handle an error that is recoverable, you should catch the exception in this method and call any code required for recovery.
5. Testing
Currently, there is no way to test sources easily, especially ones that require external systems to be booted up. However, you can write unit tests if you require any additional logic that does not revolve around I/O. In the near future, we plan to integrate a toolkit for containerized testing of sources/sinks that interact with external systems.