NebulaStream  0.6.213
NebulaStream is a data and application management framework for the internet of things
NES::RequestHandlerService Class Reference

: This class is responsible for handling requests related to submitting, fetching information, and deleting different queryIdAndCatalogEntryMapping, as well as modifying the topology. More...

#include <RequestHandlerService.hpp>

Public Member Functions

 RequestHandlerService (Configurations::OptimizerConfiguration optimizerConfiguration, const QueryParsingServicePtr &queryParsingService, const Catalogs::Query::QueryCatalogPtr &queryCatalog, const Catalogs::Source::SourceCatalogPtr &sourceCatalog, const Catalogs::UDF::UDFCatalogPtr &udfCatalog, const NES::RequestProcessor::AsyncRequestProcessorPtr &asyncRequestExecutor, const z3::ContextPtr &z3Context, const Statistic::AbstractStatisticQueryGeneratorPtr &statisticQueryGenerator, const Statistic::StatisticRegistryPtr &statisticRegistry, const Optimizer::PlacementAmendmentHandlerPtr &placementAmendmentHandler)
 
QueryId validateAndQueueAddQueryRequest (const std::string &queryString, const Optimizer::PlacementStrategy placementStrategy)
 Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More...
 
QueryId validateAndQueueAddQueryRequest (const QueryPlanPtr &queryPlan, const Optimizer::PlacementStrategy placementStrategy)
 Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More...
 
nlohmann::json validateAndQueueExplainQueryRequest (const QueryPlanPtr &queryPlan, const Optimizer::PlacementStrategy placementStrategy)
 Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned. More...
 
bool validateAndQueueStopQueryRequest (QueryId queryId)
 
bool validateAndQueueFailQueryRequest (SharedQueryId sharedQueryId, DecomposedQueryId decomposedQueryId, DecomposedQueryPlanVersion decomposedQueryVersion, const std::string &failureReason)
 
RequestProcessor::ISQPRequestResponsePtr queueISQPRequest (const std::vector< RequestProcessor::ISQPEventPtr > &isqpEvents)
 Process multiple query and topology change request represented by isqp events in a batch. More...
 
std::vector< Statistic::StatisticKey > trackStatisticRequest (const Statistic::CharacteristicPtr &characteristic, const Windowing::WindowTypePtr &window, const Statistic::TriggerConditionPtr &triggerCondition, const Statistic::SendingPolicyPtr &sendingPolicy, std::function< void(Statistic::CharacteristicPtr)> callBack)
 Processes a track requests by mapping it to a statistic query and returning the statistic keys. More...
 
std::vector< Statistic::StatisticKey > trackStatisticRequest (const Statistic::CharacteristicPtr &characteristic, const Windowing::WindowTypePtr &window)
 Processes a track requests by mapping it to a statistic query and returning the statistic keys. More...
 
bool queueRegisterPhysicalSourceRequest (std::vector< RequestProcessor::PhysicalSourceDefinition > additions, WorkerId workerId) const
 register one or multiple new physical sources More...
 
bool queueRegisterLogicalSourceRequest (const std::string &logicalSourceName, SchemaPtr schema) const
 register a new logical source More...
 
bool queueUnregisterPhysicalSourceRequest (const std::string &physicalSourceName, const std::string &logicalSourceName, WorkerId workerId) const
 unregister an existing physical source More...
 
bool queueUnregisterAllPhysicalSourcesByWorkerRequest (WorkerId workerId) const
 unregister all existing physical sources More...
 
bool queueAddKeyDistributionEntryRequest (const std::string &logicalSourceName, const std::string &physicalSourceName, WorkerId workerId, const std::string &value) const
 add key distribution entry More...
 
bool queueUnregisterLogicalSourceRequest (const std::string &logicalSourceName) const
 unregister an existing logical source More...
 
bool queueUpdateLogicalSourceRequest (const std::string &logicalSourceName, SchemaPtr schema) const
 update an existing logical source More...
 
