Skip to content

Commit 0da78c8

Browse files
committed
Remove low-level read timeouts from the Parser, now handled in the Connection
1 parent d5fc899 commit 0da78c8

File tree

1 file changed

+35
-88
lines changed

1 file changed

+35
-88
lines changed

redis/asyncio/connection.py

Lines changed: 35 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import contextlib
33
import copy
44
import enum
5-
import errno
65
import inspect
76
import io
87
import os
@@ -69,16 +68,6 @@ async def __aexit__(self, *args):
6968

7069
nullcontext = NullContext()
7170

72-
NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {
73-
BlockingIOError: errno.EWOULDBLOCK,
74-
ssl.SSLWantReadError: 2,
75-
ssl.SSLWantWriteError: 2,
76-
ssl.SSLError: 2,
77-
}
78-
79-
NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
80-
81-
8271
SYM_STAR = b"*"
8372
SYM_DOLLAR = b"$"
8473
SYM_CRLF = b"\r\n"
@@ -243,11 +232,9 @@ def __init__(
243232
self,
244233
stream_reader: asyncio.StreamReader,
245234
socket_read_size: int,
246-
socket_timeout: Optional[float],
247235
):
248236
self._stream: Optional[asyncio.StreamReader] = stream_reader
249237
self.socket_read_size = socket_read_size
250-
self.socket_timeout = socket_timeout
251238
self._buffer: Optional[io.BytesIO] = io.BytesIO()
252239
# number of bytes written to the buffer from the socket
253240
self.bytes_written = 0
@@ -258,52 +245,35 @@ def __init__(
258245
def length(self):
259246
return self.bytes_written - self.bytes_read
260247

261-
async def _read_from_socket(
262-
self,
263-
length: Optional[int] = None,
264-
timeout: Union[float, None, _Sentinel] = SENTINEL,
265-
raise_on_timeout: bool = True,
266-
) -> bool:
248+
async def _read_from_socket(self, length: Optional[int] = None) -> bool:
267249
buf = self._buffer
268250
if buf is None or self._stream is None:
269251
raise RedisError("Buffer is closed.")
270252
buf.seek(self.bytes_written)
271253
marker = 0
272-
timeout = timeout if timeout is not SENTINEL else self.socket_timeout
273254

274-
try:
275-
while True:
276-
async with async_timeout.timeout(timeout):
277-
data = await self._stream.read(self.socket_read_size)
278-
# an empty string indicates the server shutdown the socket
279-
if isinstance(data, bytes) and len(data) == 0:
280-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
281-
buf.write(data)
282-
data_length = len(data)
283-
self.bytes_written += data_length
284-
marker += data_length
285-
286-
if length is not None and length > marker:
287-
continue
288-
return True
289-
except (socket.timeout, asyncio.TimeoutError):
290-
if raise_on_timeout:
291-
raise TimeoutError("Timeout reading from socket")
292-
return False
293-
except NONBLOCKING_EXCEPTIONS as ex:
294-
# if we're in nonblocking mode and the recv raises a
295-
# blocking error, simply return False indicating that
296-
# there's no data to be read. otherwise raise the
297-
# original exception.
298-
allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1)
299-
if not raise_on_timeout and ex.errno == allowed:
300-
return False
301-
raise ConnectionError(f"Error while reading from socket: {ex.args}")
255+
while True:
256+
data = await self._stream.read(self.socket_read_size)
257+
# an empty string indicates the server shutdown the socket
258+
if isinstance(data, bytes) and len(data) == 0:
259+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
260+
buf.write(data)
261+
data_length = len(data)
262+
self.bytes_written += data_length
263+
marker += data_length
264+
265+
if length is not None and length > marker:
266+
continue
267+
return True
302268

303269
async def can_read_destructive(self) -> bool:
304-
return bool(self.length) or await self._read_from_socket(
305-
timeout=0, raise_on_timeout=False
306-
)
270+
if self.length:
271+
return True
272+
try:
273+
async with async_timeout.timeout(0):
274+
return await self._read_from_socket()
275+
except asyncio.TimeoutError:
276+
return False
307277

