How to add a new operator?
This is an implementation guide on adding a new operator in NebulaStream. In the reminder, we describe the main concepts, features, and essential development step with examples of how to add your custom operator.
- a. Executable Nautilus Operator
- b. Physical Operator
- c. Logical Operator & QueryPlanBuilder
- d. Serialization Logic
- e. Query API & Clients
In general, you can implement your operator in both ways: bottom-up (a.-e.) and top-down (e.-a.). Note: The steps b.-e. are rather simple if you know the functionalities and properties of your operator and currently consist of a lot of boilerplate code. In contrast, the implementation of the executable operator is the most cumbersome task and may take more time than the previous steps if you do it the first time as you need to understand how the Nautilus compiler works.
a. Executable Nautilus Operator (Worker)
An executable operator contains the code that is executed during the operator’s runtime.
It implements the ExecutableOperator
interface with the execute function, which is called for each received tuple (record). Code in execute gets (query-)compiled by NebulaStreams query compilation backend Nautilus.
Tasks:
- Create a new class that inherits from the base class
ExecutableOperator
. - The core of the compiler is the execute function. Here, you have to apply your operator logic tuple-based, i.e., the execute function is called for each occurring tuple.
- If your operator requires access to a global state, you must also create an operator handler that provides context for previous computations. Provide your handler with the required attributes your operator depends on. Your operator will have access to the state in the execution function via the execution context.
- Add all new src-files to the respective
CmakeList
s (in the corresponding folders, watch out for subfolders)
Location: nes-execution
Tasks:
- execution -> Operators -> ExecutableOperator
Tests:
- tests -> UnitTests -> Execution -> Operators
- tests -> UnitTests -> Execution -> PipelineExecution
// Task 1
// @brief Limit operator that limits the number of records returned by the query.
class Limit : public ExecutableOperator {
public:
// @brief Creates a limit operator
// @param limitRecords number of records to limit
explicit Limit(const uint64_t operatorHandlerIndex) : operatorHandlerIndex(operatorHandlerIndex){};
void execute(ExecutionContext& ctx, Record& record) const override;
private:
const uint64_t operatorHandlerIndex;
};
// Task 3:
// @brief Limit operator handler to manage the global state of a limit operator
class LimitOperatorHandler : public Runtime::Execution::OperatorHandler,
public ::NES::detail::virtual_enable_shared_from_this<LimitOperatorHandler, false> {
public:
// @brief Creates the operator handler.
explicit LimitOperatorHandler(const uint64_t limit) : limit(limit){};
void start(Runtime::Execution::PipelineExecutionContextPtr, uint32_t) { NES_DEBUG("start LimitOperatorHandler"); }
void stop(Runtime::QueryTerminationType queryTerminationType, Runtime::Execution::PipelineExecutionContextPtr) {}
// attributes
const uint64_t limit;
std::atomic<uint64_t> counter = 0;
}
b. Physical Operator (Worker)
A physical operator represents the concrete realization of a logical operator for the workers. It can be a direct mapping, e.g., a LogicalFilterOperator
maps to a PhysicalFilterOperator
, for stateless operators or require a more complex representation that results in multiple physical operators, e.g., for Windowing.
Tasks:
- Create a physical operator description inheriting from the
PhysicalOperator
base classes (Unary or Binary operator, respectively). - Add all new src-files to the respective
CmakeList
s (in the corresponding folders, watch out for subfolders). - Add the missing lowering method(s) and elseif conditions to the
DefaultPhysicalOperatorProvider
class in order to enable lowering from a logical operator to a physical one. - Afterward, you have to add your operator to the lower method of the
LowerPhysicalToNautilusOperators
class to enable lowering from the physical operator to the executable Nautilus operator.
Location: nes-execution
Tasks:
- QueryCompiler -> Operators -> PhysicalOperators
- QueryCompiler -> Phases -> Translations -> DefaultPhysicalOperatorProvider
Tests:
- tests -> UnitTests -> Execution -> Operators
// Task 1
PhysicalLimitOperator::PhysicalLimitOperator(OperatorId id, StatisticId statisticId, SchemaPtr inputSchema, SchemaPtr outputSchema, uint64_t limit): Operator(id, statisticId), PhysicalUnaryOperator(id, statisticId, std::move(inputSchema), std::move(outputSchema)), limit(limit) {}
PhysicalOperatorPtr PhysicalLimitOperator::create(OperatorId id, StatisticId statisticId, const SchemaPtr& inputSchema, const SchemaPtr& outputSchema, uint64_t limit) {
return std::make_shared<PhysicalLimitOperator>(id, statisticId, inputSchema, outputSchema, limit);
}
uint64_t PhysicalLimitOperator::getLimit() { return limit; }
PhysicalOperatorPtr PhysicalLimitOperator::create(StatisticId statisticId, SchemaPtr inputSchema, SchemaPtr outputSchema, uint64_t limit) {
return create(getNextOperatorId(), statisticId, std::move(inputSchema), std::move(outputSchema), limit);
}
// Task 3
std::shared_ptr<Runtime::Execution::Operators::Operator> LowerPhysicalToNautilusOperators::lower(Runtime::Execution::PhysicalOperatorPipeline& pipeline,
std::shared_ptr<Runtime::Execution::Operators::Operator> parentOperator,
const PhysicalOperators::PhysicalOperatorPtr& operatorNode, size_t bufferSize,
std::vector<Runtime::Execution::OperatorHandlerPtr>& operatorHandlers) {
if (operatorNode->instanceOf<ANYOperator>()) {
...
} else if (operatorNode->instanceOf<LogicalLimitOperator>()) {
auto limitOperator = operatorNode->as<LogicalLimitOperator>();
auto physicalLimitOperator = PhysicalOperators::PhysicalLimitOperator::create(operatorNode->getStatisticId(),
limitOperator->getInputSchema(),
limitOperator->getOutputSchema(),
limitOperator->getLimit());
operatorNode->replace(physicalLimitOperator);
} else if (...) {
...
}
// Task 4
if (operatorNode->instanceOf<PhysicalOperators::ANYOperator>()) {
...
} else if (operatorNode->instanceOf<PhysicalOperators::PhysicalLimitOperator>()) {
auto limit = lowerLimit(pipeline, operatorNode, operatorHandlers);
parentOperator->setChild(limit);
return limit;
}
} else if (...) {
...
}
c. Logical Operator & QueryPlanBuilder (Coordinator)
The logical operator is the logical representation of the operation. It is used to create the logical operator tree and enables schema inference and signature computation.
Tasks:
- Create a new class that inherits from the desired operator class, i.e., a unary or binary operator or an operator similar to yours.
- Add required attributes and their getter and setter functions, e.g., predicate and getPredicate() for the filter operator, to your logical operator and adjust the inherited functions if required.
- Add createMyOperator() in the
LogicalOperatorFactory
class - Add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders).
- Add your operator node to the logical operator plan in the
QueryPlanBuilder
, and include essential checks or additional logic for your operator. Note that you have to write your logical operator first.
Location: nes-operators
Tasks:
- Operators -> LogicalOperators
- Plans -> Query -> QueryPlanBuilder
Tests:
- tests -> UnitTests -> Plans -> Query -> QueryPlanBuilderTest
// Task 2:
LogicalLimitOperator::LogicalLimitOperator(uint64_t limit, OperatorId id): Operator(id),
LogicalUnaryOperator(id),
limit(limit) {}
uint64_t LogicalLimitOperator::getLimit() const { return limit; }
bool LogicalLimitOperator::equal(NodePtr const& rhs) const {
if (rhs->instanceOf<LogicalLimitOperator>()) {
auto limitOperator = rhs->as<LogicalLimitOperator>();
return limit == limitOperator->limit;
}
return false;
}
// Task 5:
QueryPlanPtr QueryPlanBuilder::addLimit(const uint64_t limit, QueryPlanPtr queryPlan) {
OperatorPtr op = LogicalOperatorFactory::createLimitOperator(limit);
queryPlan->appendOperatorAsNewRoot(op);
return queryPlan;
}
d. Serialization (Coordinator -> Worker)
You must provide serialization-specific details in order to serialize and send your operator from the coordinator to the workers.
- Create an operator-specific serialization message for your operator (private attributes) in
SerializableOperator.proto
. - Delete your CMake folder and recompile your project to trigger the auto-generation of proto files.
- Add the specific code to the serialization and deserialization of your new operator in the
OperatorSerializationUtil
class.
Location: nes-operators
Tasks:
- grpc -> SerializableOperator.proto
- Operators -> Serialization -> OperatorSerializationUtil
Tests:
- nes-coordinator -> tests -> UnitTest -> Serialization
// Task 1:
// LimitDetails contains properties for the limit operator
message LimitDetails {
uint64 limit = 1;
}
// Task 3:
void OperatorSerializationUtil::serializeLimitOperator( const LogicalLimitOperator& limitOperator,
SerializableOperator& serializedOperator) {
auto limitDetails = SerializableOperator_LimitDetails();
limitDetails.set_limit(limitOperator.getLimit());
serializedOperator.mutable_details()->PackFrom(limitDetails);
}
LogicalUnaryOperatorPtr
OperatorSerializationUtil::deserializeLimitOperator(const SerializableOperator_LimitDetails& limitDetails) {
return LogicalOperatorFactory::createLimitOperator(limitDetails.limit(), getNextOperatorId());
}
e. QueryAPI & Clients
You must add your operator to the Query API to enable users to use it in a query.
- Add your operator as a method to the API and send a request to the QueryPlanBuilder.
Location: nes-client
Tasks:
- API -> Query
Tests:
- tests -> UnitTests -> API -> QueryAPITest
- nes-coordinator -> tests -> UnitTests -> QueryExecution
- nes-coordinator -> tests -> Integration
💡 If you want your operator to be available in the NebulaStream clients, you also have to add it their:
// Task 1:
Query& Query::limit(const uint64_t limit) {
this->queryPlan = QueryPlanBuilder::addLimit(limit, this->queryPlan);
return *this;
}