Skip to content

Commit eb1e273

Browse files
committed
SwtScheduler that honors the Scheduler/Worker contracts
1 parent 77509e1 commit eb1e273

File tree

1 file changed

+214
-78
lines changed

1 file changed

+214
-78
lines changed

src/com/diffplug/common/swt/SwtExec.java

Lines changed: 214 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,27 @@
1515
*/
1616
package com.diffplug.common.swt;
1717

18+
import java.util.*;
1819
import java.util.List;
19-
import java.util.Objects;
20-
import java.util.concurrent.AbstractExecutorService;
21-
import java.util.concurrent.Callable;
22-
import java.util.concurrent.CompletionStage;
23-
import java.util.concurrent.Delayed;
24-
import java.util.concurrent.ExecutionException;
25-
import java.util.concurrent.Executor;
26-
import java.util.concurrent.RejectedExecutionException;
27-
import java.util.concurrent.RunnableFuture;
28-
import java.util.concurrent.ScheduledExecutorService;
29-
import java.util.concurrent.ScheduledFuture;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.TimeoutException;
20+
import java.util.concurrent.*;
21+
import java.util.concurrent.atomic.*;
3222
import java.util.function.Supplier;
3323

3424
import org.eclipse.swt.SWT;
35-
import org.eclipse.swt.widgets.Display;
36-
import org.eclipse.swt.widgets.Widget;
37-
38-
import rx.Observable;
39-
import rx.Scheduler;
40-
import rx.Subscription;
41-
import rx.functions.Action0;
42-
import rx.subscriptions.BooleanSubscription;
43-
import rx.subscriptions.Subscriptions;
25+
import org.eclipse.swt.widgets.*;
4426

27+
import com.diffplug.common.base.Box.Nullable;
28+
import com.diffplug.common.base.Unhandled;
29+
import com.diffplug.common.rx.*;
4530
import com.google.common.base.Preconditions;
4631
import com.google.common.primitives.Ints;
4732
import com.google.common.util.concurrent.ListenableFuture;
4833

49-
import com.diffplug.common.base.Box.Nullable;
50-
import com.diffplug.common.base.Unhandled;
51-
import com.diffplug.common.rx.Rx;
52-
import com.diffplug.common.rx.RxSubscriber;
5334
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35+
import rx.*;
36+
import rx.Observable;
37+
import rx.functions.Action0;
38+
import rx.subscriptions.*;
5439

