Skip to content

Commit 0cea1b6

Browse files
authored
Merge pull request #33 from true-async/31-duplicate-awaitable-objects-in-the-waker
31 duplicate awaitable objects in the waker
2 parents 82d9de4 + e51ef7e commit 0cea1b6

10 files changed

+164
-8
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
## [0.3.0] - TBD
1111

1212
### Added
13+
- **Multiple Callbacks Per Event Support**: Complete redesign of waker trigger system to support multiple callbacks on a single event
14+
- Modified `zend_async_waker_trigger_s` structure to use flexible array member with dynamic capacity
15+
- Added `waker_trigger_create()` and `waker_trigger_add_callback()` helper functions for efficient memory management
16+
- Implemented single-block memory allocation for better performance (trigger + callback array in one allocation)
17+
- Default capacity starts at 1 and doubles as needed (1 → 2 → 4 → 8...)
18+
- Fixed `coroutine_event_callback_dispose()` to remove only specific callbacks instead of entire events
19+
- **Breaking Change**: Events now persist until all associated callbacks are removed
1320
- **Bailout Tests**: Added 15 tests covering memory exhaustion and stack overflow scenarios in async operations
1421
- **Garbage Collection Support**: Implemented comprehensive GC handlers for async objects
1522
- Added `async_coroutine_object_gc()` function to track all ZVALs in coroutine structures

async.c

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ PHP_FUNCTION(Async_await)
219219
zend_async_event_t *awaitable_event = ZEND_ASYNC_OBJECT_TO_EVENT(awaitable);
220220
zend_async_event_t *cancellation_event = cancellation != NULL ? ZEND_ASYNC_OBJECT_TO_EVENT(cancellation) : NULL;
221221

222+
// If the awaitable is the same as the cancellation event, we can skip the cancellation check.
223+
if (awaitable_event == cancellation_event) {
224+
cancellation_event = NULL;
225+
}
226+
222227
// If the awaitable is already resolved, we can return the result immediately.
223228
if (ZEND_ASYNC_EVENT_IS_CLOSED(awaitable_event)) {
224229

@@ -236,7 +241,7 @@ PHP_FUNCTION(Async_await)
236241

237242
// If the cancellation event is already resolved, we can return exception immediately.
238243
if (cancellation_event != NULL && ZEND_ASYNC_EVENT_IS_CLOSED(cancellation_event)) {
239-
if (ZEND_ASYNC_EVENT_EXTRACT_RESULT(awaitable_event, return_value)) {
244+
if (ZEND_ASYNC_EVENT_EXTRACT_RESULT(cancellation_event, return_value)) {
240245
return;
241246
}
242247

@@ -721,10 +726,12 @@ PHP_FUNCTION(Async_rootContext)
721726
THROW_IF_ASYNC_OFF;
722727
THROW_IF_SCHEDULER_CONTEXT;
723728

724-
/* TODO: Implement root context access */
725-
/* For now, return a new context */
726-
async_context_t *context = async_context_new();
727-
RETURN_OBJ(&context->std);
729+
if (ASYNC_G(root_context) == NULL) {
730+
ASYNC_G(root_context) = (zend_async_context_t *)async_context_new();
731+
}
732+
733+
async_context_t *context = (async_context_t *)ASYNC_G(root_context);
734+
RETURN_OBJ_COPY(&context->std);
728735
}
729736

730737
PHP_FUNCTION(Async_getCoroutines)
@@ -755,7 +762,7 @@ PHP_FUNCTION(Async_gracefulShutdown)
755762
THROW_IF_ASYNC_OFF;
756763
THROW_IF_SCHEDULER_CONTEXT;
757764

758-
/* TODO: Implement graceful shutdown */
765+
ZEND_ASYNC_SHUTDOWN();
759766
}
760767

761768
/*
@@ -898,6 +905,9 @@ static PHP_GINIT_FUNCTION(async)
898905
async_globals->signal_handlers = NULL;
899906
async_globals->signal_events = NULL;
900907
async_globals->process_events = NULL;
908+
async_globals->root_context = NULL;
909+
/* Maximum number of coroutines in the concurrent iterator */
910+
async_globals->default_concurrency = 32;
901911

902912
#ifdef PHP_WIN32
903913
async_globals->watcherThread = NULL;

async_API.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ static void engine_shutdown(void)
215215
circular_buffer_dtor(&ASYNC_G(coroutine_queue));
216216
zend_hash_destroy(&ASYNC_G(coroutines));
217217

218+
if (ASYNC_G(root_context) != NULL) {
219+
async_context_t * root_context = (async_context_t *) ASYNC_G(root_context);
220+
ASYNC_G(root_context) = NULL;
221+
OBJ_RELEASE(&root_context->std);
222+
}
223+
218224
//async_host_name_list_dtor();
219225
}
220226

@@ -918,6 +924,10 @@ void async_await_futures(
918924
return;
919925
}
920926