nlohmann::json queueGetAllLogicalSourcesRequest () const
 get all logical sources More...
 
nlohmann::json queueGetPhysicalSourcesRequest (std::string logicelSourceName) const
 get all physical sources belonging to a logical source More...
 
SchemaPtr queueGetLogicalSourceSchemaRequest (std::string logicelSourceName) const
 get the schema of a logical source More...
 

Detailed Description

: This class is responsible for handling requests related to submitting, fetching information, and deleting different queryIdAndCatalogEntryMapping, as well as modifying the topology.

Constructor & Destructor Documentation

◆ RequestHandlerService()

NES::RequestHandlerService::RequestHandlerService ( Configurations::OptimizerConfiguration  optimizerConfiguration,
const QueryParsingServicePtr queryParsingService,
const Catalogs::Query::QueryCatalogPtr queryCatalog,
const Catalogs::Source::SourceCatalogPtr sourceCatalog,
const Catalogs::UDF::UDFCatalogPtr udfCatalog,
const NES::RequestProcessor::AsyncRequestProcessorPtr asyncRequestExecutor,
const z3::ContextPtr z3Context,
const Statistic::AbstractStatisticQueryGeneratorPtr statisticQueryGenerator,
const Statistic::StatisticRegistryPtr statisticRegistry,
const Optimizer::PlacementAmendmentHandlerPtr placementAmendmentHandler 
)
explicit

References NES::Optimizer::SemanticQueryValidation::create(), NES::Optimizer::SyntacticQueryValidation::create(), and NES_DEBUG.

Here is the call graph for this function:

Member Function Documentation

◆ queueAddKeyDistributionEntryRequest()

bool NES::RequestHandlerService::queueAddKeyDistributionEntryRequest ( const std::string &  logicalSourceName,
const std::string &  physicalSourceName,
WorkerId  workerId,
const std::string &  value 
) const

add key distribution entry

Parameters
logicalSourceNamethe name of the logical source to which the physical source belongs
physicalSourceNamethe name of the physical source to unregister
workerIdthe id of the worker hosting the physical source
valuethe statistics from request
Returns
true on success

References NES::RequestProcessor::AddKeyDistributionEntryEvent::create(), NES::TestUtils::logicalSourceName(), NES_DEBUG, NES::TestUtils::physicalSourceName(), and magic_enum::detail::value().

Here is the call graph for this function:

◆ queueGetAllLogicalSourcesRequest()

nlohmann::json NES::RequestHandlerService::queueGetAllLogicalSourcesRequest ( ) const

get all logical sources

Returns
json object containing all logical sources

References NES::RequestProcessor::GetAllLogicalSourcesEvent::create(), NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.

Here is the call graph for this function:

◆ queueGetLogicalSourceSchemaRequest()

SchemaPtr NES::RequestHandlerService::queueGetLogicalSourceSchemaRequest ( std::string  logicelSourceName) const

get the schema of a logical source

Returns
json object containing the schema

References NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::GetSchemaEvent::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.

Here is the call graph for this function:

◆ queueGetPhysicalSourcesRequest()

nlohmann::json NES::RequestHandlerService::queueGetPhysicalSourcesRequest ( std::string  logicelSourceName) const

get all physical sources belonging to a logical source

Returns
json object containing the sources

References NES::RequestProcessor::GetSourceCatalogRequest::create(), NES::RequestProcessor::GetPhysicalSourcesEvent::create(), NES::RequestProcessor::DEFAULT_RETRIES, and NES_DEBUG.

Here is the call graph for this function:

◆ queueISQPRequest()

RequestProcessor::ISQPRequestResponsePtr NES::RequestHandlerService::queueISQPRequest ( const std::vector< RequestProcessor::ISQPEventPtr > &  isqpEvents)

Process multiple query and topology change request represented by isqp events in a batch.

Parameters
isqpEventsa vector of ISQP requests to be handled
Returns
response to the execution of the request