308278
async def read(self, length: int) -> bytes:
309279
length = length + 2 # make sure to read the \r\n terminator
@@ -386,9 +356,7 @@ def on_connect(self, connection: "Connection"):
386356
if self._stream is None:
387357
raise RedisError("Buffer is closed.")
388358

389-
self._buffer = SocketBuffer(
390-
self._stream, self._read_size, connection.socket_timeout
391-
)
359+
self._buffer = SocketBuffer(self._stream, self._read_size)
392360
self.encoder = connection.encoder
393361

394362
def on_disconnect(self):
@@ -458,14 +426,13 @@ async def read_response(
458426
class HiredisParser(BaseParser):
459427
"""Parser class for connections using Hiredis"""
460428

461-
__slots__ = BaseParser.__slots__ + ("_reader", "_socket_timeout")
429+
__slots__ = BaseParser.__slots__ + ("_reader",)
462430

463431
def __init__(self, socket_read_size: int):
464432
if not HIREDIS_AVAILABLE:
465433
raise RedisError("Hiredis is not available.")
466434
super().__init__(socket_read_size=socket_read_size)
467435
self._reader: Optional[hiredis.Reader] = None
468-
self._socket_timeout: Optional[float] = None
469436

470437
def on_connect(self, connection: "Connection"):
471438
self._stream = connection._reader
@@ -478,7 +445,6 @@ def on_connect(self, connection: "Connection"):
478445
kwargs["errors"] = connection.encoder.encoding_errors
479446

480447
self._reader = hiredis.Reader(**kwargs)
481-
self._socket_timeout = connection.socket_timeout
482448

483449
def on_disconnect(self):
484450
self._stream = None
@@ -489,39 +455,20 @@ async def can_read_destructive(self):
489455
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
490456
if self._reader.gets():
491457
return True
492-
return await self.read_from_socket(timeout=0, raise_on_timeout=False)
493-
494-
async def read_from_socket(
495-
self,
496-
timeout: Union[float, None, _Sentinel] = SENTINEL,
497-
raise_on_timeout: bool = True,
498-
):
499-
timeout = self._socket_timeout if timeout is SENTINEL else timeout
500458
try:
501-
if timeout is None:
502-
buffer = await self._stream.read(self._read_size)
503-
else:
504-
async with async_timeout.timeout(timeout):
505-
buffer = await self._stream.read(self._read_size)
506-
if not buffer or not isinstance(buffer, bytes):
507-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
508-
self._reader.feed(buffer)
509-
# data was read from the socket and added to the buffer.
510-
# return True to indicate that data was read.
511-
return True
512-
except (socket.timeout, asyncio.TimeoutError):
513-
if raise_on_timeout:
514-
raise TimeoutError("Timeout reading from socket") from None
459+
async with async_timeout.timeout(0):
460+
return await self.read_from_socket()
461+
except asyncio.TimeoutError:
515462
return False
516-
except NONBLOCKING_EXCEPTIONS as ex:
517-
# if we're in nonblocking mode and the recv raises a
518-
# blocking error, simply return False indicating that
519-
# there's no data to be read. otherwise raise the
520-
# original exception.
521-
allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1)
522-
if not raise_on_timeout and ex.errno == allowed:
523-
return False
524-
raise ConnectionError(f"Error while reading from socket: {ex.args}")
463+
464+
async def read_from_socket(self):
465+
buffer = await self._stream.read(self._read_size)
466+
if not buffer or not isinstance(buffer, bytes):
467+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
468+
self._reader.feed(buffer)
469+
# data was read from the socket and added to the buffer.
470+
# return True to indicate that data was read.
471+
return True
525472

526473
async def read_response(
527474
self, disable_decoding: bool = False

0 commit comments

Comments
 (0)