Skip to content

Retry policy support for v2 programming model #1268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 6, 2023
45 changes: 21 additions & 24 deletions azure_functions_worker/bindings/retrycontext.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from dataclasses import dataclass
from enum import Enum

from . import rpcexception


class RetryPolicy(Enum):
"""Retry policy for the function invocation"""

MAX_RETRY_COUNT = "max_retry_count"
STRATEGY = "strategy"
DELAY_INTERVAL = "delay_interval"
MINIMUM_INTERVAL = "minimum_interval"
MAXIMUM_INTERVAL = "maximum_interval"


@dataclass
class RetryContext:
"""Check https://docs.microsoft.com/en-us/azure/azure-functions/
functions-bindings-error-pages?tabs=python#retry-policies-preview"""

def __init__(self,
retry_count: int,
max_retry_count: int,
rpc_exception: rpcexception.RpcException) -> None:
self.__retry_count = retry_count
self.__max_retry_count = max_retry_count
self.__rpc_exception = rpc_exception

@property
def retry_count(self) -> int:
"""Gets the current retry count from retry-context"""
return self.__retry_count

@property
def max_retry_count(self) -> int:
"""Gets the max retry count from retry-context"""
return self.__max_retry_count

@property
def exception(self) -> rpcexception.RpcException:
return self.__rpc_exception
"""Gets the current retry count from retry-context"""
retry_count: int

"""Gets the max retry count from retry-context"""
max_retry_count: int

rpc_exception: rpcexception.RpcException
3 changes: 3 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@
SCRIPT_FILE_NAME = "function_app.py"

PYTHON_LANGUAGE_RUNTIME = "python"

# Settings for V2 programming model
RETRY_POLICY = "retry_policy"
8 changes: 4 additions & 4 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ def index_functions(self, function_path: str):
len(indexed_functions))

if indexed_functions:
fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)

indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = "Function Name: {}, Function Binding: {}" \
Expand All @@ -621,10 +625,6 @@ def index_functions(self, function_path: str):
'Successfully processed FunctionMetadataRequest for '
'functions: %s', " ".join(indexed_function_logs))

fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)

return fx_metadata_results

async def _handle__close_shared_memory_resources_request(self, request):
Expand Down
48 changes: 47 additions & 1 deletion azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@
import os.path
import pathlib
import sys
import time
from datetime import timedelta
from os import PathLike, fspath
from typing import Optional, Dict

from google.protobuf.duration_pb2 import Duration

from . import protos, functions
from .bindings.retrycontext import RetryPolicy
from .constants import MODULE_NOT_FOUND_TS_URL, SCRIPT_FILE_NAME, \
PYTHON_LANGUAGE_RUNTIME
PYTHON_LANGUAGE_RUNTIME, RETRY_POLICY
from .utils.wrappers import attach_message_to_exception

_AZURE_NAMESPACE = '__app__'
Expand Down Expand Up @@ -45,6 +50,12 @@ def install() -> None:
sys.modules[_AZURE_NAMESPACE] = ns_pkg


def convert_to_seconds(timestr: str):
x = time.strptime(timestr, '%H:%M:%S')
return int(timedelta(hours=x.tm_hour, minutes=x.tm_min,
seconds=x.tm_sec).total_seconds())


def uninstall() -> None:
pass

Expand All @@ -60,6 +71,39 @@ def build_binding_protos(indexed_function) -> Dict:
return binding_protos


def build_retry_protos(indexed_function) -> Dict:
retry = indexed_function.get_settings_dict(RETRY_POLICY)
if not retry:
return None

strategy = retry.get(RetryPolicy.STRATEGY.value)
if strategy == "fixed_delay":
delay_interval = Duration(
seconds=convert_to_seconds(
retry.get(RetryPolicy.DELAY_INTERVAL.value)))
retry_protos = protos.RpcRetryOptions(
max_retry_count=int(retry.get(RetryPolicy.MAX_RETRY_COUNT.value)),
retry_strategy=retry.get(RetryPolicy.STRATEGY.value),
delay_interval=delay_interval,
)
else:
minimum_interval = Duration(
seconds=convert_to_seconds(
retry.get(RetryPolicy.MINIMUM_INTERVAL.value)))
maximum_interval = Duration(
seconds=convert_to_seconds(
retry.get(RetryPolicy.MAXIMUM_INTERVAL.value)))

retry_protos = protos.RpcRetryOptions(
max_retry_count=int(retry.get(RetryPolicy.MAX_RETRY_COUNT.value)),
retry_strategy=retry.get(RetryPolicy.STRATEGY.value),
minimum_interval=minimum_interval,
maximum_interval=maximum_interval
)

return retry_protos


def process_indexed_function(functions_registry: functions.Registry,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test which validates the json being generated?

indexed_functions):
fx_metadata_results = []
Expand All @@ -68,6 +112,7 @@ def process_indexed_function(functions_registry: functions.Registry,
function=indexed_function)

binding_protos = build_binding_protos(indexed_function)
retry_protos = build_retry_protos(indexed_function)

function_metadata = protos.RpcFunctionMetadata(
name=function_info.name,
Expand All @@ -80,6 +125,7 @@ def process_indexed_function(functions_registry: functions.Registry,
language=PYTHON_LANGUAGE_RUNTIME,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings(),
retry_options=retry_protos,
properties={"worker_indexed": "True"})

fx_metadata_results.append(function_metadata)
Expand Down
3 changes: 2 additions & 1 deletion azure_functions_worker/protos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
CloseSharedMemoryResourcesResponse,
FunctionsMetadataRequest,
FunctionMetadataResponse,
WorkerMetadata)
WorkerMetadata,
RpcRetryOptions)