References NES::RequestProcessor::ISQPRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.

Here is the call graph for this function:

◆ queueRegisterLogicalSourceRequest()

bool NES::RequestHandlerService::queueRegisterLogicalSourceRequest ( const std::string &  logicalSourceName,
SchemaPtr  schema 
) const

register a new logical source

Parameters
logicalSourceNamethe name of the logical source
schemathe schema of the logical source
Returns
true on success

References NES::RequestProcessor::AddLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.

Here is the call graph for this function:

◆ queueRegisterPhysicalSourceRequest()

bool NES::RequestHandlerService::queueRegisterPhysicalSourceRequest ( std::vector< RequestProcessor::PhysicalSourceDefinition additions,
WorkerId  workerId 
) const

register one or multiple new physical sources

Parameters
additionsa vector of physical source additions
workerIdthe id of the worker hosting the physical source
Returns
true on success

References NES::RequestProcessor::AddPhysicalSourcesEvent::create(), and NES_DEBUG.

Here is the call graph for this function:

◆ queueUnregisterAllPhysicalSourcesByWorkerRequest()

bool NES::RequestHandlerService::queueUnregisterAllPhysicalSourcesByWorkerRequest ( WorkerId  workerId) const

unregister all existing physical sources

Parameters
additionsa vector of physical source additions
workerIdthe id of the worker hosting the physical source
Returns
true on success

References NES::RequestProcessor::RemoveAllPhysicalSourcesByWorkerEvent::create(), and NES_DEBUG.

Here is the call graph for this function:

◆ queueUnregisterLogicalSourceRequest()

bool NES::RequestHandlerService::queueUnregisterLogicalSourceRequest ( const std::string &  logicalSourceName) const

unregister an existing logical source

Parameters
logicalSourceNamethe name of the logical source
Returns
true on success

References NES::RequestProcessor::RemoveLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.

Here is the call graph for this function:

◆ queueUnregisterPhysicalSourceRequest()

bool NES::RequestHandlerService::queueUnregisterPhysicalSourceRequest ( const std::string &  physicalSourceName,
const std::string &  logicalSourceName,
WorkerId  workerId 
) const

unregister an existing physical source

Parameters
physicalSourceNamethe name of the physical source to unregister
logicalSourceNamethe name of the logical source to which the physical source belongs
workerIdthe id of the worker hosting the physical source
Returns
true on success

References NES::RequestProcessor::RemovePhysicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), NES_DEBUG, and NES::TestUtils::physicalSourceName().

Here is the call graph for this function:

◆ queueUpdateLogicalSourceRequest()

bool NES::RequestHandlerService::queueUpdateLogicalSourceRequest ( const std::string &  logicalSourceName,
SchemaPtr  schema 
) const

update an existing logical source

Parameters
logicalSourceNamethe name of the logical source
schemathe new schema of the logical source
Returns
true on success

References NES::RequestProcessor::UpdateLogicalSourceEvent::create(), NES::TestUtils::logicalSourceName(), and NES_DEBUG.

Here is the call graph for this function:

◆ trackStatisticRequest() [1/2]

std::vector< Statistic::StatisticKey > NES::RequestHandlerService::trackStatisticRequest ( const Statistic::CharacteristicPtr characteristic,
const Windowing::WindowTypePtr window 
)

Processes a track requests by mapping it to a statistic query and returning the statistic keys.

Parameters
characteristicWhat type of the statistic is being tracked, i.e, data, infrastructure, or workload
windowOver what window to track the statistics over
Returns
Vector of StatisticKeys

References NES::Statistic::DEFAULT, and trackStatisticRequest().

Here is the call graph for this function:

◆ trackStatisticRequest() [2/2]

std::vector< Statistic::StatisticKey > NES::RequestHandlerService::trackStatisticRequest ( const Statistic::CharacteristicPtr characteristic,
const Windowing::WindowTypePtr window,
const Statistic::TriggerConditionPtr triggerCondition,
const Statistic::SendingPolicyPtr sendingPolicy,
std::function< void(Statistic::CharacteristicPtr)>  callBack 
)

