diff --git a/pyinfra/server/server.py b/pyinfra/server/server.py index 1fe9a54..b860964 100644 --- a/pyinfra/server/server.py +++ b/pyinfra/server/server.py @@ -24,7 +24,7 @@ def make_processor_fn(operation, buffer_size, batched): return operation -class OnDemandProcessor(abc.ABC): +class OnDemandProcessor: def __init__(self, processor_fn): self.execution_queue = chain([]) self.processor_fn = processor_fn @@ -41,22 +41,12 @@ class OnDemandProcessor(abc.ABC): return Nothing -class OnDemandProcessorAdapter(abc.ABC): - @abc.abstractmethod - def submit(self, *args, **kwargs) -> None: - raise NotImplementedError +class RestOndDemandProcessorAdapter(OnDemandProcessor): + def __init__(self, processor_fn): + super(RestOndDemandProcessorAdapter, self).__init__(processor_fn=processor_fn) - def compute_next(self) -> Union[Nothing, Any]: - result = self.processor.compute_next() - return result - - -class RestProcessorAdapter(OnDemandProcessorAdapter): - def __init__(self, processor: OnDemandProcessor): - self.processor = processor - - def submit(self, request) -> None: - self.processor.submit(request.json, final=request.method == "POST") + def submit(self, request, **kwargs) -> None: + super(RestOndDemandProcessorAdapter, self).submit(request.json, final=request.method == "POST") class RestStreamer: @@ -93,7 +83,7 @@ def valid(result): def set_up_processing_server(process_fn): app = Flask(__name__) - streamer = RestStreamer(RestProcessorAdapter(OnDemandProcessor(process_fn))) + streamer = RestStreamer(RestOndDemandProcessorAdapter(process_fn)) @app.route("/ready", methods=["GET"]) def ready():