Adjust log levels to reduce log clutter
Also updates readme and adds pytest execution to CI script.
This commit is contained in:
parent
c09476cfae
commit
72547201f3
@ -5,4 +5,4 @@ include:
|
||||
|
||||
run-tests:
|
||||
script:
|
||||
- echo "skipping tests"
|
||||
- pytest .
|
||||
|
||||
66
README.md
66
README.md
@ -45,6 +45,19 @@ A configuration is located in `/config.yaml`. All relevant variables can be conf
|
||||
|
||||
### Expected AMQP input message:
|
||||
|
||||
Either use the legacy format with dossierId and fileId as strings or the new format where absolute paths are used.
|
||||
A tenant ID can be optionally provided in the message header (key: "X-TENANT-ID")
|
||||
|
||||
|
||||
```json
|
||||
{
|
||||
"targetFilePath": "",
|
||||
"responseFilePath": ""
|
||||
}
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```json
|
||||
{
|
||||
"dossierId": "",
|
||||
@ -58,6 +71,16 @@ Optionally, the input message can contain a field with the key `"operations"`.
|
||||
|
||||
### AMQP output message:
|
||||
|
||||
|
||||
```json
|
||||
{
|
||||
"targetFilePath": "",
|
||||
"responseFilePath": ""
|
||||
}
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```json
|
||||
{
|
||||
"dossierId": "",
|
||||
@ -69,31 +92,37 @@ Optionally, the input message can contain a field with the key `"operations"`.
|
||||
|
||||
### Setup
|
||||
|
||||
Install project dependencies
|
||||
Add the respective version of the pyinfra package to your pyproject.toml file. Make sure to add our gitlab registry as a source.
|
||||
For now, all internal packages used by pyinfra also have to be added to the pyproject.toml file.
|
||||
Execute `poetry lock` and `poetry install` to install the packages.
|
||||
|
||||
```bash
|
||||
make poetry
|
||||
```
|
||||
```toml
|
||||
[tool.poetry.dependencies]
|
||||
pyinfra = { version = "1.6.0", source = "gitlab-research" }
|
||||
kn-utils = { version = "0.1.4", source = "gitlab-research" }
|
||||
|
||||
You don't have to install it independently in the project repo, just `import pyinfra` in any `.py`-file
|
||||
|
||||
or install form another project
|
||||
|
||||
```bash
|
||||
poetry add git+ssh://git@git.iqser.com:2222/rr/pyinfra.git#TAG-NUMBER
|
||||
[[tool.poetry.source]]
|
||||
name = "gitlab-research"
|
||||
url = "https://gitlab.knecon.com/api/v4/groups/19/-/packages/pypi/simple"
|
||||
priority = "explicit"
|
||||
```
|
||||
|
||||
### API
|
||||
|
||||
```python
|
||||
from pyinfra.config import get_config
|
||||
from pyinfra import config
|
||||
from pyinfra.payload_processing.processor import make_payload_processor
|
||||
from pyinfra.queue.queue_manager import QueueManager
|
||||
|
||||
queue_manager = QueueManager(get_config())
|
||||
queue_manager.start_consuming(make_payload_processor(data_processor))
|
||||
pyinfra_config = config.get_config()
|
||||
|
||||
process_payload = make_payload_processor(process_data, config=pyinfra_config)
|
||||
|
||||
queue_manager = QueueManager(pyinfra_config)
|
||||
queue_manager.start_consuming(process_payload)
|
||||
```
|
||||
The data_processor should expect a dict or bytes (pdf) as input and should return a list of results.
|
||||
|
||||
`process_data` should expect a dict (json) or bytes (pdf) as input and should return a list of results.
|
||||
|
||||
## Scripts
|
||||
|
||||
@ -111,11 +140,12 @@ $ python scripts/start_pyinfra.py
|
||||
|
||||
**Shell 3**: Upload dummy content on storage and publish message
|
||||
```bash
|
||||
$ python scripts/mock_process_request.py
|
||||
$ python scripts/send_request.py
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
The tests take a bit longer than you are probably used to, because among other things the required startup times are
|
||||
quite high. The test runtime can be accelerated by setting 'autouse' to 'False'. In that case, run 'docker-compose up'
|
||||
in the tests dir manually before running the tests.
|
||||
Running all tests take a bit longer than you are probably used to, because among other things the required startup times are
|
||||
quite high for docker-compose dependent tests. This is why the tests are split into two parts. The first part contains all
|
||||
tests that do not require docker-compose and the second part contains all tests that require docker-compose.
|
||||
Per default, only the first part is executed, but when releasing a new version, all tests should be executed.
|
||||
@ -1,4 +1,4 @@
|
||||
from kn_utils.logging import getLogger
|
||||
from kn_utils.logging import logger
|
||||
from dataclasses import asdict
|
||||
from typing import Callable, List
|
||||
|
||||
@ -14,9 +14,6 @@ from pyinfra.payload_processing.payload import (
|
||||
from pyinfra.storage.storage import make_downloader, make_uploader
|
||||
from pyinfra.storage.storage_provider import StorageProvider
|
||||
|
||||
logger = getLogger()
|
||||
logger.setLevel(get_config().logging_level_root)
|
||||
|
||||
|
||||
class PayloadProcessor:
|
||||
def __init__(
|
||||
|
||||
@ -119,7 +119,6 @@ class QueueManager:
|
||||
"""
|
||||
callback = self._create_queue_callback(process_payload)
|
||||
self._set_consumer_token(None)
|
||||
logger.info("Consuming from queue ...")
|
||||
|
||||
try:
|
||||
self._open_channel()
|
||||
@ -165,7 +164,7 @@ class QueueManager:
|
||||
def acknowledge_message_and_publish_response(frame, headers, response_body):
|
||||
response_properties = pika.BasicProperties(headers=headers) if headers else None
|
||||
self._channel.basic_publish("", self._output_queue, json.dumps(response_body).encode(), response_properties)
|
||||
logger.info(f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}.")
|
||||
logger.debug(f"Result published, acknowledging incoming message with delivery_tag {frame.delivery_tag}.")
|
||||
self._channel.basic_ack(frame.delivery_tag)
|
||||
|
||||
def callback(_channel, frame, properties, body):
|
||||
@ -183,7 +182,7 @@ class QueueManager:
|
||||
|
||||
try:
|
||||
logger.debug(f"Processing {frame}, {properties}, {body}")
|
||||
filtered_message_headers = safe_project(properties.headers, ["X-TENANT-ID"]) # TODO: parametrize key?
|
||||
filtered_message_headers = safe_project(properties.headers, ["X-TENANT-ID"])
|
||||
message_body = {**json.loads(body), **filtered_message_headers}
|
||||
|
||||
processing_result = process_message_body_and_await_result(message_body)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user