Compare commits

...

12 Commits

Author SHA1 Message Date
Julius Unverfehrt
3ef4246d1e chore: fuzzy pin kn-utils to allow for future updates 2025-01-22 12:36:38 +01:00
Julius Unverfehrt
841c492639 Merge branch 'chore/RES-871-update-callback' into 'master'
feat:BREAKING CHANGE: download callback no forwards all files as bytes

See merge request knecon/research/pyinfra!108
2025-01-16 11:11:59 +01:00
Julius Unverfehrt
ead069d3a7 chore: adjust docstrings 2025-01-16 10:35:06 +01:00
Julius Unverfehrt
044ea6cf0a feat: streamline download to always include the filename of the downloaded file 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
ff7547e2c6 fix: remove faulty import 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
fbf79ef758 chore: regenerate BOM 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
f382887d40 chore: seek and destroy proto in code 2025-01-16 10:29:50 +01:00
Julius Unverfehrt
5c4400aa8b feat:BREAKING CHANGE: download callback no forwards all files as bytes 2025-01-16 10:29:46 +01:00
Jonathan Kössler
5ce66f18a0 Merge branch 'bugfix/RED-10722' into 'master'
fix: dlq init

See merge request knecon/research/pyinfra!109
2025-01-15 10:56:12 +01:00
Jonathan Kössler
ea0c55930a chore: remove test nack 2025-01-15 10:00:50 +01:00
Jonathan Kössler
87f57e2244 fix: dlq init 2025-01-14 16:39:47 +01:00
Jonathan Kössler
3fb8c4e641 fix: do not use groups for packages 2024-12-18 16:33:35 +01:00
28 changed files with 8716 additions and 6733 deletions

View File

