NebulaStream
0.6.213
NebulaStream is a data and application management framework for the internet of things
|
: This class is responsible for handling requests related to submitting, fetching information, and deleting different queryIdAndCatalogEntryMapping, as well as modifying the topology. More...
#include <RequestHandlerService.hpp>
Public Member Functions | |
RequestHandlerService (Configurations::OptimizerConfiguration optimizerConfiguration, const QueryParsingServicePtr &queryParsingService, const Catalogs::Query::QueryCatalogPtr &queryCatalog, const Catalogs::Source::SourceCatalogPtr &sourceCatalog, const Catalogs::UDF::UDFCatalogPtr &udfCatalog, const NES::RequestProcessor::AsyncRequestProcessorPtr &asyncRequestExecutor, const z3::ContextPtr &z3Context, const Statistic::AbstractStatisticQueryGeneratorPtr &statisticQueryGenerator, const Statistic::StatisticRegistryPtr &statisticRegistry, const Optimizer::PlacementAmendmentHandlerPtr &placementAmendmentHandler) | |
QueryId | validateAndQueueAddQueryRequest (const std::string &queryString, const Optimizer::PlacementStrategy placementStrategy) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More... | |
QueryId | validateAndQueueAddQueryRequest (const QueryPlanPtr &queryPlan, const Optimizer::PlacementStrategy placementStrategy) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More... | |
nlohmann::json | validateAndQueueExplainQueryRequest (const QueryPlanPtr &queryPlan, const Optimizer::PlacementStrategy placementStrategy) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More... | |
bool | validateAndQueueStopQueryRequest (QueryId queryId) |
bool | validateAndQueueFailQueryRequest (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, const std::string &failureReason) |
RequestProcessor::ISQPRequestResponsePtr | queueISQPRequest (const std::vector< RequestProcessor::ISQPEventPtr > &isqpEvents) |
Process multiple query and topology change request represented by isqp events in a batch. More... | |
std::vector< Statistic::StatisticKey > | trackStatisticRequest (const Statistic::CharacteristicPtr &characteristic, const Windowing::WindowTypePtr &window, const Statistic::TriggerConditionPtr &triggerCondition, const Statistic::SendingPolicyPtr &sendingPolicy, std::function< void(Statistic::CharacteristicPtr)> callBack) |
Processes a track requests by mapping it to a statistic query and returning the statistic keys. More... | |
std::vector< Statistic::StatisticKey > | trackStatisticRequest (const Statistic::CharacteristicPtr &characteristic, const Windowing::WindowTypePtr &window) |
Processes a track requests by mapping it to a statistic query and returning the statistic keys. More... | |
bool | queueRegisterPhysicalSourceRequest (std::vector< RequestProcessor::PhysicalSourceDefinition > additions, WorkerId workerId) const |
register one or multiple new physical sources More... | |
bool | queueRegisterLogicalSourceRequest (const std::string &logicalSourceName, SchemaPtr schema) const |
register a new logical source More... | |
bool | queueUnregisterPhysicalSourceRequest (const std::string &physicalSourceName, const std::string &logicalSourceName, WorkerId workerId) const |
unregister an existing physical source More... | |
bool | queueUnregisterAllPhysicalSourcesByWorkerRequest (WorkerId workerId) const |
unregister all existing physical sources More... | |
bool | queueAddKeyDistributionEntryRequest (const std::string &logicalSourceName, const std::string &physicalSourceName, WorkerId workerId, const std::string &value) const |
add key distribution entry More... | |
bool | queueUnregisterLogicalSourceRequest (const std::string &logicalSourceName) const |
unregister an existing logical source More... | |
bool | queueUpdateLogicalSourceRequest (const std::string &logicalSourceName, SchemaPtr schema) const |
update an existing logical source More... | |
nlohmann::json | queueGetAllLogicalSourcesRequest () const |
get all logical sources More... | |
nlohmann::json | queueGetPhysicalSourcesRequest (std::string logicelSourceName) const |
get all physical sources belonging to a logical source More... | |
SchemaPtr | queueGetLogicalSourceSchemaRequest (std::string logicelSourceName) const |
get the schema of a logical source More... | |
: This class is responsible for handling requests related to submitting, fetching information, and deleting different queryIdAndCatalogEntryMapping, as well as modifying the topology.
|
explicit |
References NES::Optimizer::SemanticQueryValidation::create(), NES::Optimizer::SyntacticQueryValidation::create(), and NES_DEBUG.
bool NES::RequestHandlerService::queueAddKeyDistributionEntryRequest | ( | const std::string & | logicalSourceName, |
const std::string & | physicalSourceName, | ||
WorkerId | workerId, | ||
const std::string & | value | ||
) | const |
add key distribution entry
logicalSourceName | the name of the logical source to which the physical source belongs |
physicalSourceName | the name of the physical source to unregister |
workerId | the id of the worker hosting the physical source |
value | the statistics from request |
References NES::RequestProcessor::AddKeyDistributionEntryEvent::create(), NES::TestUtils::logicalSourceName(), NES_DEBUG, NES::TestUtils::physicalSourceName(), and magic_enum::detail::value().
nlohmann::json NES::RequestHandlerService::queueGetAllLogicalSourcesRequest | ( | ) | const |
get all logical sources
References NES::RequestProcessor::GetAllLogicalSourcesEvent::create(), NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.
SchemaPtr NES::RequestHandlerService::queueGetLogicalSourceSchemaRequest | ( | std::string | logicelSourceName | ) | const |
get the schema of a logical source
References NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::GetSchemaEvent::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.
nlohmann::json NES::RequestHandlerService::queueGetPhysicalSourcesRequest | ( | std::string | logicelSourceName | ) | const |
get all physical sources belonging to a logical source
References NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::GetPhysicalSourcesEvent::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.
RequestProcessor::ISQPRequestResponsePtr NES::RequestHandlerService::queueISQPRequest | ( | const std::vector< RequestProcessor::ISQPEventPtr > & | isqpEvents | ) |
Process multiple query and topology change request represented by isqp events in a batch.
isqpEvents | a vector of ISQP requests to be handled |
References NES::RequestProcessor::ISQPRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.
bool NES::RequestHandlerService::queueRegisterLogicalSourceRequest | ( | const std::string & | logicalSourceName, |
SchemaPtr | schema | ||
) | const |
register a new logical source
logicalSourceName | the name of the logical source |
schema | the schema of the logical source |
References NES::RequestProcessor::AddLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.
bool NES::RequestHandlerService::queueRegisterPhysicalSourceRequest | ( | std::vector< RequestProcessor::PhysicalSourceDefinition > | additions, |
WorkerId | workerId | ||
) | const |
register one or multiple new physical sources
additions | a vector of physical source additions |
workerId | the id of the worker hosting the physical source |
References NES::RequestProcessor::AddPhysicalSourcesEvent::create(), and NES_DEBUG.
bool NES::RequestHandlerService::queueUnregisterAllPhysicalSourcesByWorkerRequest | ( | WorkerId | workerId | ) | const |
unregister all existing physical sources
additions | a vector of physical source additions |
workerId | the id of the worker hosting the physical source |
References NES::RequestProcessor::RemoveAllPhysicalSourcesByWorkerEvent::create(), and NES_DEBUG.
bool NES::RequestHandlerService::queueUnregisterLogicalSourceRequest | ( | const std::string & | logicalSourceName | ) | const |
unregister an existing logical source
logicalSourceName | the name of the logical source |
References NES::RequestProcessor::RemoveLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.
bool NES::RequestHandlerService::queueUnregisterPhysicalSourceRequest | ( | const std::string & | physicalSourceName, |
const std::string & | logicalSourceName, | ||
WorkerId | workerId | ||
) | const |
unregister an existing physical source
physicalSourceName | the name of the physical source to unregister |
logicalSourceName | the name of the logical source to which the physical source belongs |
workerId | the id of the worker hosting the physical source |
References NES::RequestProcessor::RemovePhysicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), NES_DEBUG, and NES::TestUtils::physicalSourceName().
bool NES::RequestHandlerService::queueUpdateLogicalSourceRequest | ( | const std::string & | logicalSourceName, |
SchemaPtr | schema | ||
) | const |
update an existing logical source
logicalSourceName | the name of the logical source |
schema | the new schema of the logical source |
References NES::RequestProcessor::UpdateLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.
std::vector< Statistic::StatisticKey > NES::RequestHandlerService::trackStatisticRequest | ( | const Statistic::CharacteristicPtr & | characteristic, |
const Windowing::WindowTypePtr & | window | ||
) |
Processes a track requests by mapping it to a statistic query and returning the statistic keys.
characteristic | What type of the statistic is being tracked, i.e, data, infrastructure, or workload |
window | Over what window to track the statistics over |
References NES::Statistic::DEFAULT, and trackStatisticRequest().
std::vector< Statistic::StatisticKey > NES::RequestHandlerService::trackStatisticRequest | ( | const Statistic::CharacteristicPtr & | characteristic, |
const Windowing::WindowTypePtr & | window, | ||
const Statistic::TriggerConditionPtr & | triggerCondition, | ||
const Statistic::SendingPolicyPtr & | sendingPolicy, | ||
std::function< void(Statistic::CharacteristicPtr)> | callBack | ||
) |
Processes a track requests by mapping it to a statistic query and returning the statistic keys.
characteristic | What type of the statistic is being tracked, i.e, data, infrastructure, or workload |
window | Over what window to track the statistics over |
triggerCondition | Condition to trigger the callback function. Is being called, if a statistic is created |
sendingPolicy | Policy to send the data to the statistic sink |
callBack | Function that should be called, if triggerCondition evaluates to true |
References NES::Optimizer::BottomUp, and validateAndQueueAddQueryRequest().
Referenced by trackStatisticRequest().
QueryId NES::RequestHandlerService::validateAndQueueAddQueryRequest | ( | const QueryPlanPtr & | queryPlan, |
const Optimizer::PlacementStrategy | placementStrategy | ||
) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.
queryPlan | : Query Plan Pointer Object |
placementStrategy | : Name of the placement strategy |
References NES::RequestProcessor::AddQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.
QueryId NES::RequestHandlerService::validateAndQueueAddQueryRequest | ( | const std::string & | queryString, |
const Optimizer::PlacementStrategy | placementStrategy | ||
) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.
queryString | : query in string form. |
placementStrategy | : name of the placement strategy to be used. |
InvalidQueryException | : when query string is not valid. |
InvalidArgumentException | : when the placement strategy is not valid. |
References NES::RequestProcessor::AddQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.
Referenced by trackStatisticRequest().
nlohmann::json NES::RequestHandlerService::validateAndQueueExplainQueryRequest | ( | const QueryPlanPtr & | queryPlan, |
const Optimizer::PlacementStrategy | placementStrategy | ||
) |
Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.
queryPlan | : Query Plan Pointer Object |
placementStrategy | : Name of the placement strategy |
faultTolerance | : fault-tolerance guarantee for the given query. |
lineage | : lineage type for the given query. |
References NES::RequestProcessor::ExplainRequest::create().
bool NES::RequestHandlerService::validateAndQueueFailQueryRequest | ( | SharedQueryId | sharedQueryId, |
DecomposedQueryId | decomposedQueryId, | ||
DecomposedQueryPlanVersion | decomposedQueryVersion, | ||
const std::string & | failureReason | ||
) |
Register the request to fail shared query plan.
sharedQueryId | : shared query plan id of the shared query plan to be stopped. |
decomposedQueryId | id of the subquery plan that failed |
decomposedQueryVersion | version of the subquery plan that failed |
failureReason | : reason for shared query plan failure. |
References NES::RequestProcessor::FailQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.
bool NES::RequestHandlerService::validateAndQueueStopQueryRequest | ( | QueryId | queryId | ) |
Register the incoming stop query request in the system by add it to the scheduling queue for further processing.
queryId | : query id of the query to be stopped. |
QueryNotFoundException | : when query id is not found in the query catalog. |
InvalidQueryStateException | : when the query is found to be in an invalid state. |
References NES::RequestProcessor::StopQueryRequest::create(), NES::RequestProcessor::DEFAULT_RETRIES, NES::MARKED_FOR_HARD_STOP, and NES::STOPPED.