Skip to content

Moved some fields from the GlobalCtx to a new class, _UpdateCtx #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/computed/lib/computed.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ abstract interface class Computed<T> {
/// gains a value or throws an exception for the
/// first time or when the result of the computation changes.
/// [onError] has the same semantics as in [Stream.listen].
///
/// Cannot be used inside computations. Instead, see [use].
ComputedSubscription<T> listen(void Function(T event)? onData,
[Function? onError]);

Expand Down
217 changes: 128 additions & 89 deletions packages/computed/lib/src/computed.dart
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ const _noReactivityInsideReact =
"`use`, `useWeak` and `react` not allowed inside react callbacks.";
const _noReactivityOutsideComputations =
"`use`, `useWeak`, `react` and `prev` are only allowed inside computations.";
const _noListenInsideComputations =
"`listen` is not allowed inside computations.";

class GlobalCtx {
static ComputedImpl? _currentComputation;
static ComputedImpl get currentComputation {
if (_currentComputation == null) {
final node = _updateCtx?._currentComputation;
if (node == null) {
throw StateError(_noReactivityOutsideComputations);
}
return _currentComputation!;
return node;
}

@visibleForTesting
Expand Down Expand Up @@ -127,50 +129,54 @@ class GlobalCtx {
static final _routerExpando = Expando<_RouterValueOrException>('computed');

static var _currentUpdate = _Token(); // Guaranteed to be unique thanks to GC
static Set<ComputedImpl> _currentUpdateNodes = {};
static Expando<bool> _currentUpdateNodeDirty = Expando();
static Expando<Map<ComputedImpl, _WeakMemoizedValueOrException>>
_currentUpdateUpstreamComputations = Expando();

static var _reacting = false;
static _UpdateCtx? _updateCtx;
}

void _injectNodesToDAG(Set<Computed> nodes) {
for (var c in nodes) {
GlobalCtx._currentUpdateNodeDirty[c] = true;
class _UpdateCtx {
ComputedImpl? _currentComputation;
final _currentUpdateNodes = <ComputedImpl>{};
final _currentUpdateNodeDirty = Expando<bool>();
final _currentUpdateUpstreamComputations =
Expando<Map<ComputedImpl, _WeakMemoizedValueOrException>>();
var _reacting = false;

void _injectNodesToDAG(Set<Computed> nodes) {
for (var c in nodes) {
_currentUpdateNodeDirty[c] = true;
}
_currentUpdateNodes.addAll(nodes.cast<ComputedImpl>());
}
GlobalCtx._currentUpdateNodes.addAll(nodes.cast<ComputedImpl>());
}

void _rerunGraph(Set<ComputedImpl> roots) {
GlobalCtx._currentUpdateNodes = {};
GlobalCtx._currentUpdateNodeDirty = Expando();
GlobalCtx._currentUpdateUpstreamComputations = Expando();
_injectNodesToDAG(roots);

void evalAfterEnsureUpstreamEvald(ComputedImpl node) {
for (var c in node._lastUpstreamComputations.keys) {
if (c._lastUpdate != GlobalCtx._currentUpdate) {
evalAfterEnsureUpstreamEvald(c);
final updateCtx = _UpdateCtx();
try {
GlobalCtx._updateCtx = updateCtx;
updateCtx._injectNodesToDAG(roots);

void evalAfterEnsureUpstreamEvald(ComputedImpl node) {
for (var c in node._lastUpstreamComputations.keys) {
if (c._lastUpdate != GlobalCtx._currentUpdate) {
evalAfterEnsureUpstreamEvald(c);
}
}
}
// It is possible that this node has been forced to be evaluated by another
// In this case, do not re-compute it again
if (GlobalCtx._currentUpdateNodeDirty[node] == true) {
try {
// It is possible that this node has been forced to be evaluated by another
// In this case, do not re-compute it again
if (updateCtx._currentUpdateNodeDirty[node] == true) {
final downstream = node.onDependencyUpdated();
_injectNodesToDAG(downstream);
} on NoValueException {
// Pass. We must still consider the downstream.
updateCtx._injectNodesToDAG(downstream);
}
}
}

while (GlobalCtx._currentUpdateNodes.isNotEmpty) {
final cur = GlobalCtx._currentUpdateNodes.first;
GlobalCtx._currentUpdateNodes.remove(cur);
if (cur._lastUpdate == GlobalCtx._currentUpdate) continue;
evalAfterEnsureUpstreamEvald(cur);
while (updateCtx._currentUpdateNodes.isNotEmpty) {
final cur = updateCtx._currentUpdateNodes.first;
updateCtx._currentUpdateNodes.remove(cur);
if (cur._lastUpdate == GlobalCtx._currentUpdate) continue;
evalAfterEnsureUpstreamEvald(cur);
}
} finally {
GlobalCtx._updateCtx = null;
}
}

Expand All @@ -186,7 +192,7 @@ class ComputedImpl<T> implements Computed<T> {
<ComputedImpl, _WeakMemoizedValueOrException>{};

bool get _computing =>
GlobalCtx._currentUpdateUpstreamComputations[this] != null;
GlobalCtx._updateCtx?._currentUpdateUpstreamComputations[this] != null;

final _memoizedDownstreamComputations = <ComputedImpl>{};
final _weakDownstreamComputations =
Expand Down Expand Up @@ -214,11 +220,12 @@ class ComputedImpl<T> implements Computed<T> {
if (_computing) throw CyclicUseException();

final caller = GlobalCtx.currentComputation;
if (GlobalCtx._reacting) {
final updateCtx = GlobalCtx._updateCtx!;
if (updateCtx._reacting == true) {
throw StateError(_noReactivityInsideReact);
}
// Make sure the caller is subscribed, upgrade to non-weak if needed
GlobalCtx._currentUpdateUpstreamComputations[caller]!.update(
updateCtx._currentUpdateUpstreamComputations[caller]!.update(
this,
(v) =>
_WeakMemoizedValueOrException(weak && v._weak, true, _lastResult),
Expand All @@ -234,7 +241,7 @@ class ComputedImpl<T> implements Computed<T> {
_use(false);
if (_lastUpdate != GlobalCtx._currentUpdate && _lastResult == null) {
final downstream = eval();
_injectNodesToDAG(downstream);
GlobalCtx._updateCtx!._injectNodesToDAG(downstream);
}

switch (_lastResult) {
Expand Down Expand Up @@ -306,15 +313,13 @@ class ComputedImpl<T> implements Computed<T> {
@override
ComputedSubscription<T> listen(void Function(T event)? onData,
[Function? onError]) {
if (GlobalCtx._updateCtx?._currentComputation != null) {
throw StateError(_noListenInsideComputations);
}
_validateOnError(onError);
final sub = _ComputedSubscriptionImpl<T>(this, onData, onError);
if (_novalue) {
try {
eval();
// Might set lastResult, won't notify the listener just yet (as that is against the Stream contract)
} on NoValueException {
// It is fine if we don't have a value yet
}
_rerunGraph({this});
}
_listeners[sub] = false;

Expand Down Expand Up @@ -399,7 +404,22 @@ class ComputedImpl<T> implements Computed<T> {
// Returns the set of downstream nodes to be re-computed.
// This is public so that it can be customized by subclasses
Set<Computed> onDependencyUpdated() {
return eval();
// Create a new update context if needed
final newContext = GlobalCtx._updateCtx == null;
if (newContext) {
GlobalCtx._updateCtx = _UpdateCtx();
}
try {
return eval();
} on NoValueException {
// Do not notify the downstream if this computation
// did not produce a value
return {};
} finally {
if (newContext) {
GlobalCtx._updateCtx = null;
}
}
}

T _evalFInZone() {
Expand Down Expand Up @@ -433,13 +453,14 @@ class ComputedImpl<T> implements Computed<T> {
Set<Computed> eval() {
const idempotencyFailureMessage =
"Computed expressions must be purely functional. Please use listeners for side effects. For computations creating asynchronous operations, make sure to use `Computed.async`.";
GlobalCtx._currentUpdateNodeDirty[this] = null;
final oldComputation = GlobalCtx._currentComputation;
final updateCtx = GlobalCtx._updateCtx!;
final oldComputation = updateCtx._currentComputation;
updateCtx._currentUpdateNodeDirty[this] = null;
bool shouldNotify = false;
try {
_prevResult = _lastResult;
GlobalCtx._currentUpdateUpstreamComputations[this] = {};
GlobalCtx._currentComputation = this;
updateCtx._currentUpdateUpstreamComputations[this] = {};
updateCtx._currentComputation = this;
var newResult = _evalFGuarded();
if (_assertIdempotent &&
switch (newResult) {
Expand Down Expand Up @@ -479,12 +500,12 @@ class ComputedImpl<T> implements Computed<T> {

// Commit the changes to the DAG
for (var e
in GlobalCtx._currentUpdateUpstreamComputations[this]!.entries) {
in updateCtx._currentUpdateUpstreamComputations[this]!.entries) {
final up = e.key;
up._addDownstreamComputation(this, e.value._memoized, e.value._weak);
}
final oldDiffNew = _lastUpstreamComputations.keys.toSet().difference(
GlobalCtx._currentUpdateUpstreamComputations[this]!.keys.toSet());
updateCtx._currentUpdateUpstreamComputations[this]!.keys.toSet());
for (var up in oldDiffNew) {
up._removeDownstreamComputation(this);
}
Expand All @@ -493,8 +514,8 @@ class ComputedImpl<T> implements Computed<T> {
// So that we can memoize that f threw NoValueException
// when ran with a specific set of dependencies, for example.
_lastUpstreamComputations =
GlobalCtx._currentUpdateUpstreamComputations[this]!;
GlobalCtx._currentUpdateUpstreamComputations[this] = null;
updateCtx._currentUpdateUpstreamComputations[this]!;
updateCtx._currentUpdateUpstreamComputations[this] = null;
// Bookkeep the fact the we ran/tried to run this computation
// so that we can unlock its downstream during the DAG walk
_lastUpdate = GlobalCtx._currentUpdate;
Expand Down Expand Up @@ -522,10 +543,10 @@ class ComputedImpl<T> implements Computed<T> {
} finally {
assert(_lastUpdate == GlobalCtx._currentUpdate);
if (shouldNotify) {
GlobalCtx._currentComputation = null;
updateCtx._currentComputation = null;
_notifyListeners();
}
GlobalCtx._currentComputation = oldComputation;
updateCtx._currentComputation = oldComputation;
}
}

Expand Down Expand Up @@ -685,62 +706,80 @@ final class RouterImpl<T> extends ComputedImpl<T> {
super._hasDownstreamComputations() ||
_nonMemoizedDownstreamComputations.isNotEmpty;

void _maybeDelay(void Function() f) async {
if (GlobalCtx._updateCtx != null) {
// An update is already going on. A data source
// must have notified us reentrantly. Process this
// event in a new microtask.
await Future.value();
// Dag runs never span across microtasks
assert(GlobalCtx._updateCtx == null);
}

f();
}

void onDataSourceData(T data) {
if (_dss == null) return;
GlobalCtx._currentUpdate = _Token();
_dss!._lastEmit = GlobalCtx._currentUpdate;
final rvoe =
GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException<T>;
if (switch (rvoe._voe) {
Value<T>(value: final value) => value != data,
_ => true
}) {
// Update the global last value cache
rvoe._voe = ValueOrException<T>.value(data);
} else if (_nonMemoizedDownstreamComputations.isEmpty) {
return;
}
_maybeDelay(() {
GlobalCtx._currentUpdate = _Token();
_dss!._lastEmit = GlobalCtx._currentUpdate;
final rvoe =
GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException<T>;
if (switch (rvoe._voe) {
Value<T>(value: final value) => value != data,
_ => true
}) {
// Update the global last value cache
rvoe._voe = ValueOrException<T>.value(data);
} else if (_nonMemoizedDownstreamComputations.isEmpty) {
return;
}

_rerunGraph({this});
_rerunGraph({this});
});
}

void onDataSourceError(Object err, StackTrace st) {
if (_dss == null) return;
GlobalCtx._currentUpdate = _Token();

_dss!._lastEmit = GlobalCtx._currentUpdate;
final rvoe =
GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException<T>;
if (switch (rvoe._voe) {
Exception(exc: final exc) => exc != err,
_ => true
}) {
// Update the global last value cache
rvoe._voe = ValueOrException<T>.exc(err, st);
} else if (_nonMemoizedDownstreamComputations.isEmpty) {
return;
}
_maybeDelay(() {
GlobalCtx._currentUpdate = _Token();

_dss!._lastEmit = GlobalCtx._currentUpdate;
final rvoe =
GlobalCtx._routerExpando[_dss!._ds] as _RouterValueOrException<T>;
if (switch (rvoe._voe) {
Exception(exc: final exc) => exc != err,
_ => true
}) {
// Update the global last value cache
rvoe._voe = ValueOrException<T>.exc(err, st);
} else if (_nonMemoizedDownstreamComputations.isEmpty) {
return;
}

_rerunGraph({this});
_rerunGraph({this});
});
}

void _react(void Function(T) onData, Function? onError) {
// Only routers can be .react-ed to
assert(_dss != null);
final caller = GlobalCtx.currentComputation;
if (GlobalCtx._reacting) {
final updateCtx = GlobalCtx._updateCtx!;
if (updateCtx._reacting) {
throw StateError(_noReactivityInsideReact);
}
// Make sure the caller is subscribed
GlobalCtx._currentUpdateUpstreamComputations[caller]![this] =
updateCtx._currentUpdateUpstreamComputations[caller]![this] =
_WeakMemoizedValueOrException(false, false, _lastResult);

if (_dss!._lastEmit != GlobalCtx._currentUpdate) {
// Don't call the functions
return;
}

GlobalCtx._reacting = true;
updateCtx._reacting = true;
try {
switch (_lastResult!) {
case Value<T>(value: final value):
Expand All @@ -756,7 +795,7 @@ final class RouterImpl<T> extends ComputedImpl<T> {
}
}
} finally {
GlobalCtx._reacting = false;
updateCtx._reacting = false;
}
}

Expand Down
5 changes: 4 additions & 1 deletion packages/computed/lib/src/computed_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ class ComputedStreamExtensionImpl<T> {
// No onPause and onResume, as Computed doesn't support these.
}

void _onListen() {
void _onListen() async {
// StreamController can call onListen synchronously,
// so call .listen in a separate microtask.
await Future.value();
_computedSubscription ??= _parent.listen((event) => _controller!.add(event),
(error) => _controller!.addError(error));
}
Expand Down
5 changes: 4 additions & 1 deletion packages/computed/test/computed_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,10 @@ void main() {
}, (e) {
expect(flag, false);
flag = true;
expect(e, isA<ComputedAsyncError>());
expect(
e,
isA<StateError>().having((e) => e.message, 'message',
"`listen` is not allowed inside computations."));
});

await Future.value();
Expand Down
Loading