Skip to content

Commit 4c3a6d9

Browse files
committed
Refactored retry config to _retry.py and added support for backoff and Retry-After
1 parent a4bc029 commit 4c3a6d9

File tree

2 files changed

+255
-42
lines changed

2 files changed

+255
-42
lines changed

firebase_admin/_retry.py

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
# Copyright 2025 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Internal retry logic module
16+
17+
This module provides utilities for adding retry logic to HTTPX requests
18+
"""
19+
20+
from __future__ import annotations
21+
import copy
22+
import email.utils
23+
import random
24+
import re
25+
import time
26+
from types import CoroutineType
27+
from typing import Any, Callable, List, Optional, Tuple
28+
import logging
29+
import asyncio
30+
import httpx
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class HttpxRetry:
36+
"""HTTPX based retry config"""
37+
# TODO: Decide
38+
# urllib3.Retry ignores the status_forcelist and only respects Retry-After header
39+
# for 413, 429 and 503 errors.
40+
# Should we do the same?
41+
# Default status codes to be used for ``status_forcelist``
42+
RETRY_AFTER_STATUS_CODES = frozenset([413, 429, 503])
43+
44+
#: Default maximum backoff time.
45+
DEFAULT_BACKOFF_MAX = 120
46+
47+
def __init__(
48+
self,
49+
status: int = 10,
50+
status_forcelist: Optional[List[int]] = None,
51+
backoff_factor: float = 0,
52+
backoff_max: float = DEFAULT_BACKOFF_MAX,
53+
raise_on_status: bool = False,
54+
backoff_jitter: float = 0,
55+
history: Optional[List[Tuple[
56+
httpx.Request,
57+
Optional[httpx.Response],
58+
Optional[Exception]
59+
]]] = None,
60+
respect_retry_after_header: bool = False,
61+
) -> None:
62+
self.status = status
63+
self.status_forcelist = status_forcelist
64+
self.backoff_factor = backoff_factor
65+
self.backoff_max = backoff_max
66+
self.raise_on_status = raise_on_status
67+
self.backoff_jitter = backoff_jitter
68+
if history:
69+
self.history = history
70+
else:
71+
self.history = []
72+
self.respect_retry_after_header = respect_retry_after_header
73+
74+
def copy(self) -> HttpxRetry:
75+
"""Creates a deep copy of this instance."""
76+
return copy.deepcopy(self)
77+
78+
def is_retryable_response(self, response: httpx.Response) -> bool:
79+
"""Determine if a response implies that the request should be retried if possible."""
80+
if self.status_forcelist and response.status_code in self.status_forcelist:
81+
return True
82+
83+
has_retry_after = bool(response.headers.get("Retry-After"))
84+
if (
85+
self.respect_retry_after_header
86+
and has_retry_after
87+
and response.status_code in self.RETRY_AFTER_STATUS_CODES
88+
):
89+
return True
90+
91+
return False
92+
93+
# Placeholder for exception retrying
94+
def is_retryable_error(self, error: Exception):
95+
"""Determine if the error implies that the request should be retired if possible."""
96+
logger.debug(error)
97+
return False
98+
99+
def is_exhausted(self) -> bool:
100+
"""Determine if there are anymore more retires."""
101+
# status count is negative
102+
return self.status < 0
103+
104+
# Identical implementation of `urllib3.Retry.parse_retry_after()`
105+
def _parse_retry_after(self, retry_after_header: str) -> float | None:
106+
"""Parses Retry-After string into a float with unit seconds."""
107+
seconds: float
108+
# Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
109+
if re.match(r"^\s*[0-9]+\s*$", retry_after_header):
110+
seconds = int(retry_after_header)
111+
else:
112+
retry_date_tuple = email.utils.parsedate_tz(retry_after_header)
113+
if retry_date_tuple is None:
114+
# TODO: Verify if this is the appropriate way to handle this.
115+
raise httpx.RemoteProtocolError(f"Invalid Retry-After header: {retry_after_header}")
116+
117+
retry_date = email.utils.mktime_tz(retry_date_tuple)
118+
seconds = retry_date - time.time()
119+
120+
seconds = max(seconds, 0)
121+
122+
return seconds
123+
124+
def get_retry_after(self, response: httpx.Response) -> float | None:
125+
"""Determine the Retry-After time needed before sending the next request."""
126+
retry_after_header = response.headers.get('Retry_After', None)
127+
if retry_after_header:
128+
# Convert retry header to a float in seconds
129+
return self._parse_retry_after(retry_after_header)
130+
return None
131+
132+
def get_backoff_time(self):
133+
"""Determine the backoff time needed before sending the next request."""
134+
# request_count is the number of previous request attempts
135+
request_count = len(self.history)
136+
backoff = self.backoff_factor * (2 ** (request_count-1))
137+
if self.backoff_jitter:
138+
backoff += random.random() * self.backoff_jitter
139+
return float(max(0, min(self.backoff_max, backoff)))
140+
141+
async def sleep_for_backoff(self) -> None:
142+
"""Determine and wait the backoff time needed before sending the next request."""
143+
backoff = self.get_backoff_time()
144+
logger.debug('Sleeping for %f seconds following failed request', backoff)
145+
await asyncio.sleep(backoff)
146+
147+
async def sleep(self, response: httpx.Response) -> None:
148+
"""Determine and wait the time needed before sending the next request."""
149+
if self.respect_retry_after_header:
150+
retry_after = self.get_retry_after(response)
151+
if retry_after:
152+
await asyncio.sleep(retry_after)
153+
return
154+
await self.sleep_for_backoff()
155+
156+
def increment(
157+
self,
158+
request: httpx.Request,
159+
response: Optional[httpx.Response] = None,
160+
error: Optional[Exception] = None
161+
) -> None:
162+
"""Update the retry state based on request attempt."""
163+
if response and self.is_retryable_response(response):
164+
self.status -= 1
165+
self.history.append((request, response, error))
166+
167+
168+
# TODO: Remove comments
169+
# Note - This implementation currently covers:
170+
# - basic retires for pre-defined status errors
171+
# - applying retry backoff and backoff jitter
172+
# - ability to respect a response's retry-after header
173+
class HttpxRetryTransport(httpx.AsyncBaseTransport):
174+
"""HTTPX transport with retry logic."""
175+
176+
# DEFAULT_RETRY = HttpxRetry(
177+
# connect=1, read=1, status=4, status_forcelist=[500, 503],
178+
# raise_on_status=False, backoff_factor=0.5, allowed_methods=None
179+
# )
180+
DEFAULT_RETRY = HttpxRetry(status=4, status_forcelist=[500, 503], backoff_factor=0.5)
181+
182+
# We could also support passing kwargs here
183+
def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs) -> None:
184+
self._retry = retry
185+
186+
transport_kwargs = kwargs.copy()
187+
transport_kwargs.update({'retries': 0, 'http2': True})
188+
# We should use a full AsyncHTTPTransport under the hood since that is
189+
# fully implemented. We could consider making this class extend a
190+
# AsyncHTTPTransport instead and use the parent class's methods to handle
191+
# requests. We sould also ensure that that transport's internal retry is
192+
# not enabled.
193+
self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs)
194+
195+
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
196+
return await self._dispatch_with_retry(
197+
request, self._wrapped_transport.handle_async_request)
198+
199+
# Two types of retries
200+
# - Status code (500s, redirect)
201+
# - Error code (read, connect, other)
202+
async def _dispatch_with_retry(
203+
self,
204+
request: httpx.Request,
205+
dispatch_method: Callable[[httpx.Request], CoroutineType[Any, Any, httpx.Response]]
206+
) -> httpx.Response:
207+
"""Sends a request with retry logic using a provided dispatch method."""
208+
# This request config is used across all requests that use this transport and therefore
209+
# needs to be copied to be used for just this request ans it's retries.
210+
retry = self._retry.copy()
211+
# First request
212+
response, error = None, None
213+
214+
while not retry.is_exhausted():
215+
216+
# First retry
217+
if response:
218+
await retry.sleep(response)
219+
220+
# Need to reset here so only last attempt's error or response is saved.
221+
response, error = None, None
222+
223+
try:
224+
logger.debug('Sending request in _dispatch_with_retry(): %r', request)
225+
response = await dispatch_method(request)
226+
logger.debug('Received response: %r', response)
227+
except httpx.HTTPError as err:
228+
logger.debug('Received error: %r', err)
229+
error = err
230+
231+
if response and not retry.is_retryable_response(response):
232+
return response
233+
234+
if error and not retry.is_retryable_error(error):
235+
raise error
236+
237+
retry.increment(request, response)
238+
239+
if response:
240+
return response
241+
if error:
242+
raise error
243+
raise Exception('_dispatch_with_retry() ended with no response or exception')
244+
245+
async def aclose(self) -> None:
246+
await self._wrapped_transport.aclose()

firebase_admin/messaging.py

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import warnings
2222
import asyncio
23+
import logging
2324
import requests
2425
import httpx
2526

@@ -38,7 +39,9 @@
3839
exceptions,
3940
App
4041
)
42+
from firebase_admin._retry import HttpxRetryTransport
4143

44+
logger = logging.getLogger(__name__)
4245

4346
_MESSAGING_ATTRIBUTE = '_messaging'
4447

@@ -410,6 +413,9 @@ def auth_flow(self, request: httpx.Request):
410413
# copy original headers
411414
request.headers = _original_headers.copy()
412415
# mutates request headers
416+
logger.debug(
417+
'Refreshing credentials for request attempt %d',
418+
_credential_refresh_attempt + 1)
413419
self.apply_auth_headers(request)
414420

415421
# Continue to perform the request
@@ -420,6 +426,9 @@ def auth_flow(self, request: httpx.Request):
420426
# on refreshable status codes. Current transport.requests.AuthorizedSession()
421427
# only does this on 401 errors. We should do the same.
422428
if response.status_code in self._refresh_status_codes:
429+
logger.debug(
430+
'Request attempt %d failed due to unauthorized credentials',
431+
_credential_refresh_attempt + 1)
423432
_credential_refresh_attempt += 1
424433
else:
425434
break
@@ -715,45 +724,3 @@ def _build_fcm_error(cls, error_dict) -> Optional[Callable[..., exceptions.Fireb
715724
fcm_code = detail.get('errorCode')
716725
break
717726
return _MessagingService.FCM_ERROR_TYPES.get(fcm_code) if fcm_code else None
718-
719-
720-
# TODO: Remove comments
721-
# Notes:
722-
# This implementation currently only covers basic retires for pre-defined status errors
723-
class HttpxRetryTransport(httpx.AsyncBaseTransport):
724-
"""HTTPX transport with retry logic."""
725-
# We could also support passing kwargs here
726-
def __init__(self, **kwargs) -> None:
727-
# Hardcoded settings for now
728-
self._retryable_status_codes = (500, 503,)
729-
self._max_retry_count = 4
730-
731-
# - We use a full AsyncHTTPTransport under the hood to make use of it's
732-
# fully implemented `handle_async_request()`.
733-
# - We could consider making the `HttpxRetryTransport`` class extend a
734-
# `AsyncHTTPTransport` instead and use the parent class's methods to handle
735-
# requests.
736-
# - We should also ensure that that transport's internal retry is
737-
# not enabled.
738-
transport_kwargs = kwargs.copy()
739-
transport_kwargs.update({'retries': 0, 'http2': True})
740-
self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs)
741-
742-
743-
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
744-
_retry_count = 0
745-
746-
while True:
747-
# Dispatch request
748-
# Let exceptions pass through for now
749-
response = await self._wrapped_transport.handle_async_request(request)
750-
751-
# Check if request is retryable
752-
if response.status_code in self._retryable_status_codes:
753-
_retry_count += 1
754-
755-
# Return if retries exhausted
756-
if _retry_count > self._max_retry_count:
757-
return response
758-
else:
759-
return response

0 commit comments

Comments
 (0)