How to add a new operator

This is a brief tutorial on adding a new operator in NebulaStream. We introduce the following components and describe the essential development step.

  • a. Nautilus Operator
  • b. Physical Operator
  • c. Logical Operator & QueryPlanBuilder
  • d. Serialization Logic
  • e. Query API & Clients

In general, you can implement your operator in both ways: bottom-up (1.-5.) and top-down (5.-1.). Note: The steps b)-e) are rather simple if you know the functionalities and properties of your operator and currently consist of a lot of boilerplate code, while is the most cumbersome and may take more time than the previous steps if you do it the first time and need to understand how Nautilus works.

a. Nautilus Operator (Worker)

A Nautilus Operator contains the code that is executed during the operator’s runtime. It implements the ExecutableOperator interface with the function execute, which is called for each received tuple. Code in execute gets (query-)compiled by NebulaStreams query compilation backend.

Tasks:

  1. Create a new class that inherits from the base class ExecutableOperator
  2. The core of the compiler is the execute function. Here, you have to apply your operator logic tuple-based, i.e., the execute function is called for each occurring tuple.
  3. If your operator requires access to a global state, you must also create an operator handler that provides context for previous computations. Provide your handler with the required attributes your operator depends on. Your operator will have access to the state in the execution function via the execution context.
  4. Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders)

Location: nes-execution

Tasks:

  • execution -> Operators -> ExecutableOperator

Tests:

  • tests -> UnitTests -> Execution -> Operators
  • tests -> UnitTests -> -> Execution -> PipelineExecution
// Task 1
// @brief Limit operator that limits the number of records returned by the query.
  class Limit : public ExecutableOperator {
  public:
  // @brief Creates a limit operator
  // @param limitRecords number of records to limit
      explicit Limit(const uint64_t operatorHandlerIndex) : operatorHandlerIndex(operatorHandlerIndex){};
      void execute(ExecutionContext& ctx, Record& record) const override;

    private:
      const uint64_t operatorHandlerIndex;
  };
// Task 3: 
// @brief Limit operator handler to manage the global state of a limit operator
  class LimitOperatorHandler : public Runtime::Execution::OperatorHandler,
                               public ::NES::detail::virtual_enable_shared_from_this<LimitOperatorHandler, false> {
  public:
  // @brief Creates the operator handler.
      explicit LimitOperatorHandler(const uint64_t limit) : limit(limit){};
      
      void start(Runtime::Execution::PipelineExecutionContextPtr, uint32_t) { NES_DEBUG("start LimitOperatorHandler"); }
      void stop(Runtime::QueryTerminationType queryTerminationType, Runtime::Execution::PipelineExecutionContextPtr) {}
      
      // attributes 
      const uint64_t limit;
      std::atomic<uint64_t> counter = 0;
  }

b. Physical Operator (Worker)

A physical operator represents the concrete realization of a logical operator for the workers. It can be a direct mapping, e.g., a LogicalFilterOperator maps to a PhysicalFilterOperator, for stateless operators or a more complex representation that results in multiple physical operators, e.g., for Windowing.

Tasks:

  1. Create a physical operator description inheriting from the PhysicalOperator base classes (Unary or Binary operator, respectively)
  2. Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders)
  3. Add the missing lowering method(s) and elseif conditions to the DefaultPhysicalOperatorProvider class in order to enable lowering from a logical operator to a physical one.
  4. Afterward, you have to add your operator to the lower method of the LowerPhysicalToNautilusOperators class to enable lowering from the physical operator to the executable nautilus operator.

Location: nes-execution

Tasks:

  • QueryCompiler -> Operators -> PhysicalOperators
  • QueryCompiler -> Phases -> Translations -> DefaultPhysicalOperatorProvider

Tests:

  • tests -> UnitTests -> Execution -> Operators
// Task 1
PhysicalLimitOperator::PhysicalLimitOperator(OperatorId id, StatisticId statisticId, SchemaPtr inputSchema, SchemaPtr outputSchema, uint64_t limit): Operator(id, statisticId), PhysicalUnaryOperator(id, statisticId, std::move(inputSchema), std::move(outputSchema)), limit(limit) {}

    PhysicalOperatorPtr PhysicalLimitOperator::create(OperatorId id, StatisticId statisticId, const SchemaPtr& inputSchema, const SchemaPtr& outputSchema, uint64_t limit) {
        return std::make_shared<PhysicalLimitOperator>(id, statisticId, inputSchema, outputSchema, limit);
    }

    uint64_t PhysicalLimitOperator::getLimit() { return limit; }

    PhysicalOperatorPtr PhysicalLimitOperator::create(StatisticId statisticId, SchemaPtr inputSchema, SchemaPtr outputSchema, uint64_t limit) {
        return create(getNextOperatorId(), statisticId, std::move(inputSchema), std::move(outputSchema), limit);
    }
