removed obsolete code
This commit is contained in:
parent
18a9683ddb
commit
0104395790
@ -48,54 +48,6 @@ def get_response_strategy(storage=None):
|
||||
return AggregationStorageStrategy(storage or get_storage())
|
||||
|
||||
|
||||
# @lru_cache(maxsize=None)
|
||||
# def make_callback(endpoint):
|
||||
# def callback(body: dict):
|
||||
# def perform_operation(operation):
|
||||
#
|
||||
# if operation in operation2pipeline:
|
||||
# print(1111111111111111)
|
||||
# pipeline = operation2pipeline[operation]
|
||||
#
|
||||
# else:
|
||||
# print(2222222222222222)
|
||||
# pipeline = get_pipeline(f"{url}/{operation}")
|
||||
# operation2pipeline[operation] = pipeline
|
||||
#
|
||||
# try:
|
||||
# data, metadata = itemgetter("data", "metadata")(body)
|
||||
# logging.debug(f"Requesting analysis from {endpoint}...")
|
||||
# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
|
||||
# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
|
||||
# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
|
||||
# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or
|
||||
# # so), in order for the queue manager to tell which message to ack.
|
||||
# analysis_response_stream = pipeline([data], [metadata])
|
||||
# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons
|
||||
# # ([data], [metadata]).
|
||||
# return list(analysis_response_stream)
|
||||
# except Exception as err:
|
||||
# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
||||
# raise AnalysisFailure() from err
|
||||
#
|
||||
# operation2pipeline = {}
|
||||
#
|
||||
# operation = body.get("operations", "submit")
|
||||
# results = perform_operation(operation)
|
||||
#
|
||||
# if operation == "submit":
|
||||
# r = list(results)
|
||||
# print(r)
|
||||
# return r
|
||||
# else:
|
||||
# print(333333333333333333333333333333333333333333333333333333333333333333, operation)
|
||||
# raise Exception
|
||||
#
|
||||
# url = "/".join(endpoint.split("/")[:-1])
|
||||
#
|
||||
# return callback
|
||||
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def make_callback(endpoint):
|
||||
url = "/".join(endpoint.split("/")[:-1])
|
||||
@ -122,17 +74,20 @@ class Callback:
|
||||
|
||||
@staticmethod
|
||||
def __run_pipeline(pipeline, body):
|
||||
"""
|
||||
TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching happening
|
||||
within the pipeline. However, the queue acknowledgment logic needs to be changed in order to facilitate
|
||||
passing non-singletons, to only ack a message, once a response is pulled from the output queue of the
|
||||
pipeline. Probably the pipeline return value needs to contains the queue message frame (or so), in order for
|
||||
the queue manager to tell which message to ack.
|
||||
|
||||
TODO: casting list on `analysis_response_stream` is a temporary solution, while the client pipeline operates
|
||||
on singletons ([data], [metadata]).
|
||||
"""
|
||||
|
||||
try:
|
||||
data, metadata = itemgetter("data", "metadata")(body)
|
||||
# TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
|
||||
# happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
|
||||
# facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
|
||||
# queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or
|
||||
# so), in order for the queue manager to tell which message to ack.
|
||||
analysis_response_stream = pipeline([data], [metadata])
|
||||
# TODO: casting list is a temporary solution, while the client pipeline operates on singletons
|
||||
# ([data], [metadata]).
|
||||
return list(analysis_response_stream)
|
||||
except Exception as err:
|
||||
raise AnalysisFailure from err
|
||||
@ -149,58 +104,8 @@ class Callback:
|
||||
logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
||||
|
||||
|
||||
# @lru_cache(maxsize=None)
|
||||
# def make_callback(analysis_endpoint):
|
||||
# def callback(body: dict):
|
||||
# try:
|
||||
# data, metadata = itemgetter("data", "metadata")(body)
|
||||
# logging.debug(f"Requesting analysis from {endpoint}...")
|
||||
# # TODO: since data and metadata are passed as singletons, there is no buffering and hence no batching
|
||||
# # happening within the pipeline. However, the queue acknowledgment logic needs to be changed in order to
|
||||
# # facilitate passing non-singletons, to only ack a message, once a response is pulled from the output
|
||||
# # queue of the pipeline. Probably the pipeline return value needs to contains the queue message frame (or
|
||||
# # so), in order for the queue manager to tell which message to ack.
|
||||
# analysis_response_stream = pipeline([data], [metadata])
|
||||
# # TODO: casting list is a temporary solution, while the client pipeline operates on singletons
|
||||
# # ([data], [metadata]).
|
||||
# return list(analysis_response_stream)
|
||||
# except Exception as err:
|
||||
# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
||||
# raise AnalysisFailure() from err
|
||||
#
|
||||
# endpoint = f"{analysis_endpoint}"
|
||||
# pipeline = get_pipeline(endpoint)
|
||||
#
|
||||
# return callback
|
||||
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def get_pipeline(endpoint):
|
||||
return ClientPipeline(
|
||||
RestPacker(), RestDispatcher(endpoint), RestReceiver(), rcompose(RestPickupStreamer(), RestReceiver())
|
||||
)
|
||||
|
||||
|
||||
# def make_callback(analysis_endpoint):
|
||||
# def callback(message: dict):
|
||||
# def perform_operation(operation):
|
||||
# endpoint = f"{analysis_endpoint}/{operation}"
|
||||
# try:
|
||||
# logging.debug(f"Requesting analysis from {endpoint}...")
|
||||
# analysis_response = requests.post(endpoint, data=message["data"])
|
||||
# analysis_response.raise_for_status()
|
||||
# analysis_response = analysis_response.json()
|
||||
# logging.debug(f"Received response.")
|
||||
# return analysis_response
|
||||
# except Exception as err:
|
||||
# logging.warning(f"Exception caught when calling analysis endpoint {endpoint}.")
|
||||
# raise AnalysisFailure() from err
|
||||
#
|
||||
# operations = message.get("operations", [""])
|
||||
# results = map(perform_operation, operations)
|
||||
# result = dict(zip(operations, results))
|
||||
# if list(result.keys()) == ["/"]:
|
||||
# result = list(result.values())[0]
|
||||
# return result
|
||||
#
|
||||
# return callback
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user