diff --git a/pyinfra/default_objects.py b/pyinfra/default_objects.py index 3281a86..6048829 100644 --- a/pyinfra/default_objects.py +++ b/pyinfra/default_objects.py @@ -88,7 +88,7 @@ class Callback: raise AnalysisFailure from err def __call__(self, body: dict): - operation = body.get("operations", "submit") + operation = body.get("operation", "submit") endpoint = self.__make_endpoint(operation) pipeline = self.__get_pipeline(endpoint) diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 8f6e4ef..ec001c3 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -21,6 +21,12 @@ def __set_up_processing_server(queued_stream_function: QueuedStreamFunction): resp.status_code = 200 return resp + @app.route("/health", methods=["GET"]) + def healthy(): + resp = jsonify("OK") + resp.status_code = 200 + return resp + @app.route("/submit", methods=["POST", "PATCH"]) def submit(): return processor.push(request) diff --git a/pyinfra/visitor.py b/pyinfra/visitor.py index 98857f3..61ffbd1 100644 --- a/pyinfra/visitor.py +++ b/pyinfra/visitor.py @@ -6,7 +6,7 @@ import logging import time from collections import deque from operator import itemgetter -from typing import Callable, Iterable +from typing import Callable from funcy import omit from more_itertools import peekable @@ -105,11 +105,6 @@ class IdentifierDispatchCallback(DispatchCallback): return identifier != self.identifier - # def data_is_non_empty(self, data): - # - # if isinstance(data, str): - # self.put_object(data, metadata) - def __call__(self, metadata): return self.has_new_identifier(metadata) @@ -200,12 +195,17 @@ class QueueVisitor: return {"data": data, "metadata": {}} try: - data = json.loads(data.decode()) - except json.JSONDecodeError: # case 1 fallback - return wrap(data.decode()) + data = data.decode() + try: + data = json.loads(data) + except json.JSONDecodeError: # case 1 fallback + return wrap(data) + except Exception: + return wrap(data) if not isinstance(data, dict): # case 1 return wrap(string_to_bytes(data)) + else: # case 2 validate(data) data["data"] = string_to_bytes(data["data"]) diff --git a/scripts/mock_client.py b/scripts/mock_client.py index 0a97e03..e8ad0fa 100644 --- a/scripts/mock_client.py +++ b/scripts/mock_client.py @@ -54,7 +54,7 @@ def build_message_bodies(analyse_container_type, bucket_name): return message_dict storage = get_s3_storage() - for bucket_name, pdf_name in storage.get_all_object_names(bucket_name): + for pdf_name in storage.get_all_object_names(bucket_name): if "pdf" not in pdf_name: continue file_id = pdf_name.split(".")[0] diff --git a/test/integration_tests/serve_test.py b/test/integration_tests/serve_test.py index 246e18c..4c79cc1 100644 --- a/test/integration_tests/serve_test.py +++ b/test/integration_tests/serve_test.py @@ -153,3 +153,5 @@ def test_serving( uploaded_files = starmap(storage.get_object, zip(repeat(bucket_name), names_of_uploaded_files)) outputs = sorted(chain(*map(decode, uploaded_files)), key=itemgetter(0)) assert outputs == targets + + storage.clear_bucket(bucket_name)