How to add a new data source?

This is an implementation guide on adding a new data source in NebulaStream. In the reminder, we describe the main concepts, features, and essential development steps with examples of how to add your custom data source. Throughout this guide, we will use the implementation of the TCPSource as a running example.

a. Source b. Source Descriptor c. Physical Source Type d. Serialization Logic e. Clients

a. Source

The main logic of the data source is implemented in the NebulaStream runtime.

Tasks:

  1. Create a data source inheriting from the base class DataSource.
  2. Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders).
  3. Afterwards, you have to add the source to the SourceCreator class.

Location: nes-runtime

Tasks:

  • Sources
// Task 1
// @brief Source to receive data via TCP connection
class TCPSource : public DataSource {
    public:
         // @brief constructor of a TCP Source
         // @param schema the schema of the data
         // @param bufferManager The BufferManager is responsible for: 1. Pooled Buffers: preallocated fixed-size buffers of memory that
         // must be reference counted 2. Unpooled Buffers: variable sized buffers that are allocated on-the-fly.
         // They are also subject to reference counting.
         // @param queryManager comes with functionality to manage the queries
         // @param tcpSourceType points at current TCPSourceType config object, look at same named file for info
         // @param operatorId represents a locally running query execution plan
         // @param originId represents an origin
         // @param statisticId represents the unique identifier of components that we can track statistics for
         // @param numSourceLocalBuffers number of local source buffers
         // @param gatheringMode the gathering mode used
         // @param physicalSourceName the name and unique identifier of a physical source
         // @param executableSuccessors executable operators coming after this source
        explicit TCPSource(SchemaPtr schema,
                Runtime::BufferManagerPtr bufferManager,
                Runtime::QueryManagerPtr queryManager,
                TCPSourceTypePtr tcpSourceType,
                OperatorId operatorId,
                OriginId originId,
                StatisticId statisticId,
                size_t numSourceLocalBuffers,
                GatheringMode gatheringMode,
                const std::string& physicalSourceName,
                std::vector<Runtime::Execution::SuccessorExecutablePipeline> executableSuccessors);
     
     ...

}
// Task 3
DataSourcePtr createTCPSource(const SchemaPtr& schema,
        const Runtime::BufferManagerPtr& bufferManager,
        const Runtime::QueryManagerPtr& queryManager,
        const TCPSourceTypePtr& tcpSourceType,
        OperatorId operatorId,
        OriginId originId,
        StatisticId statisticId,
        size_t numSourceLocalBuffers,
        const std::string& physicalSourceName,
        const std::vector<Runtime::Execution::SuccessorExecutablePipeline>& successors) {
    return std::make_shared<TCPSource>(schema,
            bufferManager,
            queryManager,
            tcpSourceType,
            operatorId,
            originId,
            statisticId,
            numSourceLocalBuffers,
            GatheringMode::INTERVAL_MODE,
            physicalSourceName,
            successors);
}

b. Source Descriptor

The data source descriptor provides the configuration for a data source.

Tasks:

  1. Create a source descriptor inheriting from the SourceDescriptor base classes.
  2. Add the new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders).

Location: nes-operators

Tasks:

  • Operators -> LogicalOperators -> Sources
// Task 1
class TCPSourceDescriptor : public SourceDescriptor {
    public:
        static SourceDescriptorPtr create(SchemaPtr schema,
                TCPSourceTypePtr tcpSourceType,
                const std::string& logicalSourceName,
                const std::string& physicalSourceName);

        static SourceDescriptorPtr create(SchemaPtr schema, TCPSourceTypePtr sourceConfig);
        
        // @brief get TCP source config
        // @return tcp source config
        TCPSourceTypePtr getSourceConfig() const;

        // @brief checks if two tcp source descriptors are the same
        // @param other
        // @return true if they are the same
        [[nodiscard]] bool equal(SourceDescriptorPtr const& other) const override;

        std::string toString() const override;

        SourceDescriptorPtr copy() override;

    private:
        // @brief tcp source descriptor constructor
        // @param schema the schema of the data
        // @param tcpSourceType all needed physicalSources for TCP passed with object
        // @param inputFormat format in which a tuple is interpreted/generated by the source
        explicit TCPSourceDescriptor(SchemaPtr schema,
                TCPSourceTypePtr tcpSourceType,
                const std::string& logicalSourceName,
                const std::string& physicalSourceName);

        TCPSourceTypePtr tcpSourceType;
};

c. Physical Source Type

The physical source types provide the source’s physical representation and are used to create the physical query plan.

