From 0104395790400739b89109f302f79430e4e02515 Mon Sep 17 00:00:00 2001 From: Matthias Bisping Date: Tue, 31 May 2022 13:50:44 +0200 Subject: [PATCH] removed obsolete code --- pyinfra/default_objects.py | 115 ++++--------------------------------- 1 file changed, 10 insertions(+), 105 deletions(-) diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index ea0e0cc..caa87e1 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -48,54 +48,6 @@ def get_response_strategy(storage=None): return AggregationStorageStrategy(storage or get_storage()) -# @lru_cache(maxsize=None) -# def make_callback(endpoint): -# def callback(body: dict): -# def perform_operation(operation): -# -# if operation in operation2pipeline: -# print(1111111111111111) -# pipeline = operation2pipeline[operation] -# -# else: -# print(2222222222222222) -# pipeline = get_pipeline(f"{url}/{operation}") -# operation2pipeline[operation] = pipeline -# -# try: -# data, metadata = itemgetter("data", "metadata")(body) -# logging.debug(f"Requesting analysis from {endpoint}...") -# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching -# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to -# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output -# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or -# # so), in order for the queue manager to tell which message to ack. -# analysis_response_stream = pipeline([data], [metadata]) -# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons -# # ([data], [metadata]). -# return list(analysis_response_stream) -# except Exception as err: -# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") -# raise AnalysisFailure() from err -# -# operation2pipeline = {} -# -# operation = body.get("operations", "submit") -# results = perform_operation(operation) -# -# if operation == "submit": -# r = list(results) -# print(r) -# return r -# else: -# print(333333333333333333333333333333333333333333333333333333333333333333, operation) -# raise Exception -# -# url = "/".join(endpoint.split("/")[:-1]) -# -# return callback - - @lru_cache(maxsize=None) def make_callback(endpoint): url = "/".join(endpoint.split("/")[:-1]) @@ -122,17 +74,20 @@ class Callback: @staticmethod def __run_pipeline(pipeline, body): + """ + TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching happening + within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate + passing non-singletons, to only ack a message, once a response is pulled from the output queue of the + pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for + the queue manager to tell which message to ack. + + TODO: casting list on `analysis_response_stream` is a temporary solution, while the client pipeline operates + on singletons ([data], [metadata]). + """ try: data, metadata = itemgetter("data", "metadata")(body) - # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching - # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to - # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output - # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or - # so), in order for the queue manager to tell which message to ack. analysis_response_stream = pipeline([data], [metadata]) - # TODO: casting list is a temporary solution, while the client pipeline operates on singletons - # ([data], [metadata]). return list(analysis_response_stream) except Exception as err: raise AnalysisFailure from err @@ -149,58 +104,8 @@ class Callback: logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") -# @lru_cache(maxsize=None) -# def make_callback(analysis_endpoint): -# def callback(body: dict): -# try: -# data, metadata = itemgetter("data", "metadata")(body) -# logging.debug(f"Requesting analysis from {endpoint}...") -# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching -# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to -# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output -# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or -# # so), in order for the queue manager to tell which message to ack. -# analysis_response_stream = pipeline([data], [metadata]) -# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons -# # ([data], [metadata]). -# return list(analysis_response_stream) -# except Exception as err: -# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") -# raise AnalysisFailure() from err -# -# endpoint = f"{analysis_endpoint}" -# pipeline = get_pipeline(endpoint) -# -# return callback - - @lru_cache(maxsize=None) def get_pipeline(endpoint): return ClientPipeline( RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver()) ) - - -# def make_callback(analysis_endpoint): -# def callback(message: dict): -# def perform_operation(operation): -# endpoint = f"{analysis_endpoint}/{operation}" -# try: -# logging.debug(f"Requesting analysis from {endpoint}...") -# analysis_response = requests.post(endpoint, data=message["data"]) -# analysis_response.raise_for_status() -# analysis_response = analysis_response.json() -# logging.debug(f"Received response.") -# return analysis_response -# except Exception as err: -# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.") -# raise AnalysisFailure() from err -# -# operations = message.get("operations", [""]) -# results = map(perform_operation, operations) -# result = dict(zip(operations, results)) -# if list(result.keys()) == ["/"]: -# result = list(result.values())[0] -# return result -# -# return callback