From 0c974a91398e41e1c154e6f0ba54ed37ceccd4d9 Mon Sep 17 00:00:00 2001 From: Jonathan Edey Date: Mon, 26 Aug 2024 17:55:42 -0400 Subject: [PATCH 1/4] fix: Remove possible deadlock with nested `callAsync()` --- .../firebase/messaging/FirebaseMessaging.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java index e16b6ac9d..1718a3fe7 100644 --- a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java +++ b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java @@ -26,6 +26,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import com.google.firebase.ErrorCode; import com.google.firebase.FirebaseApp; import com.google.firebase.ImplFirebaseTrampolines; @@ -159,7 +160,7 @@ protected String execute() throws FirebaseMessagingException { * no failures are only indicated by a {@link BatchResponse}. */ public BatchResponse sendEach(@NonNull List messages) throws FirebaseMessagingException { - return sendEachOp(messages, false).call(); + return sendEach(messages, false); } @@ -186,7 +187,11 @@ public BatchResponse sendEach(@NonNull List messages) throws FirebaseMe */ public BatchResponse sendEach( @NonNull List messages, boolean dryRun) throws FirebaseMessagingException { - return sendEachOp(messages, dryRun).call(); + try { + return sendEachOpAsync(messages, dryRun).get(); + } catch (InterruptedException | ExecutionException e) { + throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); + } } /** @@ -197,7 +202,7 @@ public BatchResponse sendEach( * the messages have been sent. */ public ApiFuture sendEachAsync(@NonNull List messages) { - return sendEachOp(messages, false).callAsync(app); + return sendEachOpAsync(messages, false); } /** @@ -209,32 +214,29 @@ public ApiFuture sendEachAsync(@NonNull List messages) { * the messages have been sent. */ public ApiFuture sendEachAsync(@NonNull List messages, boolean dryRun) { - return sendEachOp(messages, dryRun).callAsync(app); + return sendEachOpAsync(messages, dryRun); } - private CallableOperation sendEachOp( + private ApiFuture sendEachOpAsync( final List messages, final boolean dryRun) { final List immutableMessages = ImmutableList.copyOf(messages); checkArgument(!immutableMessages.isEmpty(), "messages list must not be empty"); checkArgument(immutableMessages.size() <= 500, "messages list must not contain more than 500 elements"); - return new CallableOperation() { - @Override - protected BatchResponse execute() throws FirebaseMessagingException { - List> list = new ArrayList<>(); - for (Message message : immutableMessages) { - ApiFuture messageId = sendOpForSendResponse(message, dryRun).callAsync(app); - list.add(messageId); - } - try { - List responses = ApiFutures.allAsList(list).get(); - return new BatchResponseImpl(responses); - } catch (InterruptedException | ExecutionException e) { - throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); - } - } - }; + List> list = new ArrayList<>(); + for (Message message : immutableMessages) { + ApiFuture messageId = sendOpForSendResponse(message, dryRun).callAsync(app); + list.add(messageId); + } + + ApiFuture> responsesFuture = ApiFutures.allAsList(list); + return ApiFutures.transform( + responsesFuture, + (responses) -> { + return new BatchResponseImpl(responses); + }, + MoreExecutors.directExecutor()); } private CallableOperation sendOpForSendResponse( From 3afbc3e72f26c24bfcde9b006cf2c3b2ada69e25 Mon Sep 17 00:00:00 2001 From: Jonathan Edey Date: Mon, 26 Aug 2024 17:57:01 -0400 Subject: [PATCH 2/4] Set default `ThreadPoolExecutor` to a fixed thread pool --- .../com/google/firebase/internal/FirebaseThreadManagers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java index 877450fcc..2bb292ad7 100644 --- a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java +++ b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java @@ -84,7 +84,8 @@ private static class DefaultThreadManager extends GlobalThreadManager { protected ExecutorService doInit() { ThreadFactory threadFactory = FirebaseScheduledExecutor.getThreadFactoryWithName( getThreadFactory(), "firebase-default-%d"); - return Executors.newCachedThreadPool(threadFactory); + + return Executors.newFixedThreadPool(100, threadFactory); } @Override From 59e1516102e42d66209bda8c9defca02371aff85 Mon Sep 17 00:00:00 2001 From: Jonathan Edey Date: Fri, 30 Aug 2024 12:58:15 -0400 Subject: [PATCH 3/4] fix: add keepAliveTime to close idle threads --- .../google/firebase/internal/FirebaseThreadManagers.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java index 2bb292ad7..c14fbd611 100644 --- a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java +++ b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java @@ -23,7 +23,10 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +88,10 @@ protected ExecutorService doInit() { ThreadFactory threadFactory = FirebaseScheduledExecutor.getThreadFactoryWithName( getThreadFactory(), "firebase-default-%d"); - return Executors.newFixedThreadPool(100, threadFactory); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, + TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); + threadPoolExecutor.allowCoreThreadTimeOut(true); + return threadPoolExecutor; } @Override From d2a76d3914d6eea1eafa21f7eff46322c243b10b Mon Sep 17 00:00:00 2001 From: Jonathan Edey Date: Fri, 30 Aug 2024 13:00:32 -0400 Subject: [PATCH 4/4] Added comments explainging the sendEachAsync change --- .../com/google/firebase/messaging/FirebaseMessaging.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java index 1718a3fe7..870940f77 100644 --- a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java +++ b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java @@ -217,6 +217,8 @@ public ApiFuture sendEachAsync(@NonNull List messages, b return sendEachOpAsync(messages, dryRun); } + // Returns an ApiFuture directly since this function is non-blocking. Individual child send + // requests are still called async and run in background threads. private ApiFuture sendEachOpAsync( final List messages, final boolean dryRun) { final List immutableMessages = ImmutableList.copyOf(messages); @@ -226,11 +228,17 @@ private ApiFuture sendEachOpAsync( List> list = new ArrayList<>(); for (Message message : immutableMessages) { + // Make async send calls per message ApiFuture messageId = sendOpForSendResponse(message, dryRun).callAsync(app); list.add(messageId); } + // Gather all futures and combine into a list ApiFuture> responsesFuture = ApiFutures.allAsList(list); + + // Chain this future to wrap the eventual responses in a BatchResponse without blocking + // the main thread. This uses the current thread to execute, but since the transformation + // function is non-blocking the transformation itself is also non-blocking. return ApiFutures.transform( responsesFuture, (responses) -> {