diff --git a/README.md b/README.md index 192a9b55..42e338a3 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,17 @@ [![Python unit CI][ff_python_unit_img]][ff_python_unit_link] [![Python lint CI][ff_python_lint_img]][ff_python_lint_link] [![Python conformace CI][ff_python_conformance_img]][ff_python_conformance_link] An open source FaaS (Function as a service) framework for writing portable -Python functions -- brought to you by the Google Cloud Functions team. +Python functions. The Functions Framework lets you write lightweight functions that run in many different environments, including: +* [OpenFunction](https://github.com/OpenFunction/OpenFunction) +* [Knative](https://github.com/knative/)-based environments +* [Dapr](https://dapr.io/)-based environments * [Google Cloud Functions](https://cloud.google.com/functions/) * Your local development machine * [Cloud Run and Cloud Run for Anthos](https://cloud.google.com/run/) -* [Knative](https://github.com/knative/)-based environments The framework allows you to go from: @@ -292,6 +294,22 @@ https://cloud.google.com/functions/docs/tutorials/pubsub#functions_helloworld_pu ## Run your function on serverless platforms +### Container environments based on Knative + +The Functions Framework is designed to be compatible with Knative environments. Build and deploy your container to a Knative environment. + +### OpenFunction + +![OpenFunction Platform Overview](https://openfunction.dev/openfunction-0.5-architecture.png) + +Besides Knative function support, one notable feature of OpenFunction is embracing Dapr system, so far Dapr pub/sub and bindings have been support. + +Dapr bindings allows you to trigger your applications or services with events coming in from external systems, or interface with external systems. OpenFunction [0.6.0 release](https://openfunction.dev/blog/2022/03/25/announcing-openfunction-0.6.0-faas-observability-http-trigger-and-more/) adds Dapr output bindings to its synchronous functions which enables HTTP triggers for asynchronous functions. For example, synchronous functions backed by the Knative runtime can now interact with middlewares defined by Dapr output binding or pub/sub, and an asynchronous function will be triggered by the events sent from the synchronous function. + +Asynchronous function introduces Dapr pub/sub to provide a platform-agnostic API to send and receive messages. A typical use case is that you can leverage synchronous functions to receive an event in plain JSON or Cloud Events format, and then send the received event to a Dapr output binding or pub/sub component, most likely a message queue (e.g. Kafka, NATS Streaming, GCP PubSub, MQTT). Finally, the asynchronous function could be triggered from the message queue. + +More details would be brought up to you in some quickstart samples, stay tuned. + ### Google Cloud Functions This Functions Framework is based on the [Python Runtime on Google Cloud Functions](https://cloud.google.com/functions/docs/concepts/python-runtime). diff --git a/docs/async-server.puml b/docs/async-server.puml new file mode 100644 index 00000000..0d1beff4 --- /dev/null +++ b/docs/async-server.puml @@ -0,0 +1,59 @@ +@startuml Async Server + +box Function Process in Local Environment or Container +control ENTRYPOINT +participant "~__main__" as Main +participant AsyncServer +participant DaprServer +participant gRPCServer [ + Web Server + ---- + ""gprc.server"" +] +end box + +entity "Dapr Sidecar " as DaprSidecar + +== OpenFunction Serving == + +ENTRYPOINT -> Main ** : execute +note over ENTRYPOINT, Main: Pass through __CLI arguments__ and \ncontainer __environment variables__ + +Main -> Main : load user function file +note left: ""function (ctx, data) {}"" + +Main -> AsyncServer ** : create +note over Main, AsyncServer: Hand over __user function__ and __context__ + +AsyncServer -> DaprServer ** : ""new"" +note over AsyncServer, DaprServer: Extract __port__ from __context__ and pass down + +DaprServer -> gRPCServer ** : ""new"" +||| +DaprServer --> DaprSidecar : Waiting till Dapr sidecar started +... +AsyncServer -> DaprServer : register __user function__ as handler \nfor each of __inputs__ in __context__ +DaprServer -> gRPCServer : add routes for Dapr style \nsubscriptions and input bindings + +... + +== OpenFunction Triggering == + +DaprSidecar <-- : sub / input data + +DaprSidecar -> gRPCServer ++ : Dapr request with "data" + +gRPCServer -> gRPCServer ++ : invoke user function + +alt + gRPCServer -> DaprSidecar ++ : publish data or invoke output binding + DaprSidecar --> gRPCServer -- : execution result +end + +return + +return server app response + +... + +@enduml \ No newline at end of file diff --git a/docs/http-binding.puml b/docs/http-binding.puml new file mode 100644 index 00000000..20b12f18 --- /dev/null +++ b/docs/http-binding.puml @@ -0,0 +1,73 @@ +@startuml HTTP Binding + +box Function Process in Local Environment or Container +control ENTRYPOINT +participant "~__main__" as Main +participant HTTPServer +participant Server [ + Web Server + ---- + ""Flask/Gunicorn"" +] +participant Middleware +participant "User Function" as UserFunction +participant DaprClient +end box + +entity "Dapr Sidecar " as DaprSidecar + +== OpenFunction Serving == + +ENTRYPOINT -> Main ** : execute +note over ENTRYPOINT, Main: Pass through __CLI arguments__ and \ncontainer __environment variables__ + +Main -> Main : load user fnction file +note left: ""function (request) {}"" + +Main -> HTTPServer ** : create +note over Main, HTTPServer: Hand over __user function__, __function type__ \nand __context__ parsed from env variables + +HTTPServer -> Server ** : new +note over Server: Depend on debug mode + +HTTPServer -> Middleware ** : new +HTTPServer -> Server : use Middleware +note over HTTPServer, Server: Pass context to middleware +||| +HTTPServer -> Server : use others middlewares +||| +HTTPServer -> UserFunction ** : wrap user function +note over HTTPServer, UserFunction: Register as HTTP or CloudEvent Function +HTTPServer -> Server : bind wrapper to "/*" route + +... + +== OpenFunction Invocation == + +[-> Server ++ : HTTP request to "/" + +Server -> UserFunction ++ : execute user function +UserFunction --> Server -- : return execution result "data" + +alt ""runtime"" = ""knative"" and ""outputs"" is not empty + Server -> Middleware ++ : invoke Middleware + + Middleware -> DaprClient ** : new + + loop each OpenFunction Output + Middleware -> DaprClient ++ : send "data" + + DaprClient -> DaprSidecar ++ : invoke binding or publication with "data" + DaprSidecar --> DaprClient -- : return result + + DaprClient --> Middleware -- : forward result + end + + Middleware --> Server -- : return "data" as response +end + +[<- Server -- : send response + +... + +@enduml diff --git a/setup.py b/setup.py index 18727fd6..d25934bb 100644 --- a/setup.py +++ b/setup.py @@ -25,13 +25,13 @@ setup( name="functions-framework", - version="3.0.0", - description="An open source FaaS (Function as a service) framework for writing portable Python functions -- brought to you by the Google Cloud Functions team.", + version="3.1.0", + description="An open source FaaS (Function as a service) framework for writing portable Python functions.", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/googlecloudplatform/functions-framework-python", - author="Google LLC", - author_email="googleapis-packages@google.com", + url="https://github.com/OpenFunction/functions-framework-python", + author="OpenFunction", + author_email="openfunction@kubesphere.io", classifiers=[ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", @@ -55,6 +55,7 @@ "watchdog>=1.0.0,<2.0.0", "gunicorn>=19.2.0,<21.0; platform_system!='Windows'", "cloudevents>=1.2.0,<2.0.0", + "dapr>=1.6.0", ], entry_points={ "console_scripts": [ diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index 46c8882b..a8b18e19 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -34,6 +34,8 @@ MissingSourceException, ) from google.cloud.functions.context import Context +from openfunction.dapr_output_middleware import dapr_output_middleware +from openfunction.async_server import AsyncApp MAX_CONTENT_LENGTH = 10 * 1024 * 1024 @@ -94,6 +96,11 @@ def setup_logging(): logging.getLogger().addHandler(warn_handler) +def setup_logging_level(debug): + if debug: + logging.getLogger().setLevel(logging.DEBUG) + + def _http_view_func_wrapper(function, request): def view_func(path): return function(request._get_current_object()) @@ -175,7 +182,7 @@ def view_func(path): return view_func -def _configure_app(app, function, signature_type): +def _configure_app(app, function, signature_type, func_context): # Mount the function at the root. Support GCF's default path behavior # Modify the url_map and view_functions directly here instead of using # add_url_rule in order to create endpoints that route all methods @@ -189,6 +196,7 @@ def _configure_app(app, function, signature_type): app.view_functions["run"] = _http_view_func_wrapper(function, flask.request) app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") app.after_request(read_request) + app.after_request(dapr_output_middleware(func_context)) elif signature_type == _function_registry.BACKGROUNDEVENT_SIGNATURE_TYPE: app.url_map.add( werkzeug.routing.Rule( @@ -241,8 +249,31 @@ def crash_handler(e): """ return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} +def create_async_app(target=None, source=None, func_context=None, debug=False): + target = _function_registry.get_function_target(target) + source = _function_registry.get_function_source(source) + + if not os.path.exists(source): + raise MissingSourceException( + "File {source} that is expected to define function doesn't exist".format( + source=source + ) + ) + + source_module, spec = _function_registry.load_function_module(source) + spec.loader.exec_module(source_module) + + function = _function_registry.get_user_function(source, source_module, target) + + setup_logging_level(debug) + + async_app = AsyncApp(func_context) + async_app.bind(function) + + return async_app.app + -def create_app(target=None, source=None, signature_type=None): +def create_app(target=None, source=None, signature_type=None, func_context=None, debug=False): target = _function_registry.get_function_target(target) source = _function_registry.get_function_source(source) @@ -282,6 +313,8 @@ def handle_none(rv): sys.stdout = _LoggingHandler("INFO", sys.stderr) sys.stderr = _LoggingHandler("ERROR", sys.stderr) setup_logging() + + setup_logging_level(debug) # Execute the module, within the application context with _app.app_context(): @@ -291,7 +324,7 @@ def handle_none(rv): signature_type = _function_registry.get_func_signature_type(target, signature_type) function = _function_registry.get_user_function(source, source_module, target) - _configure_app(_app, function, signature_type) + _configure_app(_app, function, signature_type, func_context) return _app @@ -302,7 +335,7 @@ class LazyWSGIApp: at import-time """ - def __init__(self, target=None, source=None, signature_type=None): + def __init__(self, target=None, source=None, signature_type=None, func_context=None, debug=False): # Support HTTP frameworks which support WSGI callables. # Note: this ability is currently broken in Gunicorn 20.0, and # environment variables should be used for configuration instead: @@ -310,13 +343,15 @@ def __init__(self, target=None, source=None, signature_type=None): self.target = target self.source = source self.signature_type = signature_type + self.func_context = func_context + self.debug = debug # Placeholder for the app which will be initialized on first call self.app = None def __call__(self, *args, **kwargs): if not self.app: - self.app = create_app(self.target, self.source, self.signature_type) + self.app = create_app(self.target, self.source, self.signature_type, self.func_context, self.debug) return self.app(*args, **kwargs) diff --git a/src/functions_framework/_cli.py b/src/functions_framework/_cli.py index 663ea50f..9699a5c2 100644 --- a/src/functions_framework/_cli.py +++ b/src/functions_framework/_cli.py @@ -16,9 +16,9 @@ import click -from functions_framework import create_app +from functions_framework import create_app, create_async_app from functions_framework._http import create_server - +from functions_framework import _function_registry @click.command() @click.option("--target", envvar="FUNCTION_TARGET", type=click.STRING, required=True) @@ -34,10 +34,24 @@ @click.option("--debug", envvar="DEBUG", is_flag=True) @click.option("--dry-run", envvar="DRY_RUN", is_flag=True) def _cli(target, source, signature_type, host, port, debug, dry_run): - app = create_app(target, source, signature_type) - if dry_run: - click.echo("Function: {}".format(target)) - click.echo("URL: http://{}:{}/".format(host, port)) - click.echo("Dry run successful, shutting down.") + context = _function_registry.get_openfunction_context(None) + + # determine if async or knative + if context and context.is_runtime_async(): + app = create_async_app(target, source, context, debug) + if dry_run: + run_dry(target, host, port) + else: + app.run(context.port) else: - create_server(app, debug).run(host, port) + app = create_app(target, source, signature_type, context, debug) + if dry_run: + run_dry(target, host, port) + else: + create_server(app, debug).run(host, port) + + +def run_dry(target, host, port): + click.echo("Function: {}".format(target)) + click.echo("URL: http://{}:{}/".format(host, port)) + click.echo("Dry run successful, shutting down.") \ No newline at end of file diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index cedb7e15..fc3a39f6 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -15,6 +15,7 @@ import os import sys import types +import json from functions_framework.exceptions import ( InvalidConfigurationException, @@ -22,9 +23,12 @@ MissingTargetException, ) +from openfunction.function_context import FunctionContext + DEFAULT_SOURCE = os.path.realpath("./main.py") FUNCTION_SIGNATURE_TYPE = "FUNCTION_SIGNATURE_TYPE" +FUNC_CONTEXT = "FUNC_CONTEXT" HTTP_SIGNATURE_TYPE = "http" CLOUDEVENT_SIGNATURE_TYPE = "cloudevent" BACKGROUNDEVENT_SIGNATURE_TYPE = "event" @@ -115,3 +119,17 @@ def get_func_signature_type(func_name: str, signature_type: str) -> str: if os.environ.get("ENTRY_POINT"): os.environ["FUNCTION_TRIGGER_TYPE"] = sig_type return sig_type + + +def get_openfunction_context(func_context: str) -> FunctionContext: + """Get openfunction context""" + context_str = ( + func_context + or os.environ.get(FUNC_CONTEXT) + ) + + if context_str: + context = FunctionContext.from_json(json.loads(context_str)) + return context + + return None diff --git a/src/openfunction/__init__.py b/src/openfunction/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/openfunction/async_server.py b/src/openfunction/async_server.py new file mode 100644 index 00000000..c94916ad --- /dev/null +++ b/src/openfunction/async_server.py @@ -0,0 +1,32 @@ +from dapr.ext.grpc import App, BindingRequest +from cloudevents.sdk.event import v1 +from openfunction.function_context import OPEN_FUNC_BINDING, OPEN_FUNC_TOPIC +from openfunction.function_runtime import OpenFunctionRuntime + +class AsyncApp(object): + """Init async server with dapr server.""" + + def __init__(self, func_context): + """Inits async server. + Args: + func_context: OpenFunction context + """ + self.ctx = OpenFunctionRuntime.parse(func_context) + self.app = App() + + + def bind(self, function): + """Bind user function with input binding/subscription. + Args: + function: user function + """ + for input in self.ctx.inputs.values(): + type = input.get_type() + if type == OPEN_FUNC_BINDING: + @self.app.binding(input.component_name) + def binding(request: BindingRequest): + function(self.ctx, request.data) + elif type == OPEN_FUNC_TOPIC: + @self.app.subscribe(pubsub_name=input.component_name, topic=input.uri, metadata=input.metadata) + def mytopic(event: v1.Event) -> None: + function(self.ctx, bytes(event.data)) \ No newline at end of file diff --git a/src/openfunction/dapr_output_middleware.py b/src/openfunction/dapr_output_middleware.py new file mode 100644 index 00000000..f7d59134 --- /dev/null +++ b/src/openfunction/dapr_output_middleware.py @@ -0,0 +1,19 @@ +import logging + +from openfunction.function_runtime import OpenFunctionRuntime + +def dapr_output_middleware(context): + """Flask middleware for output binding.""" + def dapr_output_middleware(response): + if not context or not context.outputs or not context.is_runtime_knative(): + return response + + runtime = OpenFunctionRuntime.parse(context) + resp = runtime.send(response.get_data(True)) + + for key, value in resp.items(): + logging.debug("Dapr result for %s: %s", key, value.text()) + + return response + + return dapr_output_middleware \ No newline at end of file diff --git a/src/openfunction/function_context.py b/src/openfunction/function_context.py new file mode 100644 index 00000000..7be2a9d0 --- /dev/null +++ b/src/openfunction/function_context.py @@ -0,0 +1,85 @@ +OPEN_FUNC_BINDING = "bindings" +OPEN_FUNC_TOPIC = "pubsub" + +KNATIVE_RUNTIME_TYPE = "knative" +ASYNC_RUNTIME_TYPE = "async" + + +class FunctionContext(object): + """OpenFunction's serving context.""" + + def __init__(self, name="", version="", runtime="", inputs=None, outputs=None, port=8080): + self.name = name + self.version = version + self.runtime = runtime + self.inputs = inputs + self.outputs = outputs + self.port = port + + def is_runtime_async(self): + return self.runtime.lower() == ASYNC_RUNTIME_TYPE + + def is_runtime_knative(self): + return self.runtime.lower() == KNATIVE_RUNTIME_TYPE + + @staticmethod + def from_json(json_dct): + name = json_dct.get('name') + version = json_dct.get('version') + runtime = json_dct.get('runtime') + inputs_list = json_dct.get('inputs') + outputs_list = json_dct.get('outputs') + + inputs = None + if inputs_list: + inputs = {} + for k, v in inputs_list.items(): + input = Component.from_json(v) + inputs[k] = input + + outputs = None + if outputs_list: + outputs = {} + for k, v in outputs_list.items(): + output = Component.from_json(v) + outputs[k] = output + + return FunctionContext(name, version, runtime, inputs, outputs) + + +class Component(object): + """Components for inputs and outputs.""" + + def __init__(self, uri="", componentName="", componentType="", metadata=None, operation=""): + self.uri = uri + self.component_name = componentName + self.component_type = componentType + self.metadata = metadata + self.operation = operation + + def get_type(self): + type_split = self.component_type.split(".") + if len(type_split) > 1: + t = type_split[0] + if t == OPEN_FUNC_BINDING or t == OPEN_FUNC_TOPIC: + return t + + return "" + + def __str__(self): + return "{uri: %s, component_name: %s, component_type: %s, operation: %s, metadata: %s}" % ( + self.uri, + self.component_name, + self.component_type, + self.operation, + self.metadata + ) + + @staticmethod + def from_json(json_dct): + uri = json_dct.get('uri', '') + component_name = json_dct.get('componentName', '') + metadata = json_dct.get('metadata') + component_type = json_dct.get('componentType', '') + operation = json_dct.get('operation', '') + return Component(uri, component_name, component_type, metadata, operation) \ No newline at end of file diff --git a/src/openfunction/function_runtime.py b/src/openfunction/function_runtime.py new file mode 100644 index 00000000..f09efcab --- /dev/null +++ b/src/openfunction/function_runtime.py @@ -0,0 +1,85 @@ +from abc import abstractmethod +import os + +from dapr.clients import DaprGrpcClient +from dapr.conf import settings + +from openfunction.function_context import FunctionContext + +DAPR_GRPC_PORT = "DAPR_GRPC_PORT" +OPEN_FUNC_BINDING = "bindings" +OPEN_FUNC_TOPIC = "pubsub" + + +class OpenFunctionRuntime(object): + """OpenFunction Runtime.""" + + def __init__(self, context=None): + """Inits OpenFunction Runtime. + Args: + context: OpenFunction context + """ + self.context = context + + def __getattribute__(self, item): + try: + target = object.__getattribute__(self, item) + return target + except AttributeError: + target = object.__getattribute__(self, "context") + return getattr(target, item) + + def set_dapr_grpc_port(self): + port = os.environ.get(DAPR_GRPC_PORT) + if port: + settings.DAPR_GRPC_PORT = port + + @staticmethod + def parse(context: FunctionContext): + return DaprRuntime(context) + + @abstractmethod + def send(self, data, output): + """send data to certain ouput binding or pubsub topic""" + + +class DaprRuntime(OpenFunctionRuntime): + """Dapr runtime derived from OpenFunctionRuntime.""" + + def __init__(self, context=None): + """Inits Dapr Runtime. + Args: + context: OpenFunction context + """ + super().__init__(context) + super().set_dapr_grpc_port() + + self.client = DaprGrpcClient() + + def send(self, data, output=None): + """Inits Dapr Runtime. + Args: + data: Bytes or str to send. + output: A string of designated output name. Only send this output if designated. + Returns: + A dict mapping keys to the corresponding dapr response. + """ + outputs = self.context.outputs + filtered_outputs = {} + responses = {} + + if output and output in outputs: + filtered_outputs[output] = outputs[output] + else: + filtered_outputs = outputs + + for key, value in filtered_outputs.items(): + type = value.get_type() + if type == OPEN_FUNC_BINDING: + resp = self.client.invoke_binding(value.component_name, value.operation, data, value.metadata) + elif type == OPEN_FUNC_TOPIC: + resp = self.client.publish_event(value.component_name, value.uri, data, value.metadata) + responses[key] = resp + + return responses + diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 00000000..3a13a4a1 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,184 @@ +import random +import pathlib +import json +import subprocess +import threading +import time +import os +import re +import pytest + +from paho.mqtt import client as mqtt_client + +from openfunction.function_context import FunctionContext +from openfunction.async_server import AsyncApp + +TEST_PAYLOAD = {"data": "hello world"} +APP_ID="async.dapr" +TEST_CONTEXT = { + "name": "test-context", + "version": "1.0.0", + "runtime": "Async", + "port": "8080", + "inputs": { + "cron": { + "uri": "cron_input", + "componentName": "binding-cron", + "componentType": "bindings.cron" + }, + "mqtt_binding": { + "uri": "of-async-default", + "componentName": "binding-mqtt", + "componentType": "bindings.mqtt" + }, + "mqtt_sub": { + "uri": "of-async-default-sub", + "componentName": "pubsub-mqtt", + "componentType": "pubsub.mqtt" + } + }, + "outputs": { + "cron": { + "uri": 'cron_output', + "operation": 'delete', + "componentName": 'binding-cron', + "componentType": 'bindings.cron', + }, + "localfs": { + "operation": "create", + "componentName": "binding-localfs", + "componentType": "bindings.localstorage", + "metadata": { + "fileName": "output-file.txt" + } + }, + "localfs-delete": { + "operation": "delete", + "componentName": "binding-localfs", + "componentType": "bindings.localstorage", + "metadata": { + "fileName": "output-file.txt" + } + }, + "mqtt_pub": { + "uri": 'of-async-default-pub', + "componentName": 'pubsub-mqtt', + "componentType": 'pubsub.mqtt', + } + } +} + +CLIENT_ID = f'of-async-mqtt-{random.randint(0, 1000)}' +BROKER = 'broker.emqx.io' +MQTT_PORT = 1883 + + +@pytest.fixture(scope="module", autouse=True) +def hook(request): + subprocess.Popen( + "dapr run -G 50001 -d ./tests/test_data/components/async -a {} -p {} --app-protocol grpc".format(APP_ID, TEST_CONTEXT["port"]), + shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + + yield request + + subprocess.Popen("dapr stop {}".format(APP_ID), shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + time.sleep(10) + +@pytest.fixture +def client(): + return AsyncApp(FunctionContext.from_json(TEST_CONTEXT)) + + +@pytest.fixture +def mqtt_test_client(): + client = mqtt_client.Client(CLIENT_ID) + client.connect(BROKER, MQTT_PORT) + return client + + +def test_cron(client): + def cron(client): + def user_function(context, data): + assert context.runtime == TEST_CONTEXT["runtime"] + assert context.inputs["cron"].uri == TEST_CONTEXT["inputs"]["cron"]["uri"] + context.send("", "cron") + client.app.stop() + + return + return user_function + + client.bind(cron(client)) + client.app.run(TEST_CONTEXT["port"]) + + +def test_mqtt_binding(client, mqtt_test_client): + def binding(client): + def user_function(context, data): + context.send(data, "localfs") + filename = TEST_CONTEXT["outputs"]["localfs"]["metadata"]["fileName"] + exist = os.path.exists(filename) + assert exist + + context.send(data, "localfs-delete") + client.app.stop() + + return + return user_function + + client.bind(binding(client)) + + def loop(): + client.app.run(TEST_CONTEXT["port"]) + + t = threading.Thread(target=loop, name='LoopThread') + t.start() + + time.sleep(10) + mqtt_test_client.publish("of-async-default", payload=json.dumps(TEST_PAYLOAD).encode('utf-8')) + + t.join() + + +def test_mqtt_subscribe(client, mqtt_test_client): + def binding(client): + def user_function(context, data): + output = 'mqtt_pub' + # subscribe from mqtt_pub + def on_message(client, userdata, msg): + assert msg.payload == json.dumps(TEST_PAYLOAD).encode('utf-8') + + print(msg.payload.decode('utf-8')) + + mqtt_test_client.subscribe(TEST_CONTEXT["outputs"]["mqtt_pub"]["uri"]) + mqtt_test_client.on_message = on_message + mqtt_test_client.loop_start() + + context.send(json.dumps(TEST_PAYLOAD), output) + + time.sleep(5) + mqtt_test_client.loop_stop() + + client.app.stop() + time.sleep(5) + + return + return user_function + + client.bind(binding(client)) + + def loop(): + client.app.run(TEST_CONTEXT["port"]) + + t = threading.Thread(target=loop, name='LoopThread') + t.start() + + time.sleep(10) + + formatted = re.sub(r'"', '\\"', json.dumps(TEST_PAYLOAD)) + subprocess.Popen("dapr publish -i {} -p {} -t {} -d '{}'".format( + APP_ID, TEST_CONTEXT["inputs"]["mqtt_sub"]["componentName"], + TEST_CONTEXT["inputs"]["mqtt_sub"]["uri"], + formatted + ), shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + + t.join() \ No newline at end of file diff --git a/tests/test_binding.py b/tests/test_binding.py new file mode 100644 index 00000000..0f9219b0 --- /dev/null +++ b/tests/test_binding.py @@ -0,0 +1,76 @@ +import pathlib +import os +import subprocess +import time + +import pytest + +from functions_framework import create_app +from openfunction.function_context import FunctionContext + +TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" +TEST_RESPONSE = "Hello world!" +FILENAME = "test-binding.txt" +APP_ID="http.dapr" + + +@pytest.fixture(scope="module", autouse=True) +def dapr(request): + subprocess.Popen("dapr run -G 50001 -d ./tests/test_data/components/http -a {}".format(APP_ID), + shell=True) + time.sleep(3) + + yield request + + subprocess.Popen("dapr stop {}".format(APP_ID), shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + time.sleep(10) + + +def create_func_context(param): + return { + "name": param["name"], + "version": "1.0.0", + "runtime": "Knative", + "outputs": { + "file": { + "componentName": "local", + "componentType": "bindings.localstorage", + "operation": param["operation"], + "metadata": { + "fileName": FILENAME + } + } + } + } + + +@pytest.fixture +def client(): + def return_client(param): + source = TEST_FUNCTIONS_DIR / "http_basic" / "main.py" + target = "hello" + + context = create_func_context(param) + + return create_app(target, source, "http", FunctionContext.from_json(context)).test_client() + + return return_client + + +test_data = [ + {"name": "Save data", "operation": "create", "listable": True}, + {"name": "Get data", "operation": "get", "listable": True}, + {"name": "Delete data", "operation": "delete", "listable": False}, +] + + +@pytest.mark.parametrize("test_data", test_data) +def test_http_binding(client, test_data): + resp = client(test_data).get("/") + + assert resp.status_code == 200 + assert TEST_RESPONSE == resp.get_data().decode() + + exist = os.path.exists(FILENAME) + + assert exist == test_data["listable"] \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index aa4a901e..8332ba74 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -36,67 +36,67 @@ def test_cli_no_arguments(): ( ["--target", "foo"], {}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( [], {"FUNCTION_TARGET": "foo"}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( ["--target", "foo", "--source", "/path/to/source.py"], {}, - [pretend.call("foo", "/path/to/source.py", "http")], + [pretend.call("foo", "/path/to/source.py", "http", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( [], {"FUNCTION_TARGET": "foo", "FUNCTION_SOURCE": "/path/to/source.py"}, - [pretend.call("foo", "/path/to/source.py", "http")], + [pretend.call("foo", "/path/to/source.py", "http", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( ["--target", "foo", "--signature-type", "event"], {}, - [pretend.call("foo", None, "event")], + [pretend.call("foo", None, "event", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( [], {"FUNCTION_TARGET": "foo", "FUNCTION_SIGNATURE_TYPE": "event"}, - [pretend.call("foo", None, "event")], + [pretend.call("foo", None, "event", None, False)], [pretend.call("0.0.0.0", 8080)], ), ( ["--target", "foo", "--dry-run"], {}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, False)], [], ), ( [], {"FUNCTION_TARGET": "foo", "DRY_RUN": "True"}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, False)], [], ), ( ["--target", "foo", "--host", "127.0.0.1"], {}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, False)], [pretend.call("127.0.0.1", 8080)], ), ( ["--target", "foo", "--debug"], {}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, True)], [pretend.call("0.0.0.0", 8080)], ), ( [], {"FUNCTION_TARGET": "foo", "DEBUG": "True"}, - [pretend.call("foo", None, "http")], + [pretend.call("foo", None, "http", None, True)], [pretend.call("0.0.0.0", 8080)], ), ], diff --git a/tests/test_cloud_event_functions.py b/tests/test_cloud_event_functions.py index 4ad8a527..a673c6e4 100644 --- a/tests/test_cloud_event_functions.py +++ b/tests/test_cloud_event_functions.py @@ -190,7 +190,7 @@ def test_unparsable_cloud_event(client): resp = client.post("/", headers={}, data="") assert resp.status_code == 400 - assert "MissingRequiredFields" in resp.data.decode() + assert "Bad Request" in resp.data.decode() @pytest.mark.parametrize("specversion", ["0.3", "1.0"]) diff --git a/tests/test_convert.py b/tests/test_convert.py index 9202f567..0d41d5ed 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -15,6 +15,7 @@ import pathlib import flask +import pretend import pytest from cloudevents.http import from_json, to_binary @@ -259,6 +260,13 @@ def test_firebase_db_event_to_cloud_event_missing_domain( ) +def test_marshal_background_event_data_bad_request(): + req = pretend.stub(headers={}, get_json=lambda: None) + + with pytest.raises(EventConversionException): + event_conversion.background_event_to_cloud_event(req) + + @pytest.mark.parametrize( "background_resource", [ diff --git a/tests/test_data/components/async/binding-cron.yaml b/tests/test_data/components/async/binding-cron.yaml new file mode 100644 index 00000000..d80aba75 --- /dev/null +++ b/tests/test_data/components/async/binding-cron.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: binding-cron +spec: + type: bindings.cron + version: v1 + metadata: + - name: schedule + value: "@every 5s" diff --git a/tests/test_data/components/async/binding-localfs.yaml b/tests/test_data/components/async/binding-localfs.yaml new file mode 100644 index 00000000..b6ba81bb --- /dev/null +++ b/tests/test_data/components/async/binding-localfs.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: binding-localfs +spec: + type: bindings.localstorage + version: v1 + metadata: + - name: rootPath + value: . diff --git a/tests/test_data/components/async/binding-mqtt.yaml b/tests/test_data/components/async/binding-mqtt.yaml new file mode 100644 index 00000000..c3317a1f --- /dev/null +++ b/tests/test_data/components/async/binding-mqtt.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: binding-mqtt +spec: + type: bindings.mqtt + version: v1 + metadata: + - name: consumerID + value: "{uuid}" + - name: url + value: "tcp://broker.emqx.io:1883" + - name: topic + value: "of-async-default" \ No newline at end of file diff --git a/tests/test_data/components/async/pubsub-mqtt.yaml b/tests/test_data/components/async/pubsub-mqtt.yaml new file mode 100644 index 00000000..4d7e73ec --- /dev/null +++ b/tests/test_data/components/async/pubsub-mqtt.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub-mqtt +spec: + type: pubsub.mqtt + version: v1 + metadata: + # - name: consumerID + # value: "{uuid}" + - name: url + value: "tcp://broker.emqx.io:1883" \ No newline at end of file diff --git a/tests/test_data/components/http/localstorage.yaml b/tests/test_data/components/http/localstorage.yaml new file mode 100644 index 00000000..88971d83 --- /dev/null +++ b/tests/test_data/components/http/localstorage.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: local +spec: + type: bindings.localstorage + version: v1 + metadata: + - name: rootPath + value: . \ No newline at end of file diff --git a/tests/test_functions.py b/tests/test_functions.py index c343205f..69931b44 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -418,17 +418,17 @@ def test_error_paths(path): @pytest.mark.parametrize( - "target, source, signature_type", - [(None, None, None), (pretend.stub(), pretend.stub(), pretend.stub())], + "target, source, signature_type, func_context, debug", + [(None, None, None, None, False), (pretend.stub(), pretend.stub(), pretend.stub(), pretend.stub(), pretend.stub())], ) -def test_lazy_wsgi_app(monkeypatch, target, source, signature_type): +def test_lazy_wsgi_app(monkeypatch, target, source, signature_type, func_context, debug): actual_app_stub = pretend.stub() wsgi_app = pretend.call_recorder(lambda *a, **kw: actual_app_stub) create_app = pretend.call_recorder(lambda *a: wsgi_app) monkeypatch.setattr(functions_framework, "create_app", create_app) # Test that it's lazy - lazy_app = LazyWSGIApp(target, source, signature_type) + lazy_app = LazyWSGIApp(target, source, signature_type, func_context, debug) assert lazy_app.app == None @@ -439,7 +439,7 @@ def test_lazy_wsgi_app(monkeypatch, target, source, signature_type): app = lazy_app(*args, **kwargs) assert app == actual_app_stub - assert create_app.calls == [pretend.call(target, source, signature_type)] + assert create_app.calls == [pretend.call(target, source, signature_type, func_context, debug)] assert wsgi_app.calls == [pretend.call(*args, **kwargs)] # Test that it's only initialized once diff --git a/tests/test_functions/http_basic/main.py b/tests/test_functions/http_basic/main.py new file mode 100644 index 00000000..53ff228d --- /dev/null +++ b/tests/test_functions/http_basic/main.py @@ -0,0 +1,2 @@ +def hello(request): + return "Hello world!" diff --git a/tests/test_view_functions.py b/tests/test_view_functions.py index 219313f9..8de543d1 100644 --- a/tests/test_view_functions.py +++ b/tests/test_view_functions.py @@ -14,6 +14,8 @@ import json import pretend +import pytest +import werkzeug from cloudevents.http import from_http @@ -63,6 +65,20 @@ def test_event_view_func_wrapper(monkeypatch): ] +def test_event_view_func_wrapper_bad_request(monkeypatch): + request = pretend.stub(headers={}, get_json=lambda: None) + + context_stub = pretend.stub() + context_class = pretend.call_recorder(lambda *a, **kw: context_stub) + monkeypatch.setattr(functions_framework, "Context", context_class) + function = pretend.call_recorder(lambda data, context: "Hello") + + view_func = functions_framework._event_view_func_wrapper(function, request) + + with pytest.raises(werkzeug.exceptions.BadRequest): + view_func("/some/path") + + def test_run_cloud_event(): headers = {"Content-Type": "application/cloudevents+json"} data = json.dumps(