From 18a9683ddbfcfcf19f7ec408193aaccd4f7e1489 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 31 May 2022 13:47:40 +0200 Subject: [PATCH] operation field in queue message WIP --- pyinfra/default_objects.py | 117 ++++++++++++++++++++++++++++++++++--- 1 file changed, 109 insertions(+), 8 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 6d4d3e0..ea0e0cc 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -2,7 +2,7 @@ import logging from functools import lru_cache from operator import itemgetter -from funcy import rcompose, pluck +from funcy import rcompose from pyinfra.config import CONFIG from pyinfra.exceptions import AnalysisFailure @@ -48,12 +48,83 @@ def get_response_strategy(storage=None): return AggregationStorageStrategy(storage or get_storage()) +# @lru_cache(maxsize=None) +# def make_callback(endpoint): +# def callback(body: dict): +# def perform_operation(operation): +# +# if operation in operation2pipeline: +# print(1111111111111111) +# pipeline = operation2pipeline[operation] +# +# else: +# print(2222222222222222) +# pipeline = get_pipeline(f"{url}/{operation}") +# operation2pipeline[operation] = pipeline +# +# try: +# data, metadata = itemgetter("data", "metadata")(body) +# logging.debug(f"Requesting analysis from {endpoint}...") +# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching +# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to +# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output +# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or +# # so), in order for the queue manager to tell which message to ack. +# analysis_response_stream = pipeline([data], [metadata]) +# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons +# # ([data], [metadata]). +# return list(analysis_response_stream) +# except Exception as err: +# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") +# raise AnalysisFailure() from err +# +# operation2pipeline = {} +# +# operation = body.get("operations", "submit") +# results = perform_operation(operation) +# +# if operation == "submit": +# r = list(results) +# print(r) +# return r +# else: +# print(333333333333333333333333333333333333333333333333333333333333333333, operation) +# raise Exception +# +# url = "/".join(endpoint.split("/")[:-1]) +# +# return callback + + @lru_cache(maxsize=None) -def make_callback(analysis_endpoint): - def callback(body: dict): +def make_callback(endpoint): + url = "/".join(endpoint.split("/")[:-1]) + return Callback(url) + + +class Callback: + def __init__(self, base_url): + self.base_url = base_url + self.endpoint2pipeline = {} + + def __make_endpoint(self, operation): + return f"{self.base_url}/{operation}" + + def __get_pipeline(self, endpoint): + if endpoint in self.endpoint2pipeline: + pipeline = self.endpoint2pipeline[endpoint] + + else: + pipeline = get_pipeline(endpoint) + self.endpoint2pipeline[endpoint] = pipeline + + return pipeline + + @staticmethod + def __run_pipeline(pipeline, body): + try: data, metadata = itemgetter("data", "metadata")(body) - logging.debug(f"Requesting analysis from {endpoint}...") # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output @@ -64,13 +135,43 @@ def make_callback(analysis_endpoint): # ([data], [metadata]). return list(analysis_response_stream) except Exception as err: + raise AnalysisFailure from err + + def __call__(self, body: dict): + operation = body.get("operations", "submit") + endpoint = self.__make_endpoint(operation) + pipeline = self.__get_pipeline(endpoint) + + try: + logging.debug(f"Requesting analysis from {endpoint}...") + return self.__run_pipeline(pipeline, body) + except AnalysisFailure: logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") - raise AnalysisFailure() from err - endpoint = f"{analysis_endpoint}" - pipeline = get_pipeline(endpoint) - return callback +# @lru_cache(maxsize=None) +# def make_callback(analysis_endpoint): +# def callback(body: dict): +# try: +# data, metadata = itemgetter("data", "metadata")(body) +# logging.debug(f"Requesting analysis from {endpoint}...") +# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching +# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to +# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output +# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or +# # so), in order for the queue manager to tell which message to ack. +# analysis_response_stream = pipeline([data], [metadata]) +# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons +# # ([data], [metadata]). +# return list(analysis_response_stream) +# except Exception as err: +# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") +# raise AnalysisFailure() from err +# +# endpoint = f"{analysis_endpoint}" +# pipeline = get_pipeline(endpoint) +# +# return callback @lru_cache(maxsize=None)