refactoring
This commit is contained in:
parent
a69f613fe6
commit
2954bbc1ad
@ -8,7 +8,7 @@ from copy import deepcopy
|
||||
from operator import itemgetter
|
||||
from typing import Callable, Dict, Union
|
||||
|
||||
from funcy import omit, filter
|
||||
from funcy import omit, filter, lmap, lflatten
|
||||
from more_itertools import peekable
|
||||
|
||||
from pyinfra.config import CONFIG, parse_disjunction_string
|
||||
@ -251,16 +251,13 @@ class QueueVisitor:
|
||||
def load_item_from_storage_and_process_with_callback(self, queue_item_body):
|
||||
"""Bundles the result from processing a storage item with the body of the corresponding queue item."""
|
||||
|
||||
storage_items = self.load_data(queue_item_body)
|
||||
|
||||
result_body = {"data": [], **queue_item_body}
|
||||
|
||||
for storage_item in storage_items:
|
||||
def process_storage_item(storage_item):
|
||||
analysis_input = {**storage_item, **queue_item_body}
|
||||
result = self.process_storage_item(analysis_input)
|
||||
result_body["data"].extend(result)
|
||||
return self.process_storage_item(analysis_input)
|
||||
|
||||
return result_body
|
||||
storage_items = self.load_data(queue_item_body)
|
||||
results = lflatten(map(process_storage_item, storage_items))
|
||||
return {"data": results, **queue_item_body}
|
||||
|
||||
def __call__(self, queue_item_body):
|
||||
analysis_result_body = self.load_item_from_storage_and_process_with_callback(queue_item_body)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user