From 16cdfcbfd2278ed44edded6618f0a5f1e0158d60 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 21 Jul 2018 19:05:32 +0800 Subject: [PATCH 01/19] Added SSEClient library --- firebase_admin/sseclient.py | 198 ++++++++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 firebase_admin/sseclient.py diff --git a/firebase_admin/sseclient.py b/firebase_admin/sseclient.py new file mode 100644 index 000000000..6f04ff2c2 --- /dev/null +++ b/firebase_admin/sseclient.py @@ -0,0 +1,198 @@ +"""SSEClient module to handle streaming of realtime changes on the database +to the firebase-admin-sdk +""" + +import re +import time +import warnings +import six + +import requests + + +# Technically, we should support streams that mix line endings. This regex, +# however, assumes that a system will provide consistent line endings. +end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n') + + +class KeepAuthSession(requests.Session): + """A session that does not drop Authentication on redirects between domains""" + def rebuild_auth(self, prepared_request, response): + pass + + +class SSEClient(object): + """SSE Client Class""" + def __init__(self, url, session, build_headers, last_id=None, retry=3000, **kwargs): + self.should_connect = True + self.url = url + self.last_id = last_id + self.retry = retry + self.running = True + # Optional support for passing in a requests.Session() + self.session = session + # function for building auth header when token expires + self.build_headers = build_headers + self.start_time = None + # Any extra kwargs will be fed into the requests.get call later. + self.requests_kwargs = kwargs + + # The SSE spec requires making requests with Cache-Control: nocache + if 'headers' not in self.requests_kwargs: + self.requests_kwargs['headers'] = {} + self.requests_kwargs['headers']['Cache-Control'] = 'no-cache' + + # The 'Accept' header is not required, but explicit > implicit + self.requests_kwargs['headers']['Accept'] = 'text/event-stream' + + # Keep data here as it streams in + self.buf = u'' + + self._connect() + + def close(self): + """Close the SSE Client instance""" + # TODO: check if AttributeError is needed to catch here + self.should_connect = False + self.retry = 0 + self.resp.close() + # self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR) + # self.resp.raw._fp.fp.raw._sock.close() + + + def _connect(self): + """connects to the server using requests""" + if self.should_connect: + success = False + while not success: + if self.last_id: + self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id + headers = self.build_headers() + self.requests_kwargs['headers'].update(headers) + # Use session if set. Otherwise fall back to requests module. + self.requester = self.session or requests + self.resp = self.requester.get(self.url, stream=True, **self.requests_kwargs) + + self.resp_iterator = self.resp.iter_content(decode_unicode=True) + + # TODO: Ensure we're handling redirects. Might also stick the 'origin' + # attribute on Events like the Javascript spec requires. + self.resp.raise_for_status() + success = True + else: + raise StopIteration() + + def _event_complete(self): + return re.search(end_of_field, self.buf) is not None + + def __iter__(self): + return self + + def __next__(self): + while not self._event_complete(): + try: + nextchar = next(self.resp_iterator) + self.buf += nextchar + except (StopIteration, requests.RequestException): + time.sleep(self.retry / 1000.0) + self._connect() + + # The SSE spec only supports resuming from a whole message, so + # if we have half a message we should throw it out. + head, sep, tail = self.buf.rpartition('\n') + self.buf = head + sep + continue + + split = re.split(end_of_field, self.buf) + head = split[0] + tail = "".join(split[1:]) + + self.buf = tail + msg = Event.parse(head) + + if msg.data == "credential is no longer valid": + self._connect() + return None + + if msg.data == 'null': + return None + + # If the server requests a specific retry delay, we need to honor it. + if msg.retry: + self.retry = msg.retry + + # last_id should only be set if included in the message. It's not + # forgotten if a message omits it. + if msg.event_id: + self.last_id = msg.event_id + + return msg + + if six.PY2: + next = __next__ + + +class Event(object): + """Event class to handle the events fired by SSE""" + + sse_line_pattern = re.compile('(?P[^:]*):?( ?(?P.*))?') + + def __init__(self, data='', event='message', event_id=None, retry=None): + self.data = data + self.event = event + self.event_id = event_id + self.retry = retry + + def dump(self): + """Dumps the event data""" + lines = [] + if self.event_id: + lines.append('id: %s' % self.event_id) + + # Only include an event line if it's not the default already. + if self.event != 'message': + lines.append('event: %s' % self.event) + + if self.retry: + lines.append('retry: %s' % self.retry) + + lines.extend('data: %s' % d for d in self.data.split('\n')) + return '\n'.join(lines) + '\n\n' + + @classmethod + def parse(cls, raw): + """Given a possibly-multiline string representing an SSE message, parse it + and return a Event object. + """ + msg = cls() + for line in raw.split('\n'): + match = cls.sse_line_pattern.match(line) + if match is None: + # Malformed line. Discard but warn. + warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning) + continue + + name = match.groupdict()['name'] + value = match.groupdict()['value'] + if name == '': + # line began with a ":", so is a comment. Ignore + continue + + if name == 'data': + # If we already have some data, then join to it with a newline. + # Else this is it. + if msg.data: + msg.data = '%s\n%s' % (msg.data, value) + else: + msg.data = value + elif name == 'event': + msg.event = value + elif name == 'id': + msg.event_id = value + elif name == 'retry': + msg.retry = int(value) + + return msg + + def __str__(self): + return self.data From b6f2eae37ba99a8beb48546873d5415b46cba679 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 21 Jul 2018 19:06:10 +0800 Subject: [PATCH 02/19] Added Streaming functionality to db.py --- firebase_admin/db.py | 69 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index 5613a1e67..4324252fe 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -23,16 +23,25 @@ import collections import json import sys +import threading +import time import requests import six from six.moves import urllib +from google.auth import transport import firebase_admin from firebase_admin import _http_client from firebase_admin import _utils +from firebase_admin.sseclient import SSEClient, KeepAuthSession +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + _DB_ATTRIBUTE = '_database' _INVALID_PATH_CHARACTERS = '[].#$' _RESERVED_FILTERS = ('$key', '$value', '$priority') @@ -69,6 +78,49 @@ def _parse_path(path): return [seg for seg in path.split('/') if seg] +class Stream(object): + """Class that handles the streaming of data node changes from server""" + def __init__(self, url, build_headers, stream_handler, stream_id): + """Initialize the streaming object""" + self.url = url + self.build_headers = build_headers + self.stream_handler = stream_handler + self.stream_id = stream_id + self.sse = None + self.thread = None + self.start() + + def start(self): + """Start the streaming by spawning a thread""" + self.thread = threading.Thread(target=self.start_stream) + self.thread.start() + return self + + def start_stream(self): + """Streaming function for the spawned thread to run""" + self.sse = SSEClient( + self.url, + session=KeepAuthSession(), + build_headers=self.build_headers + ) + + for msg in self.sse: + # iterate the sse client's generator + if msg: + msg_data = json.loads(msg.data) + msg_data["event"] = msg.event + if self.stream_id: + msg_data["stream_id"] = self.stream_id + self.stream_handler(msg_data) + + def close(self): + while not self.sse and not hasattr(self.sse, "resp"): + time.sleep(0.001) + self.sse.running = False + self.sse.close() + self.thread.join() + + class Reference(object): """Reference represents a node in the Firebase realtime database.""" @@ -101,6 +153,23 @@ def parent(self): return Reference(client=self._client, segments=self._segments[:-1]) return None + def build_headers(self, token=None): + headers = {'content-type' : 'application/json; charset=UTF-8'} + if not token and self._client._session.credentials: + request = transport.requests.Request() + self._client._session.credentials.refresh(request) + access_token = self._client._session.credentials.token + headers['Authorization'] = 'Bearer ' + access_token + return headers + + def stream(self, stream_handler, stream_id=None): + parameters = {} + # reset path and build_query for next query + request_ref = '{}{}.json?{}'.format( + self._client.base_url, self._pathurl, urlencode(parameters) + ) + return Stream(request_ref, self.build_headers, stream_handler, stream_id) + def child(self, path): """Returns a Reference to the specified child node. From 561f4eb1890ce5c4a0b4516fc91d3fce61912b07 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 21 Jul 2018 19:11:15 +0800 Subject: [PATCH 03/19] Added ignore directive 'protected-access' for db.py in lint.sh --- lint.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lint.sh b/lint.sh index 603b78f92..bb410d85d 100755 --- a/lint.sh +++ b/lint.sh @@ -33,6 +33,7 @@ set -o nounset SKIP_FOR_TESTS="redefined-outer-name,protected-access,missing-docstring,too-many-lines" SKIP_FOR_SNIPPETS="${SKIP_FOR_TESTS},reimported,unused-variable" +SKIP_FOR_DB="protected-access" if [[ "$#" -eq 1 && "$1" = "all" ]] then @@ -47,12 +48,12 @@ fi if [[ "$CHECK_ALL" = true ]] then - lintAllFiles "firebase_admin" "" + lintAllFiles "firebase_admin" "$SKIP_FOR_DB" lintAllFiles "tests" "$SKIP_FOR_TESTS" lintAllFiles "integration" "$SKIP_FOR_TESTS" lintAllFiles "snippets" "$SKIP_FOR_SNIPPETS" else - lintChangedFiles "firebase_admin" "" + lintChangedFiles "firebase_admin" "$SKIP_FOR_DB" lintChangedFiles "tests" "$SKIP_FOR_TESTS" lintChangedFiles "integration" "$SKIP_FOR_TESTS" lintChangedFiles "snippets" "$SKIP_FOR_SNIPPETS" From e6944c1be460ef9fa7bf0f24e2f20260ad2b7daf Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 21 Jul 2018 21:24:06 +0800 Subject: [PATCH 04/19] Fixed typo --- tests/test_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_db.py b/tests/test_db.py index 1bbd4c722..ef3b53be9 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -523,7 +523,7 @@ def test_range_query(self): assert recorder[0].headers['User-Agent'] == db._USER_AGENT -class TestDatabseInitialization(object): +class TestDatabaseInitialization(object): """Test cases for database initialization.""" def teardown_method(self): From 4dccb8c1c05ee210182282f793b06dc293c60502 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Tue, 24 Jul 2018 09:29:32 +0800 Subject: [PATCH 05/19] Renamed file to internal module --- firebase_admin/{sseclient.py => _sseclient.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename firebase_admin/{sseclient.py => _sseclient.py} (100%) diff --git a/firebase_admin/sseclient.py b/firebase_admin/_sseclient.py similarity index 100% rename from firebase_admin/sseclient.py rename to firebase_admin/_sseclient.py From f8ca12cbacef28a3400317876cc5e075d0132781 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Tue, 24 Jul 2018 09:31:37 +0800 Subject: [PATCH 06/19] Reverted lint.sh, added ignore directive in db.py --- firebase_admin/db.py | 6 +++--- lint.sh | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index 4324252fe..10e8eaf8a 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -155,10 +155,10 @@ def parent(self): def build_headers(self, token=None): headers = {'content-type' : 'application/json; charset=UTF-8'} - if not token and self._client._session.credentials: + if not token and self._client._session.credentials: # pylint: disable=protected-access request = transport.requests.Request() - self._client._session.credentials.refresh(request) - access_token = self._client._session.credentials.token + self._client._session.credentials.refresh(request) # pylint: disable=protected-access + access_token = self._client._session.credentials.token # pylint: disable=protected-access headers['Authorization'] = 'Bearer ' + access_token return headers diff --git a/lint.sh b/lint.sh index bb410d85d..603b78f92 100755 --- a/lint.sh +++ b/lint.sh @@ -33,7 +33,6 @@ set -o nounset SKIP_FOR_TESTS="redefined-outer-name,protected-access,missing-docstring,too-many-lines" SKIP_FOR_SNIPPETS="${SKIP_FOR_TESTS},reimported,unused-variable" -SKIP_FOR_DB="protected-access" if [[ "$#" -eq 1 && "$1" = "all" ]] then @@ -48,12 +47,12 @@ fi if [[ "$CHECK_ALL" = true ]] then - lintAllFiles "firebase_admin" "$SKIP_FOR_DB" + lintAllFiles "firebase_admin" "" lintAllFiles "tests" "$SKIP_FOR_TESTS" lintAllFiles "integration" "$SKIP_FOR_TESTS" lintAllFiles "snippets" "$SKIP_FOR_SNIPPETS" else - lintChangedFiles "firebase_admin" "$SKIP_FOR_DB" + lintChangedFiles "firebase_admin" "" lintChangedFiles "tests" "$SKIP_FOR_TESTS" lintChangedFiles "integration" "$SKIP_FOR_TESTS" lintChangedFiles "snippets" "$SKIP_FOR_SNIPPETS" From 6a829d028713f69693ae892f2441c6aa103d6b58 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Tue, 24 Jul 2018 10:31:39 +0800 Subject: [PATCH 07/19] Changed import module name to internal name --- firebase_admin/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index cc3b9a885..cd5daedaa 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -34,7 +34,7 @@ import firebase_admin from firebase_admin import _http_client from firebase_admin import _utils -from firebase_admin.sseclient import SSEClient, KeepAuthSession +from firebase_admin._sseclient import SSEClient, KeepAuthSession try: From 4b9a952dfab745b72b45e6af79866e5e357a1a0a Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Wed, 25 Jul 2018 07:31:23 +0800 Subject: [PATCH 08/19] Fixed pylint protected-access by not calling protected member --- firebase_admin/db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index cd5daedaa..6afdfc5b7 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -155,10 +155,10 @@ def parent(self): def build_headers(self, token=None): headers = {'content-type' : 'application/json; charset=UTF-8'} - if not token and self._client._session.credentials: # pylint: disable=protected-access + if not token and self._client.session: request = transport.requests.Request() - self._client._session.credentials.refresh(request) # pylint: disable=protected-access - access_token = self._client._session.credentials.token # pylint: disable=protected-access + self._client.session.credentials.refresh(request) + access_token = self._client.session.credentials.token headers['Authorization'] = 'Bearer ' + access_token return headers From db96e05b23698f6cddb986cd6a7bb6a9d4b95bdd Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Thu, 26 Jul 2018 22:59:05 +0800 Subject: [PATCH 09/19] Added test_sseclient.py --- tests/test_sseclient.py | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/test_sseclient.py diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py new file mode 100644 index 000000000..24848e061 --- /dev/null +++ b/tests/test_sseclient.py @@ -0,0 +1,60 @@ +"""Tests for firebase_admin.sseclient.""" +import json +import six +import requests + +from firebase_admin._sseclient import SSEClient, KeepAuthSession +from tests.testutils import MockAdapter + + +class MockSSEClient(MockAdapter): + def __init__(self, payload, status, recorder): + super().__init__(payload, status, recorder) + + def send(self, request, **kwargs): + resp = requests.models.Response() + resp.url = request.url + resp.status_code = self._status + resp.raw = six.BytesIO(self._data.encode()) + return resp + + +class TestSSEClient(object): + """Test cases for the SSEClient""" + + test_url = "https://test.firebaseio.com" + + def build_headers(self): + """Returns a mock header for SSEClient test""" + return { + "content-type": "application/json; charset=UTF-8", + "Authorization" : "Bearer MOCK_ACCESS_TOKEN" + } + + def init_sse(self): + payload = 'event: put\ndata: {"path":"/","data":"testevent"}\n\n' + status = 200 + recorder = [] + + adapter = MockSSEClient(payload, status, recorder) + session = KeepAuthSession() + session.mount(self.test_url, adapter) + + sseclient = SSEClient(url=self.test_url, session=session, build_headers=self.build_headers) + return sseclient + + + def test_init_sseclient(self): + sseclient = self.init_sse() + + assert sseclient.url == self.test_url + assert sseclient.running + assert sseclient.session != None + + def test_event(self): + sseclient = self.init_sse() + for msg in sseclient: + event = json.loads(msg.data) + break + assert event["data"] == "testevent" + assert event["path"] == "/" From 0906a7923afd95809cb3978393245b330b74a3cf Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Fri, 27 Jul 2018 19:46:56 +0800 Subject: [PATCH 10/19] python2,3 compatibility, fixed encoding issue --- tests/test_sseclient.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 24848e061..19150b73a 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -8,14 +8,17 @@ class MockSSEClient(MockAdapter): - def __init__(self, payload, status, recorder): - super().__init__(payload, status, recorder) + def __init__(self, payload): + status = 200 + recorder = [] + MockAdapter.__init__(self, payload, status, recorder) def send(self, request, **kwargs): resp = requests.models.Response() resp.url = request.url resp.status_code = self._status resp.raw = six.BytesIO(self._data.encode()) + resp.encoding = "utf-8" return resp @@ -33,10 +36,8 @@ def build_headers(self): def init_sse(self): payload = 'event: put\ndata: {"path":"/","data":"testevent"}\n\n' - status = 200 - recorder = [] - adapter = MockSSEClient(payload, status, recorder) + adapter = MockSSEClient(payload) session = KeepAuthSession() session.mount(self.test_url, adapter) From ea58b5882e9c1bea8f1e272f31772fceac67dfa9 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Fri, 27 Jul 2018 20:51:27 +0800 Subject: [PATCH 11/19] Added tests for Event() class --- tests/test_sseclient.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 19150b73a..2ab7c6ea0 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -3,7 +3,7 @@ import six import requests -from firebase_admin._sseclient import SSEClient, KeepAuthSession +from firebase_admin._sseclient import SSEClient, KeepAuthSession, Event from tests.testutils import MockAdapter @@ -59,3 +59,21 @@ def test_event(self): break assert event["data"] == "testevent" assert event["path"] == "/" + + +class TestEvent(object): + """Test cases for Events""" + + def test_normal(self): + data = 'event: put\ndata: {"path":"/","data":"testdata"}' + output = 'event: put\ndata: {"path":"/","data":"testdata"}\n\n' + event = Event.parse(data) + assert event.dump() == output + assert event.event == "put" + assert event.data == '{"path":"/","data":"testdata"}' + + def test_invalid(self): + data = 'event: invalid_event' + event = Event.parse(data) + assert event.event == "invalid_event" + assert event.data == '' From 104cc4e156ee59f42ab6ee74f2154be4cdc7ae2a Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 10:37:00 +0800 Subject: [PATCH 12/19] removed build_headers() function --- firebase_admin/_sseclient.py | 55 ++++++++++++++++-------------------- firebase_admin/db.py | 17 ++--------- tests/test_sseclient.py | 9 ++---- 3 files changed, 29 insertions(+), 52 deletions(-) diff --git a/firebase_admin/_sseclient.py b/firebase_admin/_sseclient.py index 6f04ff2c2..c7dfecdac 100644 --- a/firebase_admin/_sseclient.py +++ b/firebase_admin/_sseclient.py @@ -6,7 +6,6 @@ import time import warnings import six - import requests @@ -23,27 +22,30 @@ def rebuild_auth(self, prepared_request, response): class SSEClient(object): """SSE Client Class""" - def __init__(self, url, session, build_headers, last_id=None, retry=3000, **kwargs): + def __init__(self, url, session, last_id=None, retry=3000, **kwargs): + """Initialize the SSEClient + Args: + url: the url to connect to + session: the requests.session() + last_id: optional id + retry: the interval in ms + **kwargs: extra kwargs will be sent to requests.get + """ self.should_connect = True self.url = url self.last_id = last_id self.retry = retry self.running = True - # Optional support for passing in a requests.Session() self.session = session - # function for building auth header when token expires - self.build_headers = build_headers - self.start_time = None - # Any extra kwargs will be fed into the requests.get call later. self.requests_kwargs = kwargs + headers = self.requests_kwargs.get('headers', {}) # The SSE spec requires making requests with Cache-Control: nocache - if 'headers' not in self.requests_kwargs: - self.requests_kwargs['headers'] = {} - self.requests_kwargs['headers']['Cache-Control'] = 'no-cache' - + headers['Cache-Control'] = 'no-cache' # The 'Accept' header is not required, but explicit > implicit - self.requests_kwargs['headers']['Accept'] = 'text/event-stream' + headers['Accept'] = 'text/event-stream' + + self.requests_kwargs['headers'] = headers # Keep data here as it streams in self.buf = u'' @@ -54,6 +56,7 @@ def close(self): """Close the SSE Client instance""" # TODO: check if AttributeError is needed to catch here self.should_connect = False + self.running = False self.retry = 0 self.resp.close() # self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR) @@ -62,13 +65,11 @@ def close(self): def _connect(self): """connects to the server using requests""" - if self.should_connect: + if self.should_connect and self.running: success = False while not success: if self.last_id: self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id - headers = self.build_headers() - self.requests_kwargs['headers'].update(headers) # Use session if set. Otherwise fall back to requests module. self.requester = self.session or requests self.resp = self.requester.get(self.url, stream=True, **self.requests_kwargs) @@ -83,6 +84,7 @@ def _connect(self): raise StopIteration() def _event_complete(self): + """Checks if the event is completed by matching regular expression""" return re.search(end_of_field, self.buf) is not None def __iter__(self): @@ -97,6 +99,7 @@ def __next__(self): time.sleep(self.retry / 1000.0) self._connect() + # The SSE spec only supports resuming from a whole message, so # if we have half a message we should throw it out. head, sep, tail = self.buf.rpartition('\n') @@ -143,26 +146,16 @@ def __init__(self, data='', event='message', event_id=None, retry=None): self.event_id = event_id self.retry = retry - def dump(self): - """Dumps the event data""" - lines = [] - if self.event_id: - lines.append('id: %s' % self.event_id) - - # Only include an event line if it's not the default already. - if self.event != 'message': - lines.append('event: %s' % self.event) - - if self.retry: - lines.append('retry: %s' % self.retry) - - lines.extend('data: %s' % d for d in self.data.split('\n')) - return '\n'.join(lines) + '\n\n' - @classmethod def parse(cls, raw): """Given a possibly-multiline string representing an SSE message, parse it and return a Event object. + + Args: + raw: the raw data to parse + + Returns: + Event: newly intialized Event() object with the parameters initialized """ msg = cls() for line in raw.split('\n'): diff --git a/firebase_admin/db.py b/firebase_admin/db.py index 6afdfc5b7..a9776995b 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -80,10 +80,9 @@ def _parse_path(path): class Stream(object): """Class that handles the streaming of data node changes from server""" - def __init__(self, url, build_headers, stream_handler, stream_id): + def __init__(self, url, stream_handler, stream_id): """Initialize the streaming object""" self.url = url - self.build_headers = build_headers self.stream_handler = stream_handler self.stream_id = stream_id self.sse = None @@ -100,8 +99,7 @@ def start_stream(self): """Streaming function for the spawned thread to run""" self.sse = SSEClient( self.url, - session=KeepAuthSession(), - build_headers=self.build_headers + session=KeepAuthSession() ) for msg in self.sse: @@ -153,22 +151,13 @@ def parent(self): return Reference(client=self._client, segments=self._segments[:-1]) return None - def build_headers(self, token=None): - headers = {'content-type' : 'application/json; charset=UTF-8'} - if not token and self._client.session: - request = transport.requests.Request() - self._client.session.credentials.refresh(request) - access_token = self._client.session.credentials.token - headers['Authorization'] = 'Bearer ' + access_token - return headers - def stream(self, stream_handler, stream_id=None): parameters = {} # reset path and build_query for next query request_ref = '{}{}.json?{}'.format( self._client.base_url, self._pathurl, urlencode(parameters) ) - return Stream(request_ref, self.build_headers, stream_handler, stream_id) + return Stream(request_ref, stream_handler, stream_id) def child(self, path): """Returns a Reference to the specified child node. diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 2ab7c6ea0..1c772bfa2 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -4,6 +4,7 @@ import requests from firebase_admin._sseclient import SSEClient, KeepAuthSession, Event +from firebase_admin import _sseclient from tests.testutils import MockAdapter @@ -27,12 +28,6 @@ class TestSSEClient(object): test_url = "https://test.firebaseio.com" - def build_headers(self): - """Returns a mock header for SSEClient test""" - return { - "content-type": "application/json; charset=UTF-8", - "Authorization" : "Bearer MOCK_ACCESS_TOKEN" - } def init_sse(self): payload = 'event: put\ndata: {"path":"/","data":"testevent"}\n\n' @@ -41,7 +36,7 @@ def init_sse(self): session = KeepAuthSession() session.mount(self.test_url, adapter) - sseclient = SSEClient(url=self.test_url, session=session, build_headers=self.build_headers) + sseclient = SSEClient(url=self.test_url, session=session) return sseclient From 8c0c3e7f95333777d72b27b9e8e5478df03e43a6 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 10:45:49 +0800 Subject: [PATCH 13/19] Removed Event().dump() test code --- firebase_admin/_sseclient.py | 17 +++++++++++------ tests/test_sseclient.py | 2 -- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/firebase_admin/_sseclient.py b/firebase_admin/_sseclient.py index c7dfecdac..c45d033cb 100644 --- a/firebase_admin/_sseclient.py +++ b/firebase_admin/_sseclient.py @@ -22,14 +22,15 @@ def rebuild_auth(self, prepared_request, response): class SSEClient(object): """SSE Client Class""" + def __init__(self, url, session, last_id=None, retry=3000, **kwargs): """Initialize the SSEClient Args: - url: the url to connect to - session: the requests.session() - last_id: optional id - retry: the interval in ms - **kwargs: extra kwargs will be sent to requests.get + url: the url to connect to + session: the requests.session() + last_id: optional id + retry: the interval in ms + **kwargs: extra kwargs will be sent to requests.get """ self.should_connect = True self.url = url @@ -84,7 +85,11 @@ def _connect(self): raise StopIteration() def _event_complete(self): - """Checks if the event is completed by matching regular expression""" + """Checks if the event is completed by matching regular expression + + Returns: + boolean: True if the regex matched meaning end of event, else False + """ return re.search(end_of_field, self.buf) is not None def __iter__(self): diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 1c772bfa2..279531f0d 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -61,9 +61,7 @@ class TestEvent(object): def test_normal(self): data = 'event: put\ndata: {"path":"/","data":"testdata"}' - output = 'event: put\ndata: {"path":"/","data":"testdata"}\n\n' event = Event.parse(data) - assert event.dump() == output assert event.event == "put" assert event.data == '{"path":"/","data":"testdata"}' From 0186ed9a06461734883859af2d57526fe282b8a7 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 10:53:00 +0800 Subject: [PATCH 14/19] Changed import to match code style --- firebase_admin/db.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index a9776995b..a1e9530c7 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -29,12 +29,11 @@ import requests import six from six.moves import urllib -from google.auth import transport import firebase_admin from firebase_admin import _http_client from firebase_admin import _utils -from firebase_admin._sseclient import SSEClient, KeepAuthSession +from firebase_admin import _sseclient try: @@ -97,9 +96,9 @@ def start(self): def start_stream(self): """Streaming function for the spawned thread to run""" - self.sse = SSEClient( + self.sse = _sseclient.SSEClient( self.url, - session=KeepAuthSession() + session=_sseclient.KeepAuthSession() ) for msg in self.sse: From 37ae8577abd8d0540643f9fcbe31db6e65b08b95 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 11:13:00 +0800 Subject: [PATCH 15/19] init sseclient to Stream().start() and removed sleep --- firebase_admin/db.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index a1e9530c7..38c4a2081 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -24,7 +24,6 @@ import json import sys import threading -import time import requests import six @@ -90,17 +89,16 @@ def __init__(self, url, stream_handler, stream_id): def start(self): """Start the streaming by spawning a thread""" + self.sse = _sseclient.SSEClient( + self.url, + session=_sseclient.KeepAuthSession() + ) self.thread = threading.Thread(target=self.start_stream) self.thread.start() return self def start_stream(self): """Streaming function for the spawned thread to run""" - self.sse = _sseclient.SSEClient( - self.url, - session=_sseclient.KeepAuthSession() - ) - for msg in self.sse: # iterate the sse client's generator if msg: @@ -111,8 +109,6 @@ def start_stream(self): self.stream_handler(msg_data) def close(self): - while not self.sse and not hasattr(self.sse, "resp"): - time.sleep(0.001) self.sse.running = False self.sse.close() self.thread.join() From 9a0d4c3b6fbbaf6e24b192b701f69682c02735e3 Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 11:46:00 +0800 Subject: [PATCH 16/19] Added apache license 2.0 header --- firebase_admin/_sseclient.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/firebase_admin/_sseclient.py b/firebase_admin/_sseclient.py index c45d033cb..c3e46bf0a 100644 --- a/firebase_admin/_sseclient.py +++ b/firebase_admin/_sseclient.py @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """SSEClient module to handle streaming of realtime changes on the database to the firebase-admin-sdk """ From d9142ad0e531be0aef6a2795862c061f8961787e Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Sat, 28 Jul 2018 11:46:22 +0800 Subject: [PATCH 17/19] changed import style, removed for loop in event in test_sseclient.TestEvent --- tests/test_sseclient.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 279531f0d..601da86f3 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -3,7 +3,6 @@ import six import requests -from firebase_admin._sseclient import SSEClient, KeepAuthSession, Event from firebase_admin import _sseclient from tests.testutils import MockAdapter @@ -33,10 +32,10 @@ def init_sse(self): payload = 'event: put\ndata: {"path":"/","data":"testevent"}\n\n' adapter = MockSSEClient(payload) - session = KeepAuthSession() + session = _sseclient.KeepAuthSession() session.mount(self.test_url, adapter) - sseclient = SSEClient(url=self.test_url, session=session) + sseclient = _sseclient.SSEClient(url=self.test_url, session=session) return sseclient @@ -49,9 +48,8 @@ def test_init_sseclient(self): def test_event(self): sseclient = self.init_sse() - for msg in sseclient: - event = json.loads(msg.data) - break + msg = next(sseclient) + event = json.loads(msg.data) assert event["data"] == "testevent" assert event["path"] == "/" @@ -61,12 +59,12 @@ class TestEvent(object): def test_normal(self): data = 'event: put\ndata: {"path":"/","data":"testdata"}' - event = Event.parse(data) + event = _sseclient.Event.parse(data) assert event.event == "put" assert event.data == '{"path":"/","data":"testdata"}' def test_invalid(self): data = 'event: invalid_event' - event = Event.parse(data) + event = _sseclient.Event.parse(data) assert event.event == "invalid_event" assert event.data == '' From bfb7c699c36ff2d415313db7da3ac711afa96d8b Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Tue, 31 Jul 2018 09:04:37 +0800 Subject: [PATCH 18/19] Removed self.running from firebase_admin._sseclient --- firebase_admin/_sseclient.py | 4 +--- tests/test_sseclient.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/firebase_admin/_sseclient.py b/firebase_admin/_sseclient.py index c3e46bf0a..26dd66977 100644 --- a/firebase_admin/_sseclient.py +++ b/firebase_admin/_sseclient.py @@ -48,7 +48,6 @@ def __init__(self, url, session, last_id=None, retry=3000, **kwargs): self.url = url self.last_id = last_id self.retry = retry - self.running = True self.session = session self.requests_kwargs = kwargs @@ -69,7 +68,6 @@ def close(self): """Close the SSE Client instance""" # TODO: check if AttributeError is needed to catch here self.should_connect = False - self.running = False self.retry = 0 self.resp.close() # self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR) @@ -78,7 +76,7 @@ def close(self): def _connect(self): """connects to the server using requests""" - if self.should_connect and self.running: + if self.should_connect: success = False while not success: if self.last_id: diff --git a/tests/test_sseclient.py b/tests/test_sseclient.py index 601da86f3..422fef172 100644 --- a/tests/test_sseclient.py +++ b/tests/test_sseclient.py @@ -43,7 +43,6 @@ def test_init_sseclient(self): sseclient = self.init_sse() assert sseclient.url == self.test_url - assert sseclient.running assert sseclient.session != None def test_event(self): From 76896a5784de1e187c43dbd412d870c6ac74045a Mon Sep 17 00:00:00 2001 From: the-c0d3r Date: Wed, 8 Aug 2018 09:19:42 +0800 Subject: [PATCH 19/19] Renamed Stream class to ListenerRegistration, stream() to listen(), removed 'stream_id' and added more documentation --- firebase_admin/db.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/firebase_admin/db.py b/firebase_admin/db.py index d38e4c2ad..508d7db4b 100644 --- a/firebase_admin/db.py +++ b/firebase_admin/db.py @@ -82,13 +82,17 @@ def _parse_path(path): return [seg for seg in path.split('/') if seg] -class Stream(object): +class ListenerRegistration(object): """Class that handles the streaming of data node changes from server""" - def __init__(self, url, stream_handler, stream_id): - """Initialize the streaming object""" + def __init__(self, url, stream_handler): + """Initialize a new ListenerRegistration object with given parameters + + Args: + url: the data node url to listen for changes + stream_handler: the callback function to fire in case of event + """ self.url = url self.stream_handler = stream_handler - self.stream_id = stream_id self.sse = None self.thread = None self.start() @@ -110,11 +114,10 @@ def start_stream(self): if msg: msg_data = json.loads(msg.data) msg_data["event"] = msg.event - if self.stream_id: - msg_data["stream_id"] = self.stream_id self.stream_handler(msg_data) def close(self): + """Terminates SSE server connection and joins the thread""" self.sse.running = False self.sse.close() self.thread.join() @@ -152,13 +155,21 @@ def parent(self): return Reference(client=self._client, segments=self._segments[:-1]) return None - def stream(self, stream_handler, stream_id=None): + def listen(self, stream_handler): + """Function to setup the streaming of data from server data node changes + + Args: + stream_handler: A function to callback in the event of data node change detected + + Returns: + object: Returns a ListenerRegistration object which handles the stream + """ parameters = {} # reset path and build_query for next query request_ref = '{}{}.json?{}'.format( self._client.base_url, self._pathurl, urlencode(parameters) ) - return Stream(request_ref, stream_handler, stream_id) + return ListenerRegistration(request_ref, stream_handler) def child(self, path): """Returns a Reference to the specified child node.