Tasks:

  1. Create a physical data type inheriting from PhysicalSourceType.
  2. Add the newly created physical source type to the PhysicalSourceTypeFactory.
  3. Now, add the source name to the SourceType enum in the class PhysicalSourceType.
  4. Afterwards, you have to add your source to the lower method of the i) ConvertLogicalToPhysicalSource and ii) LowerToExecutableQueryPlanPhase classes to enable lowering from the logical source operator into the executable query plan.
  5. Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders).

Location: nes-configuration

Tasks:

  • Configurations -> Worker -> PhysicalSourceType

Tests:

  • nes-coordinator -> tests -> UnitTests -> Phases
  • nes-coordinator -> tests -> UnitTests -> Sources
// Task 1
class TCPSourceType : public PhysicalSourceType {
    public:
        // @brief create a TCPSourceTypePtr object
        // @param sourceConfigMap inputted config options
        // @return TCPSourceTypePtr
        static TCPSourceTypePtr create(const std::string& logicalSourceName,
                const std::string& physicalSourceName,
                std::map<std::string, std::string> sourceConfigMap);

        // @brief create a TCPSourceTypePtr object
        // @param yamlConfig inputted config options
        // @return TCPSourceTypePtr
        static TCPSourceTypePtr
            create(const std::string& logicalSourceName, const std::string& physicalSourceName, Yaml::Node yamlConfig);