927+
if (concurrency == 0) {
928+
concurrency = ASYNC_G(default_concurrency);
929+
}
930+
921931
await_context = ecalloc(1, sizeof(async_await_context_t));
922932
await_context->total = futures != NULL ? (int) zend_hash_num_elements(futures) : 0;
923933
await_context->futures_count = 0;
@@ -994,6 +1004,11 @@ void async_await_futures(
9941004
zend_async_resume_when(coroutine, awaitable, false, NULL, &callback->callback);
9951005

9961006
if (UNEXPECTED(EG(exception))) {
1007+
if (tmp_results != NULL) {
1008+
zend_array_destroy(tmp_results);
1009+
tmp_results = NULL;
1010+
}
1011+
9971012
await_context->dtor(await_context);
9981013
return;
9991014
}

php_async.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ ZEND_BEGIN_MODULE_GLOBALS(async)
7777
zend_fiber_transfer *main_transfer;
7878
/* The main flow stack */
7979
zend_vm_stack main_vm_stack;
80+
/* System root context */
81+
zend_async_context_t *root_context;
82+
/* The default concurrency */
83+
int default_concurrency;
8084

8185
#ifdef PHP_ASYNC_LIBUV
8286
/* The reactor */

run-tests.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ BASE_PATH="$(cd "$(dirname "$0")/tests" && pwd)"
44
RUN_TESTS_PATH="$(cd "$(dirname "$0")/../../" && pwd)/run-tests.php"
55
PHP_EXECUTABLE="$(cd "$(dirname "$0")/../../" && pwd)/sapi/cli/php"
66
export VALGRIND_OPTS="--leak-check=full --track-origins=yes"
7+
export MYSQL_TEST_HOST="127.0.0.1"
8+
export MYSQL_TEST_PORT="3306"
9+
export MYSQL_TEST_USER="root"
10+
export MYSQL_TEST_PASSWD="root"
11+
export MYSQL_TEST_DB="php_test"
712

813
if [ -z "$1" ]; then
914
TEST_PATH="$BASE_PATH"
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
--TEST--
2+
awaitAll() - The object used to cancel the wait is simultaneously the object being awaited.
3+
--FILE--
4+
<?php
5+
6+
use function Async\spawn;
7+
use function Async\awaitAll;
8+
9+
echo "start\n";
10+
11+
$coroutine1 = spawn(function() {
12+
return "first";
13+
});
14+
15+
$coroutine2 = spawn(function() {
16+
return "second";
17+
});
18+
19+
try {
20+
$result = awaitAll([$coroutine1, $coroutine2], $coroutine2);
21+
var_dump($result);
22+
} catch (\Async\CancellationException $e) {
23+
}
24+
25+
echo "end\n";
26+
?>
27+
--EXPECTF--
28+
start
29+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
--TEST--
2+
awaitAll() - Attempt to wait for two identical objects.
3+
--FILE--
4+
<?php
5+
6+
use function Async\spawn;
7+
use function Async\awaitAll;
8+
9+
echo "start\n";
10+
11+
$coroutine1 = spawn(function() {
12+
return "first";
13+
});
14+
15+
$coroutine2 = spawn(function() {
16+
return "second";
17+
});
18+
19+
$result = awaitAll([$coroutine1, $coroutine2, $coroutine1, $coroutine2]);
20+
var_dump($result);
21+
22+
echo "end\n";
23+
?>
24+
--EXPECTF--
25+
start
26+
array(2) {
27+
[0]=>
28+
array(4) {
29+
[0]=>
30+
string(5) "first"
31+
[1]=>
32+
string(6) "second"
33+
[2]=>
34+
string(5) "first"
35+
[3]=>
36+
string(6) "second"
37+
}
38+
[1]=>
39+
array(0) {
40+
}
41+
}
42+
end

tests/edge_cases/005-scheduler_shutdown_basic.phpt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ echo "coroutines spawned\n";
2929

3030
// Trigger graceful shutdown
3131
try {
32-
gracefulShutdown();
3332
awaitAll([$coroutine1, $coroutine2]);
3433
} catch (Throwable $e) {
3534
echo "shutdown exception: " . $e->getMessage() . "\n";

tests/edge_cases/006-scheduler_graceful_shutdown_exceptions.phpt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ echo "coroutines spawned\n";
2929
// Trigger graceful shutdown with custom cancellation
3030
try {
3131
$cancellation = new \Async\CancellationException("Custom shutdown message");
32-
gracefulShutdown($cancellation);
3332
awaitAll([$error_coroutine, $cleanup_coroutine]);
3433
echo "graceful shutdown with custom cancellation completed\n";
3534
} catch (\Async\CancellationException $e) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
--TEST--
2+
Stream: stream_socket_accept() + timeout
3+
--FILE--
4+
<?php
5+
6+
use function Async\spawn;
7+
use function Async\awaitAll;
8+
use function Async\delay;
9+
use function Async\suspend;
10+
11+
echo "Start\n";
12+
13+
$server_port = null;
14+
15+
// Server coroutine
16+
$server = spawn(function() {
17+
echo "Server: starting\n";
18+
19+
$socket = stream_socket_server("tcp://127.0.0.1:0", $errno, $errstr);
20+
if (!$socket) {
21+
echo "Server: failed to start - $errstr\n";
22+
return;
23+
}
24+
25+
$address = stream_socket_get_name($socket, false);
26+
$server_port = (int)substr($address, strrpos($address, ':') + 1);
27+
echo "Server: listening on port $server_port\n";
28+
29+
echo "Server: accepting connections\n";
30+
$client = stream_socket_accept($socket, 1);
31+
32+
echo "Server end\n";
33+
});
34+
35+
echo "End\n";
36+
37+
?>
38+
--EXPECTF--
39+
Start
40+
End
41+
Server: starting
42+
Server: listening on port %d
43+
Server: accepting connections
44+
45+
Warning: stream_socket_accept(): %s
46+
Server end

0 commit comments

Comments
 (0)