added id setting to operatin mocks and thus removed need for random seed of randomly seeded hash in storage item identifier
This commit is contained in:
parent
7e46a66698
commit
1940b974b1
@ -3,7 +3,6 @@ import gzip
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
from operator import itemgetter
|
||||
from typing import Callable, Dict, Union
|
||||
@ -23,11 +22,9 @@ from pyinfra.storage.storage import Storage
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def unique_hash(pages, seed=""):
|
||||
assert isinstance(seed, str)
|
||||
def unique_hash(pages):
|
||||
pages_str = "-".join(map(str, pages))
|
||||
seed = seed or str(time.time())
|
||||
rand_str = (pages_str + seed).encode(encoding="UTF-8", errors="strict")
|
||||
rand_str = pages_str.encode(encoding="UTF-8", errors="strict")
|
||||
hsh = hashlib.md5(rand_str).hexdigest()
|
||||
return hsh
|
||||
|
||||
@ -68,11 +65,11 @@ def get_response_object_descriptor(body):
|
||||
|
||||
class ResponseStrategy(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def handle_response(self, body):
|
||||
def handle_response(self, analysis_payload: dict):
|
||||
pass
|
||||
|
||||
def __call__(self, body):
|
||||
return self.handle_response(body)
|
||||
def __call__(self, analysis_payload: dict):
|
||||
return self.handle_response(analysis_payload)
|
||||
|
||||
|
||||
class StorageStrategy(ResponseStrategy):
|
||||
@ -144,19 +141,19 @@ class AggregationStorageStrategy(ResponseStrategy):
|
||||
analysis_payload : {data: ..., metadata: ...}
|
||||
"""
|
||||
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload.get("id", "0")}
|
||||
storage_upload_info = {**request_metadata, "id": analysis_payload["metadata"].get("id", "0")}
|
||||
|
||||
if analysis_payload["data"]:
|
||||
return self.put_object(json.dumps(analysis_payload).encode(), storage_upload_info)
|
||||
|
||||
else:
|
||||
self.buffer.append(analysis_payload)
|
||||
if last or self.dispatch_callback(request_metadata):
|
||||
if last or self.dispatch_callback(storage_upload_info):
|
||||
return self.upload_queue_items(storage_upload_info)
|
||||
|
||||
def handle_response(self, payload, final=False):
|
||||
request_metadata = omit(payload, ["data"])
|
||||
result_data = peekable(payload["data"])
|
||||
def handle_response(self, analysis_payload, final=False):
|
||||
request_metadata = omit(analysis_payload, ["data"])
|
||||
result_data = peekable(analysis_payload["data"])
|
||||
for analysis_payload in result_data:
|
||||
yield self.upload_or_aggregate(analysis_payload, request_metadata, last=not result_data.peek(False))
|
||||
|
||||
|
||||
4
test/fixtures/server.py
vendored
4
test/fixtures/server.py
vendored
@ -85,7 +85,8 @@ def core_operation(item_type, one_to_many, analysis_task):
|
||||
return string.decode().upper().encode(), metadata
|
||||
|
||||
def extract(string: bytes, metadata):
|
||||
for c in project(dict(enumerate(string.decode())), metadata["pages"]).values():
|
||||
for i, c in project(dict(enumerate(string.decode())), metadata["pages"]).items():
|
||||
metadata["id"] = i
|
||||
yield c.encode(), metadata
|
||||
|
||||
def rotate(im: bytes, metadata):
|
||||
@ -98,6 +99,7 @@ def core_operation(item_type, one_to_many, analysis_task):
|
||||
def stream_pages(pdf: bytes, metadata):
|
||||
for i, page in enumerate(fitz.open(stream=pdf)):
|
||||
# yield page.get_pixmap().tobytes("png"), metadata
|
||||
metadata["id"] = i
|
||||
yield f"page_{i}".encode(), metadata
|
||||
|
||||
params2op = {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user