How to add a new operator placement algorithm?

This is an implementation guide on adding a new operator placement algorithm in NebulaStream to guide the placement of operators. In the reminder, we describe the main concepts, features, and essential development steps with examples of how to add your custom operator placement strategy.

In general, an operator placement algorithm determines the optimal placement for executing each operator of a shared query plan, i.e., a graph, in a processing node topology graph. The placement must be valid, ensuring the semantics of the query remain unchanged. The optimization goal can vary, focusing on latency, throughput, energy consumption, or other criteria.

Existing Interface

To add your operator placement algorithm NebulaStream provides a base class called BasePlacementAdditionStrategy as interface. We designed this interface to facilitate the placement of operators from new queries even when NebulaStream is already processing other queries, potentially within a shared query plan. In our interface, operator placement algorithms assign operators to specific computing nodes. Therefore, it represents the existing placements with pinned upstream and downstream operators. When a new query arrives, NebulaStream receives operators that are not yet assigned to any computing node. The operator placement algorithm is then responsible for assigning these new operators to the appropriate nodes.

💡 We recommend you to take a look to already existing rules that may serve you as starting point for your solution. The BottomUpStrategy and TopDownStrategy offer a relatively simpler constructive-based placement. Alternatively, the ILPStrategy (Integer Linear Programming) offers a more complex cost-based optimization of coming up with a placement.

Tasks

  1. Create a new class for the new placement strategy and extend the base class BasePlacementAdditionStrategy.
  2. Override the updateGlobalExecutionPlan() function from the base class BasePlacementAdditionStrategy by defining the logic of the new operator placement. The main logic should pin the unpinned operators in the shared query plan in the computing nodes between the computing nodes in which pinnedUpstreamOperator and pinnedDownstreamOperators are pinned, i.e., a path. The base class also provides helper functions that are commonly needed in an operator placement strategy, .e.g.:
    • If required, feasible paths can be computed using performPathSelection(), i.e., this function selects path from the computing node in which the latest upstream operator is placed to the computing node in which the latest downstream operator is placed.
    • Add the network source and sink operator using addNetworkOperator(), i.e., this function inserts system-generated operator to receive and send data between operators over the network (e.g., when two successive operators are placed in two different computing nodes)
  3. Call the updateExecutionNodes() at the end of the updateGlobalExecutionPlan() function to finalize the placement.
  4. Make the new operator placement strategy visible by performing the following steps:
    • Add the new placement strategy as an enum in the PlacementStrategy class.
    • Add the new placement strategy to the QueryPlacementAmendmentPhase::getStrategy().
  5. Finally, add a specific unit test for the new operator placement strategy and extend QueryPlacementAmendmentTest.cpp with a test case of performing query amendment using your placement strategy.
  6. Last, add all new src-files to the respective CmakeLists (in the corresponding folders, watch out for subfolders).

Location: nes-optimizer

Tasks:

  • optimizer -> QueryPlacementAddition
  • optimizer -> Phases -> PlacementAmendment -> QueryPlacementAmendmentPhase

Tests:

  • tests -> UnitTests -> Optimizer -> QueryPlacement
  • tests -> UnitTests -> Optimizer -> Phases -> QueryPlacementAmendmentTest.cpp

Location: nes-common

Tasks:

  • Util -> PlacementAmendment -> QueryPlacementAmendmentPhase

The following code shows the implementation of the updateGlobalExecutionPlan() (Task 2 and 3) using some helper functions from BasePlacementAdditionStrategy.

PlacementAdditionResult YourPlacementStrategy::updateGlobalExecutionPlan(SharedQueryId sharedQueryId,
                                                                    const std::set<LogicalOperatorPtr>& pinnedUpStreamOperators,
                                                                    const std::set<LogicalOperatorPtr>& pinnedDownStreamOperators,
                                                                    DecomposedQueryPlanVersion querySubPlanVersion) {
    try {
        NES_DEBUG("Perform placement of the pinned and all their downstream operators.");
        
        // 1. Create copy of the query plan
        auto copy =
            CopiedPinnedOperators::create(pinnedUpStreamOperators, pinnedDownStreamOperators, operatorIdToOriginalOperatorMap);

        // 2. Find the path where operators need to be placed
        performPathSelection(copy.copiedPinnedUpStreamOperators, copy.copiedPinnedDownStreamOperators);

        // ... add your placement strategy here 

        // Last, update execution nodes (Task 3)
        return updateExecutionNodes(sharedQueryId, computedQuerySubPlans, querySubPlanVersion);
    } catch (std::exception& ex) {
        NES_ERROR("Exception occurred during bottom up placement: {}", ex.what());
        throw Exceptions::QueryPlacementAdditionException(sharedQueryId, ex.what());
    }
}

The following code shows the enum class PlacementStrategy in which you need to add your strategy (Task 4).

enum class PlacementStrategy : uint8_t {
TopDown = 0,
BottomUp = 1,
...
YourPlacementStrategy = 8
};

The following code shows the QueryPlacementAmendmentPhaseclass with the getStrategy function in which you need to add your strategy (Task 4).

BasePlacementStrategyPtr QueryPlacementAmendmentPhase::getStrategy(PlacementStrategy placementStrategy) {

    auto plannerURL = coordinatorConfiguration->elegant.plannerServiceURL;

    switch (placementStrategy) {
        case PlacementStrategy::ILP:
            return ILPStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
        case PlacementStrategy::BottomUp:
            return BottomUpStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
        ... 
            case PlacementStrategy::YourPlacementStrategy:
            return YourPlacementStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
        default:
            throw Exceptions::RuntimeException("Unknown placement strategy type "
                                               + std::string(magic_enum::enum_name(placementStrategy)));
    }
}