@ -6,7 +6,7 @@
4. [ Module Installation ](#module-installation)
5. [ Scripts ](#scripts)
6. [ Tests ](#tests)
7. [ Protobuf ](#protobuf)
7. [ Opentelemetry protobuf dependency hell ](#opentelemetry-protobuf-dependency-hell)
## About
@ -213,48 +213,8 @@ $ python scripts/send_request.py
Tests require a running minio and rabbitmq container, meaning you have to run `docker compose up` in the tests folder
before running the tests.
## Protobuf
## OpenTelemetry Protobuf Dependency Hell
### Opentelemetry Compatibility Issue
**Note**: Status: 31/07/2024, the currently used `opentelemetry-exporter-otlp-proto-http` version `1.25.0` requires
a `protobuf` version < `5.x.x` and is not compatible with the latest protobuf version `5.27.x`. This is an [open issue](https://github.com/open-telemetry/opentelemetry-python/issues/3958) in opentelemetry, because [support for 4.25.x ends in Q2 '25](https://protobuf.dev/support/version-support/#python). Therefore, we should keep this in mind and update the dependency once opentelemetry includes support for `protobuf 5.27.x`.
### Install Protobuf Compiler
**Linux**
1. Download the version of the protobuf compiler matching the protobuf package, currently v4.25.4 so protoc v25.4, from [GitHub](https://github.com/protocolbuffers/protobuf/releases) -> `protobuf-25.4.zip`
2. Extract the files under `$HOME/.local` or another directory of your choice
```bash
unzip protoc-<version>-linux-x86_64.zip -d $HOME/.local
```
3. Ensure that the `bin` directory is in your `PATH` by adding the following line to your `.bashrc` or `.zshrc`:
```bash
export PATH="$PATH:$HOME/.local/bin"
```
**MacOS**
1. Download the version of the protobuf compiler matching the protobuf package, currently v4.25.4 so protoc v25.4, from [GitHub](https://github.com/protocolbuffers/protobuf/releases) -> `protoc-25.4-osx-universal_binary.zip`
2. Extract the files to a directory of your choice
3. Copy the executable bin `protoc` to `/usr/local/bin`
```bash
sudo cp /Users/you/location-of-unzipped-dir/bin/protoc /usr/local/bin/
```
4. Open `protoc` in `/usr/local/bin/` via Finder to make it executable, now it should be also on your `PATH`
### Compile Protobuf Files
1. Ensure that the protobuf compiler is installed on your system. You can check this by running:
```bash
protoc --version
```
2. Compile proto files:
```bash
protoc --proto_path=./config/proto --python_out=./pyinfra/proto ./config/proto/*.proto
```
3. Manually adjust import statements in the generated files to match the package structure, e.g.:
`import EntryData_pb2 as EntryData__pb2` -> `import pyinfra.proto.EntryData_pb2 as EntryData__pb2`.
This does not work automatically because the generated files are not in the same directory as the proto files.
**Note**: Status 2025/01/09: the currently used `opentelemetry-exporter-otlp-proto-http` version `1.25.0` requires
a `protobuf` version < `5.x.x` and is not compatible with the latest protobuf version `5.27.x`. This is an [open issue](https://github.com/open-telemetry/opentelemetry-python/issues/3958) in opentelemetry, because [support for 4.25.x ends in Q2 '25](https://protobuf.dev/support/version-support/#python).
Therefore, we should keep this in mind and update the dependency once opentelemetry includes support for `protobuf 5.27.x`.

11934
bom.json

File diff suppressed because it is too large Load Diff

View File

@ -1,21 +0,0 @@
syntax = "proto3";
message AllDocumentPages {
repeated DocumentPage documentPages = 1;
}
message DocumentPage {
// The page number, starting with 1.
int32 number = 1;
// The page height in PDF user units.
int32 height = 2;
// The page width in PDF user units.
int32 width = 3;
// The page rotation as specified by the PDF.
int32 rotation = 4;
}

View File

@ -1,25 +0,0 @@
syntax = "proto3";
message AllDocumentPositionData {
repeated DocumentPositionData documentPositionData = 1;
}
message DocumentPositionData {
// Identifier of the text block.
int64 id = 1;
// For each string coordinate in the search text of the text block, the array contains an entry relating the string coordinate to the position coordinate.
// This is required due to the text and position coordinates not being equal.
repeated int32 stringIdxToPositionIdx = 2;
// The bounding box for each glyph as a rectangle. This matrix is of size (n,4), where n is the number of glyphs in the text block.
// The second dimension specifies the rectangle with the value x, y, width, height, with x, y specifying the lower left corner.
// In order to access this information, the stringIdxToPositionIdx array must be used to transform the coordinates.
repeated Position positions = 3;
// Definition of a BoundingBox that contains x, y, width, and height.
message Position {
repeated float value = 1;
}
}

View File

@ -1,8 +0,0 @@
syntax = "proto3";
import "EntryData.proto";
message DocumentStructure {
// The root EntryData represents the Document.
EntryData root = 1;
}

View File

@ -1,29 +0,0 @@
syntax = "proto3";
message AllDocumentTextData {
repeated DocumentTextData documentTextData = 1;
}
message DocumentTextData {
// Identifier of the text block.
int64 id = 1;
// The page the text block occurs on.
int64 page = 2;
// The text of the text block.
string searchText = 3;
// Each text block is assigned a number on a page, starting from 0.
int32 numberOnPage = 4;
// The text blocks are ordered, this number represents the start of the text block as a string offset.
int32 start = 5;
// The text blocks are ordered, this number represents the end of the text block as a string offset.
int32 end = 6;
// The line breaks in the text of this semantic node in string offsets. They are exclusive end. At the end of each semantic node there is an implicit linebreak.
repeated int32 lineBreaks = 7;
}

View File

@ -1,27 +0,0 @@
syntax = "proto3";
import "LayoutEngine.proto";
import "NodeType.proto";
message EntryData {
// Type of the semantic node.
NodeType type = 1;
// Specifies the position in the parsed tree structure.
repeated int32 treeId = 2;
// Specifies the text block IDs associated with this semantic node.
repeated int64 atomicBlockIds = 3;
// Specifies the pages this semantic node appears on.
repeated int64 pageNumbers = 4;
// Some semantic nodes have additional information, this information is stored in this Map.
map<string, string> properties = 5;
// All child Entries of this Entry.
repeated EntryData children = 6;
// Describes the origin of the semantic node.
repeated LayoutEngine engines = 7;
}

View File

@ -1,7 +0,0 @@
syntax = "proto3";
enum LayoutEngine {
ALGORITHM = 0;
AI = 1;
OUTLINE = 2;
}

View File

@ -1,14 +0,0 @@
syntax = "proto3";
enum NodeType {
DOCUMENT = 0;
SECTION = 1;
SUPER_SECTION = 2;
HEADLINE = 3;
PARAGRAPH = 4;
TABLE = 5;
TABLE_CELL = 6;
IMAGE = 7;
HEADER = 8;
FOOTER = 9;
}

2612
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: DocumentPage.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x12\x44ocumentPage.proto"8\n\x10\x41llDocumentPages\x12$\n\rdocumentPages\x18\x01 \x03(\x0b\x32\r.DocumentPage"O\n\x0c\x44ocumentPage\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0e\n\x06height\x18\x02 \x01(\x05\x12\r\n\x05width\x18\x03 \x01(\x05\x12\x10\n\x08rotation\x18\x04 \x01(\x05\x62\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "DocumentPage_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_ALLDOCUMENTPAGES"]._serialized_start = 22
_globals["_ALLDOCUMENTPAGES"]._serialized_end = 78
_globals["_DOCUMENTPAGE"]._serialized_start = 80
_globals["_DOCUMENTPAGE"]._serialized_end = 159
# @@protoc_insertion_point(module_scope)

View File

@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: DocumentPositionData.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1a\x44ocumentPositionData.proto"N\n\x17\x41llDocumentPositionData\x12\x33\n\x14\x64ocumentPositionData\x18\x01 \x03(\x0b\x32\x15.DocumentPositionData"\x90\x01\n\x14\x44ocumentPositionData\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x1e\n\x16stringIdxToPositionIdx\x18\x02 \x03(\x05\x12\x31\n\tpositions\x18\x03 \x03(\x0b\x32\x1e.DocumentPositionData.Position\x1a\x19\n\x08Position\x12\r\n\x05value\x18\x01 \x03(\x02\x62\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "DocumentPositionData_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_ALLDOCUMENTPOSITIONDATA"]._serialized_start = 30
_globals["_ALLDOCUMENTPOSITIONDATA"]._serialized_end = 108
_globals["_DOCUMENTPOSITIONDATA"]._serialized_start = 111
_globals["_DOCUMENTPOSITIONDATA"]._serialized_end = 255
_globals["_DOCUMENTPOSITIONDATA_POSITION"]._serialized_start = 230
_globals["_DOCUMENTPOSITIONDATA_POSITION"]._serialized_end = 255
# @@protoc_insertion_point(module_scope)

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: DocumentStructure.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import pyinfra.proto.EntryData_pb2 as EntryData__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x17\x44ocumentStructure.proto\x1a\x0f\x45ntryData.proto"-\n\x11\x44ocumentStructure\x12\x18\n\x04root\x18\x01 \x01(\x0b\x32\n.EntryDatab\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "DocumentStructure_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_DOCUMENTSTRUCTURE"]._serialized_start = 44
_globals["_DOCUMENTSTRUCTURE"]._serialized_end = 89
# @@protoc_insertion_point(module_scope)

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: DocumentTextData.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x16\x44ocumentTextData.proto"B\n\x13\x41llDocumentTextData\x12+\n\x10\x64ocumentTextData\x18\x01 \x03(\x0b\x32\x11.DocumentTextData"\x86\x01\n\x10\x44ocumentTextData\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x0c\n\x04page\x18\x02 \x01(\x03\x12\x12\n\nsearchText\x18\x03 \x01(\t\x12\x14\n\x0cnumberOnPage\x18\x04 \x01(\x05\x12\r\n\x05start\x18\x05 \x01(\x05\x12\x0b\n\x03\x65nd\x18\x06 \x01(\x05\x12\x12\n\nlineBreaks\x18\x07 \x03(\x05\x62\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "DocumentTextData_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_ALLDOCUMENTTEXTDATA"]._serialized_start = 26
_globals["_ALLDOCUMENTTEXTDATA"]._serialized_end = 92
_globals["_DOCUMENTTEXTDATA"]._serialized_start = 95
_globals["_DOCUMENTTEXTDATA"]._serialized_end = 229
# @@protoc_insertion_point(module_scope)

View File

@ -1,34 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: EntryData.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import pyinfra.proto.LayoutEngine_pb2 as LayoutEngine__pb2
import pyinfra.proto.NodeType_pb2 as NodeType__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x0f\x45ntryData.proto\x1a\x12LayoutEngine.proto\x1a\x0eNodeType.proto"\x82\x02\n\tEntryData\x12\x17\n\x04type\x18\x01 \x01(\x0e\x32\t.NodeType\x12\x0e\n\x06treeId\x18\x02 \x03(\x05\x12\x16\n\x0e\x61tomicBlockIds\x18\x03 \x03(\x03\x12\x13\n\x0bpageNumbers\x18\x04 \x03(\x03\x12.\n\nproperties\x18\x05 \x03(\x0b\x32\x1a.EntryData.PropertiesEntry\x12\x1c\n\x08\x63hildren\x18\x06 \x03(\x0b\x32\n.EntryData\x12\x1e\n\x07\x65ngines\x18\x07 \x03(\x0e\x32\r.LayoutEngine\x1a\x31\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x62\x06proto3'
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "EntryData_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_ENTRYDATA_PROPERTIESENTRY"]._options = None
_globals["_ENTRYDATA_PROPERTIESENTRY"]._serialized_options = b"8\001"
_globals["_ENTRYDATA"]._serialized_start = 56
_globals["_ENTRYDATA"]._serialized_end = 314
_globals["_ENTRYDATA_PROPERTIESENTRY"]._serialized_start = 265
_globals["_ENTRYDATA_PROPERTIESENTRY"]._serialized_end = 314
# @@protoc_insertion_point(module_scope)

View File

@ -1,27 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: LayoutEngine.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b"\n\x12LayoutEngine.proto*2\n\x0cLayoutEngine\x12\r\n\tALGORITHM\x10\x00\x12\x06\n\x02\x41I\x10\x01\x12\x0b\n\x07OUTLINE\x10\x02\x62\x06proto3"
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "LayoutEngine_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_LAYOUTENGINE"]._serialized_start = 22
_globals["_LAYOUTENGINE"]._serialized_end = 72
# @@protoc_insertion_point(module_scope)

View File

@ -1,27 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: NodeType.proto
# Protobuf Python Version: 4.25.5
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b"\n\x0eNodeType.proto*\x93\x01\n\x08NodeType\x12\x0c\n\x08\x44OCUMENT\x10\x00\x12\x0b\n\x07SECTION\x10\x01\x12\x11\n\rSUPER_SECTION\x10\x02\x12\x0c\n\x08HEADLINE\x10\x03\x12\r\n\tPARAGRAPH\x10\x04\x12\t\n\x05TABLE\x10\x05\x12\x0e\n\nTABLE_CELL\x10\x06\x12\t\n\x05IMAGE\x10\x07\x12\n\n\x06HEADER\x10\x08\x12\n\n\x06\x46OOTER\x10\tb\x06proto3"
)
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "NodeType_pb2", _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals["_NODETYPE"]._serialized_start = 19
_globals["_NODETYPE"]._serialized_end = 166
# @@protoc_insertion_point(module_scope)

View File

@ -122,6 +122,11 @@ class AsyncQueueManager:
self.config.service_response_exchange_name, ExchangeType.DIRECT, durable=True
)
# we must declare DLQ to handle error messages
self.dead_letter_queue = await self.channel.declare_queue(
self.config.service_dead_letter_queue_name, durable=True
)
@retry(tries=5, exceptions=(AMQPConnectionError, ChannelInvalidStateError), reraise=True, logger=logger)
async def setup_tenant_queue(self) -> None:
self.tenant_exchange_queue = await self.channel.declare_queue(
@ -160,6 +165,10 @@ class AsyncQueueManager:
input_queue = await self.channel.declare_queue(
queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self.config.service_dead_letter_queue_name,
},
)
await input_queue.bind(self.input_exchange, routing_key=tenant_id)
self.consumer_tags[tenant_id] = await input_queue.consume(self.process_input_message)

View File

@ -1,15 +1,16 @@
from typing import Callable, Union
from typing import Callable
from dynaconf import Dynaconf
from kn_utils.logging import logger
from pyinfra.storage.connection import get_storage
from pyinfra.storage.utils import (
download_data_as_specified_in_message,
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
DownloadedData,
)
DataProcessor = Callable[[Union[dict, bytes], dict], Union[dict, list, str]]
DataProcessor = Callable[[dict[str, DownloadedData] | DownloadedData, dict], dict | list | str]
Callback = Callable[[dict], dict]
@ -28,7 +29,9 @@ def make_download_process_upload_callback(data_processor: DataProcessor, setting
storage = get_storage(settings, queue_message_payload.get("X-TENANT-ID"))
data = download_data_as_specified_in_message(storage, queue_message_payload)
data: dict[str, DownloadedData] | DownloadedData = download_data_bytes_as_specified_in_message(
storage, queue_message_payload
)
result = data_processor(data, queue_message_payload)

View File

@ -1,127 +0,0 @@
import re
from enum import Enum
from pathlib import Path
from google.protobuf.json_format import MessageToDict
from kn_utils.logging import logger
from pyinfra.proto import (
DocumentPage_pb2,
DocumentPositionData_pb2,
DocumentStructure_pb2,
DocumentTextData_pb2,
)
class ProtoDataLoader:
"""Loads proto data from a file and returns it as a dictionary or list.
The loader is a singleton and should be used as a callable. The file name and byte data are passed as arguments.
The document type is determined based on the file name and the data is returned as a dictionary or list, depending
on the document type.
The DocumentType enum contains all supported document types and their corresponding proto schema.
KEYS_TO_UNPACK contains the keys that should be unpacked from the message dictionary. Keys are unpacked if the
message dictionary contains only one key. This behaviour is necessary since lists are wrapped in a dictionary.
"""
_instance = None
_pattern = None
class DocumentType(Enum):
STRUCTURE = (DocumentStructure_pb2.DocumentStructure, "DocumentStructure")
TEXT = (DocumentTextData_pb2.AllDocumentTextData, "AllDocumentTextData")
PAGES = (DocumentPage_pb2.AllDocumentPages, "AllDocumentPages")
POSITION = (DocumentPositionData_pb2.AllDocumentPositionData, "AllDocumentPositionData")
KEYS_TO_UNPACK = ["documentTextData", "documentPages", "documentPositionData"]
@classmethod
def _build_pattern(cls) -> re.Pattern:
types = "|".join([dt.name for dt in cls.DocumentType])
return re.compile(rf"\..*({types}).*\.proto.*")
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._pattern = cls._build_pattern()
return cls._instance
def __call__(self, file_name: str | Path, data: bytes) -> dict:
return self._load(file_name, data)
def _load(self, file_name: str | Path, data: bytes) -> dict | list:
file_name = str(file_name)
document_type = self._match(file_name)
if not document_type:
logger.error(f"Unknown document type: {file_name}, supported types: {self.DocumentType}")
return {}
logger.debug(f"Loading document type: {document_type}")
schema, _ = self.DocumentType[document_type].value
message = schema()
message.ParseFromString(data)
message_dict = MessageToDict(message, including_default_value_fields=True)
message_dict = convert_int64_fields(message_dict)
if document_type == "POSITION":
message_dict = transform_positions_to_list(message_dict)
return self._unpack(message_dict)
def _match(self, file_name: str) -> str | None:
match = self._pattern.search(file_name)
return match.group(1) if match else None
def _unpack(self, message_dict: dict) -> list | dict:
if len(message_dict) > 1:
return message_dict
for key in self.KEYS_TO_UNPACK:
if key in message_dict:
logger.debug(f"Unpacking key: {key}")
return message_dict[key]
return message_dict
def convert_int64_fields(obj):
# FIXME: find a more sophisticated way to convert int64 fields (defaults to str in python)
# we skip the following keys because the values are expected to be of type str
skip_keys = ["col", "row", "numberOfCols", "numberOfRows"]
if isinstance(obj, dict):
for key, value in obj.items():
if key in skip_keys:
continue
obj[key] = convert_int64_fields(value)
elif isinstance(obj, list):
return [convert_int64_fields(item) for item in obj]
elif isinstance(obj, str) and obj.isdigit():
return int(obj)
return obj
def transform_positions_to_list(obj: dict | list) -> dict:
"""Transforms the repeated fields 'positions' to a lists of lists of floats
as expected by DocumentReader.
Args:
obj (dict | list): Proto message dict
Returns:
dict: Proto message dict
"""
if isinstance(obj, dict):
# Check if 'positions' is in the dictionary and reshape it as list of lists of floats
if "positions" in obj and isinstance(obj["positions"], list):
obj["positions"] = [pos["value"] for pos in obj["positions"] if isinstance(pos, dict) and "value" in pos]
# Recursively apply to all nested dictionaries
for key, value in obj.items():
obj[key] = transform_positions_to_list(value)
elif isinstance(obj, list):
# Recursively apply to all items in the list
obj = [transform_positions_to_list(item) for item in obj]
return obj

View File

@ -1,12 +1,11 @@
import gzip
import json
from functools import singledispatch
from typing import Union
from typing import TypedDict
from kn_utils.logging import logger
from pydantic import BaseModel, ValidationError
from pyinfra.storage.proto_data_loader import ProtoDataLoader
from pyinfra.storage.storages.storage import Storage
@ -53,28 +52,27 @@ class TenantIdDossierIdFileIdUploadPayload(BaseModel):
class TargetResponseFilePathDownloadPayload(BaseModel):
targetFilePath: Union[str, dict]
targetFilePath: str | dict[str, str]
class TargetResponseFilePathUploadPayload(BaseModel):
responseFilePath: str
def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -> Union[dict, bytes]:
class DownloadedData(TypedDict):
data: bytes
file_path: str
def download_data_bytes_as_specified_in_message(
storage: Storage, raw_payload: dict
) -> dict[str, DownloadedData] | DownloadedData:
"""Convenience function to download a file specified in a message payload.
Supports both legacy and new payload formats. Also supports downloading multiple files at once, which should
be specified in a dictionary under the 'targetFilePath' key with the file path as value.
If the content is compressed with gzip (.gz), it will be decompressed (-> bytes).
If the content is a json file, it will be decoded (-> dict).
If no file is specified in the payload or the file does not exist in storage, an exception will be raised.
In all other cases, the content will be returned as is (-> bytes).
This function can be extended in the future as needed (e.g. handling of more file types), but since further
requirements are not specified at this point in time, and it is unclear what these would entail, the code is kept
simple for now to improve readability, maintainability and avoid refactoring efforts of generic solutions that
weren't as generic as they seemed.
The data is downloaded as bytes and returned as a dictionary with the file path as key and the data as value.
In case of several download targets, a nested dictionary is returned with the same keys and dictionaries with
the file path and data as values.
"""
try:
@ -93,33 +91,25 @@ def download_data_as_specified_in_message(storage: Storage, raw_payload: dict) -
@singledispatch
def _download(file_path_or_file_path_dict: Union[str, dict], storage: Storage) -> Union[dict, bytes]:
def _download(
file_path_or_file_path_dict: str | dict[str, str], storage: Storage
) -> dict[str, DownloadedData] | DownloadedData:
pass
@_download.register(str)
def _download_single_file(file_path: str, storage: Storage) -> bytes:
def _download_single_file(file_path: str, storage: Storage) -> DownloadedData:
if not storage.exists(file_path):
raise FileNotFoundError(f"File '{file_path}' does not exist in storage.")
data = storage.get_object(file_path)
data = gzip.decompress(data) if ".gz" in file_path else data
if ".json" in file_path:
data = json.loads(data.decode("utf-8"))
elif ".proto" in file_path:
data = ProtoDataLoader()(file_path, data)
else:
pass # identity for other file types
logger.info(f"Downloaded {file_path} from storage.")
return data
return DownloadedData(data=data, file_path=file_path)
@_download.register(dict)
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict:
def _download_multiple_files(file_path_dict: dict, storage: Storage) -> dict[str, DownloadedData]:
return {key: _download(value, storage) for key, value in file_path_dict.items()}

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pyinfra"
version = "3.4.0"
version = "4.1.0"
description = ""
authors = ["Team Research <research@knecon.com>"]
license = "All rights reseverd"
@ -21,10 +21,9 @@ pycryptodome = "^3.19"
fastapi = "^0.109.0"
uvicorn = "^0.26.0"
[tool.poetry.group.internal.dependencies]
kn-utils = { version = "0.4.0", source = "nexus" }
# [tool.poetry.group.telemetry.dependencies]
# DONT USE GROUPS BECAUSE THEY ARE NOT INSTALLED FOR PACKAGES
# [tool.poetry.group.internal.dependencies] <<< THIS IS NOT WORKING
kn-utils = { version = ">=0.4.0", source = "nexus" }
# We set all opentelemetry dependencies to lower bound because the image classification service depends on a protobuf version <4, but does not use proto files.
# Therefore, we allow latest possible protobuf version in the services which use proto files. As soon as the dependency issue is fixed set this to the latest possible opentelemetry version
opentelemetry-instrumentation-pika = ">=0.46b0,<0.50"
@ -42,6 +41,7 @@ azure-monitor-opentelemetry = "^1.6.0"
aio-pika = "^9.4.2"
aiohttp = "^3.9.5"
# THIS IS NOT AVAILABLE FOR SERVICES THAT IMPLEMENT PYINFRA
[tool.poetry.group.dev.dependencies]
pytest = "^7"
ipykernel = "^6.26.0"

View File

@ -27,6 +27,8 @@ def storage(storage_backend, settings):
def queue_manager(settings):
settings.rabbitmq_heartbeat = 10
settings.connection_sleep = 5
settings.rabbitmq.max_retries = 3
settings.rabbitmq.max_delay = 10
queue_manager = QueueManager(settings)
yield queue_manager

View File

@ -3,7 +3,6 @@ from sys import stdout
from time import sleep
import pika
import pytest
from kn_utils.logging import logger
logger.remove()

View File

@ -7,7 +7,7 @@ from fastapi import FastAPI
from pyinfra.storage.connection import get_storage_for_tenant
from pyinfra.storage.utils import (
download_data_as_specified_in_message,
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
)
from pyinfra.utils.cipher import encrypt
@ -139,16 +139,6 @@ def payload(payload_type):
}
@pytest.fixture
def expected_data(payload_type):
if payload_type == "target_response_file_path":
return {"data": "success"}
elif payload_type == "dossier_id_file_id":
return {"dossierId": "test", "fileId": "file", "data": "success"}
elif payload_type == "target_file_dict":
return {"file_1": {"data": "success"}, "file_2": {"data": "success"}}
@pytest.mark.parametrize(
"payload_type",
[
@ -160,17 +150,17 @@ def expected_data(payload_type):
)
@pytest.mark.parametrize("storage_backend", ["azure", "s3"], scope="class")
class TestDownloadAndUploadFromMessage:
def test_download_and_upload_from_message(self, storage, payload, expected_data, payload_type):
def test_download_and_upload_from_message(self, storage, payload, payload_type):
storage.clear_bucket()
upload_data = expected_data if payload_type != "target_file_dict" else expected_data["file_1"]
storage.put_object("test/file.target.json.gz", gzip.compress(json.dumps(upload_data).encode()))
result = {"process_result": "success"}
storage_data = {**payload, "data": result}
packed_data = gzip.compress(json.dumps(storage_data).encode())
data = download_data_as_specified_in_message(storage, payload)
storage.put_object("test/file.target.json.gz", packed_data)
assert data == expected_data
upload_data_as_specified_in_message(storage, payload, expected_data)
_ = download_data_bytes_as_specified_in_message(storage, payload)
upload_data_as_specified_in_message(storage, payload, result)
data = json.loads(gzip.decompress(storage.get_object("test/file.response.json.gz")).decode())
assert data == {**payload, "data": expected_data}
assert data == storage_data

View File

@ -1,197 +0,0 @@
import gzip
import json
from pathlib import Path
import pytest
from deepdiff import DeepDiff
from pyinfra.storage.proto_data_loader import ProtoDataLoader
enum = 1
@pytest.fixture
def test_data_dir():
return Path(__file__).parents[1] / "data"
@pytest.fixture
def document_data(request, test_data_dir) -> (str, bytes, dict | list):
doc_type = request.param
# Search for relevant doc_type file pairs - there should be one proto and one json file per document type
input_file_path = next(test_data_dir.glob(f"*.{doc_type}.proto.gz"), None)
target_file_path = next(test_data_dir.glob(f"*.{doc_type}.json.gz"), None)
input_data = input_file_path.read_bytes()
target_data = json.loads(gzip.decompress(target_file_path.read_bytes()))
return input_file_path, input_data, target_data
@pytest.fixture
def proto_data_loader():
return ProtoDataLoader()
@pytest.fixture
def should_match():
return [
"a.DOCUMENT_STRUCTURE.proto.gz",
"a.DOCUMENT_TEXT.proto.gz",
"a.DOCUMENT_PAGES.proto.gz",
"a.DOCUMENT_POSITION.proto.gz",
"b.DOCUMENT_STRUCTURE.proto",
"b.DOCUMENT_TEXT.proto",
"b.DOCUMENT_PAGES.proto",
"b.DOCUMENT_POSITION.proto",
"c.STRUCTURE.proto.gz",
"c.TEXT.proto.gz",
"c.PAGES.proto.gz",
"c.POSITION.proto.gz",
]
@pytest.mark.xfail(
reason="FIXME: The test is not stable, but has to work before we can deploy the code! Right now, we don't have parity between the proto and the json data."
)
# As DOCUMENT_POSITION is a very large file, the test takes forever. If you want to test it, add "DOCUMENT_POSITION" to the list below. - Added per default
@pytest.mark.parametrize("document_data", ["DOCUMENT_STRUCTURE", "DOCUMENT_TEXT", "DOCUMENT_PAGES", "DOCUMENT_POSITION"], indirect=True)
def test_proto_data_loader_end2end(document_data, proto_data_loader):
file_path, data, target = document_data
data = gzip.decompress(data)
loaded_data = proto_data_loader(file_path, data)
loaded_data_str = json.dumps(loaded_data, sort_keys=True)
target_str = json.dumps(target, sort_keys=True)
# If you want to look at the files in more detail uncomment code below
# global enum
# with open(f"input-{enum}.json", "w") as f:
# json.dump(target, f, sort_keys=True, indent=4)
# with open(f"output-{enum}.json", "w") as f:
# json.dump(loaded_data, f, sort_keys=True, indent=4)
# enum += 1
diff = DeepDiff(loaded_data_str, target_str, ignore_order=True)
# FIXME: remove this block when the test is stable
# if diff:
# with open(f"diff_test.json", "w") as f:
# f.write(diff.to_json(indent=4))
assert not diff
def test_proto_data_loader_unknown_document_type(proto_data_loader):
assert not proto_data_loader("unknown_document_type.proto", b"")
def test_proto_data_loader_file_name_matching(proto_data_loader, should_match):
for file_name in should_match:
assert proto_data_loader._match(file_name) is not None
@pytest.mark.parametrize("document_data", ["DOCUMENT_PAGES"], indirect=True)
def test_document_page_types(document_data, proto_data_loader):
# types from document reader
# number: int
# height: int
# width: int
# rotation: int
file_path, data, _ = document_data
data = gzip.decompress(data)
loaded_data = proto_data_loader(file_path, data)
assert isinstance(loaded_data, list)
assert all(isinstance(entry, dict) for entry in loaded_data)
# since all values need to be int anyway we can summarize it
assert all(all(isinstance(value, int) for value in entry.values()) for entry in loaded_data)
@pytest.mark.parametrize("document_data", ["DOCUMENT_POSITION"], indirect=True)
def test_document_position_data_types(document_data, proto_data_loader):
# types from document reader
# id: int
# stringIdxToPositionIdx: list[int]
# positions: list[list[float]]
file_path, data, _ = document_data
data = gzip.decompress(data)
loaded_data = proto_data_loader(file_path, data)
assert isinstance(loaded_data, list)
assert all(isinstance(entry, dict) for entry in loaded_data)
for entry in loaded_data:
assert isinstance(entry["id"], int)
assert isinstance(entry["stringIdxToPositionIdx"], list)
assert isinstance(entry["positions"], list)
assert all(isinstance(position, list) for position in entry["positions"])
assert all(all(isinstance(coordinate, float) for coordinate in position) for position in entry["positions"])
@pytest.mark.parametrize("document_data", ["DOCUMENT_STRUCTURE"], indirect=True)
def test_document_structure_types(document_data, proto_data_loader):
# types from document reader for DocumentStructure
# root: dict
# types from document reader for EntryData
# type: str
# tree_id: list[int]
# atomic_block_ids: list[int]
# page_numbers: list[int]
# properties: dict[str, str]
# children: list[dict]
file_path, data, _ = document_data
data = gzip.decompress(data)
loaded_data = proto_data_loader(file_path, data)
assert isinstance(loaded_data, dict)
assert isinstance(loaded_data["root"], dict)
assert isinstance(loaded_data["root"]["type"], str)
assert isinstance(loaded_data["root"]["treeId"], list)
assert isinstance(loaded_data["root"]["atomicBlockIds"], list)
assert isinstance(loaded_data["root"]["pageNumbers"], list)
assert isinstance(loaded_data["root"]["children"], list)
assert all(isinstance(value, int) for value in loaded_data["root"]["treeId"])
assert all(isinstance(value, int) for value in loaded_data["root"]["atomicBlockIds"])
assert all(isinstance(value, int) for value in loaded_data["root"]["pageNumbers"])
assert all(isinstance(value, dict) for value in loaded_data["root"]["properties"].values())
assert all(
all(isinstance(value, dict) for value in entry.values()) for entry in loaded_data["root"]["properties"].values()
)
assert all(isinstance(value, dict) for value in loaded_data["root"]["children"])
@pytest.mark.parametrize("document_data", ["DOCUMENT_TEXT"], indirect=True)
def test_document_text_data_types(document_data, proto_data_loader):
# types from document reader
# id: int
# page: int
# search_text: str
# number_on_page: int
# start: int
# end: int
# lineBreaks: list[int]
file_path, data, _ = document_data
data = gzip.decompress(data)
loaded_data = proto_data_loader(file_path, data)
assert isinstance(loaded_data, list)
assert all(isinstance(entry, dict) for entry in loaded_data)
for entry in loaded_data:
assert isinstance(entry["id"], int)
assert isinstance(entry["page"], int)
assert isinstance(entry["searchText"], str)
assert isinstance(entry["numberOnPage"], int)
assert isinstance(entry["start"], int)
assert isinstance(entry["end"], int)
assert all(isinstance(value, int) for value in entry["lineBreaks"])

View File

@ -0,0 +1,83 @@
import json
import pytest
from unittest.mock import patch
from pyinfra.storage.utils import (
download_data_bytes_as_specified_in_message,
upload_data_as_specified_in_message,
DownloadedData,
)
from pyinfra.storage.storages.storage import Storage
@pytest.fixture
def mock_storage():
with patch("pyinfra.storage.utils.Storage") as MockStorage:
yield MockStorage()
@pytest.fixture(
params=[
{
"raw_payload": {
"tenantId": "tenant1",
"dossierId": "dossier1",
"fileId": "file1",
"targetFileExtension": "txt",
"responseFileExtension": "json",
},
"expected_result": {
"data": b'{"key": "value"}',
"file_path": "tenant1/dossier1/file1.txt"
}
},
{
"raw_payload": {
"targetFilePath": "some/path/to/file.txt.gz",
"responseFilePath": "some/path/to/file.json"
},
"expected_result": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file.txt.gz"
}
},
{
"raw_payload": {
"targetFilePath": {
"file1": "some/path/to/file1.txt.gz",
"file2": "some/path/to/file2.txt.gz"
},
"responseFilePath": "some/path/to/file.json"
},
"expected_result": {
"file1": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file1.txt.gz"
},
"file2": {
"data": b'{"key": "value"}',
"file_path": "some/path/to/file2.txt.gz"
}
}
},
]
)
def payload_and_expected_result(request):
return request.param
def test_download_data_bytes_as_specified_in_message(mock_storage, payload_and_expected_result):
raw_payload = payload_and_expected_result["raw_payload"]
expected_result = payload_and_expected_result["expected_result"]
mock_storage.get_object.return_value = b'{"key": "value"}'
result = download_data_bytes_as_specified_in_message(mock_storage, raw_payload)
assert isinstance(result, dict)
assert result == expected_result
mock_storage.get_object.assert_called()
def test_upload_data_as_specified_in_message(mock_storage, payload_and_expected_result):
raw_payload = payload_and_expected_result["raw_payload"]
data = {"key": "value"}
upload_data_as_specified_in_message(mock_storage, raw_payload, data)
mock_storage.put_object.assert_called_once()