Processes a track requests by mapping it to a statistic query and returning the statistic keys.

Parameters
characteristicWhat type of the statistic is being tracked, i.e, data, infrastructure, or workload
windowOver what window to track the statistics over
triggerConditionCondition to trigger the callback function. Is being called, if a statistic is created
sendingPolicyPolicy to send the data to the statistic sink
callBackFunction that should be called, if triggerCondition evaluates to true
Returns
: Vector of StatisticKeys

References NES::Optimizer::BottomUp, and validateAndQueueAddQueryRequest().

Referenced by trackStatisticRequest().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ validateAndQueueAddQueryRequest() [1/2]

QueryId NES::RequestHandlerService::validateAndQueueAddQueryRequest ( const QueryPlanPtr queryPlan,
const Optimizer::PlacementStrategy  placementStrategy 
)

Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.

Parameters
queryPlan: Query Plan Pointer Object
placementStrategy: Name of the placement strategy
Returns
query id

References NES::RequestProcessor::AddQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.

Here is the call graph for this function:

◆ validateAndQueueAddQueryRequest() [2/2]

QueryId NES::RequestHandlerService::validateAndQueueAddQueryRequest ( const std::string &  queryString,
const Optimizer::PlacementStrategy  placementStrategy 
)

Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.

Parameters
queryString: query in string form.
placementStrategy: name of the placement strategy to be used.
Returns
queryId : query id of the valid input query.
Exceptions
InvalidQueryException: when query string is not valid.
InvalidArgumentException: when the placement strategy is not valid.

References NES::RequestProcessor::AddQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.

Referenced by trackStatisticRequest().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ validateAndQueueExplainQueryRequest()

nlohmann::json NES::RequestHandlerService::validateAndQueueExplainQueryRequest ( const QueryPlanPtr queryPlan,
const Optimizer::PlacementStrategy  placementStrategy 
)

Register the incoming query in the system by add it to the scheduling queue for further processing, and return the query Id assigned.

Parameters
queryPlan: Query Plan Pointer Object
placementStrategy: Name of the placement strategy
faultTolerance: fault-tolerance guarantee for the given query.
lineage: lineage type for the given query.
Returns
query id

References NES::RequestProcessor::ExplainRequest::create().

Here is the call graph for this function:

◆ validateAndQueueFailQueryRequest()

bool NES::RequestHandlerService::validateAndQueueFailQueryRequest ( SharedQueryId  sharedQueryId,
DecomposedQueryId  decomposedQueryId,
DecomposedQueryPlanVersion  decomposedQueryVersion,
const std::string &  failureReason 
)

Register the request to fail shared query plan.

Warning
: this method is primarily designed to be called only by the system.
Parameters
sharedQueryId: shared query plan id of the shared query plan to be stopped.
decomposedQueryIdid of the subquery plan that failed
decomposedQueryVersionversion of the subquery plan that failed
failureReason: reason for shared query plan failure.
Returns
: true if successful

References NES::RequestProcessor::FailQueryRequest::create(), and NES::RequestProcessor::DEFAULT_RETRIES.

Here is the call graph for this function:

◆ validateAndQueueStopQueryRequest()

bool NES::RequestHandlerService::validateAndQueueStopQueryRequest ( QueryId  queryId)

Register the incoming stop query request in the system by add it to the scheduling queue for further processing.

Parameters
queryId: query id of the query to be stopped.
Returns
: true if successful
Exceptions
QueryNotFoundException: when query id is not found in the query catalog.
InvalidQueryStateException: when the query is found to be in an invalid state.

References NES::RequestProcessor::StopQueryRequest::create(), NES::RequestProcessor::DEFAULT_RETRIES, NES::MARKED_FOR_HARD_STOP, and NES::STOPPED.

Here is the call graph for this function:

The documentation for this class was generated from the following files: