How to add a new operator placement algorithm?
On this page
This is an implementation guide on adding a new operator placement algorithm in NebulaStream to guide the placement of operators. In the reminder, we describe the main concepts, features, and essential development steps with examples of how to add your custom operator placement strategy.
In general, an operator placement algorithm determines the optimal placement for executing each operator of a shared query plan, i.e., a graph, in a processing node topology graph. The placement must be valid, ensuring the semantics of the query remain unchanged. The optimization goal can vary, focusing on latency, throughput, energy consumption, or other criteria.
Existing Interface
To add your operator placement algorithm NebulaStream provides a base class called BasePlacementAdditionStrategy
as interface.
We designed this interface to facilitate the placement of operators from new queries even when NebulaStream is already processing other queries,
potentially within a shared query plan. In our interface, operator placement algorithms assign operators to
specific computing nodes. Therefore, it represents the existing placements with pinned upstream and downstream operators.
When a new query arrives, NebulaStream receives operators that are not yet assigned to any computing node.
The operator placement algorithm is then responsible for assigning these new operators to the appropriate nodes.
💡 We recommend you to take a look to already existing rules that may serve you as starting point for your solution. The BottomUpStrategy
and TopDownStrategy
offer a relatively simpler constructive-based placement. Alternatively, the ILPStrategy
(Integer Linear Programming) offers a more complex cost-based optimization of coming up with a placement.
Tasks
- Create a new class for the new placement strategy and extend the base class
BasePlacementAdditionStrategy
. - Override the
updateGlobalExecutionPlan()
function from the base classBasePlacementAdditionStrategy
by defining the logic of the new operator placement. The main logic should pin the unpinned operators in the shared query plan in the computing nodes between the computing nodes in whichpinnedUpstreamOperator
andpinnedDownstreamOperators
are pinned, i.e., a path. The base class also provides helper functions that are commonly needed in an operator placement strategy, .e.g.:- If required, feasible paths can be computed using
performPathSelection()
, i.e., this function selects path from the computing node in which the latest upstream operator is placed to the computing node in which the latest downstream operator is placed. - Add the network source and sink operator using
addNetworkOperator()
, i.e., this function inserts system-generated operator to receive and send data between operators over the network (e.g., when two successive operators are placed in two different computing nodes)
- If required, feasible paths can be computed using
- Call the
updateExecutionNodes()
at the end of theupdateGlobalExecutionPlan()
function to finalize the placement. - Make the new operator placement strategy visible by performing the following steps:
- Add the new placement strategy as an enum in the
PlacementStrategy
class. - Add the new placement strategy to the
QueryPlacementAmendmentPhase::getStrategy()
.
- Add the new placement strategy as an enum in the
- Finally, add a specific unit test for the new operator placement strategy and extend
QueryPlacementAmendmentTest.cpp
with a test case of performing query amendment using your placement strategy. - Last, add all new src-files to the respective
CmakeList
s (in the corresponding folders, watch out for subfolders).
Location: nes-optimizer
Tasks:
- optimizer -> QueryPlacementAddition
- optimizer -> Phases -> PlacementAmendment -> QueryPlacementAmendmentPhase
Tests:
- tests -> UnitTests -> Optimizer -> QueryPlacement
- tests -> UnitTests -> Optimizer -> Phases -> QueryPlacementAmendmentTest.cpp
Location: nes-common
Tasks:
- Util -> PlacementAmendment -> QueryPlacementAmendmentPhase
The following code shows the implementation of the updateGlobalExecutionPlan()
(Task 2 and 3) using some helper functions from BasePlacementAdditionStrategy
.
PlacementAdditionResult YourPlacementStrategy::updateGlobalExecutionPlan(SharedQueryId sharedQueryId,
const std::set<LogicalOperatorPtr>& pinnedUpStreamOperators,
const std::set<LogicalOperatorPtr>& pinnedDownStreamOperators,
DecomposedQueryPlanVersion querySubPlanVersion) {
try {
NES_DEBUG("Perform placement of the pinned and all their downstream operators.");
// 1. Create copy of the query plan
auto copy =
CopiedPinnedOperators::create(pinnedUpStreamOperators, pinnedDownStreamOperators, operatorIdToOriginalOperatorMap);
// 2. Find the path where operators need to be placed
performPathSelection(copy.copiedPinnedUpStreamOperators, copy.copiedPinnedDownStreamOperators);
// ... add your placement strategy here
// Last, update execution nodes (Task 3)
return updateExecutionNodes(sharedQueryId, computedQuerySubPlans, querySubPlanVersion);
} catch (std::exception& ex) {
NES_ERROR("Exception occurred during bottom up placement: {}", ex.what());
throw Exceptions::QueryPlacementAdditionException(sharedQueryId, ex.what());
}
}
The following code shows the enum class PlacementStrategy
in which you need to add your strategy (Task 4).
enum class PlacementStrategy : uint8_t {
TopDown = 0,
BottomUp = 1,
...
YourPlacementStrategy = 8
};
The following code shows the QueryPlacementAmendmentPhase
class with the getStrategy
function in which you need to add your strategy (Task 4).
BasePlacementStrategyPtr QueryPlacementAmendmentPhase::getStrategy(PlacementStrategy placementStrategy) {
auto plannerURL = coordinatorConfiguration->elegant.plannerServiceURL;
switch (placementStrategy) {
case PlacementStrategy::ILP:
return ILPStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
case PlacementStrategy::BottomUp:
return BottomUpStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
...
case PlacementStrategy::YourPlacementStrategy:
return YourPlacementStrategy::create(globalExecutionPlan, topology, typeInferencePhase, placementAmendmentMode);
default:
throw Exceptions::RuntimeException("Unknown placement strategy type "
+ std::string(magic_enum::enum_name(placementStrategy)));
}
}