operation field in queue message WIP

This commit is contained in:
Matthias Bisping 2022-05-31 13:47:40 +02:00
parent ae2509dc59
commit 18a9683ddb

View File

@ -2,7 +2,7 @@ import logging
from functools import lru_cache
from operator import itemgetter
from funcy import rcompose, pluck
from funcy import rcompose
from pyinfra.config import CONFIG
from pyinfra.exceptions import AnalysisFailure
@ -48,12 +48,83 @@ def get_response_strategy(storage=None):
return AggregationStorageStrategy(storage or get_storage())
# @lru_cache(maxsize=None)
# def make_callback(endpoint):
# def callback(body: dict):
# def perform_operation(operation):
#
# if operation in operation2pipeline:
# print(1111111111111111)
# pipeline = operation2pipeline[operation]
#
# else:
# print(2222222222222222)
# pipeline = get_pipeline(f"{url}/{operation}")
# operation2pipeline[operation] = pipeline
#
# try:
# data, metadata = itemgetter("data", "metadata")(body)
# logging.debug(f"Requesting analysis from {endpoint}...")
# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or
# # so), in order for the queue manager to tell which message to ack.
# analysis_response_stream = pipeline([data], [metadata])
# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons
# # ([data], [metadata]).
# return list(analysis_response_stream)
# except Exception as err:
# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
# raise AnalysisFailure() from err
#
# operation2pipeline = {}
#
# operation = body.get("operations", "submit")
# results = perform_operation(operation)
#
# if operation == "submit":
# r = list(results)
# print(r)
# return r
# else:
# print(333333333333333333333333333333333333333333333333333333333333333333, operation)
# raise Exception
#
# url = "/".join(endpoint.split("/")[:-1])
#
# return callback
@lru_cache(maxsize=None)
def make_callback(analysis_endpoint):
def callback(body: dict):
def make_callback(endpoint):
url = "/".join(endpoint.split("/")[:-1])
return Callback(url)
class Callback:
def __init__(self, base_url):
self.base_url = base_url
self.endpoint2pipeline = {}
def __make_endpoint(self, operation):
return f"{self.base_url}/{operation}"
def __get_pipeline(self, endpoint):
if endpoint in self.endpoint2pipeline:
pipeline = self.endpoint2pipeline[endpoint]
else:
pipeline = get_pipeline(endpoint)
self.endpoint2pipeline[endpoint] = pipeline
return pipeline
@staticmethod
def __run_pipeline(pipeline, body):
try:
data, metadata = itemgetter("data", "metadata")(body)
logging.debug(f"Requesting analysis from {endpoint}...")
# TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
# happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
# facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
@ -64,13 +135,43 @@ def make_callback(analysis_endpoint):
# ([data], [metadata]).
return list(analysis_response_stream)
except Exception as err:
raise AnalysisFailure from err
def __call__(self, body: dict):
operation = body.get("operations", "submit")
endpoint = self.__make_endpoint(operation)
pipeline = self.__get_pipeline(endpoint)
try:
logging.debug(f"Requesting analysis from {endpoint}...")
return self.__run_pipeline(pipeline, body)
except AnalysisFailure:
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
raise AnalysisFailure() from err
endpoint = f"{analysis_endpoint}"
pipeline = get_pipeline(endpoint)
return callback
# @lru_cache(maxsize=None)
# def make_callback(analysis_endpoint):
# def callback(body: dict):
# try:
# data, metadata = itemgetter("data", "metadata")(body)
# logging.debug(f"Requesting analysis from {endpoint}...")
# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or
# # so), in order for the queue manager to tell which message to ack.
# analysis_response_stream = pipeline([data], [metadata])
# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons
# # ([data], [metadata]).
# return list(analysis_response_stream)
# except Exception as err:
# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
# raise AnalysisFailure() from err
#
# endpoint = f"{analysis_endpoint}"
# pipeline = get_pipeline(endpoint)
#
# return callback
@lru_cache(maxsize=None)