from .shared.NullableTypes_pb2 import (
NullableString,
Expand Down
46 changes: 46 additions & 0 deletions azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ message StreamingMessage {

// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;

// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;

// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;

}
}

Expand Down Expand Up @@ -330,6 +337,9 @@ message RpcFunctionMetadata {
// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;

// The optional function execution retry strategy to use on invocation failures.
RpcRetryOptions retry_options = 15;

// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
Expand Down Expand Up @@ -423,6 +433,15 @@ message InvocationResponse {
StatusResult result = 3;
}

message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}

message WorkerWarmupResponse {
StatusResult result = 1;
}

// Used to encapsulate data which could be a variety of types
message TypedData {
oneof data {
Expand Down Expand Up @@ -681,4 +700,31 @@ message ModelBindingData
// Used to encapsulate collection model_binding_data
message CollectionModelBindingData {
repeated ModelBindingData model_binding_data = 1;
}

// Retry policy which the worker sends the host when the worker indexes
// a function.
message RpcRetryOptions
{
// The retry strategy to use. Valid values are fixed delay or exponential backoff.
enum RetryStrategy
{
exponential_backoff = 0;
fixed_delay = 1;
}

// The maximum number of retries allowed per function execution.
// -1 means to retry indefinitely.
int32 max_retry_count = 2;

// The delay that's used between retries when you're using a fixed delay strategy.
google.protobuf.Duration delay_interval = 3;

// The minimum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration minimum_interval = 4;

// The maximum retry delay when you're using an exponential backoff strategy
google.protobuf.Duration maximum_interval = 5;

RetryStrategy retry_strategy = 6;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from azure.functions import FunctionApp, TimerRequest, Context, AuthLevel
import logging

app = FunctionApp(http_auth_level=AuthLevel.ANONYMOUS)


@app.timer_trigger(schedule="*/1 * * * * *", arg_name="mytimer",
run_on_startup=False,
use_monitor=False)
@app.retry(strategy="exponential_backoff", max_retry_count="3",
minimum_interval="00:00:01",
maximum_interval="00:00:02")
def mytimer(mytimer: TimerRequest, context: Context) -> None:
logging.info(f'Current retry count: {context.retry_context.retry_count}')

if context.retry_context.retry_count == \
context.retry_context.max_retry_count:
logging.info(
f"Max retries of {context.retry_context.max_retry_count} for "
f"function {context.function_name} has been reached")
else:
raise Exception("This is a retryable exception")
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from azure.functions import FunctionApp, TimerRequest, Context, AuthLevel
import logging

app = FunctionApp(http_auth_level=AuthLevel.ANONYMOUS)


@app.timer_trigger(schedule="*/1 * * * * *", arg_name="mytimer",
run_on_startup=False,
use_monitor=False)
@app.retry(strategy="fixed_delay", max_retry_count="3",
delay_interval="00:00:01")
def mytimer(mytimer: TimerRequest, context: Context) -> None:
logging.info(f'Current retry count: {context.retry_context.retry_count}')

if context.retry_context.retry_count == \
context.retry_context.max_retry_count:
logging.info(
f"Max retries of {context.retry_context.max_retry_count} for "
f"function {context.function_name} has been reached")
else:
raise Exception("This is a retryable exception")
48 changes: 48 additions & 0 deletions tests/endtoend/test_retry_policy_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import time
import typing

from tests.utils import testutils


class TestFixedRetryPolicyFunctions(testutils.WebHostTestCase):

@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'retry_policy_functions' / \
'fixed_strategy'

def test_fixed_retry_policy(self):
# Checking webhost status.
time.sleep(5)
r = self.webhost.request('GET', '', no_prefix=True)
self.assertTrue(r.ok)

def check_log_fixed_retry_policy(self, host_out: typing.List[str]):
self.assertIn('Current retry count: 0', host_out)
self.assertIn('Current retry count: 1', host_out)
self.assertIn("Max retries of 3 for function mytimer"
" has been reached", host_out)


class TestExponentialRetryPolicyFunctions(testutils.WebHostTestCase):

@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'retry_policy_functions' / \
'exponential_strategy'

def test_retry_policy(self):
# Checking webhost status.
r = self.webhost.request('GET', '', no_prefix=True,
timeout=5)
time.sleep(5)
self.assertTrue(r.ok)

def check_log_retry_policy(self, host_out: typing.List[str]):
self.assertIn('Current retry count: 1', host_out)
self.assertIn('Current retry count: 2', host_out)
self.assertIn('Current retry count: 3', host_out)
self.assertIn("Max retries of 3 for function mytimer"
" has been reached", host_out)
11 changes: 11 additions & 0 deletions tests/unittests/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,17 @@ async def test_dispatcher_functions_metadata_request(self):
self.assertEqual(r.response.result.status,
protos.StatusResult.Success)

async def test_dispatcher_functions_metadata_request_with_retry(self):
"""Test if the functions metadata response will be sent correctly
when a functions metadata request is received
"""
async with self._ctrl as host:
r = await host.get_functions_metadata()
self.assertIsInstance(r.response, protos.FunctionMetadataResponse)
self.assertFalse(r.response.use_default_metadata_indexing)
self.assertEqual(r.response.result.status,
protos.StatusResult.Success)


class TestDispatcherSteinLegacyFallback(testutils.AsyncTestCase):

Expand Down
Loading