5540
/**
5641
* {link ExecutorService}s which execute on the SWT UI thread.
@@ -326,58 +311,7 @@ public Scheduler getRxScheduler() {
326311

327312
private SwtExec(Display display) {
328313
this.display = display;
329-
this.scheduler = new Scheduler() {
330-
@Override
331-
public Worker createWorker() {
332-
return new Worker() {
333-
private BooleanSubscription workerSub = BooleanSubscription.create();
334-
335-
@Override
336-
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
337-
if (isUnsubscribed()) {
338-
return Subscriptions.unsubscribed();
339-
}
340-
if (delayTime <= 0) {
341-
return schedule(action);
342-
} else {
343-
ScheduledFuture<?> future = SwtExec.this.schedule(() -> {
344-
if (!workerSub.isUnsubscribed()) {
345-
action.call();
346-
}
347-
}, delayTime, unit);
348-
Subscription sub = Subscriptions.create(() -> {
349-
future.cancel(true);
350-
});
351-
return sub;
352-
}
353-
}
354-
355-
@Override
356-
public Subscription schedule(Action0 action) {
357-
if (isUnsubscribed()) {
358-
return Subscriptions.unsubscribed();
359-
}
360-
BooleanSubscription sub = BooleanSubscription.create();
361-
execute(() -> {
362-
if (!sub.isUnsubscribed() && !workerSub.isUnsubscribed()) {
363-
action.call();
364-
}
365-
});
366-
return sub;
367-
}
368-
369-
@Override
370-
public void unsubscribe() {
371-
workerSub.unsubscribe();
372-
}
373-
374-
@Override
375-
public boolean isUnsubscribed() {
376-
return workerSub.isUnsubscribed();
377-
}
378-
};
379-
}
380-
};
314+
this.scheduler = new SwtScheduler(this);
381315
this.rxExecutor = Rx.on(this, scheduler);
382316
}
383317

@@ -673,4 +607,206 @@ public boolean isDone() {
673607
return runnableFuture.isDone();
674608
}
675609
}
610+
611+
/** Scheduler that runs tasks on Swt's event dispatch thread. */
612+
static final class SwtScheduler extends Scheduler {
613+
final SwtExec exec;
614+
615+
public SwtScheduler(SwtExec exec) {
616+
this.exec = exec;
617+
}
618+
619+
@Override
620+
public Worker createWorker() {
621+
return new SwtWorker(exec);
622+
}
623+
624+
static final class SwtWorker extends Scheduler.Worker {
625+
final SwtExec exec;
626+
627+
volatile boolean unsubscribed;
628+
629+
/** Set of active tasks, guarded by this. */
630+
Set<SwtScheduledAction> tasks;
631+
632+
public SwtWorker(SwtExec exec) {
633+
this.exec = exec;
634+
this.tasks = new HashSet<>();
635+
}
636+
637+
@Override
638+
public void unsubscribe() {
639+
if (unsubscribed) {
640+
return;
641+
}
642+
unsubscribed = true;
643+
644+
Set<SwtScheduledAction> set;
645+
synchronized (this) {
646+
set = tasks;
647+
tasks = null;
648+
}
649+
650+
if (set != null) {
651+
for (SwtScheduledAction a : set) {
652+
a.cancelFuture();
653+
}
654+
}
655+
}
656+
657+
void remove(SwtScheduledAction a) {
658+
if (unsubscribed) {
659+
return;
660+
}
661+
synchronized (this) {
662+
if (unsubscribed) {
663+
return;
664+
}
665+
666+
tasks.remove(a);
667+
}
668+
}
669+
670+
@Override
671+
public boolean isUnsubscribed() {
672+
return unsubscribed;
673+
}
674+
675+
@Override
676+
public Subscription schedule(Action0 action) {
677+
if (unsubscribed) {
678+
return Subscriptions.unsubscribed();
679+
}
680+
681+
SwtScheduledAction a = new SwtScheduledAction(action, this);
682+
683+
synchronized (this) {
684+
if (unsubscribed) {
685+
return Subscriptions.unsubscribed();
686+
}
687+
688+
tasks.add(a);
689+
}
690+
691+
exec.execute(a);
692+
693+
if (unsubscribed) {
694+
a.cancel();
695+
return Subscriptions.unsubscribed();
696+
}
697+
698+
return a;
699+
}
700+
701+
@Override
702+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
703+
if (unsubscribed) {
704+
return Subscriptions.unsubscribed();
705+
}
706+
707+
SwtScheduledAction a = new SwtScheduledAction(action, this);
708+
709+
synchronized (this) {
710+
if (unsubscribed) {
711+
return Subscriptions.unsubscribed();
712+
}
713+
714+
tasks.add(a);
715+
}
716+
717+
Future<?> f = exec.schedule(a, delayTime, unit);
718+
719+
if (unsubscribed) {
720+
a.cancel();
721+
f.cancel(true);
722+
return Subscriptions.unsubscribed();
723+
}
724+
725+
a.setFuture(f);
726+
727+
return a;
728+
}
729+
730+
/**
731+
* Represents a cancellable asynchronous Runnable that wraps an action
732+
* and manages the associated Worker lifecycle.
733+
*/
734+
static final class SwtScheduledAction implements Runnable, Subscription {
735+
final Action0 action;
736+
737+
final SwtWorker parent;
738+
739+
volatile ScheduledFuture<?> future;
740+
@SuppressWarnings("rawtypes")
741+
static final AtomicReferenceFieldUpdater<SwtScheduledAction, Future> FUTURE =
742+
AtomicReferenceFieldUpdater.newUpdater(SwtScheduledAction.class, Future.class, "future");
743+
744+
static final Future<?> CANCELLED = new FutureTask<>(() -> { }, null);
745+
746+
static final Future<?> FINISHED = new FutureTask<>(() -> { }, null);
747+
748+
volatile int state;
749+
static final AtomicIntegerFieldUpdater<SwtScheduledAction> STATE =
750+
AtomicIntegerFieldUpdater.newUpdater(SwtScheduledAction.class, "state");
751+
752+
static final int STATE_ACTIVE = 0;
753+
static final int STATE_FINISHED = 1;
754+
static final int STATE_CANCELLED = 2;
755+
756+
public SwtScheduledAction(Action0 action, SwtWorker parent) {
757+
this.action = action;
758+
this.parent = parent;
759+
}
760+
761+
@Override
762+
public void run() {
763+
if (!parent.unsubscribed && state == STATE_ACTIVE) {
764+
try {
765+
action.call();
766+
} finally {
767+
FUTURE.lazySet(this, FINISHED);
768+
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_FINISHED)) {
769+
parent.remove(this);
770+
}
771+
}
772+
}
773+
}
774+
775+
@Override
776+
public boolean isUnsubscribed() {
777+
return state != STATE_ACTIVE;
778+
}
779+
780+
@Override
781+
public void unsubscribe() {
782+
if (STATE.compareAndSet(this, STATE_ACTIVE, STATE_CANCELLED)) {
783+
parent.remove(this);
784+
}
785+
cancelFuture();
786+
}
787+
788+
void setFuture(Future<?> f) {
789+
if (FUTURE.compareAndSet(this, null, f)) {
790+
if (future != FINISHED) {
791+
f.cancel(true);
792+
}
793+
}
794+
}
795+
796+
void cancelFuture() {
797+
Future<?> f = future;
798+
if (f != CANCELLED && f != FINISHED) {
799+
f = FUTURE.getAndSet(this, CANCELLED);
800+
if (f != null && f != CANCELLED && f != FINISHED) {
801+
f.cancel(true);
802+
}
803+
}
804+
}
805+
806+
void cancel() {
807+
state = STATE_CANCELLED;
808+
}
809+
}
810+
}
811+
}
676812
}

0 commit comments

Comments
 (0)