From 5409cce4a8abec97213a40fc9bd2662f038f909e Mon Sep 17 00:00:00 2001 From: laminar Date: Tue, 20 Jun 2023 21:09:41 +0800 Subject: [PATCH 1/9] develop ff-python based on ofn v1beta2 Signed-off-by: laminar --- setup.cfg | 2 +- setup.py | 7 +- src/functions_framework/__init__.py | 240 +----------- src/functions_framework/_cli.py | 46 +-- src/functions_framework/_function_registry.py | 4 +- src/functions_framework/_http/__init__.py | 42 --- src/functions_framework/_http/flask.py | 25 -- src/functions_framework/_http/gunicorn.py | 37 -- src/functions_framework/background_event.py | 44 --- .../context}/__init__.py | 0 .../context}/function_context.py | 45 ++- .../context/runtime_context.py | 39 ++ src/functions_framework/event_conversion.py | 346 ------------------ .../openfunction/__init__.py | 0 .../openfunction/async_server.py | 6 +- .../openfunction/dapr_output_middleware.py | 2 +- .../openfunction/function_runtime.py | 2 +- src/functions_framework/runner.py | 55 +++ src/functions_framework/triggers/__init__.py | 0 .../triggers/dapr_trigger/__init__.py | 0 .../triggers/dapr_trigger/dapr.py | 30 ++ .../triggers/http_trigger/__init__.py | 0 src/functions_framework/triggers/trigger.py | 10 + src/google/__init__.py | 22 -- src/google/cloud/__init__.py | 22 -- src/google/cloud/functions/__init__.py | 13 - src/google/cloud/functions/context.py | 19 - src/google/cloud/functions_v1/__init__.py | 13 - src/google/cloud/functions_v1/context.py | 33 -- .../cloud/functions_v1beta2/__init__.py | 13 - src/google/cloud/functions_v1beta2/context.py | 33 -- tests/test_async.py | 5 +- tests/test_binding.py | 2 +- tests/test_convert.py | 50 +-- tests/test_functions.py | 18 +- .../background_load_error/main.py | 2 +- .../background_missing_dependency/main.py | 2 +- .../background_multiple_entry_points/main.py | 4 +- .../test_functions/background_trigger/main.py | 2 +- .../converted_background_event.py | 4 +- 40 files changed, 251 insertions(+), 988 deletions(-) delete mode 100644 src/functions_framework/_http/__init__.py delete mode 100644 src/functions_framework/_http/flask.py delete mode 100644 src/functions_framework/_http/gunicorn.py delete mode 100644 src/functions_framework/background_event.py rename src/{openfunction => functions_framework/context}/__init__.py (100%) rename src/{openfunction => functions_framework/context}/function_context.py (61%) create mode 100644 src/functions_framework/context/runtime_context.py delete mode 100644 src/functions_framework/event_conversion.py create mode 100644 src/functions_framework/openfunction/__init__.py rename src/{ => functions_framework}/openfunction/async_server.py (82%) rename src/{ => functions_framework}/openfunction/dapr_output_middleware.py (86%) rename src/{ => functions_framework}/openfunction/function_runtime.py (97%) create mode 100644 src/functions_framework/runner.py create mode 100644 src/functions_framework/triggers/__init__.py create mode 100644 src/functions_framework/triggers/dapr_trigger/__init__.py create mode 100644 src/functions_framework/triggers/dapr_trigger/dapr.py create mode 100644 src/functions_framework/triggers/http_trigger/__init__.py create mode 100644 src/functions_framework/triggers/trigger.py delete mode 100644 src/google/__init__.py delete mode 100644 src/google/cloud/__init__.py delete mode 100644 src/google/cloud/functions/__init__.py delete mode 100644 src/google/cloud/functions/context.py delete mode 100644 src/google/cloud/functions_v1/__init__.py delete mode 100644 src/google/cloud/functions_v1/context.py delete mode 100644 src/google/cloud/functions_v1beta2/__init__.py delete mode 100644 src/google/cloud/functions_v1beta2/context.py diff --git a/setup.cfg b/setup.cfg index 4a639876..47f09686 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,4 +7,4 @@ line_length = 88 lines_between_types = 1 combine_as_imports = True default_section = THIRDPARTY -known_first_party = functions_framework, google.cloud.functions +known_first_party = functions_framework diff --git a/setup.py b/setup.py index d25934bb..dcd0f46c 100644 --- a/setup.py +++ b/setup.py @@ -46,16 +46,17 @@ ], keywords="functions-framework", packages=find_packages(where="src"), - namespace_packages=["google", "google.cloud"], package_dir={"": "src"}, python_requires=">=3.5, <4", install_requires=[ + "grpcio==1.54.2", "flask>=1.0,<3.0", "click>=7.0,<9.0", - "watchdog>=1.0.0,<2.0.0", + # "watchdog>=1.0.0,<2.0.0", "gunicorn>=19.2.0,<21.0; platform_system!='Windows'", "cloudevents>=1.2.0,<2.0.0", - "dapr>=1.6.0", + "dapr>=1.10.0", + "aiohttp==3.8.4", ], entry_points={ "console_scripts": [ diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index a8b18e19..9bd258b8 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -12,30 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools import io import json import logging -import os.path -import pathlib import sys -import cloudevents.exceptions as cloud_exceptions import flask -import werkzeug -from cloudevents.http import from_http, is_binary +from cloudevents.http import from_http +from functions_framework.runner import Runner -from functions_framework import _function_registry, event_conversion -from functions_framework.background_event import BackgroundEvent from functions_framework.exceptions import ( EventConversionException, FunctionsFrameworkException, - MissingSourceException, ) -from google.cloud.functions.context import Context -from openfunction.dapr_output_middleware import dapr_output_middleware -from openfunction.async_server import AsyncApp +from functions_framework.openfunction import dapr_output_middleware MAX_CONTENT_LENGTH = 10 * 1024 * 1024 @@ -58,32 +49,6 @@ def write(self, out): return self.stderr.write(json.dumps(payload) + "\n") -def cloud_event(func): - """Decorator that registers cloudevent as user function signature type.""" - _function_registry.REGISTRY_MAP[ - func.__name__ - ] = _function_registry.CLOUDEVENT_SIGNATURE_TYPE - - @functools.wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - -def http(func): - """Decorator that registers http as user function signature type.""" - _function_registry.REGISTRY_MAP[ - func.__name__ - ] = _function_registry.HTTP_SIGNATURE_TYPE - - @functools.wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - def setup_logging(): logging.getLogger().setLevel(logging.INFO) info_handler = logging.StreamHandler(sys.stdout) @@ -114,124 +79,6 @@ def _run_cloud_event(function, request): function(event) -def _cloud_event_view_func_wrapper(function, request): - def view_func(path): - ce_exception = None - event = None - try: - event = from_http(request.headers, request.get_data()) - except ( - cloud_exceptions.MissingRequiredFields, - cloud_exceptions.InvalidRequiredFields, - ) as e: - ce_exception = e - - if not ce_exception: - function(event) - return "OK" - - # Not a CloudEvent. Try converting to a CloudEvent. - try: - function(event_conversion.background_event_to_cloud_event(request)) - except EventConversionException as e: - flask.abort( - 400, - description=( - "Function was defined with FUNCTION_SIGNATURE_TYPE=cloudevent but" - " parsing CloudEvent failed and converting from background event to" - f" CloudEvent also failed.\nGot HTTP headers: {request.headers}\nGot" - f" data: {request.get_data()}\nGot CloudEvent exception: {repr(ce_exception)}" - f"\nGot background event conversion exception: {repr(e)}" - ), - ) - return "OK" - - return view_func - - -def _event_view_func_wrapper(function, request): - def view_func(path): - if event_conversion.is_convertable_cloud_event(request): - # Convert this CloudEvent to the equivalent background event data and context. - data, context = event_conversion.cloud_event_to_background_event(request) - function(data, context) - elif is_binary(request.headers): - # Support CloudEvents in binary content mode, with data being the - # whole request body and context attributes retrieved from request - # headers. - data = request.get_data() - context = Context( - eventId=request.headers.get("ce-eventId"), - timestamp=request.headers.get("ce-timestamp"), - eventType=request.headers.get("ce-eventType"), - resource=request.headers.get("ce-resource"), - ) - function(data, context) - else: - # This is a regular CloudEvent - event_data = event_conversion.marshal_background_event_data(request) - if not event_data: - flask.abort(400) - event_object = BackgroundEvent(**event_data) - data = event_object.data - context = Context(**event_object.context) - function(data, context) - - return "OK" - - return view_func - - -def _configure_app(app, function, signature_type, func_context): - # Mount the function at the root. Support GCF's default path behavior - # Modify the url_map and view_functions directly here instead of using - # add_url_rule in order to create endpoints that route all methods - if signature_type == _function_registry.HTTP_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule("/", defaults={"path": ""}, endpoint="run") - ) - app.url_map.add(werkzeug.routing.Rule("/robots.txt", endpoint="error")) - app.url_map.add(werkzeug.routing.Rule("/favicon.ico", endpoint="error")) - app.url_map.add(werkzeug.routing.Rule("/", endpoint="run")) - app.view_functions["run"] = _http_view_func_wrapper(function, flask.request) - app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") - app.after_request(read_request) - app.after_request(dapr_output_middleware(func_context)) - elif signature_type == _function_registry.BACKGROUNDEVENT_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule( - "/", defaults={"path": ""}, endpoint="run", methods=["POST"] - ) - ) - app.url_map.add( - werkzeug.routing.Rule("/", endpoint="run", methods=["POST"]) - ) - app.view_functions["run"] = _event_view_func_wrapper(function, flask.request) - # Add a dummy endpoint for GET / - app.url_map.add(werkzeug.routing.Rule("/", endpoint="get", methods=["GET"])) - app.view_functions["get"] = lambda: "" - elif signature_type == _function_registry.CLOUDEVENT_SIGNATURE_TYPE: - app.url_map.add( - werkzeug.routing.Rule( - "/", defaults={"path": ""}, endpoint=signature_type, methods=["POST"] - ) - ) - app.url_map.add( - werkzeug.routing.Rule( - "/", endpoint=signature_type, methods=["POST"] - ) - ) - - app.view_functions[signature_type] = _cloud_event_view_func_wrapper( - function, flask.request - ) - else: - raise FunctionsFrameworkException( - "Invalid signature type: {signature_type}".format( - signature_type=signature_type - ) - ) - def read_request(response): """ @@ -249,85 +96,6 @@ def crash_handler(e): """ return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} -def create_async_app(target=None, source=None, func_context=None, debug=False): - target = _function_registry.get_function_target(target) - source = _function_registry.get_function_source(source) - - if not os.path.exists(source): - raise MissingSourceException( - "File {source} that is expected to define function doesn't exist".format( - source=source - ) - ) - - source_module, spec = _function_registry.load_function_module(source) - spec.loader.exec_module(source_module) - - function = _function_registry.get_user_function(source, source_module, target) - - setup_logging_level(debug) - - async_app = AsyncApp(func_context) - async_app.bind(function) - - return async_app.app - - -def create_app(target=None, source=None, signature_type=None, func_context=None, debug=False): - target = _function_registry.get_function_target(target) - source = _function_registry.get_function_source(source) - - # Set the template folder relative to the source path - # Python 3.5: join does not support PosixPath - template_folder = str(pathlib.Path(source).parent / "templates") - - if not os.path.exists(source): - raise MissingSourceException( - "File {source} that is expected to define function doesn't exist".format( - source=source - ) - ) - - source_module, spec = _function_registry.load_function_module(source) - - # Create the application - _app = flask.Flask(target, template_folder=template_folder) - _app.config["MAX_CONTENT_LENGTH"] = MAX_CONTENT_LENGTH - _app.register_error_handler(500, crash_handler) - global errorhandler - errorhandler = _app.errorhandler - - # Handle legacy GCF Python 3.7 behavior - if os.environ.get("ENTRY_POINT"): - os.environ["FUNCTION_NAME"] = os.environ.get("K_SERVICE", target) - _app.make_response_original = _app.make_response - - def handle_none(rv): - if rv is None: - rv = "OK" - return _app.make_response_original(rv) - - _app.make_response = handle_none - - # Handle log severity backwards compatibility - sys.stdout = _LoggingHandler("INFO", sys.stderr) - sys.stderr = _LoggingHandler("ERROR", sys.stderr) - setup_logging() - - setup_logging_level(debug) - - # Execute the module, within the application context - with _app.app_context(): - spec.loader.exec_module(source_module) - - # Get the configured function signature type - signature_type = _function_registry.get_func_signature_type(target, signature_type) - function = _function_registry.get_user_function(source, source_module, target) - - _configure_app(_app, function, signature_type, func_context) - - return _app - class LazyWSGIApp: """ @@ -351,7 +119,7 @@ def __init__(self, target=None, source=None, signature_type=None, func_context=N def __call__(self, *args, **kwargs): if not self.app: - self.app = create_app(self.target, self.source, self.signature_type, self.func_context, self.debug) + self.app = Runner(self.target, self.source, self.signature_type, self.func_context, self.debug) return self.app(*args, **kwargs) diff --git a/src/functions_framework/_cli.py b/src/functions_framework/_cli.py index 9699a5c2..c153ff26 100644 --- a/src/functions_framework/_cli.py +++ b/src/functions_framework/_cli.py @@ -16,39 +16,41 @@ import click -from functions_framework import create_app, create_async_app -from functions_framework._http import create_server from functions_framework import _function_registry +from functions_framework.runner import Runner + @click.command() @click.option("--target", envvar="FUNCTION_TARGET", type=click.STRING, required=True) @click.option("--source", envvar="FUNCTION_SOURCE", type=click.Path(), default=None) -@click.option( - "--signature-type", - envvar="FUNCTION_SIGNATURE_TYPE", - type=click.Choice(["http", "event", "cloudevent"]), - default="http", -) +# @click.option( +# "--signature-type", +# envvar="FUNCTION_SIGNATURE_TYPE", +# type=click.Choice(["http", "event", "cloudevent"]), +# default="http", +# ) @click.option("--host", envvar="HOST", type=click.STRING, default="0.0.0.0") @click.option("--port", envvar="PORT", type=click.INT, default=8080) @click.option("--debug", envvar="DEBUG", is_flag=True) @click.option("--dry-run", envvar="DRY_RUN", is_flag=True) -def _cli(target, source, signature_type, host, port, debug, dry_run): +def _cli(target, source, host, port, debug, dry_run): context = _function_registry.get_openfunction_context(None) - # determine if async or knative - if context and context.is_runtime_async(): - app = create_async_app(target, source, context, debug) - if dry_run: - run_dry(target, host, port) - else: - app.run(context.port) - else: - app = create_app(target, source, signature_type, context, debug) - if dry_run: - run_dry(target, host, port) - else: - create_server(app, debug).run(host, port) + # # determine if async or knative + # if context and context.is_runtime_async(): + # app = create_async_app(target, source, context, debug) + # if dry_run: + # run_dry(target, host, port) + # else: + # app.run(context.port) + # else: + # app = create_app(target, source, signature_type, context, debug) + # if dry_run: + # run_dry(target, host, port) + # else: + # create_server(app, debug).run(host, port) + runner = Runner(context, target, source, host, port, debug, dry_run) + runner.run() def run_dry(target, host, port): diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index fc3a39f6..48bddf09 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2023 OpenFunction # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ MissingTargetException, ) -from openfunction.function_context import FunctionContext +from functions_framework.context.function_context import FunctionContext DEFAULT_SOURCE = os.path.realpath("./main.py") diff --git a/src/functions_framework/_http/__init__.py b/src/functions_framework/_http/__init__.py deleted file mode 100644 index ca9b0f5c..00000000 --- a/src/functions_framework/_http/__init__.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from functions_framework._http.flask import FlaskApplication - - -class HTTPServer: - def __init__(self, app, debug, **options): - self.app = app - self.debug = debug - self.options = options - - if self.debug: - self.server_class = FlaskApplication - else: - try: - from functions_framework._http.gunicorn import GunicornApplication - - self.server_class = GunicornApplication - except ImportError as e: - self.server_class = FlaskApplication - - def run(self, host, port): - http_server = self.server_class( - self.app, host, port, self.debug, **self.options - ) - http_server.run() - - -def create_server(wsgi_app, debug, **options): - return HTTPServer(wsgi_app, debug, **options) diff --git a/src/functions_framework/_http/flask.py b/src/functions_framework/_http/flask.py deleted file mode 100644 index b2edf563..00000000 --- a/src/functions_framework/_http/flask.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class FlaskApplication: - def __init__(self, app, host, port, debug, **options): - self.app = app - self.host = host - self.port = port - self.debug = debug - self.options = options - - def run(self): - self.app.run(self.host, self.port, debug=self.debug, **self.options) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py deleted file mode 100644 index 25fdb790..00000000 --- a/src/functions_framework/_http/gunicorn.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import gunicorn.app.base - - -class GunicornApplication(gunicorn.app.base.BaseApplication): - def __init__(self, app, host, port, debug, **options): - self.options = { - "bind": "%s:%s" % (host, port), - "workers": 1, - "threads": 8, - "timeout": 0, - "loglevel": "error", - "limit_request_line": 0, - } - self.options.update(options) - self.app = app - super().__init__() - - def load_config(self): - for key, value in self.options.items(): - self.cfg.set(key, value) - - def load(self): - return self.app diff --git a/src/functions_framework/background_event.py b/src/functions_framework/background_event.py deleted file mode 100644 index be01960b..00000000 --- a/src/functions_framework/background_event.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class BackgroundEvent(object): - """BackgroundEvent is an event passed to GCF background event functions. - - Background event functions take data and context as parameters, both of - which this class represents. By contrast, CloudEvent functions take a - single CloudEvent object as their parameter. This class does not represent - CloudEvents. - """ - - # Supports v1beta1, v1beta2, and v1 event formats. - def __init__( - self, - context=None, - data="", - eventId="", - timestamp="", - eventType="", - resource="", - **kwargs, - ): - self.context = context - if not self.context: - self.context = { - "eventId": eventId, - "timestamp": timestamp, - "eventType": eventType, - "resource": resource, - } - self.data = data diff --git a/src/openfunction/__init__.py b/src/functions_framework/context/__init__.py similarity index 100% rename from src/openfunction/__init__.py rename to src/functions_framework/context/__init__.py diff --git a/src/openfunction/function_context.py b/src/functions_framework/context/function_context.py similarity index 61% rename from src/openfunction/function_context.py rename to src/functions_framework/context/function_context.py index 7be2a9d0..aaf87803 100644 --- a/src/openfunction/function_context.py +++ b/src/functions_framework/context/function_context.py @@ -8,27 +8,33 @@ class FunctionContext(object): """OpenFunction's serving context.""" - def __init__(self, name="", version="", runtime="", inputs=None, outputs=None, port=8080): + def __init__(self, name="", version="", dapr_triggers=None, http_trigger=None, + inputs=None, outputs=None, states=None, + pre_hooks=None, post_hooks=None, tracing=None, port=8080): self.name = name self.version = version - self.runtime = runtime + self.dapr_triggers = dapr_triggers + self.http_trigger = http_trigger self.inputs = inputs self.outputs = outputs + self.states = states + self.pre_hooks = pre_hooks + self.post_hooks = post_hooks + self.tracing = tracing self.port = port - def is_runtime_async(self): - return self.runtime.lower() == ASYNC_RUNTIME_TYPE - - def is_runtime_knative(self): - return self.runtime.lower() == KNATIVE_RUNTIME_TYPE - @staticmethod def from_json(json_dct): name = json_dct.get('name') version = json_dct.get('version') - runtime = json_dct.get('runtime') inputs_list = json_dct.get('inputs') outputs_list = json_dct.get('outputs') + dapr_triggers = json_dct.get('triggers', {}).get('dapr', []) + http_trigger = json_dct.get('triggers', {}).get('http', None) + states = json_dct.get('states', {}) + pre_hooks = json_dct.get('pre_hooks', []) + post_hooks = json_dct.get('post_hooks', []) + tracing = json_dct.get('tracing', {}) inputs = None if inputs_list: @@ -43,8 +49,8 @@ def from_json(json_dct): for k, v in outputs_list.items(): output = Component.from_json(v) outputs[k] = output - - return FunctionContext(name, version, runtime, inputs, outputs) + return FunctionContext(name, version, dapr_triggers, http_trigger, + inputs, outputs, states, pre_hooks, post_hooks, tracing) class Component(object): @@ -82,4 +88,19 @@ def from_json(json_dct): metadata = json_dct.get('metadata') component_type = json_dct.get('componentType', '') operation = json_dct.get('operation', '') - return Component(uri, component_name, component_type, metadata, operation) \ No newline at end of file + return Component(uri, component_name, component_type, metadata, operation) + + +class OpenFunctionTrigger(object): + + def __init__(self, name, component_type, topic): + self.name = name + self.component_type = component_type + self.topic = topic + + @staticmethod + def from_json(json_dct): + name = json_dct.get('name', '') + component_type = json_dct.get('type', '') + topic = json_dct.get('topic') + return OpenFunctionTrigger(name, component_type, topic) diff --git a/src/functions_framework/context/runtime_context.py b/src/functions_framework/context/runtime_context.py new file mode 100644 index 00000000..ccd00f1e --- /dev/null +++ b/src/functions_framework/context/runtime_context.py @@ -0,0 +1,39 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from functions_framework.context.function_context import FunctionContext, OpenFunctionTrigger + + +class RuntimeContext(object): + """Context for runtime.""" + + def __int__(self, context: FunctionContext = None): + self.context = context + self.request = None + + def has_dapr_trigger(self): + """Check if the function has dapr trigger.""" + return self.context and self.context.dapr_triggers + + def has_http_trigger(self): + """Check if the function has http trigger.""" + return self.context and self.context.http_trigger + + def get_dapr_triggers(self) -> [OpenFunctionTrigger]: + """Get dapr triggers.""" + triggers = [] + for trigger in self.context.dapr_triggers: + triggers.append( + OpenFunctionTrigger(name=trigger.get('name'), topic=trigger.get('topic'), component_type=trigger.get('type')) + ) + return triggers diff --git a/src/functions_framework/event_conversion.py b/src/functions_framework/event_conversion.py deleted file mode 100644 index 28cf2a1b..00000000 --- a/src/functions_framework/event_conversion.py +++ /dev/null @@ -1,346 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import re - -from datetime import datetime -from typing import Any, Optional, Tuple - -from cloudevents.exceptions import MissingRequiredFields -from cloudevents.http import CloudEvent, from_http, is_binary - -from functions_framework.background_event import BackgroundEvent -from functions_framework.exceptions import EventConversionException -from google.cloud.functions.context import Context - -_CLOUD_EVENT_SPEC_VERSION = "1.0" - -# Maps background/legacy event types to their equivalent CloudEvent types. -# For more info on event mappings see -# https://github.com/GoogleCloudPlatform/functions-framework-conformance/blob/master/docs/mapping.md -_BACKGROUND_TO_CE_TYPE = { - "google.pubsub.topic.publish": "google.cloud.pubsub.topic.v1.messagePublished", - "providers/cloud.pubsub/eventTypes/topic.publish": "google.cloud.pubsub.topic.v1.messagePublished", - "google.storage.object.finalize": "google.cloud.storage.object.v1.finalized", - "google.storage.object.delete": "google.cloud.storage.object.v1.deleted", - "google.storage.object.archive": "google.cloud.storage.object.v1.archived", - "google.storage.object.metadataUpdate": "google.cloud.storage.object.v1.metadataUpdated", - "providers/cloud.firestore/eventTypes/document.write": "google.cloud.firestore.document.v1.written", - "providers/cloud.firestore/eventTypes/document.create": "google.cloud.firestore.document.v1.created", - "providers/cloud.firestore/eventTypes/document.update": "google.cloud.firestore.document.v1.updated", - "providers/cloud.firestore/eventTypes/document.delete": "google.cloud.firestore.document.v1.deleted", - "providers/firebase.auth/eventTypes/user.create": "google.firebase.auth.user.v1.created", - "providers/firebase.auth/eventTypes/user.delete": "google.firebase.auth.user.v1.deleted", - "providers/google.firebase.analytics/eventTypes/event.log": "google.firebase.analytics.log.v1.written", - "providers/google.firebase.database/eventTypes/ref.create": "google.firebase.database.ref.v1.created", - "providers/google.firebase.database/eventTypes/ref.write": "google.firebase.database.ref.v1.written", - "providers/google.firebase.database/eventTypes/ref.update": "google.firebase.database.ref.v1.updated", - "providers/google.firebase.database/eventTypes/ref.delete": "google.firebase.database.ref.v1.deleted", - "providers/cloud.storage/eventTypes/object.change": "google.cloud.storage.object.v1.finalized", -} - -# _BACKGROUND_TO_CE_TYPE contains duplicate values for some keys. This set contains the duplicates -# that should be dropped when generating the inverse mapping _CE_TO_BACKGROUND_TYPE -_NONINVERTALBE_CE_TYPES = { - "providers/cloud.pubsub/eventTypes/topic.publish", - "providers/cloud.storage/eventTypes/object.change", -} - -# Maps CloudEvent types to the equivalent background/legacy event types (inverse -# of _BACKGROUND_TO_CE_TYPE) -_CE_TO_BACKGROUND_TYPE = { - v: k for k, v in _BACKGROUND_TO_CE_TYPE.items() if k not in _NONINVERTALBE_CE_TYPES -} - -# CloudEvent service names. -_FIREBASE_AUTH_CE_SERVICE = "firebaseauth.googleapis.com" -_FIREBASE_CE_SERVICE = "firebase.googleapis.com" -_FIREBASE_DB_CE_SERVICE = "firebasedatabase.googleapis.com" -_FIRESTORE_CE_SERVICE = "firestore.googleapis.com" -_PUBSUB_CE_SERVICE = "pubsub.googleapis.com" -_STORAGE_CE_SERVICE = "storage.googleapis.com" - -# Raw pubsub types -_PUBSUB_EVENT_TYPE = "google.pubsub.topic.publish" -_PUBSUB_MESSAGE_TYPE = "type.googleapis.com/google.pubsub.v1.PubsubMessage" - -_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+") - -# Maps background event services to their equivalent CloudEvent services. -_SERVICE_BACKGROUND_TO_CE = { - "providers/cloud.firestore/": _FIRESTORE_CE_SERVICE, - "providers/google.firebase.analytics/": _FIREBASE_CE_SERVICE, - "providers/firebase.auth/": _FIREBASE_AUTH_CE_SERVICE, - "providers/google.firebase.database/": _FIREBASE_DB_CE_SERVICE, - "providers/cloud.pubsub/": _PUBSUB_CE_SERVICE, - "providers/cloud.storage/": _STORAGE_CE_SERVICE, - "google.pubsub": _PUBSUB_CE_SERVICE, - "google.storage": _STORAGE_CE_SERVICE, -} - -# Maps CloudEvent service strings to regular expressions used to split a background -# event resource string into CloudEvent resource and subject strings. Each regex -# must have exactly two capture groups: the first for the resource and the second -# for the subject. -_CE_SERVICE_TO_RESOURCE_RE = { - _FIREBASE_CE_SERVICE: re.compile(r"^(projects/[^/]+)/(events/[^/]+)$"), - _FIREBASE_DB_CE_SERVICE: re.compile(r"^projects/_/(instances/[^/]+)/(refs/.+)$"), - _FIRESTORE_CE_SERVICE: re.compile( - r"^(projects/[^/]+/databases/\(default\))/(documents/.+)$" - ), - _STORAGE_CE_SERVICE: re.compile(r"^(projects/[^/]/buckets/[^/]+)/(objects/.+)$"), -} - -# Maps Firebase Auth background event metadata field names to their equivalent -# CloudEvent field names. -_FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE = { - "createdAt": "createTime", - "lastSignedInAt": "lastSignInTime", -} -# Maps Firebase Auth CloudEvent metadata field names to their equivalent -# background event field names (inverse of _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE). -_FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND = { - v: k for k, v in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items() -} - - -def background_event_to_cloud_event(request) -> CloudEvent: - """Converts a background event represented by the given HTTP request into a CloudEvent.""" - event_data = marshal_background_event_data(request) - if not event_data: - raise EventConversionException("Failed to parse JSON") - - event_object = BackgroundEvent(**event_data) - data = event_object.data - context = Context(**event_object.context) - - if context.event_type not in _BACKGROUND_TO_CE_TYPE: - raise EventConversionException( - f'Unable to find CloudEvent equivalent type for "{context.event_type}"' - ) - new_type = _BACKGROUND_TO_CE_TYPE[context.event_type] - - service, resource, subject = _split_resource(context) - source = f"//{service}/{resource}" - - # Handle Pub/Sub events. - if service == _PUBSUB_CE_SERVICE: - if "messageId" not in data: - data["messageId"] = context.event_id - if "publishTime" not in data: - data["publishTime"] = context.timestamp - data = {"message": data} - - # Handle Firebase Auth events. - if service == _FIREBASE_AUTH_CE_SERVICE: - if "metadata" in data: - for old, new in _FIREBASE_AUTH_METADATA_FIELDS_BACKGROUND_TO_CE.items(): - if old in data["metadata"]: - data["metadata"][new] = data["metadata"][old] - del data["metadata"][old] - if "uid" in data: - uid = data["uid"] - subject = f"users/{uid}" - - # Handle Firebase DB events. - if service == _FIREBASE_DB_CE_SERVICE: - # The CE source of firebasedatabase CloudEvents includes location information - # that is inferred from the 'domain' field of legacy events. - if "domain" not in event_data: - raise EventConversionException( - "Invalid FirebaseDB event payload: missing 'domain'" - ) - - domain = event_data["domain"] - location = "us-central1" - if domain != "firebaseio.com": - location = domain.split(".")[0] - - resource = f"projects/_/locations/{location}/{resource}" - source = f"//{service}/{resource}" - - metadata = { - "id": context.event_id, - "time": context.timestamp, - "specversion": _CLOUD_EVENT_SPEC_VERSION, - "datacontenttype": "application/json", - "type": new_type, - "source": source, - } - - if subject: - metadata["subject"] = subject - - return CloudEvent(metadata, data) - - -def is_convertable_cloud_event(request) -> bool: - """Is the given request a known CloudEvent that can be converted to background event.""" - if is_binary(request.headers): - event_type = request.headers.get("ce-type") - event_source = request.headers.get("ce-source") - return ( - event_source is not None - and event_type is not None - and event_type in _CE_TO_BACKGROUND_TYPE - ) - return False - - -def _split_ce_source(source) -> Tuple[str, str]: - """Splits a CloudEvent source string into resource and subject components.""" - regex = re.compile(r"\/\/([^/]+)\/(.+)") - match = regex.fullmatch(source) - if not match: - raise EventConversionException("Unexpected CloudEvent source.") - - return match.group(1), match.group(2) - - -def cloud_event_to_background_event(request) -> Tuple[Any, Context]: - """Converts a background event represented by the given HTTP request into a CloudEvent.""" - try: - event = from_http(request.headers, request.get_data()) - data = event.data - service, name = _split_ce_source(event["source"]) - - if event["type"] not in _CE_TO_BACKGROUND_TYPE: - raise EventConversionException( - f'Unable to find background event equivalent type for "{event["type"]}"' - ) - - if service == _PUBSUB_CE_SERVICE: - resource = {"service": service, "name": name, "type": _PUBSUB_MESSAGE_TYPE} - if "message" in data: - data = data["message"] - if "messageId" in data: - del data["messageId"] - if "publishTime" in data: - del data["publishTime"] - elif service == _FIREBASE_AUTH_CE_SERVICE: - resource = name - if "metadata" in data: - for old, new in _FIREBASE_AUTH_METADATA_FIELDS_CE_TO_BACKGROUND.items(): - if old in data["metadata"]: - data["metadata"][new] = data["metadata"][old] - del data["metadata"][old] - elif service == _STORAGE_CE_SERVICE: - resource = { - "name": f"{name}/{event['subject']}", - "service": service, - "type": data["kind"], - } - elif service == _FIREBASE_DB_CE_SERVICE: - name = re.sub("/locations/[^/]+", "", name) - resource = f"{name}/{event['subject']}" - else: - resource = f"{name}/{event['subject']}" - - context = Context( - eventId=event["id"], - timestamp=event["time"], - eventType=_CE_TO_BACKGROUND_TYPE[event["type"]], - resource=resource, - ) - return (data, context) - except (AttributeError, KeyError, TypeError, MissingRequiredFields): - raise EventConversionException( - "Failed to convert CloudEvent to BackgroundEvent." - ) - - -def _split_resource(context: Context) -> Tuple[str, str, str]: - """Splits a background event's resource into a CloudEvent service, resource, and subject.""" - service = "" - resource = "" - if isinstance(context.resource, dict): - service = context.resource.get("service", "") - resource = context.resource["name"] - else: - resource = context.resource - - # If there's no service we'll choose an appropriate one based on the event type. - if not service: - for b_service, ce_service in _SERVICE_BACKGROUND_TO_CE.items(): - if context.event_type.startswith(b_service): - service = ce_service - break - if not service: - raise EventConversionException( - "Unable to find CloudEvent equivalent service " - f"for {context.event_type}" - ) - - # If we don't need to split the resource string then we're done. - if service not in _CE_SERVICE_TO_RESOURCE_RE: - return service, resource, "" - - # Split resource into resource and subject. - match = _CE_SERVICE_TO_RESOURCE_RE[service].fullmatch(resource) - if not match: - raise EventConversionException("Resource regex did not match") - - return service, match.group(1), match.group(2) - - -def marshal_background_event_data(request): - """Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of - a background event""" - try: - request_data = request.get_json() - if not _is_raw_pubsub_payload(request_data): - # If this in not a raw Pub/Sub request, return the unaltered request data. - return request_data - return { - "context": { - "eventId": request_data["message"]["messageId"], - "timestamp": request_data["message"].get( - "publishTime", datetime.utcnow().isoformat() + "Z" - ), - "eventType": _PUBSUB_EVENT_TYPE, - "resource": { - "service": _PUBSUB_CE_SERVICE, - "type": _PUBSUB_MESSAGE_TYPE, - "name": _parse_pubsub_topic(request.path), - }, - }, - "data": { - "@type": _PUBSUB_MESSAGE_TYPE, - "data": request_data["message"]["data"], - "attributes": request_data["message"]["attributes"], - }, - } - except (AttributeError, KeyError, TypeError): - raise EventConversionException("Failed to convert Pub/Sub payload to event") - - -def _is_raw_pubsub_payload(request_data) -> bool: - """Does the given request body match the schema of a unmarshalled Pub/Sub request""" - return ( - request_data is not None - and "context" not in request_data - and "subscription" in request_data - and "message" in request_data - and "data" in request_data["message"] - and "messageId" in request_data["message"] - ) - - -def _parse_pubsub_topic(request_path) -> Optional[str]: - match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path) - if match: - return match.group(0) - else: - # It is possible to configure a Pub/Sub subscription to push directly to this function - # without passing the topic name in the URL path. - return "" diff --git a/src/functions_framework/openfunction/__init__.py b/src/functions_framework/openfunction/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/openfunction/async_server.py b/src/functions_framework/openfunction/async_server.py similarity index 82% rename from src/openfunction/async_server.py rename to src/functions_framework/openfunction/async_server.py index c94916ad..4cc6dfb4 100644 --- a/src/openfunction/async_server.py +++ b/src/functions_framework/openfunction/async_server.py @@ -1,7 +1,7 @@ -from dapr.ext.grpc import App, BindingRequest +from dapr.aio.grpc import App, BindingRequest from cloudevents.sdk.event import v1 -from openfunction.function_context import OPEN_FUNC_BINDING, OPEN_FUNC_TOPIC -from openfunction.function_runtime import OpenFunctionRuntime +from functions_framework.context.function_context import OPEN_FUNC_BINDING, OPEN_FUNC_TOPIC +from functions_framework.openfunction.function_runtime import OpenFunctionRuntime class AsyncApp(object): """Init async server with dapr server.""" diff --git a/src/openfunction/dapr_output_middleware.py b/src/functions_framework/openfunction/dapr_output_middleware.py similarity index 86% rename from src/openfunction/dapr_output_middleware.py rename to src/functions_framework/openfunction/dapr_output_middleware.py index f7d59134..a65ff63e 100644 --- a/src/openfunction/dapr_output_middleware.py +++ b/src/functions_framework/openfunction/dapr_output_middleware.py @@ -1,6 +1,6 @@ import logging -from openfunction.function_runtime import OpenFunctionRuntime +from functions_framework.openfunction.function_runtime import OpenFunctionRuntime def dapr_output_middleware(context): """Flask middleware for output binding.""" diff --git a/src/openfunction/function_runtime.py b/src/functions_framework/openfunction/function_runtime.py similarity index 97% rename from src/openfunction/function_runtime.py rename to src/functions_framework/openfunction/function_runtime.py index f09efcab..eb5ec9e4 100644 --- a/src/openfunction/function_runtime.py +++ b/src/functions_framework/openfunction/function_runtime.py @@ -4,7 +4,7 @@ from dapr.clients import DaprGrpcClient from dapr.conf import settings -from openfunction.function_context import FunctionContext +from functions_framework.context.function_context import FunctionContext DAPR_GRPC_PORT = "DAPR_GRPC_PORT" OPEN_FUNC_BINDING = "bindings" diff --git a/src/functions_framework/runner.py b/src/functions_framework/runner.py new file mode 100644 index 00000000..22c37cd8 --- /dev/null +++ b/src/functions_framework/runner.py @@ -0,0 +1,55 @@ +import os +from dapr.ext.grpc import App + +from functions_framework import _function_registry +from functions_framework.exceptions import MissingSourceException +from functions_framework.context.function_context import FunctionContext +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.triggers.dapr_trigger.dapr import DaprTrigger + + +class Runner: + def __init__(self, context: FunctionContext, target=None, source=None, + host=None, port=None, debug=None, dry_run=None): + self.target = target + self.source = source + self.context = context + self.user_function = None + self.request = None + self.app = App() + self.load_user_function() + + def load_user_function(self): + _target = _function_registry.get_function_target(self.target) + _source = _function_registry.get_function_source(self.source) + + if not os.path.exists(_source): + raise MissingSourceException( + "File {source} that is expected to define function doesn't exist".format( + source=_source + ) + ) + + source_module, spec = _function_registry.load_function_module(_source) + spec.loader.exec_module(source_module) + + self.user_function = _function_registry.get_user_function(_source, source_module, _target) + + def invoke_user_function(self, request): + self.request = request + if self.user_function: + output_data = self.user_function(self) + return output_data + else: + raise ValueError("User function is not loaded.") + + def run(self): + runtime_context = RuntimeContext() + runtime_context.context = self.context + + if runtime_context.has_dapr_trigger(): + _triggers = runtime_context.get_dapr_triggers() + dapr_trigger = DaprTrigger(self.context.port, self.app, _triggers, self.user_function) + dapr_trigger.start(runtime_context) + + self.app.run(50055) diff --git a/src/functions_framework/triggers/__init__.py b/src/functions_framework/triggers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/functions_framework/triggers/dapr_trigger/__init__.py b/src/functions_framework/triggers/dapr_trigger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py new file mode 100644 index 00000000..160bf064 --- /dev/null +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -0,0 +1,30 @@ +import logging +from copy import deepcopy + +from functions_framework.context.function_context import OpenFunctionTrigger +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.triggers.trigger import Trigger + +from dapr.ext.grpc import App, BindingRequest + + +class DaprTrigger(Trigger): + def __init__(self, port, app: App = None, triggers: [OpenFunctionTrigger] = None, user_function=None): + self.port = port + self.triggers = triggers + self.app = app + self.user_function = user_function + + def start(self, context: RuntimeContext): + if not self.triggers: + raise Exception("No triggers specified for DaprTrigger") + + for trigger in self.triggers: + if trigger.component_type.startswith("bindings"): + @self.app.binding(trigger.name) + def binding_handler(request: BindingRequest): + ctx = deepcopy(context) + ctx.request = request + logging.basicConfig(level=logging.INFO) + logging.info('Received Message : ' + request.text()) + self.user_function(ctx) diff --git a/src/functions_framework/triggers/http_trigger/__init__.py b/src/functions_framework/triggers/http_trigger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/functions_framework/triggers/trigger.py b/src/functions_framework/triggers/trigger.py new file mode 100644 index 00000000..3384f98b --- /dev/null +++ b/src/functions_framework/triggers/trigger.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod + +from functions_framework.context.runtime_context import RuntimeContext + + +class Trigger(ABC): + @abstractmethod + def start(self, context: RuntimeContext): + pass + diff --git a/src/google/__init__.py b/src/google/__init__.py deleted file mode 100644 index 72a55585..00000000 --- a/src/google/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - import pkgutil - - __path__ = pkgutil.extend_path(__path__, __name__) diff --git a/src/google/cloud/__init__.py b/src/google/cloud/__init__.py deleted file mode 100644 index 72a55585..00000000 --- a/src/google/cloud/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - import pkgutil - - __path__ = pkgutil.extend_path(__path__, __name__) diff --git a/src/google/cloud/functions/__init__.py b/src/google/cloud/functions/__init__.py deleted file mode 100644 index 6913f02e..00000000 --- a/src/google/cloud/functions/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/google/cloud/functions/context.py b/src/google/cloud/functions/context.py deleted file mode 100644 index 665d8b29..00000000 --- a/src/google/cloud/functions/context.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Definition of types used by Cloud Functions in Python..""" - -from google.cloud.functions_v1.context import Context - -__all__ = ["Context"] diff --git a/src/google/cloud/functions_v1/__init__.py b/src/google/cloud/functions_v1/__init__.py deleted file mode 100644 index 6913f02e..00000000 --- a/src/google/cloud/functions_v1/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/google/cloud/functions_v1/context.py b/src/google/cloud/functions_v1/context.py deleted file mode 100644 index 12670867..00000000 --- a/src/google/cloud/functions_v1/context.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Definition of the context type used by Cloud Functions in Python.""" - - -class Context(object): - """Context passed to background functions.""" - - def __init__(self, eventId="", timestamp="", eventType="", resource=""): - self.event_id = eventId - self.timestamp = timestamp - self.event_type = eventType - self.resource = resource - - def __str__(self): - return "{event_id: %s, timestamp: %s, event_type: %s, resource: %s}" % ( - self.event_id, - self.timestamp, - self.event_type, - self.resource, - ) diff --git a/src/google/cloud/functions_v1beta2/__init__.py b/src/google/cloud/functions_v1beta2/__init__.py deleted file mode 100644 index 6913f02e..00000000 --- a/src/google/cloud/functions_v1beta2/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/google/cloud/functions_v1beta2/context.py b/src/google/cloud/functions_v1beta2/context.py deleted file mode 100644 index 12670867..00000000 --- a/src/google/cloud/functions_v1beta2/context.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Definition of the context type used by Cloud Functions in Python.""" - - -class Context(object): - """Context passed to background functions.""" - - def __init__(self, eventId="", timestamp="", eventType="", resource=""): - self.event_id = eventId - self.timestamp = timestamp - self.event_type = eventType - self.resource = resource - - def __str__(self): - return "{event_id: %s, timestamp: %s, event_type: %s, resource: %s}" % ( - self.event_id, - self.timestamp, - self.event_type, - self.resource, - ) diff --git a/tests/test_async.py b/tests/test_async.py index 3a13a4a1..edd321e1 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,5 +1,4 @@ import random -import pathlib import json import subprocess import threading @@ -10,8 +9,8 @@ from paho.mqtt import client as mqtt_client -from openfunction.function_context import FunctionContext -from openfunction.async_server import AsyncApp +from functions_framework.openfunction import FunctionContext +from functions_framework.openfunction import AsyncApp TEST_PAYLOAD = {"data": "hello world"} APP_ID="async.dapr" diff --git a/tests/test_binding.py b/tests/test_binding.py index 0f9219b0..4ae9f906 100644 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -6,7 +6,7 @@ import pytest from functions_framework import create_app -from openfunction.function_context import FunctionContext +from functions_framework.openfunction import FunctionContext TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" TEST_RESPONSE = "Hello world!" diff --git a/tests/test_convert.py b/tests/test_convert.py index 0d41d5ed..6128cd5d 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -22,7 +22,7 @@ from functions_framework import event_conversion from functions_framework.exceptions import EventConversionException -from google.cloud.functions.context import Context +from google_origin.cloud.functions.context import Context TEST_DATA_DIR = pathlib.Path(__file__).resolve().parent / "test_data" @@ -31,11 +31,11 @@ "context": { "eventId": "1215011316659232", "timestamp": "2020-05-18T12:13:19Z", - "eventType": "google.pubsub.topic.publish", + "eventType": "google_origin.pubsub.topic.publish", "resource": { "service": "pubsub.googleapis.com", "name": "projects/sample-project/topics/gcf-test", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, }, "data": { @@ -71,7 +71,7 @@ "id": "1215011316659232", "source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", "time": "2020-05-18T12:13:19Z", - "type": "google.cloud.pubsub.topic.v1.messagePublished", + "type": "google_origin.cloud.pubsub.topic.v1.messagePublished", "datacontenttype": "application/json", "data": { "message": { @@ -104,17 +104,17 @@ def raw_pubsub_request(): def marshalled_pubsub_request(): return { "data": { - "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "@type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", "data": "eyJmb28iOiJiYXIifQ==", "attributes": {"test": "123"}, }, "context": { "eventId": "1215011316659232", - "eventType": "google.pubsub.topic.publish", + "eventType": "google_origin.pubsub.topic.publish", "resource": { "name": "projects/sample-project/topics/gcf-test", "service": "pubsub.googleapis.com", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, "timestamp": "2021-04-17T07:21:18.249Z", }, @@ -277,7 +277,7 @@ def test_marshal_background_event_data_bad_request(): ) def test_split_resource(background_resource): context = Context( - eventType="google.storage.object.finalize", resource=background_resource + eventType="google_origin.storage.object.finalize", resource=background_resource ) service, resource, subject = event_conversion._split_resource(context) assert service == "storage.googleapis.com" @@ -320,7 +320,7 @@ def test_split_resource_no_resource_regex_match(): "type": "storage#object", } context = Context( - eventType="google.storage.object.finalize", resource=background_resource + eventType="google_origin.storage.object.finalize", resource=background_resource ) with pytest.raises(EventConversionException) as exc_info: event_conversion._split_resource(context) @@ -412,25 +412,25 @@ def test_pubsub_emulator_request_with_invalid_message( "ce_event_type, ce_source, expected_type, expected_resource", [ ( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/instances/my-project-id", - "providers/google.firebase.database/eventTypes/ref.write", + "providers/google_origin.firebase.database/eventTypes/ref.write", "projects/_/instances/my-project-id/my/subject", ), ( - "google.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", - "google.pubsub.topic.publish", + "google_origin.pubsub.topic.publish", { "service": "pubsub.googleapis.com", "name": "projects/sample-project/topics/gcf-test", - "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", }, ), ( - "google.cloud.storage.object.v1.finalized", + "google_origin.cloud.storage.object.v1.finalized", "//storage.googleapis.com/projects/_/buckets/some-bucket", - "google.storage.object.finalize", + "google_origin.storage.object.finalize", { "service": "storage.googleapis.com", "name": "projects/_/buckets/some-bucket/my/subject", @@ -438,19 +438,19 @@ def test_pubsub_emulator_request_with_invalid_message( }, ), ( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", "providers/firebase.auth/eventTypes/user.create", "projects/my-project-id", ), ( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/us-central1/instances/my-project-id", - "providers/google.firebase.database/eventTypes/ref.write", + "providers/google_origin.firebase.database/eventTypes/ref.write", "projects/_/instances/my-project-id/my/subject", ), ( - "google.cloud.firestore.document.v1.written", + "google_origin.cloud.firestore.document.v1.written", "//firestore.googleapis.com/projects/project-id/databases/(default)", "providers/cloud.firestore/eventTypes/document.write", "projects/project-id/databases/(default)/my/subject", @@ -480,7 +480,7 @@ def test_cloud_event_to_legacy_event_with_pubsub_message_payload( create_ce_headers, ): headers = create_ce_headers( - "google.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", ) data = { @@ -494,7 +494,7 @@ def test_cloud_event_to_legacy_event_with_pubsub_message_payload( (res_data, res_context) = event_conversion.cloud_event_to_background_event(req) - assert res_context.event_type == "google.pubsub.topic.publish" + assert res_context.event_type == "google_origin.pubsub.topic.publish" assert res_data == {"data": "fizzbuzz"} @@ -502,7 +502,7 @@ def test_cloud_event_to_legacy_event_with_firebase_auth_ce( create_ce_headers, ): headers = create_ce_headers( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", ) data = { @@ -530,7 +530,7 @@ def test_cloud_event_to_legacy_event_with_firebase_auth_ce_empty_metadata( create_ce_headers, ): headers = create_ce_headers( - "google.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", ) data = {"metadata": {}, "uid": "my-id"} @@ -569,7 +569,7 @@ def test_cloud_event_to_legacy_event_with_invalid_event( exception_message, ): headers = create_ce_headers( - "google.firebase.database.ref.v1.written", + "google_origin.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/instances/my-project-id", ) for k, v in header_overrides.items(): diff --git a/tests/test_functions.py b/tests/test_functions.py index 69931b44..4d72f1e8 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -573,14 +573,14 @@ def test_errorhandler(monkeypatch): @pytest.mark.parametrize( "event_type", [ - "google.cloud.firestore.document.v1.written", - "google.cloud.pubsub.topic.v1.messagePublished", - "google.cloud.storage.object.v1.finalized", - "google.cloud.storage.object.v1.metadataUpdated", - "google.firebase.analytics.log.v1.written", - "google.firebase.auth.user.v1.created", - "google.firebase.auth.user.v1.deleted", - "google.firebase.database.ref.v1.written", + "google_origin.cloud.firestore.document.v1.written", + "google_origin.cloud.pubsub.topic.v1.messagePublished", + "google_origin.cloud.storage.object.v1.finalized", + "google_origin.cloud.storage.object.v1.metadataUpdated", + "google_origin.firebase.analytics.log.v1.written", + "google_origin.firebase.auth.user.v1.created", + "google_origin.firebase.auth.user.v1.deleted", + "google_origin.firebase.database.ref.v1.written", ], ) def tests_cloud_to_background_event_client( @@ -598,7 +598,7 @@ def tests_cloud_to_background_event_client( def tests_cloud_to_background_event_client_invalid_source( background_event_client, create_ce_headers, tempfile_payload ): - headers = create_ce_headers("google.cloud.firestore.document.v1.written") + headers = create_ce_headers("google_origin.cloud.firestore.document.v1.written") headers["ce-source"] = "invalid" resp = background_event_client.post("/", headers=headers, json=tempfile_payload) diff --git a/tests/test_functions/background_load_error/main.py b/tests/test_functions/background_load_error/main.py index d9db3c71..4fef385b 100644 --- a/tests/test_functions/background_load_error/main.py +++ b/tests/test_functions/background_load_error/main.py @@ -23,7 +23,7 @@ def function(event, context): Args: event: The event data which triggered this background function. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ # Syntax error: an extra closing parenthesis in the line below. print('foo')) diff --git a/tests/test_functions/background_missing_dependency/main.py b/tests/test_functions/background_missing_dependency/main.py index 3050adfc..2d8685f3 100644 --- a/tests/test_functions/background_missing_dependency/main.py +++ b/tests/test_functions/background_missing_dependency/main.py @@ -25,7 +25,7 @@ def function(event, context): Args: event: The event data which triggered this background function. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ del event del context diff --git a/tests/test_functions/background_multiple_entry_points/main.py b/tests/test_functions/background_multiple_entry_points/main.py index 56b1a73f..4a2a85eb 100644 --- a/tests/test_functions/background_multiple_entry_points/main.py +++ b/tests/test_functions/background_multiple_entry_points/main.py @@ -45,7 +45,7 @@ def myFunctionFoo( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ fun("myFunctionFoo", event) @@ -62,7 +62,7 @@ def myFunctionBar( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ fun("myFunctionBar", event) diff --git a/tests/test_functions/background_trigger/main.py b/tests/test_functions/background_trigger/main.py index 842c4889..14996857 100644 --- a/tests/test_functions/background_trigger/main.py +++ b/tests/test_functions/background_trigger/main.py @@ -27,7 +27,7 @@ def function( event: The event data (as dictionary) which triggered this background function. Must contain entries for 'value' and 'filename' keys in the data dictionary. - context (google.cloud.functions.Context): The Cloud Functions event context. + context (google_origin.cloud.functions.Context): The Cloud Functions event context. """ filename = event["filename"] value = event["value"] diff --git a/tests/test_functions/cloud_events/converted_background_event.py b/tests/test_functions/cloud_events/converted_background_event.py index 9264251d..c6fab38b 100644 --- a/tests/test_functions/cloud_events/converted_background_event.py +++ b/tests/test_functions/cloud_events/converted_background_event.py @@ -30,7 +30,7 @@ def function(cloud_event): """ data = { "message": { - "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "@type": "type.googleapis.com/google_origin.pubsub.v1.PubsubMessage", "attributes": { "attr1": "attr1-value", }, @@ -45,7 +45,7 @@ def function(cloud_event): and cloud_event.data == data and cloud_event["source"] == "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test" - and cloud_event["type"] == "google.cloud.pubsub.topic.v1.messagePublished" + and cloud_event["type"] == "google_origin.cloud.pubsub.topic.v1.messagePublished" and cloud_event["time"] == "2020-09-29T11:32:00.000Z" ) From 6f38d3c83a5ebcb84ad4432d0a114a3e8f51fcb1 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 26 Jun 2023 21:46:23 +0800 Subject: [PATCH 2/9] add exception_handler and logger Signed-off-by: laminar --- src/functions_framework/exceptions.py | 10 +++++++ src/functions_framework/log.py | 41 +++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 src/functions_framework/log.py diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 671a28a4..1502d681 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -35,3 +35,13 @@ class MissingTargetException(FunctionsFrameworkException): class EventConversionException(FunctionsFrameworkException): pass + + +def exception_handler(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + return f"An error occurred: {e}" + + return wrapper diff --git a/src/functions_framework/log.py b/src/functions_framework/log.py new file mode 100644 index 00000000..352ecf4a --- /dev/null +++ b/src/functions_framework/log.py @@ -0,0 +1,41 @@ +import logging + + +def initialize_logger(name=None, level=logging.DEBUG): + if not name: + name = __name__ + _logger = logging.getLogger(name) + + # set logger level + _logger.setLevel(level) + + # create file handler + file_handler = logging.FileHandler("function.log") + file_handler.setLevel(level) + + # create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + # create formatter + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + # add formatter to handlers + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + # add handlers to logger + _logger.addHandler(file_handler) + _logger.addHandler(console_handler) + + return _logger + + +# initialize logger +logger = initialize_logger(__name__, logging.INFO) + +# test logger +logger.debug("This is a debug message") +logger.info("This is an info message") +logger.warning("This is a warning message") +logger.error("This is an error message") From 91ce19074aabd81b205c9366ba8e5641fc95bce2 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 26 Jun 2023 21:47:18 +0800 Subject: [PATCH 3/9] add user_context Signed-off-by: laminar --- .../context/user_context.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 src/functions_framework/context/user_context.py diff --git a/src/functions_framework/context/user_context.py b/src/functions_framework/context/user_context.py new file mode 100644 index 00000000..8d9eea42 --- /dev/null +++ b/src/functions_framework/context/user_context.py @@ -0,0 +1,79 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +import json + +from dapr.clients import DaprClient + +from functions_framework import constants +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.exceptions import exception_handler +from functions_framework.openfunction.function_out import FunctionOut + + +class UserContext(object): + """Context for user.""" + + def __int__(self, runtime_context: RuntimeContext = None, + binding_request=None, topic_event=None, logger=None): + self.runtime_context = runtime_context + self.logger = logger + self.out = FunctionOut(0, None, "", {}) + self.dapr_client = None + self.__binding_request = binding_request + self.__topic_event = topic_event + self.__init_dapr_client() + + def __init_dapr_client(self): + if not self.dapr_client: + self.dapr_client = DaprClient() + + def __init_logger(self): + if self.logger: + self.logger.name = __name__ + + def get_binding_request(self): + return copy.deepcopy(self.__binding_request) + + def get_topic_event(self): + return copy.deepcopy(self.__topic_event) + + @exception_handler + def send(self, output_name, data): + """Send data to specify output component. + Args: + data: Bytes or str to send. + output_name: A string of designated output name. Only send this output if designated. + Returns: + Response from dapr. + """ + outputs = self.runtime_context.get_outputs() + resp = None + + if not outputs: + raise Exception("No outputs found.") + + if output_name not in outputs: + raise Exception("No output named {} found.".format(output_name)) + + target = outputs[output_name] + if target.component_type.startswith(constants.DAPR_BINDING_TYPE): + resp = self.dapr_client.invoke_binding(target.component_name, target.operation, data, target.metadata) + elif target.component_type.startswith(constants.DAPR_PUBSUB_TYPE): + data = json.dumps(data) + resp = self.dapr_client.publish_event( + target.component_name, target.topic, data, + data_content_type=constants.DEFAULT_DATA_CONTENT_TYPE, publish_metadata=target.metadata) + + return resp From 9bf8c69dfa830933f3ae376dbb789a303020a296 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 26 Jun 2023 21:47:47 +0800 Subject: [PATCH 4/9] adjust class name Signed-off-by: laminar --- src/functions_framework/constants.py | 7 ++ .../context/function_context.py | 63 +++++++++----- .../context/runtime_context.py | 32 ++++--- .../openfunction/async_server.py | 32 ------- .../openfunction/dapr_output_middleware.py | 19 ----- .../openfunction/function_out.py | 49 +++++++++++ .../openfunction/function_runtime.py | 85 ------------------- src/functions_framework/runner.py | 33 ++++--- .../triggers/dapr_trigger/dapr.py | 29 +++++-- src/functions_framework/triggers/trigger.py | 2 +- 10 files changed, 156 insertions(+), 195 deletions(-) create mode 100644 src/functions_framework/constants.py delete mode 100644 src/functions_framework/openfunction/async_server.py delete mode 100644 src/functions_framework/openfunction/dapr_output_middleware.py create mode 100644 src/functions_framework/openfunction/function_out.py delete mode 100644 src/functions_framework/openfunction/function_runtime.py diff --git a/src/functions_framework/constants.py b/src/functions_framework/constants.py new file mode 100644 index 00000000..4d123d04 --- /dev/null +++ b/src/functions_framework/constants.py @@ -0,0 +1,7 @@ +DEFAULT_DAPR_APP_PORT = 50051 +DEFAULT_HTTP_APP_PORT = 8080 + +DAPR_BINDING_TYPE = "bindings" +DAPR_PUBSUB_TYPE = "pubsub" + +DEFAULT_DATA_CONTENT_TYPE = "application/json" diff --git a/src/functions_framework/context/function_context.py b/src/functions_framework/context/function_context.py index aaf87803..949e97c2 100644 --- a/src/functions_framework/context/function_context.py +++ b/src/functions_framework/context/function_context.py @@ -1,13 +1,22 @@ -OPEN_FUNC_BINDING = "bindings" -OPEN_FUNC_TOPIC = "pubsub" - -KNATIVE_RUNTIME_TYPE = "knative" -ASYNC_RUNTIME_TYPE = "async" +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from functions_framework.constants import DAPR_BINDING_TYPE, DAPR_PUBSUB_TYPE class FunctionContext(object): """OpenFunction's serving context.""" - + def __init__(self, name="", version="", dapr_triggers=None, http_trigger=None, inputs=None, outputs=None, states=None, pre_hooks=None, post_hooks=None, tracing=None, port=8080): @@ -29,7 +38,7 @@ def from_json(json_dct): version = json_dct.get('version') inputs_list = json_dct.get('inputs') outputs_list = json_dct.get('outputs') - dapr_triggers = json_dct.get('triggers', {}).get('dapr', []) + _dapr_triggers = json_dct.get('triggers', {}).get('dapr', []) http_trigger = json_dct.get('triggers', {}).get('http', None) states = json_dct.get('states', {}) pre_hooks = json_dct.get('pre_hooks', []) @@ -49,6 +58,11 @@ def from_json(json_dct): for k, v in outputs_list.items(): output = Component.from_json(v) outputs[k] = output + + dapr_triggers = [DaprTrigger] + for trigger in _dapr_triggers: + dapr_triggers.append(DaprTrigger.from_json(trigger)) + return FunctionContext(name, version, dapr_triggers, http_trigger, inputs, outputs, states, pre_hooks, post_hooks, tracing) @@ -56,10 +70,10 @@ def from_json(json_dct): class Component(object): """Components for inputs and outputs.""" - def __init__(self, uri="", componentName="", componentType="", metadata=None, operation=""): - self.uri = uri - self.component_name = componentName - self.component_type = componentType + def __init__(self, component_name="", component_type="", topic="", metadata=None, operation=""): + self.topic = topic + self.component_name = component_name + self.component_type = component_type self.metadata = metadata self.operation = operation @@ -67,40 +81,47 @@ def get_type(self): type_split = self.component_type.split(".") if len(type_split) > 1: t = type_split[0] - if t == OPEN_FUNC_BINDING or t == OPEN_FUNC_TOPIC: - return t + if t == DAPR_BINDING_TYPE or t == DAPR_PUBSUB_TYPE: + return t return "" def __str__(self): - return "{uri: %s, component_name: %s, component_type: %s, operation: %s, metadata: %s}" % ( - self.uri, + return "{component_name: %s, component_type: %s, topic: %s, metadata: %s, operation: %s}" % ( self.component_name, self.component_type, - self.operation, - self.metadata + self.topic, + self.metadata, + self.operation ) @staticmethod def from_json(json_dct): - uri = json_dct.get('uri', '') + topic = json_dct.get('topic', '') component_name = json_dct.get('componentName', '') metadata = json_dct.get('metadata') component_type = json_dct.get('componentType', '') operation = json_dct.get('operation', '') - return Component(uri, component_name, component_type, metadata, operation) + return Component(component_name, component_type, topic, metadata, operation) -class OpenFunctionTrigger(object): +class DaprTrigger(object): def __init__(self, name, component_type, topic): self.name = name self.component_type = component_type self.topic = topic + def __str__(self): + return "{name: %s, component_type: %s, topic: %s}" % ( + self.name, + self.component_type, + self.topic + ) + @staticmethod def from_json(json_dct): name = json_dct.get('name', '') component_type = json_dct.get('type', '') topic = json_dct.get('topic') - return OpenFunctionTrigger(name, component_type, topic) + return DaprTrigger(name, component_type, topic) diff --git a/src/functions_framework/context/runtime_context.py b/src/functions_framework/context/runtime_context.py index ccd00f1e..504daa94 100644 --- a/src/functions_framework/context/runtime_context.py +++ b/src/functions_framework/context/runtime_context.py @@ -11,29 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from functions_framework.context.function_context import FunctionContext, OpenFunctionTrigger +from functions_framework.context.function_context import FunctionContext, Component class RuntimeContext(object): """Context for runtime.""" - def __int__(self, context: FunctionContext = None): + def __int__(self, context: FunctionContext = None, logger=None): self.context = context - self.request = None + self.logger = logger - def has_dapr_trigger(self): - """Check if the function has dapr trigger.""" - return self.context and self.context.dapr_triggers + def __init_logger(self): + if self.logger: + self.logger.name = __name__ def has_http_trigger(self): """Check if the function has http trigger.""" return self.context and self.context.http_trigger - def get_dapr_triggers(self) -> [OpenFunctionTrigger]: - """Get dapr triggers.""" - triggers = [] - for trigger in self.context.dapr_triggers: - triggers.append( - OpenFunctionTrigger(name=trigger.get('name'), topic=trigger.get('topic'), component_type=trigger.get('type')) - ) - return triggers + def get_dapr_triggers(self): + """Get dapr trigger.""" + if self.context: + return self.context.dapr_triggers + else: + return [] + + def get_outputs(self) -> [Component]: + if self.context and self.context.outputs: + return self.context.outputs + else: + return [Component] diff --git a/src/functions_framework/openfunction/async_server.py b/src/functions_framework/openfunction/async_server.py deleted file mode 100644 index 4cc6dfb4..00000000 --- a/src/functions_framework/openfunction/async_server.py +++ /dev/null @@ -1,32 +0,0 @@ -from dapr.aio.grpc import App, BindingRequest -from cloudevents.sdk.event import v1 -from functions_framework.context.function_context import OPEN_FUNC_BINDING, OPEN_FUNC_TOPIC -from functions_framework.openfunction.function_runtime import OpenFunctionRuntime - -class AsyncApp(object): - """Init async server with dapr server.""" - - def __init__(self, func_context): - """Inits async server. - Args: - func_context: OpenFunction context - """ - self.ctx = OpenFunctionRuntime.parse(func_context) - self.app = App() - - - def bind(self, function): - """Bind user function with input binding/subscription. - Args: - function: user function - """ - for input in self.ctx.inputs.values(): - type = input.get_type() - if type == OPEN_FUNC_BINDING: - @self.app.binding(input.component_name) - def binding(request: BindingRequest): - function(self.ctx, request.data) - elif type == OPEN_FUNC_TOPIC: - @self.app.subscribe(pubsub_name=input.component_name, topic=input.uri, metadata=input.metadata) - def mytopic(event: v1.Event) -> None: - function(self.ctx, bytes(event.data)) \ No newline at end of file diff --git a/src/functions_framework/openfunction/dapr_output_middleware.py b/src/functions_framework/openfunction/dapr_output_middleware.py deleted file mode 100644 index a65ff63e..00000000 --- a/src/functions_framework/openfunction/dapr_output_middleware.py +++ /dev/null @@ -1,19 +0,0 @@ -import logging - -from functions_framework.openfunction.function_runtime import OpenFunctionRuntime - -def dapr_output_middleware(context): - """Flask middleware for output binding.""" - def dapr_output_middleware(response): - if not context or not context.outputs or not context.is_runtime_knative(): - return response - - runtime = OpenFunctionRuntime.parse(context) - resp = runtime.send(response.get_data(True)) - - for key, value in resp.items(): - logging.debug("Dapr result for %s: %s", key, value.text()) - - return response - - return dapr_output_middleware \ No newline at end of file diff --git a/src/functions_framework/openfunction/function_out.py b/src/functions_framework/openfunction/function_out.py new file mode 100644 index 00000000..fa833879 --- /dev/null +++ b/src/functions_framework/openfunction/function_out.py @@ -0,0 +1,49 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +class FunctionOut: + def __init__(self, code: int, error, data, metadata): + self.__code = code + self.__error = error + self.__data = data + self.__metadata = metadata + + def __str__(self): + return f"FunctionOut(code={self.__code}, error={self.__error}, data={self.__data}, metadata={self.__metadata})" + + def __repr__(self): + return str(self) + + def set_code(self, code: int): + self.__code = code + + def get_code(self) -> int: + return self.__code + + def set_error(self, error): + self.__error = error + + def get_error(self): + return self.__error + + def set_data(self, data): + self.__data = data + + def get_data(self): + return self.__data + + def set_metadata(self, metadata): + self.__metadata = metadata + + def get_metadata(self): + return self.__metadata diff --git a/src/functions_framework/openfunction/function_runtime.py b/src/functions_framework/openfunction/function_runtime.py deleted file mode 100644 index eb5ec9e4..00000000 --- a/src/functions_framework/openfunction/function_runtime.py +++ /dev/null @@ -1,85 +0,0 @@ -from abc import abstractmethod -import os - -from dapr.clients import DaprGrpcClient -from dapr.conf import settings - -from functions_framework.context.function_context import FunctionContext - -DAPR_GRPC_PORT = "DAPR_GRPC_PORT" -OPEN_FUNC_BINDING = "bindings" -OPEN_FUNC_TOPIC = "pubsub" - - -class OpenFunctionRuntime(object): - """OpenFunction Runtime.""" - - def __init__(self, context=None): - """Inits OpenFunction Runtime. - Args: - context: OpenFunction context - """ - self.context = context - - def __getattribute__(self, item): - try: - target = object.__getattribute__(self, item) - return target - except AttributeError: - target = object.__getattribute__(self, "context") - return getattr(target, item) - - def set_dapr_grpc_port(self): - port = os.environ.get(DAPR_GRPC_PORT) - if port: - settings.DAPR_GRPC_PORT = port - - @staticmethod - def parse(context: FunctionContext): - return DaprRuntime(context) - - @abstractmethod - def send(self, data, output): - """send data to certain ouput binding or pubsub topic""" - - -class DaprRuntime(OpenFunctionRuntime): - """Dapr runtime derived from OpenFunctionRuntime.""" - - def __init__(self, context=None): - """Inits Dapr Runtime. - Args: - context: OpenFunction context - """ - super().__init__(context) - super().set_dapr_grpc_port() - - self.client = DaprGrpcClient() - - def send(self, data, output=None): - """Inits Dapr Runtime. - Args: - data: Bytes or str to send. - output: A string of designated output name. Only send this output if designated. - Returns: - A dict mapping keys to the corresponding dapr response. - """ - outputs = self.context.outputs - filtered_outputs = {} - responses = {} - - if output and output in outputs: - filtered_outputs[output] = outputs[output] - else: - filtered_outputs = outputs - - for key, value in filtered_outputs.items(): - type = value.get_type() - if type == OPEN_FUNC_BINDING: - resp = self.client.invoke_binding(value.component_name, value.operation, data, value.metadata) - elif type == OPEN_FUNC_TOPIC: - resp = self.client.publish_event(value.component_name, value.uri, data, value.metadata) - responses[key] = resp - - return responses - diff --git a/src/functions_framework/runner.py b/src/functions_framework/runner.py index 22c37cd8..6ff94e72 100644 --- a/src/functions_framework/runner.py +++ b/src/functions_framework/runner.py @@ -1,11 +1,13 @@ +import logging import os from dapr.ext.grpc import App from functions_framework import _function_registry +from functions_framework import log from functions_framework.exceptions import MissingSourceException from functions_framework.context.function_context import FunctionContext from functions_framework.context.runtime_context import RuntimeContext -from functions_framework.triggers.dapr_trigger.dapr import DaprTrigger +from functions_framework.triggers.dapr_trigger.dapr import DaprTriggerHandler class Runner: @@ -17,6 +19,11 @@ def __init__(self, context: FunctionContext, target=None, source=None, self.user_function = None self.request = None self.app = App() + self.host = host + self.port = port + self.debug = debug + self.dry_run = dry_run + self.logger = None self.load_user_function() def load_user_function(self): @@ -35,21 +42,19 @@ def load_user_function(self): self.user_function = _function_registry.get_user_function(_source, source_module, _target) - def invoke_user_function(self, request): - self.request = request - if self.user_function: - output_data = self.user_function(self) - return output_data - else: - raise ValueError("User function is not loaded.") + def init_logger(self): + level = logging.INFO + if self.debug: + level = logging.DEBUG + self.logger = log.initialize_logger(__name__, level) def run(self): - runtime_context = RuntimeContext() - runtime_context.context = self.context + # convert to runtime context + runtime_context = RuntimeContext().__int__(self.context, self.logger) - if runtime_context.has_dapr_trigger(): - _triggers = runtime_context.get_dapr_triggers() - dapr_trigger = DaprTrigger(self.context.port, self.app, _triggers, self.user_function) + _triggers = runtime_context.get_dapr_triggers() + if _triggers: + dapr_trigger = DaprTriggerHandler(self.context.port, self.app, _triggers, self.user_function) dapr_trigger.start(runtime_context) - self.app.run(50055) + self.app.run(self.port) diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py index 160bf064..624f9798 100644 --- a/src/functions_framework/triggers/dapr_trigger/dapr.py +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -1,30 +1,41 @@ import logging from copy import deepcopy +from cloudevents.sdk.event import v1 -from functions_framework.context.function_context import OpenFunctionTrigger +from functions_framework.context.function_context import DaprTrigger from functions_framework.context.runtime_context import RuntimeContext -from functions_framework.triggers.trigger import Trigger +from functions_framework.context.user_context import UserContext +from functions_framework.triggers.trigger import TriggerHandler from dapr.ext.grpc import App, BindingRequest -class DaprTrigger(Trigger): - def __init__(self, port, app: App = None, triggers: [OpenFunctionTrigger] = None, user_function=None): +class DaprTriggerHandler(TriggerHandler): + def __init__(self, port, app: App = None, triggers: [DaprTrigger] = None, user_function=None): self.port = port self.triggers = triggers self.app = app self.user_function = user_function - def start(self, context: RuntimeContext): + def start(self, context: RuntimeContext, logger=None): if not self.triggers: - raise Exception("No triggers specified for DaprTrigger") + raise Exception("No triggers specified for DaprTriggerHandler") for trigger in self.triggers: if trigger.component_type.startswith("bindings"): @self.app.binding(trigger.name) def binding_handler(request: BindingRequest): - ctx = deepcopy(context) - ctx.request = request + rt_ctx = deepcopy(context) + user_ctx = UserContext().__int__(runtime_context=rt_ctx, binding_request=request, logger=logger) logging.basicConfig(level=logging.INFO) logging.info('Received Message : ' + request.text()) - self.user_function(ctx) + self.user_function(user_ctx) + + if trigger.component_type.startswith("pubsub"): + @self.app.subscribe(pubsub_name=trigger.name, topic=trigger.topic) + def topic_handler(event: v1.Event): + rt_ctx = deepcopy(context) + user_ctx = UserContext().__int__(runtime_context=rt_ctx, topic_event=event, logger=logger) + logging.basicConfig(level=logging.INFO) + logging.info('Received Message : ' + event.data.__str__()) + self.user_function(user_ctx) diff --git a/src/functions_framework/triggers/trigger.py b/src/functions_framework/triggers/trigger.py index 3384f98b..f9771f06 100644 --- a/src/functions_framework/triggers/trigger.py +++ b/src/functions_framework/triggers/trigger.py @@ -3,7 +3,7 @@ from functions_framework.context.runtime_context import RuntimeContext -class Trigger(ABC): +class TriggerHandler(ABC): @abstractmethod def start(self, context: RuntimeContext): pass From a21ba7d4e7627bd866072a94b5e619d4c51a5d98 Mon Sep 17 00:00:00 2001 From: laminar Date: Wed, 28 Jun 2023 15:31:10 +0800 Subject: [PATCH 5/9] add license header Signed-off-by: laminar --- src/functions_framework/__init__.py | 5 ++--- src/functions_framework/__main__.py | 3 +-- src/functions_framework/_cli.py | 5 +---- src/functions_framework/_function_registry.py | 7 +++--- src/functions_framework/constants.py | 13 +++++++++++ src/functions_framework/context/__init__.py | 13 +++++++++++ src/functions_framework/exceptions.py | 4 +--- src/functions_framework/log.py | 13 +++++++++++ .../openfunction/__init__.py | 13 +++++++++++ src/functions_framework/runner.py | 19 +++++++++++++--- src/functions_framework/triggers/__init__.py | 13 +++++++++++ .../triggers/dapr_trigger/__init__.py | 13 +++++++++++ .../triggers/dapr_trigger/dapr.py | 22 +++++++++++++++---- src/functions_framework/triggers/trigger.py | 13 +++++++++++ 14 files changed, 133 insertions(+), 23 deletions(-) diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index 9bd258b8..98335fa7 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import io import json import logging @@ -20,13 +19,13 @@ import flask from cloudevents.http import from_http -from functions_framework.runner import Runner from functions_framework.exceptions import ( EventConversionException, FunctionsFrameworkException, ) from functions_framework.openfunction import dapr_output_middleware +from functions_framework.runner import Runner MAX_CONTENT_LENGTH = 10 * 1024 * 1024 diff --git a/src/functions_framework/__main__.py b/src/functions_framework/__main__.py index 5f2e710c..3676cfdd 100644 --- a/src/functions_framework/__main__.py +++ b/src/functions_framework/__main__.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from functions_framework._cli import _cli _cli(prog_name="python -m functions_framework") diff --git a/src/functions_framework/_cli.py b/src/functions_framework/_cli.py index c153ff26..fa745a45 100644 --- a/src/functions_framework/_cli.py +++ b/src/functions_framework/_cli.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os - import click from functions_framework import _function_registry diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index 48bddf09..36a9343b 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -1,4 +1,4 @@ -# Copyright 2023 OpenFunction +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,19 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import importlib.util +import json import os import sys import types -import json +from functions_framework.context.function_context import FunctionContext from functions_framework.exceptions import ( InvalidConfigurationException, InvalidTargetTypeException, MissingTargetException, ) -from functions_framework.context.function_context import FunctionContext - DEFAULT_SOURCE = os.path.realpath("./main.py") FUNCTION_SIGNATURE_TYPE = "FUNCTION_SIGNATURE_TYPE" diff --git a/src/functions_framework/constants.py b/src/functions_framework/constants.py index 4d123d04..df946087 100644 --- a/src/functions_framework/constants.py +++ b/src/functions_framework/constants.py @@ -1,3 +1,16 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. DEFAULT_DAPR_APP_PORT = 50051 DEFAULT_HTTP_APP_PORT = 8080 diff --git a/src/functions_framework/context/__init__.py b/src/functions_framework/context/__init__.py index e69de29b..136f540a 100644 --- a/src/functions_framework/context/__init__.py +++ b/src/functions_framework/context/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 1502d681..78a88805 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2023 The OpenFunction Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - class FunctionsFrameworkException(Exception): pass diff --git a/src/functions_framework/log.py b/src/functions_framework/log.py index 352ecf4a..9b9f4b40 100644 --- a/src/functions_framework/log.py +++ b/src/functions_framework/log.py @@ -1,3 +1,16 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging diff --git a/src/functions_framework/openfunction/__init__.py b/src/functions_framework/openfunction/__init__.py index e69de29b..136f540a 100644 --- a/src/functions_framework/openfunction/__init__.py +++ b/src/functions_framework/openfunction/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/functions_framework/runner.py b/src/functions_framework/runner.py index 6ff94e72..112ba2c4 100644 --- a/src/functions_framework/runner.py +++ b/src/functions_framework/runner.py @@ -1,12 +1,25 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os + from dapr.ext.grpc import App -from functions_framework import _function_registry -from functions_framework import log -from functions_framework.exceptions import MissingSourceException +from functions_framework import _function_registry, log from functions_framework.context.function_context import FunctionContext from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.exceptions import MissingSourceException from functions_framework.triggers.dapr_trigger.dapr import DaprTriggerHandler diff --git a/src/functions_framework/triggers/__init__.py b/src/functions_framework/triggers/__init__.py index e69de29b..136f540a 100644 --- a/src/functions_framework/triggers/__init__.py +++ b/src/functions_framework/triggers/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/functions_framework/triggers/dapr_trigger/__init__.py b/src/functions_framework/triggers/dapr_trigger/__init__.py index e69de29b..136f540a 100644 --- a/src/functions_framework/triggers/dapr_trigger/__init__.py +++ b/src/functions_framework/triggers/dapr_trigger/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py index 624f9798..2370a987 100644 --- a/src/functions_framework/triggers/dapr_trigger/dapr.py +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -1,14 +1,28 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging + from copy import deepcopy + from cloudevents.sdk.event import v1 +from dapr.ext.grpc import App, BindingRequest from functions_framework.context.function_context import DaprTrigger from functions_framework.context.runtime_context import RuntimeContext from functions_framework.context.user_context import UserContext from functions_framework.triggers.trigger import TriggerHandler -from dapr.ext.grpc import App, BindingRequest - class DaprTriggerHandler(TriggerHandler): def __init__(self, port, app: App = None, triggers: [DaprTrigger] = None, user_function=None): @@ -27,7 +41,7 @@ def start(self, context: RuntimeContext, logger=None): def binding_handler(request: BindingRequest): rt_ctx = deepcopy(context) user_ctx = UserContext().__int__(runtime_context=rt_ctx, binding_request=request, logger=logger) - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) logging.info('Received Message : ' + request.text()) self.user_function(user_ctx) @@ -36,6 +50,6 @@ def binding_handler(request: BindingRequest): def topic_handler(event: v1.Event): rt_ctx = deepcopy(context) user_ctx = UserContext().__int__(runtime_context=rt_ctx, topic_event=event, logger=logger) - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) logging.info('Received Message : ' + event.data.__str__()) self.user_function(user_ctx) diff --git a/src/functions_framework/triggers/trigger.py b/src/functions_framework/triggers/trigger.py index f9771f06..8edf9076 100644 --- a/src/functions_framework/triggers/trigger.py +++ b/src/functions_framework/triggers/trigger.py @@ -1,3 +1,16 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from abc import ABC, abstractmethod from functions_framework.context.runtime_context import RuntimeContext From 5dd0c09bba537b67504d0274b491dfcafe3fca45 Mon Sep 17 00:00:00 2001 From: laminar Date: Thu, 29 Jun 2023 19:02:19 +0800 Subject: [PATCH 6/9] Add check for function signature Signed-off-by: laminar --- src/functions_framework/_function_registry.py | 20 +++++++++++++++++++ src/functions_framework/exceptions.py | 4 ++++ 2 files changed, 24 insertions(+) diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index 36a9343b..c25ad413 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -12,14 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import importlib.util +import inspect import json import os import sys import types from functions_framework.context.function_context import FunctionContext +from functions_framework.context.user_context import UserContext from functions_framework.exceptions import ( InvalidConfigurationException, + InvalidFunctionSignatureException, InvalidTargetTypeException, MissingTargetException, ) @@ -37,6 +40,14 @@ REGISTRY_MAP = {} +# Default function signature rule. +def __function_signature_rule__(context: UserContext): + pass + + +FUNCTION_SIGNATURE_RULE = inspect.Signature(__function_signature_rule__()) + + def get_user_function(source, source_module, target): """Returns user function, raises exception for invalid function.""" # Extract the target function from the source file @@ -55,6 +66,15 @@ def get_user_function(source, source_module, target): source=source, target=target, target_type=type(function) ) ) + + if FUNCTION_SIGNATURE_RULE != inspect.Signature(function): + raise InvalidFunctionSignatureException( + "The function defined in file {source} as {target} needs to be of " + "function signature {signature}, but got {target_signature}".format( + source=source, target=target, signature=FUNCTION_SIGNATURE_RULE, + target_signature=inspect.Signature(function)) + ) + return function diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 78a88805..95ea4d0b 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -35,6 +35,10 @@ class EventConversionException(FunctionsFrameworkException): pass +class InvalidFunctionSignatureException(FunctionsFrameworkException): + pass + + def exception_handler(func): def wrapper(*args, **kwargs): try: From 80577452ccf6561d943ecbf078838390b86c2f15 Mon Sep 17 00:00:00 2001 From: laminar Date: Fri, 30 Jun 2023 20:11:04 +0800 Subject: [PATCH 7/9] adjust http trigger Signed-off-by: laminar --- setup.py | 4 +- src/functions_framework/__init__.py | 123 ---------- src/functions_framework/_cli.py | 24 +- src/functions_framework/_function_registry.py | 6 +- .../context/function_context.py | 25 ++ .../context/runtime_context.py | 13 +- .../context/user_context.py | 8 +- src/functions_framework/log.py | 6 - src/functions_framework/runner.py | 16 +- .../triggers/dapr_trigger/dapr.py | 11 +- .../triggers/http_trigger/__init__.py | 217 ++++++++++++++++++ .../triggers/http_trigger/_http/__init__.py | 43 ++++ .../triggers/http_trigger/_http/flask.py | 25 ++ .../triggers/http_trigger/_http/gunicorn.py | 37 +++ .../triggers/http_trigger/http.py | 46 ++++ 15 files changed, 435 insertions(+), 169 deletions(-) create mode 100644 src/functions_framework/triggers/http_trigger/_http/__init__.py create mode 100644 src/functions_framework/triggers/http_trigger/_http/flask.py create mode 100644 src/functions_framework/triggers/http_trigger/_http/gunicorn.py create mode 100644 src/functions_framework/triggers/http_trigger/http.py diff --git a/setup.py b/setup.py index dcd0f46c..566a2425 100644 --- a/setup.py +++ b/setup.py @@ -52,11 +52,13 @@ "grpcio==1.54.2", "flask>=1.0,<3.0", "click>=7.0,<9.0", - # "watchdog>=1.0.0,<2.0.0", + "uvicorn>=0.22.0", "gunicorn>=19.2.0,<21.0; platform_system!='Windows'", "cloudevents>=1.2.0,<2.0.0", "dapr>=1.10.0", "aiohttp==3.8.4", + "dapr-ext-grpc>=1.10.0", + "dapr-ext-fastapi>=1.10.0" ], entry_points={ "console_scripts": [ diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index 98335fa7..136f540a 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -11,126 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import io -import json -import logging -import sys - -import flask - -from cloudevents.http import from_http - -from functions_framework.exceptions import ( - EventConversionException, - FunctionsFrameworkException, -) -from functions_framework.openfunction import dapr_output_middleware -from functions_framework.runner import Runner - -MAX_CONTENT_LENGTH = 10 * 1024 * 1024 - -_FUNCTION_STATUS_HEADER_FIELD = "X-Google-Status" -_CRASH = "crash" - -_CLOUDEVENT_MIME_TYPE = "application/cloudevents+json" - - -class _LoggingHandler(io.TextIOWrapper): - """Logging replacement for stdout and stderr in GCF Python 3.7.""" - - def __init__(self, level, stderr=sys.stderr): - io.TextIOWrapper.__init__(self, io.StringIO(), encoding=stderr.encoding) - self.level = level - self.stderr = stderr - - def write(self, out): - payload = dict(severity=self.level, message=out.rstrip("\n")) - return self.stderr.write(json.dumps(payload) + "\n") - - -def setup_logging(): - logging.getLogger().setLevel(logging.INFO) - info_handler = logging.StreamHandler(sys.stdout) - info_handler.setLevel(logging.NOTSET) - info_handler.addFilter(lambda record: record.levelno <= logging.INFO) - logging.getLogger().addHandler(info_handler) - - warn_handler = logging.StreamHandler(sys.stderr) - warn_handler.setLevel(logging.WARNING) - logging.getLogger().addHandler(warn_handler) - - -def setup_logging_level(debug): - if debug: - logging.getLogger().setLevel(logging.DEBUG) - - -def _http_view_func_wrapper(function, request): - def view_func(path): - return function(request._get_current_object()) - - return view_func - - -def _run_cloud_event(function, request): - data = request.get_data() - event = from_http(request.headers, data) - function(event) - - - -def read_request(response): - """ - Force the framework to read the entire request before responding, to avoid - connection errors when returning prematurely. - """ - - flask.request.get_data() - return response - - -def crash_handler(e): - """ - Return crash header to allow logging 'crash' message in logs. - """ - return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} - - -class LazyWSGIApp: - """ - Wrap the WSGI app in a lazily initialized wrapper to prevent initialization - at import-time - """ - - def __init__(self, target=None, source=None, signature_type=None, func_context=None, debug=False): - # Support HTTP frameworks which support WSGI callables. - # Note: this ability is currently broken in Gunicorn 20.0, and - # environment variables should be used for configuration instead: - # https://github.com/benoitc/gunicorn/issues/2159 - self.target = target - self.source = source - self.signature_type = signature_type - self.func_context = func_context - self.debug = debug - - # Placeholder for the app which will be initialized on first call - self.app = None - - def __call__(self, *args, **kwargs): - if not self.app: - self.app = Runner(self.target, self.source, self.signature_type, self.func_context, self.debug) - return self.app(*args, **kwargs) - - -app = LazyWSGIApp() - - -class DummyErrorHandler: - def __init__(self): - pass - - def __call__(self, *args, **kwargs): - return self - - -errorhandler = DummyErrorHandler() diff --git a/src/functions_framework/_cli.py b/src/functions_framework/_cli.py index fa745a45..0dfc3795 100644 --- a/src/functions_framework/_cli.py +++ b/src/functions_framework/_cli.py @@ -20,32 +20,14 @@ @click.command() @click.option("--target", envvar="FUNCTION_TARGET", type=click.STRING, required=True) @click.option("--source", envvar="FUNCTION_SOURCE", type=click.Path(), default=None) -# @click.option( -# "--signature-type", -# envvar="FUNCTION_SIGNATURE_TYPE", -# type=click.Choice(["http", "event", "cloudevent"]), -# default="http", -# ) @click.option("--host", envvar="HOST", type=click.STRING, default="0.0.0.0") @click.option("--port", envvar="PORT", type=click.INT, default=8080) @click.option("--debug", envvar="DEBUG", is_flag=True) @click.option("--dry-run", envvar="DRY_RUN", is_flag=True) def _cli(target, source, host, port, debug, dry_run): - context = _function_registry.get_openfunction_context(None) - - # # determine if async or knative - # if context and context.is_runtime_async(): - # app = create_async_app(target, source, context, debug) - # if dry_run: - # run_dry(target, host, port) - # else: - # app.run(context.port) - # else: - # app = create_app(target, source, signature_type, context, debug) - # if dry_run: - # run_dry(target, host, port) - # else: - # create_server(app, debug).run(host, port) + # fetch the context + context = _function_registry.get_openfunction_context('') + runner = Runner(context, target, source, host, port, debug, dry_run) runner.run() diff --git a/src/functions_framework/_function_registry.py b/src/functions_framework/_function_registry.py index c25ad413..4759b78f 100644 --- a/src/functions_framework/_function_registry.py +++ b/src/functions_framework/_function_registry.py @@ -45,7 +45,7 @@ def __function_signature_rule__(context: UserContext): pass -FUNCTION_SIGNATURE_RULE = inspect.Signature(__function_signature_rule__()) +FUNCTION_SIGNATURE_RULE = inspect.signature(__function_signature_rule__) def get_user_function(source, source_module, target): @@ -67,12 +67,12 @@ def get_user_function(source, source_module, target): ) ) - if FUNCTION_SIGNATURE_RULE != inspect.Signature(function): + if FUNCTION_SIGNATURE_RULE != inspect.signature(function): raise InvalidFunctionSignatureException( "The function defined in file {source} as {target} needs to be of " "function signature {signature}, but got {target_signature}".format( source=source, target=target, signature=FUNCTION_SIGNATURE_RULE, - target_signature=inspect.Signature(function)) + target_signature=inspect.signature(function)) ) return function diff --git a/src/functions_framework/context/function_context.py b/src/functions_framework/context/function_context.py index 949e97c2..f47e913b 100644 --- a/src/functions_framework/context/function_context.py +++ b/src/functions_framework/context/function_context.py @@ -63,6 +63,8 @@ def from_json(json_dct): for trigger in _dapr_triggers: dapr_triggers.append(DaprTrigger.from_json(trigger)) + http_trigger = HTTPRoute.from_json(http_trigger) + return FunctionContext(name, version, dapr_triggers, http_trigger, inputs, outputs, states, pre_hooks, post_hooks, tracing) @@ -105,6 +107,29 @@ def from_json(json_dct): return Component(component_name, component_type, topic, metadata, operation) +class HTTPRoute(object): + """HTTP route.""" + + def __init__(self, port="", hostname="", rules=None): + self.port = port + self.hostname = hostname + self.rules = rules + + def __str__(self): + return "{port: %s, hostname: %s, rules: %s}" % ( + self.port, + self.hostname, + self.rules + ) + + @staticmethod + def from_json(json_dct): + port = json_dct.get('port', '') + hostnames = json_dct.get('route', {}).get('hostnames', '') + rules = json_dct.get('route', {}).get('rules', []) + return HTTPRoute(port, hostnames, rules) + + class DaprTrigger(object): def __init__(self, name, component_type, topic): diff --git a/src/functions_framework/context/runtime_context.py b/src/functions_framework/context/runtime_context.py index 504daa94..cdcec4d0 100644 --- a/src/functions_framework/context/runtime_context.py +++ b/src/functions_framework/context/runtime_context.py @@ -11,13 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from functions_framework.context.function_context import FunctionContext, Component +from functions_framework.context.function_context import Component, FunctionContext -class RuntimeContext(object): +class RuntimeContext: """Context for runtime.""" - def __int__(self, context: FunctionContext = None, logger=None): + def __init__(self, context: FunctionContext = None, logger=None): self.context = context self.logger = logger @@ -36,6 +36,13 @@ def get_dapr_triggers(self): else: return [] + def get_http_trigger(self): + """Get http trigger.""" + if self.context: + return self.context.http_trigger + else: + return None + def get_outputs(self) -> [Component]: if self.context and self.context.outputs: return self.context.outputs diff --git a/src/functions_framework/context/user_context.py b/src/functions_framework/context/user_context.py index 8d9eea42..cb316373 100644 --- a/src/functions_framework/context/user_context.py +++ b/src/functions_framework/context/user_context.py @@ -25,14 +25,15 @@ class UserContext(object): """Context for user.""" - def __int__(self, runtime_context: RuntimeContext = None, - binding_request=None, topic_event=None, logger=None): + def __init__(self, runtime_context: RuntimeContext = None, + binding_request=None, topic_event=None, http_request=None, logger=None): self.runtime_context = runtime_context self.logger = logger self.out = FunctionOut(0, None, "", {}) self.dapr_client = None self.__binding_request = binding_request self.__topic_event = topic_event + self.__http_request = http_request self.__init_dapr_client() def __init_dapr_client(self): @@ -49,6 +50,9 @@ def get_binding_request(self): def get_topic_event(self): return copy.deepcopy(self.__topic_event) + def get_http_request(self): + return self.__http_request + @exception_handler def send(self, output_name, data): """Send data to specify output component. diff --git a/src/functions_framework/log.py b/src/functions_framework/log.py index 9b9f4b40..477e0a57 100644 --- a/src/functions_framework/log.py +++ b/src/functions_framework/log.py @@ -46,9 +46,3 @@ def initialize_logger(name=None, level=logging.DEBUG): # initialize logger logger = initialize_logger(__name__, logging.INFO) - -# test logger -logger.debug("This is a debug message") -logger.info("This is an info message") -logger.warning("This is a warning message") -logger.error("This is an error message") diff --git a/src/functions_framework/runner.py b/src/functions_framework/runner.py index 112ba2c4..6ad12ea4 100644 --- a/src/functions_framework/runner.py +++ b/src/functions_framework/runner.py @@ -21,6 +21,7 @@ from functions_framework.context.runtime_context import RuntimeContext from functions_framework.exceptions import MissingSourceException from functions_framework.triggers.dapr_trigger.dapr import DaprTriggerHandler +from functions_framework.triggers.http_trigger.http import HTTPTriggerHandler class Runner: @@ -31,13 +32,13 @@ def __init__(self, context: FunctionContext, target=None, source=None, self.context = context self.user_function = None self.request = None - self.app = App() self.host = host self.port = port self.debug = debug self.dry_run = dry_run self.logger = None self.load_user_function() + self.init_logger() def load_user_function(self): _target = _function_registry.get_function_target(self.target) @@ -63,11 +64,14 @@ def init_logger(self): def run(self): # convert to runtime context - runtime_context = RuntimeContext().__int__(self.context, self.logger) + runtime_context = RuntimeContext(self.context, self.logger) + + _trigger = runtime_context.get_http_trigger() + if _trigger: + http_trigger = HTTPTriggerHandler(self.context.port, _trigger, self.source, self.target, self.user_function) + http_trigger.start(runtime_context, logger=self.logger) _triggers = runtime_context.get_dapr_triggers() if _triggers: - dapr_trigger = DaprTriggerHandler(self.context.port, self.app, _triggers, self.user_function) - dapr_trigger.start(runtime_context) - - self.app.run(self.port) + dapr_trigger = DaprTriggerHandler(self.context.port, _triggers, self.user_function) + dapr_trigger.start(runtime_context, logger=self.logger) diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py index 2370a987..a5aeff7b 100644 --- a/src/functions_framework/triggers/dapr_trigger/dapr.py +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -25,10 +25,11 @@ class DaprTriggerHandler(TriggerHandler): - def __init__(self, port, app: App = None, triggers: [DaprTrigger] = None, user_function=None): + """Handle dapr trigger.""" + def __init__(self, port, triggers: [DaprTrigger] = None, user_function=None): self.port = port self.triggers = triggers - self.app = app + self.app = App() self.user_function = user_function def start(self, context: RuntimeContext, logger=None): @@ -40,7 +41,7 @@ def start(self, context: RuntimeContext, logger=None): @self.app.binding(trigger.name) def binding_handler(request: BindingRequest): rt_ctx = deepcopy(context) - user_ctx = UserContext().__int__(runtime_context=rt_ctx, binding_request=request, logger=logger) + user_ctx = UserContext(runtime_context=rt_ctx, binding_request=request, logger=logger) logging.basicConfig(level=logging.DEBUG) logging.info('Received Message : ' + request.text()) self.user_function(user_ctx) @@ -49,7 +50,9 @@ def binding_handler(request: BindingRequest): @self.app.subscribe(pubsub_name=trigger.name, topic=trigger.topic) def topic_handler(event: v1.Event): rt_ctx = deepcopy(context) - user_ctx = UserContext().__int__(runtime_context=rt_ctx, topic_event=event, logger=logger) + user_ctx = UserContext(runtime_context=rt_ctx, topic_event=event, logger=logger) logging.basicConfig(level=logging.DEBUG) logging.info('Received Message : ' + event.data.__str__()) self.user_function(user_ctx) + + self.app.run(self.port) diff --git a/src/functions_framework/triggers/http_trigger/__init__.py b/src/functions_framework/triggers/http_trigger/__init__.py index e69de29b..f60f2693 100644 --- a/src/functions_framework/triggers/http_trigger/__init__.py +++ b/src/functions_framework/triggers/http_trigger/__init__.py @@ -0,0 +1,217 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import io +import json +import logging +import os.path +import pathlib +import sys + +from copy import deepcopy + +import flask +import werkzeug + +from functions_framework import _function_registry +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.context.user_context import UserContext +from functions_framework.exceptions import MissingSourceException + +_FUNCTION_STATUS_HEADER_FIELD = "X-OpenFunction-Status" +_CRASH = "crash" + + +class _LoggingHandler(io.TextIOWrapper): + """Logging replacement for stdout and stderr in GCF Python 3.7.""" + + def __init__(self, level, stderr=sys.stderr): + io.TextIOWrapper.__init__(self, io.StringIO(), encoding=stderr.encoding) + self.level = level + self.stderr = stderr + + def write(self, out): + payload = dict(severity=self.level, message=out.rstrip("\n")) + return self.stderr.write(json.dumps(payload) + "\n") + + +def cloud_event(func): + """Decorator that registers cloudevent as user function signature type.""" + _function_registry.REGISTRY_MAP[ + func.__name__ + ] = _function_registry.CLOUDEVENT_SIGNATURE_TYPE + + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def http(func): + """Decorator that registers http as user function signature type.""" + _function_registry.REGISTRY_MAP[ + func.__name__ + ] = _function_registry.HTTP_SIGNATURE_TYPE + + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def setup_logging(): + logging.getLogger().setLevel(logging.INFO) + info_handler = logging.StreamHandler(sys.stdout) + info_handler.setLevel(logging.NOTSET) + info_handler.addFilter(lambda record: record.levelno <= logging.INFO) + logging.getLogger().addHandler(info_handler) + + warn_handler = logging.StreamHandler(sys.stderr) + warn_handler.setLevel(logging.WARNING) + logging.getLogger().addHandler(warn_handler) + + +def _http_view_func_wrapper(function, runtime_context: RuntimeContext, request, logger): + @functools.wraps(function) + def view_func(path): + rt_ctx = deepcopy(runtime_context) + user_ctx = UserContext(runtime_context=rt_ctx, http_request=request, logger=logger) + return function(user_ctx) + + return view_func + + +def _configure_app(wsgi_app, runtime_context: RuntimeContext, function, logger): + wsgi_app.url_map.add( + werkzeug.routing.Rule("/", defaults={"path": ""}, endpoint="run") + ) + wsgi_app.url_map.add(werkzeug.routing.Rule("/robots.txt", endpoint="error")) + wsgi_app.url_map.add(werkzeug.routing.Rule("/favicon.ico", endpoint="error")) + wsgi_app.url_map.add(werkzeug.routing.Rule("/", endpoint="run")) + wsgi_app.view_functions["run"] = _http_view_func_wrapper(function, runtime_context, flask.request, logger) + wsgi_app.view_functions["error"] = lambda: flask.abort(404, description="Not Found") + wsgi_app.after_request(read_request) + + +def read_request(response): + """ + Force the framework to read the entire request before responding, to avoid + connection errors when returning prematurely. Skipped on streaming responses + as these may continue to operate on the request after they are returned. + """ + + if not response.is_streamed: + flask.request.get_data() + + return response + + +def crash_handler(e): + """ + Return crash header to allow logging 'crash' message in logs. + """ + return str(e), 500, {_FUNCTION_STATUS_HEADER_FIELD: _CRASH} + + +def create_app(runtime_context: RuntimeContext = None, target=None, source=None, logger=None): + _target = _function_registry.get_function_target(target) + _source = _function_registry.get_function_source(source) + + # Set the template folder relative to the source path + # Python 3.5: join does not support PosixPath + template_folder = str(pathlib.Path(_source).parent / "templates") + + if not os.path.exists(_source): + raise MissingSourceException( + "File {source} that is expected to define function doesn't exist".format( + source=_source + ) + ) + + source_module, spec = _function_registry.load_function_module(_source) + + # Create the application + _app = flask.Flask(_target, template_folder=template_folder) + _app.register_error_handler(500, crash_handler) + global errorhandler + errorhandler = _app.errorhandler + + # Handle legacy GCF Python 3.7 behavior + if os.environ.get("ENTRY_POINT"): + os.environ["FUNCTION_NAME"] = os.environ.get("K_SERVICE", _target) + _app.make_response_original = _app.make_response + + def handle_none(rv): + if rv is None: + rv = "OK" + return _app.make_response_original(rv) + + _app.make_response = handle_none + + # Handle log severity backwards compatibility + sys.stdout = _LoggingHandler("INFO", sys.stderr) + sys.stderr = _LoggingHandler("ERROR", sys.stderr) + setup_logging() + + # Execute the module, within the application context + with _app.app_context(): + spec.loader.exec_module(source_module) + + # Get the configured function signature type + function = _function_registry.get_user_function(_source, source_module, _target) + + _configure_app(_app, runtime_context, function, logger) + + return _app + + +class LazyWSGIApp: + """ + Wrap the WSGI app in a lazily initialized wrapper to prevent initialization + at import-time + """ + + def __init__(self, target=None, source=None, signature_type=None): + # Support HTTP frameworks which support WSGI callables. + # Note: this ability is currently broken in Gunicorn 20.0, and + # environment variables should be used for configuration instead: + # https://github.com/benoitc/gunicorn/issues/2159 + self.target = target + self.source = source + self.signature_type = signature_type + + # Placeholder for the app which will be initialized on first call + self.app = None + + def __call__(self, *args, **kwargs): + if not self.app: + self.app = create_app(self.target, self.source, self.signature_type) + return self.app(*args, **kwargs) + + +app = LazyWSGIApp() + + +class DummyErrorHandler: + def __init__(self): + pass + + def __call__(self, *args, **kwargs): + return self + + +errorhandler = DummyErrorHandler() diff --git a/src/functions_framework/triggers/http_trigger/_http/__init__.py b/src/functions_framework/triggers/http_trigger/_http/__init__.py new file mode 100644 index 00000000..6f7694b3 --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/_http/__init__.py @@ -0,0 +1,43 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from functions_framework.triggers.http_trigger._http.flask import FlaskApplication + + +class HTTPServer: + def __init__(self, app, debug, **options): + self.app = app + self.debug = debug + self.options = options + + if self.debug: + self.server_class = FlaskApplication + else: + try: + from functions_framework.triggers.http_trigger._http.gunicorn import ( + GunicornApplication, + ) + + self.server_class = GunicornApplication + except ImportError as e: + self.server_class = FlaskApplication + + def run(self, host, port): + http_server = self.server_class( + self.app, host, port, self.debug, **self.options + ) + http_server.run() + + +def create_server(wsgi_app, debug, **options): + return HTTPServer(wsgi_app, debug, **options) \ No newline at end of file diff --git a/src/functions_framework/triggers/http_trigger/_http/flask.py b/src/functions_framework/triggers/http_trigger/_http/flask.py new file mode 100644 index 00000000..8cc5987d --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/_http/flask.py @@ -0,0 +1,25 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class FlaskApplication: + def __init__(self, app, host, port, debug, **options): + self.app = app + self.host = host + self.port = port + self.debug = debug + self.options = options + + def run(self): + self.app.run(self.host, self.port, debug=self.debug, **self.options) \ No newline at end of file diff --git a/src/functions_framework/triggers/http_trigger/_http/gunicorn.py b/src/functions_framework/triggers/http_trigger/_http/gunicorn.py new file mode 100644 index 00000000..f522b67f --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/_http/gunicorn.py @@ -0,0 +1,37 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gunicorn.app.base + + +class GunicornApplication(gunicorn.app.base.BaseApplication): + def __init__(self, app, host, port, debug, **options): + self.options = { + "bind": "%s:%s" % (host, port), + "workers": 1, + "threads": 1024, + "timeout": 0, + "loglevel": "error", + "limit_request_line": 0, + } + self.options.update(options) + self.app = app + super().__init__() + + def load_config(self): + for key, value in self.options.items(): + self.cfg.set(key, value) + + def load(self): + return self.app diff --git a/src/functions_framework/triggers/http_trigger/http.py b/src/functions_framework/triggers/http_trigger/http.py new file mode 100644 index 00000000..02ce8c29 --- /dev/null +++ b/src/functions_framework/triggers/http_trigger/http.py @@ -0,0 +1,46 @@ +# Copyright 2023 The OpenFunction Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uvicorn + +from functions_framework import constants +from functions_framework.context.function_context import HTTPRoute +from functions_framework.context.runtime_context import RuntimeContext +from functions_framework.triggers.http_trigger import create_app +from functions_framework.triggers.http_trigger._http import create_server +from functions_framework.triggers.trigger import TriggerHandler + + +class HTTPTriggerHandler(TriggerHandler): + """Handle http trigger.""" + def __init__(self, port, trigger: HTTPRoute, source=None, target=None, user_function=None, debug=False): + self.port = trigger.port if trigger.port else port + self.source = source + self.target = target + self.trigger = trigger + self.hostname = trigger.hostname + self.route_rules = trigger.rules + self.user_function = user_function + self.debug = debug + if self.port is None: + self.port = constants.DEFAULT_HTTP_APP_PORT + + def start(self, context: RuntimeContext, logger=None): + if not self.trigger: + raise Exception("No trigger specified for HTTPTriggerHandler") + + app = create_app(context, self.target, self.source, logger) + create_server(app, self.debug).run("0.0.0.0", self.port) + + + From 62a772f2d6f1ba143c8679be96080a890b808de9 Mon Sep 17 00:00:00 2001 From: laminar Date: Thu, 6 Jul 2023 12:45:25 +0800 Subject: [PATCH 8/9] fix Signed-off-by: laminar --- src/functions_framework/context/function_context.py | 10 ++++++---- src/functions_framework/context/runtime_context.py | 13 +++++++++---- .../triggers/dapr_trigger/dapr.py | 3 +++ .../triggers/http_trigger/http.py | 2 +- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/functions_framework/context/function_context.py b/src/functions_framework/context/function_context.py index f47e913b..534afbf1 100644 --- a/src/functions_framework/context/function_context.py +++ b/src/functions_framework/context/function_context.py @@ -19,7 +19,7 @@ class FunctionContext(object): def __init__(self, name="", version="", dapr_triggers=None, http_trigger=None, inputs=None, outputs=None, states=None, - pre_hooks=None, post_hooks=None, tracing=None, port=8080): + pre_hooks=None, post_hooks=None, tracing=None, port=0): self.name = name self.version = version self.dapr_triggers = dapr_triggers @@ -44,6 +44,7 @@ def from_json(json_dct): pre_hooks = json_dct.get('pre_hooks', []) post_hooks = json_dct.get('post_hooks', []) tracing = json_dct.get('tracing', {}) + port = json_dct.get('port', 0) inputs = None if inputs_list: @@ -59,14 +60,15 @@ def from_json(json_dct): output = Component.from_json(v) outputs[k] = output - dapr_triggers = [DaprTrigger] + dapr_triggers = [] for trigger in _dapr_triggers: dapr_triggers.append(DaprTrigger.from_json(trigger)) - http_trigger = HTTPRoute.from_json(http_trigger) + if http_trigger: + http_trigger = HTTPRoute.from_json(http_trigger) return FunctionContext(name, version, dapr_triggers, http_trigger, - inputs, outputs, states, pre_hooks, post_hooks, tracing) + inputs, outputs, states, pre_hooks, post_hooks, tracing, port) class Component(object): diff --git a/src/functions_framework/context/runtime_context.py b/src/functions_framework/context/runtime_context.py index cdcec4d0..d69afdd9 100644 --- a/src/functions_framework/context/runtime_context.py +++ b/src/functions_framework/context/runtime_context.py @@ -11,7 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from functions_framework.context.function_context import Component, FunctionContext +from functions_framework.context.function_context import ( + Component, + DaprTrigger, + FunctionContext, + HTTPRoute, +) class RuntimeContext: @@ -29,14 +34,14 @@ def has_http_trigger(self): """Check if the function has http trigger.""" return self.context and self.context.http_trigger - def get_dapr_triggers(self): + def get_dapr_triggers(self) -> [DaprTrigger]: """Get dapr trigger.""" if self.context: return self.context.dapr_triggers else: return [] - def get_http_trigger(self): + def get_http_trigger(self) -> HTTPRoute: """Get http trigger.""" if self.context: return self.context.http_trigger @@ -47,4 +52,4 @@ def get_outputs(self) -> [Component]: if self.context and self.context.outputs: return self.context.outputs else: - return [Component] + return [] diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py index a5aeff7b..9ee151db 100644 --- a/src/functions_framework/triggers/dapr_trigger/dapr.py +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -18,6 +18,7 @@ from cloudevents.sdk.event import v1 from dapr.ext.grpc import App, BindingRequest +from functions_framework import constants from functions_framework.context.function_context import DaprTrigger from functions_framework.context.runtime_context import RuntimeContext from functions_framework.context.user_context import UserContext @@ -31,6 +32,8 @@ def __init__(self, port, triggers: [DaprTrigger] = None, user_function=None): self.triggers = triggers self.app = App() self.user_function = user_function + if self.port == 0: + self.port = constants.DEFAULT_DAPR_APP_PORT def start(self, context: RuntimeContext, logger=None): if not self.triggers: diff --git a/src/functions_framework/triggers/http_trigger/http.py b/src/functions_framework/triggers/http_trigger/http.py index 02ce8c29..4d35e67a 100644 --- a/src/functions_framework/triggers/http_trigger/http.py +++ b/src/functions_framework/triggers/http_trigger/http.py @@ -32,7 +32,7 @@ def __init__(self, port, trigger: HTTPRoute, source=None, target=None, user_func self.route_rules = trigger.rules self.user_function = user_function self.debug = debug - if self.port is None: + if self.port == 0: self.port = constants.DEFAULT_HTTP_APP_PORT def start(self, context: RuntimeContext, logger=None): From 9bb6b1325022a8268820ea257ae899735ebfa198 Mon Sep 17 00:00:00 2001 From: laminar Date: Mon, 10 Jul 2023 09:26:16 +0800 Subject: [PATCH 9/9] fix Signed-off-by: laminar --- .../context/function_context.py | 30 ++++++++----------- .../context/user_context.py | 2 +- .../triggers/dapr_trigger/dapr.py | 6 ---- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/functions_framework/context/function_context.py b/src/functions_framework/context/function_context.py index 534afbf1..687c9603 100644 --- a/src/functions_framework/context/function_context.py +++ b/src/functions_framework/context/function_context.py @@ -36,8 +36,8 @@ def __init__(self, name="", version="", dapr_triggers=None, http_trigger=None, def from_json(json_dct): name = json_dct.get('name') version = json_dct.get('version') - inputs_list = json_dct.get('inputs') - outputs_list = json_dct.get('outputs') + inputs_map = json_dct.get('inputs') + outputs_map = json_dct.get('outputs') _dapr_triggers = json_dct.get('triggers', {}).get('dapr', []) http_trigger = json_dct.get('triggers', {}).get('http', None) states = json_dct.get('states', {}) @@ -47,16 +47,16 @@ def from_json(json_dct): port = json_dct.get('port', 0) inputs = None - if inputs_list: + if inputs_map: inputs = {} - for k, v in inputs_list.items(): - input = Component.from_json(v) - inputs[k] = input + for k, v in inputs_map.items(): + _input = Component.from_json(v) + inputs[k] = _input outputs = None - if outputs_list: + if outputs_map: outputs = {} - for k, v in outputs_list.items(): + for k, v in outputs_map.items(): output = Component.from_json(v) outputs[k] = output @@ -112,24 +112,18 @@ def from_json(json_dct): class HTTPRoute(object): """HTTP route.""" - def __init__(self, port="", hostname="", rules=None): + def __init__(self, port=""): self.port = port - self.hostname = hostname - self.rules = rules def __str__(self): - return "{port: %s, hostname: %s, rules: %s}" % ( - self.port, - self.hostname, - self.rules + return "{port: %s}" % ( + self.port ) @staticmethod def from_json(json_dct): port = json_dct.get('port', '') - hostnames = json_dct.get('route', {}).get('hostnames', '') - rules = json_dct.get('route', {}).get('rules', []) - return HTTPRoute(port, hostnames, rules) + return HTTPRoute(port) class DaprTrigger(object): diff --git a/src/functions_framework/context/user_context.py b/src/functions_framework/context/user_context.py index cb316373..4d543d42 100644 --- a/src/functions_framework/context/user_context.py +++ b/src/functions_framework/context/user_context.py @@ -26,7 +26,7 @@ class UserContext(object): """Context for user.""" def __init__(self, runtime_context: RuntimeContext = None, - binding_request=None, topic_event=None, http_request=None, logger=None): + binding_request=None, topic_event=None, http_request=None, logger=None): self.runtime_context = runtime_context self.logger = logger self.out = FunctionOut(0, None, "", {}) diff --git a/src/functions_framework/triggers/dapr_trigger/dapr.py b/src/functions_framework/triggers/dapr_trigger/dapr.py index 9ee151db..01a88976 100644 --- a/src/functions_framework/triggers/dapr_trigger/dapr.py +++ b/src/functions_framework/triggers/dapr_trigger/dapr.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging - from copy import deepcopy from cloudevents.sdk.event import v1 @@ -45,8 +43,6 @@ def start(self, context: RuntimeContext, logger=None): def binding_handler(request: BindingRequest): rt_ctx = deepcopy(context) user_ctx = UserContext(runtime_context=rt_ctx, binding_request=request, logger=logger) - logging.basicConfig(level=logging.DEBUG) - logging.info('Received Message : ' + request.text()) self.user_function(user_ctx) if trigger.component_type.startswith("pubsub"): @@ -54,8 +50,6 @@ def binding_handler(request: BindingRequest): def topic_handler(event: v1.Event): rt_ctx = deepcopy(context) user_ctx = UserContext(runtime_context=rt_ctx, topic_event=event, logger=logger) - logging.basicConfig(level=logging.DEBUG) - logging.info('Received Message : ' + event.data.__str__()) self.user_function(user_ctx) self.app.run(self.port)