From 310c050469cf9cea54279fdf99f8ac4b953e6ca9 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:05:04 +0200 Subject: [PATCH 1/8] add throttle and prio --- README.md | 37 ++++++++++++++++ package-lock.json | 51 +++++++++++++++++++++ package.json | 1 + spec/.eslintrc.json | 2 +- spec/ParsePushAdapter.spec.js | 83 +++++++++++++++++++++++++++++++++++ spec/ThrottleQueue.spec.js | 66 ++++++++++++++++++++++++++++ spec/helper.js | 10 ++++- src/ParsePushAdapter.js | 8 ++++ src/ThrottleQueue.js | 41 +++++++++++++++++ 9 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 spec/ThrottleQueue.spec.js create mode 100644 src/ThrottleQueue.js diff --git a/README.md b/README.md index d2297eb6..49d1f891 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ The official Push Notification adapter for Parse Server. See [Parse Server Push - [HTTP/1.1 Legacy Option](#http11-legacy-option) - [Firebase Client Error](#firebase-client-error) - [Expo Push Options](#expo-push-options) +- [Push Throttling \& Prioritization](#push-throttling--prioritization) - [Bundled with Parse Server](#bundled-with-parse-server) - [Logging](#logging) @@ -181,6 +182,42 @@ expo: { For more information see the [Expo docs](https://docs.expo.dev/push-notifications/overview/). +## Push Throttling & Prioritization + +Push providers usually throttle their APIs, so that sending too many pushes notifications within a short time may cause the API not accept any more requests. To address this, push sending can be throttled per provider by adding a `throttle` option to the respective push configuration. The option `maxPerSecond` defines the maximum number of pushes sent per second. If not throttle is configured, pushes are sent as quickly as possible. + +Example throttle configuration: + +```js +const parseServerOptions = { + push: { + adapter: new ParsePushAdapter({ + ios: { + // ... + throttle: { maxPerSecond: 100 } + } + }) + } +}; +``` + +Each push request may additionally specify the following parameters. + +| Parameter | Description | +|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `queueTtl` | The TTL in seconds. If a queued push expires before it is sent, it is discarded. | +| `queuePriority` | The priority of the push in the queue. When processing items in the queue, pushes with higher priority values are sent first. The priority value 1 means a higher priority than the value 0. The default priority value is 0. | + +Example push payload: + +```js +pushData = { + queueTtl: 10, // discard after 10 seconds if not yet sent + queuePriority: 1, // send with higher priority than default pushes + data: { alert: 'Hello' } +}; +``` + ## Bundled with Parse Server Parse Server already comes bundled with a specific version of the push adapter. This installation is only necessary when customizing the push adapter version that should be used by Parse Server. When using a customized version of the push adapter, ensure that it's compatible with the version of Parse Server you are using. diff --git a/package-lock.json b/package-lock.json index 801d43c7..fb337a7d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "expo-server-sdk": "3.15.0", "firebase-admin": "13.4.0", "npmlog": "7.0.1", + "p-queue": "8.1.0", "parse": "6.1.1", "web-push": "3.6.7" }, @@ -3406,6 +3407,11 @@ "node": ">=6" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -8185,6 +8191,21 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-reduce": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-reduce/-/p-reduce-2.1.0.tgz", @@ -8194,6 +8215,17 @@ "node": ">=8" } }, + "node_modules/p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/package-json-from-dist": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz", @@ -12694,6 +12726,11 @@ "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==" }, + "eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -16056,12 +16093,26 @@ "integrity": "sha512-z4cYYMMdKHzw4O5UkWJImbZynVIo0lSGTXc7bzB1e/rrDqkgGUNysK/o4bTr+0+xKvvLoTyGqYC4Fgljy9qe1Q==", "dev": true }, + "p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "requires": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + } + }, "p-reduce": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/p-reduce/-/p-reduce-2.1.0.tgz", "integrity": "sha512-2USApvnsutq8uoxZBGbbWM0JIYLiEMJ9RlaN7fAzVNb9OZN0SHjjTTfIcb667XynS5Y1VhwDJVDa72TnPzAYWw==", "dev": true }, + "p-timeout": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", + "integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==" + }, "package-json-from-dist": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz", diff --git a/package.json b/package.json index 450d158d..f44d9de0 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "expo-server-sdk": "3.15.0", "firebase-admin": "13.4.0", "npmlog": "7.0.1", + "p-queue": "8.1.0", "parse": "6.1.1", "web-push": "3.6.7" }, diff --git a/spec/.eslintrc.json b/spec/.eslintrc.json index 44ad90cd..af3c16bb 100644 --- a/spec/.eslintrc.json +++ b/spec/.eslintrc.json @@ -1,6 +1,6 @@ { "env": { - "jasmine": true + "jasmine": true }, "globals": { "Parse": true diff --git a/spec/ParsePushAdapter.spec.js b/spec/ParsePushAdapter.spec.js index 40d8d0bf..4ea15c1d 100644 --- a/spec/ParsePushAdapter.spec.js +++ b/spec/ParsePushAdapter.spec.js @@ -10,6 +10,8 @@ import GCM from '../src/GCM.js'; import WEB from '../src/WEB.js'; import FCM from '../src/FCM.js'; import EXPO from '../src/EXPO.js'; +import { wait } from './helper.js'; +import PQueue from 'p-queue'; describe('ParsePushAdapter', () => { @@ -642,6 +644,87 @@ describe('ParsePushAdapter', () => { }); }); + + it('throttles push sends per provider', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + throttle: { maxPerSecond: 1 } + } + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + const times = []; + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(() => { + times.push(Date.now()); + return Promise.resolve([]); + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + await Promise.all([ + parsePushAdapter.send({}, installs), + parsePushAdapter.send({}, installs), + ]); + expect(times.length).toBe(2); + expect(times[1] - times[0]).toBeGreaterThanOrEqual(900); + expect(times[1] - times[0]).toBeLessThanOrEqual(1100); + }); + + it('skips queued pushes after ttl expires', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + throttle: { maxPerSecond: 1 } + } + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async () => { + await wait(1_200); + return []; + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + await Promise.all([ + parsePushAdapter.send({}, installs), + parsePushAdapter.send({ queueTtl: 1 }, installs) + ]); + expect(parsePushAdapter.senderMap['android'].send.calls.count()).toBe(1); + }); + + it('sends higher priority pushes before lower priority ones', async () => { + const pushConfig = { + android: { + senderId: 'id', + apiKey: 'key', + throttle: { maxPerSecond: 1 } + } + }; + const parsePushAdapter = new ParsePushAdapter(pushConfig); + const callOrder = []; + parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async (data) => { + callOrder.push(data.queuePriority); + await wait(100); + return []; + }); + const installs = [{ deviceType: 'android', deviceToken: 'token' }]; + + // Block queue with task so that the queue scheduler doesn't start processing enqueued items + // immediately; afterwards the scheduler picks the next enqueued item according to priority; + const pBlock = parsePushAdapter.queues.android.enqueue({ task: () => wait(500) }); + // Wait to ensure block item in queue has started + await wait(100); + + await Promise.all([ + pBlock, + parsePushAdapter.send({ queuePriority: 3 }, installs), + parsePushAdapter.send({ queuePriority: 4 }, installs), + parsePushAdapter.send({ queuePriority: 2 }, installs), + parsePushAdapter.send({ queuePriority: 0 }, installs), + parsePushAdapter.send({ queuePriority: 1 }, installs), + ]); + expect(callOrder).toEqual([4, 3, 2, 1, 0]); + }); + + it('random string throws with size <=0', () => { expect(() => randomString(0)).toThrow(); }); diff --git a/spec/ThrottleQueue.spec.js b/spec/ThrottleQueue.spec.js new file mode 100644 index 00000000..5b006d6b --- /dev/null +++ b/spec/ThrottleQueue.spec.js @@ -0,0 +1,66 @@ +import ThrottleQueue from '../src/ThrottleQueue.js'; +import { wait } from './helper.js'; + +describe('ThrottleQueue', () => { + it('processes items respecting rate limit', async () => { + const q = new ThrottleQueue(1); + const times = []; + const p1 = q.enqueue({ task: () => { times.push(Date.now()) } }); + const p2 = q.enqueue({ task: () => { times.push(Date.now()) } }); + await Promise.all([p1, p2]); + expect(times.length).toBe(2); + expect(times[1] - times[0]).toBeGreaterThanOrEqual(900); + expect(times[1] - times[0]).toBeLessThanOrEqual(1100); + }); + + it('drops expired items', async () => { + const q = new ThrottleQueue(1); + const results = []; + const p1 = q.enqueue({ task: () => wait(1_200) }); + const p2 = q.enqueue({ task: () => { throw new Error('should not run'); }, ttl: 1 }); + const p3 = q.enqueue({ task: () => { results.push('run'); } }); + await Promise.all([p1, p2, p3]); + expect(results).toEqual(['run']); + }); + + it('processes higher priority tasks first', async () => { + const q = new ThrottleQueue(1); + const results = []; + + // Block queue with task so that the queue scheduler doesn't start processing enqueued items + // immediately; afterwards the scheduler picks the next enqueued item according to priority + const pBlock = q.enqueue({ task: () => wait(500) }); + // Wait to ensure block item in queue has started + await wait(100); + + const p7 = q.enqueue({ task: () => results.push('priority7'), priority: 7 }); + const p4 = q.enqueue({ task: () => results.push('priority4'), priority: 4 }); + const p2 = q.enqueue({ task: () => results.push('priority2'), priority: 2 }); + const p0 = q.enqueue({ task: () => results.push('priority0'), priority: 0 }); + const p6 = q.enqueue({ task: () => results.push('priority6'), priority: 6 }); + const p1 = q.enqueue({ task: () => results.push('priority1'), priority: 1 }); + const p3 = q.enqueue({ task: () => results.push('priority3'), priority: 3 }); + const p5 = q.enqueue({ task: () => results.push('priority5'), priority: 5 }); + await Promise.all([ + pBlock, + p3, + p2, + p4, + p7, + p0, + p6, + p5, + p1, + ]); + expect(results).toEqual([ + 'priority7', + 'priority6', + 'priority5', + 'priority4', + 'priority3', + 'priority2', + 'priority1', + 'priority0', + ]); + }); +}); diff --git a/spec/helper.js b/spec/helper.js index f01497ba..107098ae 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -1,12 +1,18 @@ import { SpecReporter } from 'jasmine-spec-reporter'; -import { fileURLToPath } from 'url'; import { dirname } from 'path'; +import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); global.__dirname = __dirname; -jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 5000; +jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 50000; jasmine.getEnv().clearReporters(); jasmine.getEnv().addReporter(new SpecReporter()); + +const wait = (ms) => new Promise(resolve => setTimeout(resolve, ms)); + +export { + wait, +}; diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index 51867697..8493fbe7 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -7,6 +7,7 @@ import FCM from './FCM.js'; import WEB from './WEB.js'; import EXPO from './EXPO.js'; import { classifyInstallations } from './PushAdapterUtils.js'; +import ThrottleQueue from './ThrottleQueue.js'; const LOG_PREFIX = 'parse-server-push-adapter'; @@ -17,6 +18,7 @@ export default class ParsePushAdapter { constructor(pushConfig = {}) { this.validPushTypes = ['ios', 'osx', 'tvos', 'watchos', 'android', 'fcm', 'web', 'expo']; this.senderMap = {}; + this.queues = {}; // used in PushController for Dashboard Features this.feature = { immediatePush: true @@ -55,6 +57,10 @@ export default class ParsePushAdapter { } break; } + if (pushConfig[pushType].throttle) { + const rate = pushConfig[pushType].throttle.maxPerSecond; + this.queues[pushType] = new ThrottleQueue(rate); + } } } @@ -84,6 +90,8 @@ export default class ParsePushAdapter { }) }); sendPromises.push(Promise.all(results)); + } else if (this.queues[pushType]) { + sendPromises.push(this.queues[pushType].enqueue({ task: () => sender.send(data, devices), ttl: data.queueTtl, priority: data.queuePriority })); } else { sendPromises.push(sender.send(data, devices)); } diff --git a/src/ThrottleQueue.js b/src/ThrottleQueue.js new file mode 100644 index 00000000..72daa11c --- /dev/null +++ b/src/ThrottleQueue.js @@ -0,0 +1,41 @@ +import PQueue from 'p-queue'; + +export default class ThrottleQueue { + + /** + * Creates an instance of ThrottleQueue. + * + * @param {number} [maxPerSecond=Infinity] The maximum number of tasks to process per second. + * Optional, defaults to Infinity (no limit). + */ + constructor(maxPerSecond = Infinity) { + if (maxPerSecond === Infinity) { + this.queue = new PQueue({ concurrency: Infinity }); + } else { + const interval = Math.ceil(1000 / maxPerSecond); + this.queue = new PQueue({ concurrency: 1, intervalCap: 1, interval }); + } + } + + /** + * Enqueue a task to be processed by the throttle queue. + * + * @param {Object} options The options for the task. + * @param {Function} options.task The task to be enqueued. + * @param {number} [options.ttl] The time-to-live for the task in seconds. Optional, if provided, + * the task will only be processed if it is still valid when dequeued. + * @param {number} [options.priority=0] The priority of the task. Optional, defaults to 0. Higher + * priority tasks will be processed before lower priority ones. For example, the value 1 has a + * higher priority than 0. + * @returns {Promise} A promise that resolves when the task is processed. + */ + enqueue({ task, ttl, priority = 0 }) { + const expireAt = ttl ? Date.now() + ttl * 1_000 : null; + return this.queue.add(() => { + if (expireAt && Date.now() > expireAt) { + return null; + } + return task(); + }, { priority }); + } +} From 46b10cd500fd3dd91a788d757862b88d11dbd174 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:32:52 +0200 Subject: [PATCH 2/8] refactor --- README.md | 32 ++++++++++++++++------- spec/ParsePushAdapter.spec.js | 49 +++++++++++++++++++---------------- src/ParsePushAdapter.js | 4 ++- 3 files changed, 52 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 49d1f891..bc63a538 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,9 @@ The official Push Notification adapter for Parse Server. See [Parse Server Push - [HTTP/1.1 Legacy Option](#http11-legacy-option) - [Firebase Client Error](#firebase-client-error) - [Expo Push Options](#expo-push-options) -- [Push Throttling \& Prioritization](#push-throttling--prioritization) +- [Push Queue](#push-queue) + - [Throttling](#throttling) + - [Push Options](#push-options) - [Bundled with Parse Server](#bundled-with-parse-server) - [Logging](#logging) @@ -182,7 +184,9 @@ expo: { For more information see the [Expo docs](https://docs.expo.dev/push-notifications/overview/). -## Push Throttling & Prioritization +## Push Queue + +### Throttling Push providers usually throttle their APIs, so that sending too many pushes notifications within a short time may cause the API not accept any more requests. To address this, push sending can be throttled per provider by adding a `throttle` option to the respective push configuration. The option `maxPerSecond` defines the maximum number of pushes sent per second. If not throttle is configured, pushes are sent as quickly as possible. @@ -194,26 +198,34 @@ const parseServerOptions = { adapter: new ParsePushAdapter({ ios: { // ... - throttle: { maxPerSecond: 100 } + queue: { + throttle: { maxPerSecond: 100 } + } } }) } }; ``` -Each push request may additionally specify the following parameters. +### Push Options + +Each push request may specify the following options for handling in the queue. -| Parameter | Description | -|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `queueTtl` | The TTL in seconds. If a queued push expires before it is sent, it is discarded. | -| `queuePriority` | The priority of the push in the queue. When processing items in the queue, pushes with higher priority values are sent first. The priority value 1 means a higher priority than the value 0. The default priority value is 0. | +| Parameter | Default | Optional | Description | +|------------------|------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `queue.ttl` | `Infinity` | Yes | The time-to-live of the push in the queue in seconds. If a queued push expires before it is sent to the push provider, it is discarded. Default is `Infinity`, meaning pushes never expire. | +| `queue.priority` | `0` | Yes | The priority of the push in the queue. When processing the queue, pushes are sent in order of their priority. For example, a push with priority `1` is sent before a push with priority `0`. | Example push payload: ```js pushData = { - queueTtl: 10, // discard after 10 seconds if not yet sent - queuePriority: 1, // send with higher priority than default pushes + queue: { + // Discard after 10 seconds from queue if push has not been sent to push provider yet + ttl: 10, + // Send with higher priority than default pushes + priority: 1, + }, data: { alert: 'Hello' } }; ``` diff --git a/spec/ParsePushAdapter.spec.js b/spec/ParsePushAdapter.spec.js index 4ea15c1d..8e6d933b 100644 --- a/spec/ParsePushAdapter.spec.js +++ b/spec/ParsePushAdapter.spec.js @@ -1,17 +1,16 @@ -import { join } from 'path'; -import log from 'npmlog'; import apn from '@parse/node-apn'; -import ParsePushAdapterPackage, { ParsePushAdapter as _ParsePushAdapter, APNS as _APNS, GCM as _GCM, WEB as _WEB, EXPO as _EXPO, utils } from '../src/index.js'; -const ParsePushAdapter = _ParsePushAdapter; -import { randomString } from '../src/PushAdapterUtils.js'; -import MockAPNProvider from './MockAPNProvider.js'; +import log from 'npmlog'; +import { join } from 'path'; import APNS from '../src/APNS.js'; +import EXPO from '../src/EXPO.js'; +import FCM from '../src/FCM.js'; import GCM from '../src/GCM.js'; +import ParsePushAdapterPackage, { APNS as _APNS, EXPO as _EXPO, GCM as _GCM, ParsePushAdapter as _ParsePushAdapter, WEB as _WEB, utils } from '../src/index.js'; +import { randomString } from '../src/PushAdapterUtils.js'; import WEB from '../src/WEB.js'; -import FCM from '../src/FCM.js'; -import EXPO from '../src/EXPO.js'; import { wait } from './helper.js'; -import PQueue from 'p-queue'; +import MockAPNProvider from './MockAPNProvider.js'; +const ParsePushAdapter = _ParsePushAdapter; describe('ParsePushAdapter', () => { @@ -650,8 +649,10 @@ describe('ParsePushAdapter', () => { android: { senderId: 'id', apiKey: 'key', - throttle: { maxPerSecond: 1 } - } + queue: { + throttle: { maxPerSecond: 1 } + }, + }, }; const parsePushAdapter = new ParsePushAdapter(pushConfig); const times = []; @@ -674,8 +675,10 @@ describe('ParsePushAdapter', () => { android: { senderId: 'id', apiKey: 'key', - throttle: { maxPerSecond: 1 } - } + queue: { + throttle: { maxPerSecond: 1 } + }, + }, }; const parsePushAdapter = new ParsePushAdapter(pushConfig); parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async () => { @@ -685,7 +688,7 @@ describe('ParsePushAdapter', () => { const installs = [{ deviceType: 'android', deviceToken: 'token' }]; await Promise.all([ parsePushAdapter.send({}, installs), - parsePushAdapter.send({ queueTtl: 1 }, installs) + parsePushAdapter.send({ queue: { ttl: 1 } }, installs) ]); expect(parsePushAdapter.senderMap['android'].send.calls.count()).toBe(1); }); @@ -695,13 +698,15 @@ describe('ParsePushAdapter', () => { android: { senderId: 'id', apiKey: 'key', - throttle: { maxPerSecond: 1 } - } + queue: { + throttle: { maxPerSecond: 1 } + }, + }, }; const parsePushAdapter = new ParsePushAdapter(pushConfig); const callOrder = []; parsePushAdapter.senderMap['android'].send = jasmine.createSpy('send').and.callFake(async (data) => { - callOrder.push(data.queuePriority); + callOrder.push(data.id); await wait(100); return []; }); @@ -715,11 +720,11 @@ describe('ParsePushAdapter', () => { await Promise.all([ pBlock, - parsePushAdapter.send({ queuePriority: 3 }, installs), - parsePushAdapter.send({ queuePriority: 4 }, installs), - parsePushAdapter.send({ queuePriority: 2 }, installs), - parsePushAdapter.send({ queuePriority: 0 }, installs), - parsePushAdapter.send({ queuePriority: 1 }, installs), + parsePushAdapter.send({ id: 3, queue: { priority: 3 }}, installs), + parsePushAdapter.send({ id: 4, queue: { priority: 4 }}, installs), + parsePushAdapter.send({ id: 2, queue: { priority: 2 }}, installs), + parsePushAdapter.send({ id: 0, queue: { priority: 0 }}, installs), + parsePushAdapter.send({ id: 1, queue: { priority: 1 }}, installs), ]); expect(callOrder).toEqual([4, 3, 2, 1, 0]); }); diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index 8493fbe7..a98210eb 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -91,7 +91,9 @@ export default class ParsePushAdapter { }); sendPromises.push(Promise.all(results)); } else if (this.queues[pushType]) { - sendPromises.push(this.queues[pushType].enqueue({ task: () => sender.send(data, devices), ttl: data.queueTtl, priority: data.queuePriority })); + const { ttl, priority } = data?.queue || {}; + delete data?.queue; + sendPromises.push(this.queues[pushType].enqueue({ task: () => sender.send(data, devices), ttl, priority })); } else { sendPromises.push(sender.send(data, devices)); } From 1712ccde74f080f822a6df6dc0ac2dda72130217 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Wed, 2 Jul 2025 23:46:46 +0200 Subject: [PATCH 3/8] expose detailed queue params --- README.md | 39 +++++++++++++++++++++++++++++++---- spec/ParsePushAdapter.spec.js | 12 ++++++++--- spec/ThrottleQueue.spec.js | 6 +++--- src/ParsePushAdapter.js | 7 ++++--- src/ThrottleQueue.js | 21 ++++++++++--------- 5 files changed, 62 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index bc63a538..989914f3 100644 --- a/README.md +++ b/README.md @@ -188,9 +188,15 @@ For more information see the [Expo docs](https://docs.expo.dev/push-notification ### Throttling -Push providers usually throttle their APIs, so that sending too many pushes notifications within a short time may cause the API not accept any more requests. To address this, push sending can be throttled per provider by adding a `throttle` option to the respective push configuration. The option `maxPerSecond` defines the maximum number of pushes sent per second. If not throttle is configured, pushes are sent as quickly as possible. +By default, pushes are sent as fast as possible. However, push providers usually throttle their APIs, so that sending too many pushes notifications within a short time may cause the API to reject requests. To address this, push sending can be throttled per provider by adding the `queue` option to the respective push configuration. -Example throttle configuration: +| Parameter | Default | Optional | Description | +|--------------------------|------------|----------|--------------------------------------------------------------------------------------------| +| `queue.concurrency` | `Infinity` | Yes | The maximum number of pushes to process concurrently. | +| `queue.intervalCapacity` | `Infinity` | Yes | The interval capacity, meaning the maximum number of tasks to process in a given interval. | +| `queue.interval` | `0` | Yes | The interval in milliseconds for the interval capacity. | + +Example configuration to throttle the queue to max. 1 push every 100ms, equivalent to max. 10 pushes per second: ```js const parseServerOptions = { @@ -199,14 +205,39 @@ const parseServerOptions = { ios: { // ... queue: { - throttle: { maxPerSecond: 100 } - } + concurrency: 1, + intervalCapacity: 1, + interval: 100, + }, } }) } }; ``` +Keep in mind that `concurrency: 1` means that pushes are sent in serial. For example, if sending a request to the push provider takes up to 500ms to complete, then the configuration above may be limited to only 2 pushes per second if every request takes 500ms. To address this, you can send pushes in parallel by setting the concurrency to a value greater than `1`, and increasing `intervalCapacity` and `interval` to fully utilize parallelism. + +Example configuration sending pushes in parallel: + +```js +const parseServerOptions = { + push: { + adapter: new ParsePushAdapter({ + ios: { + // ... + queue: { + concurrency: 5, + intervalCapacity: 5, + interval: 500, + }, + } + }) + } +}; +``` + +In the example above, pushes will be sent in bursts of 5 at once, with max. 10 pushes within 1s. On a timeline that means at `t=0ms`, 5 pushes will be sent in parallel. If sending the pushes take less than 500ms, then `intervalCapacity` will still limit to 5 pushes within the first 500ms. At `t=500ms` the second interval begins and another max. 5 pushes are sent in parallel. That effectively means a throughput of up to 10 pushes per second. + ### Push Options Each push request may specify the following options for handling in the queue. diff --git a/spec/ParsePushAdapter.spec.js b/spec/ParsePushAdapter.spec.js index 8e6d933b..0401c57e 100644 --- a/spec/ParsePushAdapter.spec.js +++ b/spec/ParsePushAdapter.spec.js @@ -650,7 +650,9 @@ describe('ParsePushAdapter', () => { senderId: 'id', apiKey: 'key', queue: { - throttle: { maxPerSecond: 1 } + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, }, }, }; @@ -676,7 +678,9 @@ describe('ParsePushAdapter', () => { senderId: 'id', apiKey: 'key', queue: { - throttle: { maxPerSecond: 1 } + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, }, }, }; @@ -699,7 +703,9 @@ describe('ParsePushAdapter', () => { senderId: 'id', apiKey: 'key', queue: { - throttle: { maxPerSecond: 1 } + concurrency: 1, + intervalCapacity: 1, + interval: 1_000, }, }, }; diff --git a/spec/ThrottleQueue.spec.js b/spec/ThrottleQueue.spec.js index 5b006d6b..0307cb15 100644 --- a/spec/ThrottleQueue.spec.js +++ b/spec/ThrottleQueue.spec.js @@ -3,7 +3,7 @@ import { wait } from './helper.js'; describe('ThrottleQueue', () => { it('processes items respecting rate limit', async () => { - const q = new ThrottleQueue(1); + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); const times = []; const p1 = q.enqueue({ task: () => { times.push(Date.now()) } }); const p2 = q.enqueue({ task: () => { times.push(Date.now()) } }); @@ -14,7 +14,7 @@ describe('ThrottleQueue', () => { }); it('drops expired items', async () => { - const q = new ThrottleQueue(1); + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); const results = []; const p1 = q.enqueue({ task: () => wait(1_200) }); const p2 = q.enqueue({ task: () => { throw new Error('should not run'); }, ttl: 1 }); @@ -24,7 +24,7 @@ describe('ThrottleQueue', () => { }); it('processes higher priority tasks first', async () => { - const q = new ThrottleQueue(1); + const q = new ThrottleQueue({ concurrency: 1, intervalCap: 1, interval: 1_000 }); const results = []; // Block queue with task so that the queue scheduler doesn't start processing enqueued items diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index a98210eb..73ced691 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -57,9 +57,10 @@ export default class ParsePushAdapter { } break; } - if (pushConfig[pushType].throttle) { - const rate = pushConfig[pushType].throttle.maxPerSecond; - this.queues[pushType] = new ThrottleQueue(rate); + const queue = pushConfig[pushType].queue; + if (queue) { + const { concurrency, intervalCapacity, interval } = queue || {}; + this.queues[pushType] = new ThrottleQueue({ concurrency, intervalCap: intervalCapacity, interval }); } } } diff --git a/src/ThrottleQueue.js b/src/ThrottleQueue.js index 72daa11c..3423b4f5 100644 --- a/src/ThrottleQueue.js +++ b/src/ThrottleQueue.js @@ -3,18 +3,19 @@ import PQueue from 'p-queue'; export default class ThrottleQueue { /** - * Creates an instance of ThrottleQueue. + * Creates an instance of ThrottleQueue. If no parameters are provided, then the queue will have no + * throttling and will process tasks as fast as possible. * - * @param {number} [maxPerSecond=Infinity] The maximum number of tasks to process per second. - * Optional, defaults to Infinity (no limit). + * @param {Object} [options] The options. + * @param {number} [options.concurrency=Infinity] The maximum number of tasks to process concurrently. + * Optional, defaults to `Infinity`, meaning no limit on concurrency. + * @param {number} [options.intervalCap=Infinity] The interval capacity, meaning the maximum number of + * tasks to process in a given interval. Optional, defaults to `Infinity`, meaning no interval limit. + * @param {number} [options.interval=0] The interval in milliseconds for the interval capacity. + * Optional, defaults to `0`, meaning no interval limit. */ - constructor(maxPerSecond = Infinity) { - if (maxPerSecond === Infinity) { - this.queue = new PQueue({ concurrency: Infinity }); - } else { - const interval = Math.ceil(1000 / maxPerSecond); - this.queue = new PQueue({ concurrency: 1, intervalCap: 1, interval }); - } + constructor({ concurrency = Infinity, intervalCap = Infinity, interval = 0 } = {}) { + this.queue = new PQueue({ concurrency, intervalCap, interval }); } /** From 1e45202e5ee3f6740e122b9e926c0551ef010c3f Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Thu, 3 Jul 2025 00:05:31 +0200 Subject: [PATCH 4/8] undo timeout change --- spec/helper.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/helper.js b/spec/helper.js index 107098ae..cdb52a8b 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -6,7 +6,7 @@ const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); global.__dirname = __dirname; -jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 50000; +jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 5000; jasmine.getEnv().clearReporters(); jasmine.getEnv().addReporter(new SpecReporter()); From 22bd46ab93a5a0975059225407b4e628866c4300 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Thu, 3 Jul 2025 00:08:19 +0200 Subject: [PATCH 5/8] increase timeout --- spec/helper.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/helper.js b/spec/helper.js index cdb52a8b..9a227e6c 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -6,7 +6,7 @@ const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); global.__dirname = __dirname; -jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 5000; +jasmine.DEFAULT_TIMEOUT_INTERVAL = process.env.PARSE_SERVER_TEST_TIMEOUT || 10_000; jasmine.getEnv().clearReporters(); jasmine.getEnv().addReporter(new SpecReporter()); From b7fc5bc62e27d9e999ff582efab0a50da7b28c86 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Thu, 3 Jul 2025 00:09:54 +0200 Subject: [PATCH 6/8] fix lint --- spec/helper.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/helper.js b/spec/helper.js index 9a227e6c..e3c445e4 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -1,5 +1,6 @@ import { SpecReporter } from 'jasmine-spec-reporter'; import { dirname } from 'path'; +import { setTimeout } from 'timers'; import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); @@ -14,5 +15,5 @@ jasmine.getEnv().addReporter(new SpecReporter()); const wait = (ms) => new Promise(resolve => setTimeout(resolve, ms)); export { - wait, + wait }; From 849e014796565ebd09f1ed898b47985e15e1f539 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Thu, 3 Jul 2025 02:26:25 +0200 Subject: [PATCH 7/8] fix if push config is array --- src/ParsePushAdapter.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index 73ced691..d6b6aba9 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -57,7 +57,8 @@ export default class ParsePushAdapter { } break; } - const queue = pushConfig[pushType].queue; + const config = pushConfig[pushType]; + const queue = Array.isArray(config) ? config.find(c => c && c.queue)?.queue : config.queue; if (queue) { const { concurrency, intervalCapacity, interval } = queue || {}; this.queues[pushType] = new ThrottleQueue({ concurrency, intervalCap: intervalCapacity, interval }); From b0bb3ec3c35bd51ba9a44633b185a7ce95bc8611 Mon Sep 17 00:00:00 2001 From: Manuel Trezza <5673677+mtrezza@users.noreply.github.com> Date: Thu, 3 Jul 2025 02:35:14 +0200 Subject: [PATCH 8/8] remove unnecessary delete --- src/ParsePushAdapter.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ParsePushAdapter.js b/src/ParsePushAdapter.js index d6b6aba9..8a8786d7 100644 --- a/src/ParsePushAdapter.js +++ b/src/ParsePushAdapter.js @@ -94,7 +94,6 @@ export default class ParsePushAdapter { sendPromises.push(Promise.all(results)); } else if (this.queues[pushType]) { const { ttl, priority } = data?.queue || {}; - delete data?.queue; sendPromises.push(this.queues[pushType].enqueue({ task: () => sender.send(data, devices), ttl, priority })); } else { sendPromises.push(sender.send(data, devices));