From 3a637f1a7ee0a98733555a2b5730bf82e25cb1b0 Mon Sep 17 00:00:00 2001 From: Io Date: Fri, 27 Dec 2019 23:04:19 +0000 Subject: [PATCH 1/3] Add support for connection closed listeners --- asyncpg/connection.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 5fa2ddae..3511b633 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -46,8 +46,8 @@ class Connection(metaclass=ConnectionMeta): '_listeners', '_server_version', '_server_caps', '_intro_query', '_reset_query', '_proxy', '_stmt_exclusive_section', '_config', '_params', '_addr', - '_log_listeners', '_cancellations', '_source_traceback', - '__weakref__') + '_log_listeners', '_close_listeners', '_cancellations', + '_source_traceback', '__weakref__') def __init__(self, protocol, transport, loop, addr: (str, int) or str, @@ -78,6 +78,7 @@ def __init__(self, protocol, transport, loop, self._listeners = {} self._log_listeners = set() self._cancellations = set() + self._close_listeners = set() settings = self._protocol.get_settings() ver_string = settings.server_version @@ -178,6 +179,19 @@ def remove_log_listener(self, callback): """ self._log_listeners.discard(callback) + def add_close_listener(self, callback): + """Add a listener that will be called when the the connection is closing. + + :param callable callback: + A callable receiving one argument: + **connection**: a Connection the callback is registered with. + """ + self._close_listeners.add(callback) + + def remove_close_listener(self, callback): + """Remove a listening callback for the connection closing.""" + self._close_listeners.discard(callback) + def get_server_pid(self): """Return the PID of the Postgres server the connection is bound to.""" return self._protocol.get_server_pid() @@ -1120,6 +1134,7 @@ def _abort(self): self._protocol = None def _cleanup(self): + self._call_close_listeners() # Free the resources associated with this connection. # This must be called when a connection is terminated. @@ -1237,6 +1252,23 @@ def _call_log_listener(self, cb, con_ref, message): 'exception': ex }) + def _call_close_listeners(self): + if not self._close_listeners: + return + + con_ref = self._unwrap() + for cb in self._close_listeners: + try: + cb(con_ref) + except Exception as ex: + self._loop.call_exception_handler({ + 'message': 'Unhandled exception in asyncpg connection ' + 'connection closed callback {!r}'.format(cb), + 'exception': ex + }) + + self._close_listeners.clear() + def _process_notification(self, pid, channel, payload): if channel not in self._listeners: return From 92a66236b6e668f4e48e8079f3c8eadb3f9719be Mon Sep 17 00:00:00 2001 From: Elvis Pranskevichus Date: Sat, 2 May 2020 10:27:04 -0700 Subject: [PATCH 2/3] Add tests for connection closure listeners --- asyncpg/_testbase/fuzzer.py | 28 ++++++++++++++++--------- asyncpg/connection.py | 7 ++++++- tests/test_listeners.py | 42 +++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/asyncpg/_testbase/fuzzer.py b/asyncpg/_testbase/fuzzer.py index 88f6e5c1..5c0b870c 100644 --- a/asyncpg/_testbase/fuzzer.py +++ b/asyncpg/_testbase/fuzzer.py @@ -145,6 +145,10 @@ def _close_connection(self, connection): if conn_task is not None: conn_task.cancel() + def close_all_connections(self): + for conn in list(self.connections): + self.loop.call_soon_threadsafe(self._close_connection, conn) + class Connection: def __init__(self, client_sock, backend_sock, proxy): @@ -215,10 +219,11 @@ async def _read(self, sock, n): else: return read_task.result() finally: - if not read_task.done(): - read_task.cancel() - if not conn_event_task.done(): - conn_event_task.cancel() + if not self.loop.is_closed(): + if not read_task.done(): + read_task.cancel() + if not conn_event_task.done(): + conn_event_task.cancel() async def _write(self, sock, data): write_task = asyncio.ensure_future( @@ -236,10 +241,11 @@ async def _write(self, sock, data): else: return write_task.result() finally: - if not write_task.done(): - write_task.cancel() - if not conn_event_task.done(): - conn_event_task.cancel() + if not self.loop.is_closed(): + if not write_task.done(): + write_task.cancel() + if not conn_event_task.done(): + conn_event_task.cancel() async def proxy_to_backend(self): buf = None @@ -264,7 +270,8 @@ async def proxy_to_backend(self): pass finally: - self.loop.call_soon(self.close) + if not self.loop.is_closed(): + self.loop.call_soon(self.close) async def proxy_from_backend(self): buf = None @@ -289,4 +296,5 @@ async def proxy_from_backend(self): pass finally: - self.loop.call_soon(self.close) + if not self.loop.is_closed(): + self.loop.call_soon(self.close) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 3511b633..f311d1e3 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -185,11 +185,16 @@ def add_close_listener(self, callback): :param callable callback: A callable receiving one argument: **connection**: a Connection the callback is registered with. + + .. versionadded:: 0.21.0 """ self._close_listeners.add(callback) def remove_close_listener(self, callback): - """Remove a listening callback for the connection closing.""" + """Remove a listening callback for the connection closing. + + .. versionadded:: 0.21.0 + """ self._close_listeners.discard(callback) def get_server_pid(self): diff --git a/tests/test_listeners.py b/tests/test_listeners.py index 4879cd88..ebf698f3 100644 --- a/tests/test_listeners.py +++ b/tests/test_listeners.py @@ -6,6 +6,10 @@ import asyncio +import os +import platform +import sys +import unittest from asyncpg import _testbase as tb from asyncpg import exceptions @@ -272,3 +276,41 @@ def listener1(*args): pass con.add_log_listener(listener1) + + +@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing') +@unittest.skipIf( + platform.system() == 'Windows' and + sys.version_info >= (3, 8), + 'not compatible with ProactorEventLoop which is default in Python 3.8') +class TestConnectionCloseListener(tb.ProxiedClusterTestCase): + + async def test_connection_close_callback_called_on_remote(self): + + called = False + + def close_cb(con): + nonlocal called + called = True + + con = await self.connect() + con.add_close_listener(close_cb) + self.proxy.close_all_connections() + try: + await con.fetchval('SELECT 1') + except Exception: + pass + self.assertTrue(called) + + async def test_connection_close_callback_called_on_local(self): + + called = False + + def close_cb(con): + nonlocal called + called = True + + con = await self.connect() + con.add_close_listener(close_cb) + await con.close() + self.assertTrue(called) From 451e3a649aec097af85d06d59fc7677082a6a9ac Mon Sep 17 00:00:00 2001 From: Elvis Pranskevichus Date: Sun, 3 May 2020 22:27:51 -0700 Subject: [PATCH 3/3] Rename "close_listener" to "termination_listener" Termination listener reads better. --- asyncpg/connection.py | 36 +++++++++++++++++++++--------------- tests/test_listeners.py | 10 +++++----- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/asyncpg/connection.py b/asyncpg/connection.py index f311d1e3..825eb156 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -46,7 +46,7 @@ class Connection(metaclass=ConnectionMeta): '_listeners', '_server_version', '_server_caps', '_intro_query', '_reset_query', '_proxy', '_stmt_exclusive_section', '_config', '_params', '_addr', - '_log_listeners', '_close_listeners', '_cancellations', + '_log_listeners', '_termination_listeners', '_cancellations', '_source_traceback', '__weakref__') def __init__(self, protocol, transport, loop, @@ -78,7 +78,7 @@ def __init__(self, protocol, transport, loop, self._listeners = {} self._log_listeners = set() self._cancellations = set() - self._close_listeners = set() + self._termination_listeners = set() settings = self._protocol.get_settings() ver_string = settings.server_version @@ -179,8 +179,8 @@ def remove_log_listener(self, callback): """ self._log_listeners.discard(callback) - def add_close_listener(self, callback): - """Add a listener that will be called when the the connection is closing. + def add_termination_listener(self, callback): + """Add a listener that will be called when the connection is closed. :param callable callback: A callable receiving one argument: @@ -188,14 +188,18 @@ def add_close_listener(self, callback): .. versionadded:: 0.21.0 """ - self._close_listeners.add(callback) + self._termination_listeners.add(callback) - def remove_close_listener(self, callback): - """Remove a listening callback for the connection closing. + def remove_termination_listener(self, callback): + """Remove a listening callback for connection termination. + + :param callable callback: + The callable that was passed to + :meth:`Connection.add_termination_listener`. .. versionadded:: 0.21.0 """ - self._close_listeners.discard(callback) + self._termination_listeners.discard(callback) def get_server_pid(self): """Return the PID of the Postgres server the connection is bound to.""" @@ -1139,7 +1143,7 @@ def _abort(self): self._protocol = None def _cleanup(self): - self._call_close_listeners() + self._call_termination_listeners() # Free the resources associated with this connection. # This must be called when a connection is terminated. @@ -1257,22 +1261,24 @@ def _call_log_listener(self, cb, con_ref, message): 'exception': ex }) - def _call_close_listeners(self): - if not self._close_listeners: + def _call_termination_listeners(self): + if not self._termination_listeners: return con_ref = self._unwrap() - for cb in self._close_listeners: + for cb in self._termination_listeners: try: cb(con_ref) except Exception as ex: self._loop.call_exception_handler({ - 'message': 'Unhandled exception in asyncpg connection ' - 'connection closed callback {!r}'.format(cb), + 'message': ( + 'Unhandled exception in asyncpg connection ' + 'termination listener callback {!r}'.format(cb) + ), 'exception': ex }) - self._close_listeners.clear() + self._termination_listeners.clear() def _process_notification(self, pid, channel, payload): if channel not in self._listeners: diff --git a/tests/test_listeners.py b/tests/test_listeners.py index ebf698f3..a4726e2d 100644 --- a/tests/test_listeners.py +++ b/tests/test_listeners.py @@ -283,9 +283,9 @@ def listener1(*args): platform.system() == 'Windows' and sys.version_info >= (3, 8), 'not compatible with ProactorEventLoop which is default in Python 3.8') -class TestConnectionCloseListener(tb.ProxiedClusterTestCase): +class TestConnectionTerminationListener(tb.ProxiedClusterTestCase): - async def test_connection_close_callback_called_on_remote(self): + async def test_connection_termination_callback_called_on_remote(self): called = False @@ -294,7 +294,7 @@ def close_cb(con): called = True con = await self.connect() - con.add_close_listener(close_cb) + con.add_termination_listener(close_cb) self.proxy.close_all_connections() try: await con.fetchval('SELECT 1') @@ -302,7 +302,7 @@ def close_cb(con): pass self.assertTrue(called) - async def test_connection_close_callback_called_on_local(self): + async def test_connection_termination_callback_called_on_local(self): called = False @@ -311,6 +311,6 @@ def close_cb(con): called = True con = await self.connect() - con.add_close_listener(close_cb) + con.add_termination_listener(close_cb) await con.close() self.assertTrue(called)