// Task 3
std::shared_ptr<Runtime::Execution::Operators::Operator> LowerPhysicalToNautilusOperators::lower(Runtime::Execution::PhysicalOperatorPipeline& pipeline, 
                                                                                                 std::shared_ptr<Runtime::Execution::Operators::Operator> parentOperator, 
                                                                                                 const PhysicalOperators::PhysicalOperatorPtr& operatorNode, size_t bufferSize, 
                                                                                                 std::vector<Runtime::Execution::OperatorHandlerPtr>& operatorHandlers) {
        if (operatorNode->instanceOf<ANYOperator>()) {
            ... 
        } else if (operatorNode->instanceOf<LogicalLimitOperator>()) {
            auto limitOperator = operatorNode->as<LogicalLimitOperator>();
            auto physicalLimitOperator = PhysicalOperators::PhysicalLimitOperator::create(operatorNode->getStatisticId(),
            limitOperator->getInputSchema(),
            limitOperator->getOutputSchema(),
            limitOperator->getLimit());
            operatorNode->replace(physicalLimitOperator);
        } else if (...) {
            ...
       }
// Task 4
        if (operatorNode->instanceOf<PhysicalOperators::ANYOperator>()) {
            ...
        } else if (operatorNode->instanceOf<PhysicalOperators::PhysicalLimitOperator>()) {
            auto limit = lowerLimit(pipeline, operatorNode, operatorHandlers);
            parentOperator->setChild(limit);
            return limit;
        }
        } else if (...) {
        ...
        }

c. Logical Operator & QueryPlanBuilder (Coordinator)

The logical operator is the logical representation of the operation. It is used to create the logical operator tree and enables schema inference and signature computation.

Tasks:

  1. Create a new class that inherits from the desired operator class, i.e., a unary or binary operator or an operator similar to yours
  2. Add required attributes and their getter and setter functions, e.g., predicate and getPredicate() for the filter operator, to your logical operator and adjust the inherited function if required.
  3. Add createMyOperator() in the LogicalOperatorFactory class
  4. Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders)
  5. Add your operator node to the logical operator plan in the QueryPlanBuilder, and include essential checks or additional logic for your operator. Note that you have to write your logical operator first.

Location: nes-operators

Tasks:

  • Operators -> LogicalOperators
  • Plans -> Query -> QueryPlanBuilder

Tests:

  • tests -> UnitTests -> Plans -> Query -> QueryPlanBuilderTest
// Task 2: 
    LogicalLimitOperator::LogicalLimitOperator(uint64_t limit, OperatorId id):  Operator(id), 
                                                                                LogicalUnaryOperator(id), 
                                                                                limit(limit) {}

    uint64_t LogicalLimitOperator::getLimit() const { return limit; }

    bool LogicalLimitOperator::equal(NodePtr const& rhs) const {
        if (rhs->instanceOf<LogicalLimitOperator>()) {
            auto limitOperator = rhs->as<LogicalLimitOperator>();
            return limit == limitOperator->limit;
        }
        return false;
    }
// Task 5: 
    QueryPlanPtr QueryPlanBuilder::addLimit(const uint64_t limit, QueryPlanPtr queryPlan) {
        OperatorPtr op = LogicalOperatorFactory::createLimitOperator(limit);
        queryPlan->appendOperatorAsNewRoot(op);
        return queryPlan;
    }

d. Serialization (Coordinator -> Worker)

You must provide serialization-specific details in order to serialize and send your operator from the coordinator to the workers.

  1. Create an operator-specific serialization message for your operator (private attributes)
  2. Delete your CMake folder and recompile your project to trigger the auto-generation of proto files
  3. Add the specific code to the serialization and deserialization of your new operator in the OperatorSerializationUtil class

Location: nes-operators

Tasks:

  • grpc -> SerializableOperator.proto
  • Operators -> Serialization -> OperatorSerializationUtil

Tests:

  • nes-coordinator -> tests -> UnitTest -> Serialization
// Task 1: 
// LimitDetails contains properties for the limit operator
    message LimitDetails {
    uint64 limit = 1;
}
// Task 3: 
    void OperatorSerializationUtil::serializeLimitOperator( const LogicalLimitOperator& limitOperator,
                                                        SerializableOperator& serializedOperator) {

        auto limitDetails = SerializableOperator_LimitDetails();
        limitDetails.set_limit(limitOperator.getLimit());
        serializedOperator.mutable_details()->PackFrom(limitDetails);
    }

    LogicalUnaryOperatorPtr
    OperatorSerializationUtil::deserializeLimitOperator(const SerializableOperator_LimitDetails& limitDetails) {
        return LogicalOperatorFactory::createLimitOperator(limitDetails.limit(), getNextOperatorId());
}

e. QueryAPI & Clients

You must add your operator to the Query API to enable users to use it in a query.

  1. Add your operator as a method to the API and send a request to the QueryPlanBuilder.

Location: nes-client

Tasks:

  • API -> Query

Tests:

  • tests -> UnitTests -> API -> QueryAPITest
  • nes-coordinator -> tests -> UnitTests -> QueryExecution
  • nes-coordinator -> tests -> Integration

💡 If you want your operator to be available in the NebulaStream clients, you also have to add it their:

// Task 1: 
Query& Query::limit(const uint64_t limit) {
    this->queryPlan = QueryPlanBuilder::addLimit(limit, this->queryPlan);
    return *this;
}