        // @brief create a TCPSourceTypePtr object with default values
        // @return TCPSourceTypePtr
        static TCPSourceTypePtr create(const std::string& logicalSourceName, const std::string& physicalSourceName);
        ...
// Task 2: 
PhysicalSourceTypePtr
PhysicalSourceTypeFactory::createPhysicalSourceType(std::string logicalSourceName,
                                                    std::string physicalSourceName,
                                                    std::string sourceType,
                                                    const std::map<std::string, std::string>& commandLineParams) {
    ...
    switch (magic_enum::enum_cast<SourceType>(sourceType).value()) {
        ...
        case SourceType::TCP_SOURCE: return TCPSourceType::create(logicalSourceName, physicalSourceName, commandLineParams);
        ...


// Task 3: 
enum class SourceType : uint8_t {
    ...
    TCP_SOURCE,
    ...
};
// Task 4.1: 
DataSourcePtr
ConvertLogicalToPhysicalSource::createDataSource(OperatorId operatorId,
        OriginId originId,
        StatisticId statisticId,
        const SourceDescriptorPtr& sourceDescriptor,
        const Runtime::NodeEnginePtr& nodeEngine,
        size_t numSourceLocalBuffers,
        const std::vector<Runtime::Execution::SuccessorExecutablePipeline>& successors) {
    ...
    } else if (sourceDescriptor->instanceOf<TCPSourceDescriptor>()) {
        NES_INFO("ConvertLogicalToPhysicalSource: Creating TCP source");
        auto tcpSourceDescriptor = sourceDescriptor->as<TCPSourceDescriptor>();
        return createTCPSource(tcpSourceDescriptor->getSchema(),
                bufferManager,
                queryManager,
                tcpSourceDescriptor->getSourceConfig(),
                operatorId,
                originId,
                statisticId,
                numSourceLocalBuffers,
                sourceDescriptor->getPhysicalSourceName(),
                successors);
    } else {
    ...
// Task 4.1
SourceDescriptorPtr LowerToExecutableQueryPlanPhase::createSourceDescriptor(SchemaPtr schema,
        PhysicalSourceTypePtr physicalSourceType) {
    ...
    case SourceType::TCP_SOURCE: {
        auto tcpSourceType = physicalSourceType->as<TCPSourceType>();
        return TCPSourceDescriptor::create(schema, tcpSourceType, logicalSourceName, physicalSourceName);
    }
    ...
}

d. Serialization (Coordinator -> Worker)

You must provide serialization-specific details to serialize and send your operator from the coordinator to the workers.

  1. Create an operator-specific serialization message for your operator (private attributes) in SerializableOperator.proto.
  2. Delete your CMake folder and recompile your project to trigger the auto-generation of proto files.
  3. Add the specific code to the serialization and deserialization of your new operator in the OperatorSerializationUtil class.

Location: nes-operators

Tasks:

  • grpc -> SerializableOperator.proto
  • Operators -> Serialization -> OperatorSerializationUtil

Tests:

  • nes-coordinator -> tests -> UnitTest -> Serialization
// Task 1: 
    message SerializableTCPSourceDescriptor {
      SerializablePhysicalSourceType physicalSourceType = 1;
      SerializableSchema sourceSchema = 2;
    }
    ...
    // The serializable wrapper for the tcp source config object
    message SerializableTCPSourceType{
        string socketHost = 1;
        uint32 socketPort = 2;
        uint32 socketDomain = 3;
        uint32 socketType = 4;
        float flushIntervalMS = 6;
        InputFormat inputFormat = 7;
        TCPDecideMessageSize tcpDecideMessageSize = 8;
        string tupleSeparator = 9;
        uint32 socketBufferSize = 10;
        uint32 bytesUsedForSocketBufferSizeTransfer = 11;
    }
// Task 3: 
    ...
    else if (sourceDescriptor.instanceOf<const TCPSourceDescriptor>()) {
        // serialize TCP source descriptor
        NES_TRACE("OperatorSerializationUtil:: serialized SourceDescriptor as "
                  "SerializableOperator_SourceDetails_SerializableTCPSourceDescriptor");
        auto tcpSourceDescriptor = sourceDescriptor.as<const TCPSourceDescriptor>();
        //init serializable source config
        auto serializedPhysicalSourceType = new SerializablePhysicalSourceType();
        serializedPhysicalSourceType->set_sourcetype(tcpSourceDescriptor->getSourceConfig()->getSourceTypeAsString());
        //init serializable tcp source config
        auto tcpSerializedSourceConfig = SerializablePhysicalSourceType_SerializableTCPSourceType();
        tcpSerializedSourceConfig.set_sockethost(tcpSourceDescriptor->getSourceConfig()->getSocketHost()->getValue());
        tcpSerializedSourceConfig.set_socketport(tcpSourceDescriptor->getSourceConfig()->getSocketPort()->getValue());
        tcpSerializedSourceConfig.set_socketdomain(tcpSourceDescriptor->getSourceConfig()->getSocketDomain()->getValue());
        tcpSerializedSourceConfig.set_sockettype(tcpSourceDescriptor->getSourceConfig()->getSocketType()->getValue());
        std::string tupleSeparator;
        tupleSeparator = tcpSourceDescriptor->getSourceConfig()->getTupleSeparator()->getValue();
        tcpSerializedSourceConfig.set_tupleseparator(tupleSeparator);
        tcpSerializedSourceConfig.set_flushintervalms(tcpSourceDescriptor->getSourceConfig()->getFlushIntervalMS()->getValue());
        switch (tcpSourceDescriptor->getSourceConfig()->getInputFormat()->getValue()) {
            case Configurations::InputFormat::JSON:
                tcpSerializedSourceConfig.set_inputformat(SerializablePhysicalSourceType_InputFormat_JSON);
                break;
            case Configurations::InputFormat::CSV:
                tcpSerializedSourceConfig.set_inputformat(SerializablePhysicalSourceType_InputFormat_CSV);
                break;
            case Configurations::InputFormat::NES_BINARY:
                tcpSerializedSourceConfig.set_inputformat(SerializablePhysicalSourceType_InputFormat_NES_BINARY);
                break;
        }

        using enum Configurations::TCPDecideMessageSize;
        switch (tcpSourceDescriptor->getSourceConfig()->getDecideMessageSize()->getValue()) {
            case TUPLE_SEPARATOR:
                tcpSerializedSourceConfig.set_tcpdecidemessagesize(
                    SerializablePhysicalSourceType_TCPDecideMessageSize_TUPLE_SEPARATOR);
                break;
            case USER_SPECIFIED_BUFFER_SIZE:
                tcpSerializedSourceConfig.set_tcpdecidemessagesize(
                    SerializablePhysicalSourceType_TCPDecideMessageSize_USER_SPECIFIED_BUFFER_SIZE);
                break;
            case BUFFER_SIZE_FROM_SOCKET:
                tcpSerializedSourceConfig.set_tcpdecidemessagesize(
                    SerializablePhysicalSourceType_TCPDecideMessageSize_BUFFER_SIZE_FROM_SOCKET);
                break;
        }
        tcpSerializedSourceConfig.set_socketbuffersize(tcpSourceDescriptor->getSourceConfig()->getSocketBufferSize()->getValue());
        tcpSerializedSourceConfig.set_bytesusedforsocketbuffersizetransfer(
            tcpSourceDescriptor->getSourceConfig()->getBytesUsedForSocketBufferSizeTransfer()->getValue());
        serializedPhysicalSourceType->mutable_specificphysicalsourcetype()->PackFrom(tcpSerializedSourceConfig);
        //init serializable tcp source descriptor
        auto tcpSerializedSourceDescriptor = SerializableOperator_SourceDetails_SerializableTCPSourceDescriptor();
        tcpSerializedSourceDescriptor.set_allocated_physicalsourcetype(serializedPhysicalSourceType);

        // serialize source schema
        SchemaSerializationUtil::serializeSchema(tcpSourceDescriptor->getSchema(),
                                                 tcpSerializedSourceDescriptor.mutable_sourceschema());
        sourceDetails.mutable_sourcedescriptor()->PackFrom(tcpSerializedSourceDescriptor);
    }
    ...

e. Clients

💡 If you want your operator to be available in the NebulaStream clients, you also must add it to the following repositories: