diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 5c5d14c..767dce9 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -3,7 +3,6 @@ import gzip import hashlib import json import logging -import time from collections import deque from operator import itemgetter from typing import Callable, Dict, Union @@ -23,11 +22,9 @@ from pyinfra.storage.storage import Storage logger = logging.getLogger(__name__) -def unique_hash(pages, seed=""): - assert isinstance(seed, str) +def unique_hash(pages): pages_str = "-".join(map(str, pages)) - seed = seed or str(time.time()) - rand_str = (pages_str + seed).encode(encoding="UTF-8", errors="strict") + rand_str = pages_str.encode(encoding="UTF-8", errors="strict") hsh = hashlib.md5(rand_str).hexdigest() return hsh @@ -68,11 +65,11 @@ def get_response_object_descriptor(body): class ResponseStrategy(abc.ABC): @abc.abstractmethod - def handle_response(self, body): + def handle_response(self, analysis_payload: dict): pass - def __call__(self, body): - return self.handle_response(body) + def __call__(self, analysis_payload: dict): + return self.handle_response(analysis_payload) class StorageStrategy(ResponseStrategy): @@ -144,19 +141,19 @@ class AggregationStorageStrategy(ResponseStrategy): analysis_payload : {data: ..., metadata: ...} """ - storage_upload_info = {**request_metadata, "id": analysis_payload.get("id", "0")} + storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", "0")} if analysis_payload["data"]: return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info) else: self.buffer.append(analysis_payload) - if last or self.dispatch_callback(request_metadata): + if last or self.dispatch_callback(storage_upload_info): return self.upload_queue_items(storage_upload_info) - def handle_response(self, payload, final=False): - request_metadata = omit(payload, ["data"]) - result_data = peekable(payload["data"]) + def handle_response(self, analysis_payload, final=False): + request_metadata = omit(analysis_payload, ["data"]) + result_data = peekable(analysis_payload["data"]) for analysis_payload in result_data: yield self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False)) diff --git a/test/fixtures/server.py b/test/fixtures/server.py index 70395a3..cae4fde 100644 --- a/test/fixtures/server.py +++ b/test/fixtures/server.py @@ -85,7 +85,8 @@ def core_operation(item_type, one_to_many, analysis_task): return string.decode().upper().encode(), metadata def extract(string: bytes, metadata): - for c in project(dict(enumerate(string.decode())), metadata["pages"]).values(): + for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items(): + metadata["id"] = i yield c.encode(), metadata def rotate(im: bytes, metadata): @@ -98,6 +99,7 @@ def core_operation(item_type, one_to_many, analysis_task): def stream_pages(pdf: bytes, metadata): for i, page in enumerate(fitz.open(stream=pdf)): # yield page.get_pixmap().tobytes("png"), metadata + metadata["id"] = i yield f"page_{i}".encode(), metadata params2op = {