Skip to content

31 duplicate awaitable objects in the waker #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.3.0] - TBD

### Added
- **Multiple Callbacks Per Event Support**: Complete redesign of waker trigger system to support multiple callbacks on a single event
- Modified `zend_async_waker_trigger_s` structure to use flexible array member with dynamic capacity
- Added `waker_trigger_create()` and `waker_trigger_add_callback()` helper functions for efficient memory management
- Implemented single-block memory allocation for better performance (trigger + callback array in one allocation)
- Default capacity starts at 1 and doubles as needed (1 → 2 → 4 → 8...)
- Fixed `coroutine_event_callback_dispose()` to remove only specific callbacks instead of entire events
- **Breaking Change**: Events now persist until all associated callbacks are removed
- **Bailout Tests**: Added 15 tests covering memory exhaustion and stack overflow scenarios in async operations
- **Garbage Collection Support**: Implemented comprehensive GC handlers for async objects
- Added `async_coroutine_object_gc()` function to track all ZVALs in coroutine structures
Expand Down
22 changes: 16 additions & 6 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ PHP_FUNCTION(Async_await)
zend_async_event_t *awaitable_event = ZEND_ASYNC_OBJECT_TO_EVENT(awaitable);
zend_async_event_t *cancellation_event = cancellation != NULL ? ZEND_ASYNC_OBJECT_TO_EVENT(cancellation) : NULL;

// If the awaitable is the same as the cancellation event, we can skip the cancellation check.
if (awaitable_event == cancellation_event) {
cancellation_event = NULL;
}

// If the awaitable is already resolved, we can return the result immediately.
if (ZEND_ASYNC_EVENT_IS_CLOSED(awaitable_event)) {

Expand All @@ -236,7 +241,7 @@ PHP_FUNCTION(Async_await)

// If the cancellation event is already resolved, we can return exception immediately.
if (cancellation_event != NULL && ZEND_ASYNC_EVENT_IS_CLOSED(cancellation_event)) {
if (ZEND_ASYNC_EVENT_EXTRACT_RESULT(awaitable_event, return_value)) {
if (ZEND_ASYNC_EVENT_EXTRACT_RESULT(cancellation_event, return_value)) {
return;
}

Expand Down Expand Up @@ -721,10 +726,12 @@ PHP_FUNCTION(Async_rootContext)
THROW_IF_ASYNC_OFF;
THROW_IF_SCHEDULER_CONTEXT;

/* TODO: Implement root context access */
/* For now, return a new context */
async_context_t *context = async_context_new();
RETURN_OBJ(&context->std);
if (ASYNC_G(root_context) == NULL) {
ASYNC_G(root_context) = (zend_async_context_t *)async_context_new();
}

async_context_t *context = (async_context_t *)ASYNC_G(root_context);
RETURN_OBJ_COPY(&context->std);
}

PHP_FUNCTION(Async_getCoroutines)
Expand Down Expand Up @@ -755,7 +762,7 @@ PHP_FUNCTION(Async_gracefulShutdown)
THROW_IF_ASYNC_OFF;
THROW_IF_SCHEDULER_CONTEXT;

/* TODO: Implement graceful shutdown */
ZEND_ASYNC_SHUTDOWN();
}

/*
Expand Down Expand Up @@ -898,6 +905,9 @@ static PHP_GINIT_FUNCTION(async)
async_globals->signal_handlers = NULL;
async_globals->signal_events = NULL;
async_globals->process_events = NULL;
async_globals->root_context = NULL;
/* Maximum number of coroutines in the concurrent iterator */
async_globals->default_concurrency = 32;

#ifdef PHP_WIN32
async_globals->watcherThread = NULL;
Expand Down
15 changes: 15 additions & 0 deletions async_API.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ static void engine_shutdown(void)
circular_buffer_dtor(&ASYNC_G(coroutine_queue));
zend_hash_destroy(&ASYNC_G(coroutines));

if (ASYNC_G(root_context) != NULL) {
async_context_t * root_context = (async_context_t *) ASYNC_G(root_context);
ASYNC_G(root_context) = NULL;
OBJ_RELEASE(&root_context->std);
}

//async_host_name_list_dtor();
}

Expand Down Expand Up @@ -918,6 +924,10 @@ void async_await_futures(
return;
}

if (concurrency == 0) {
concurrency = ASYNC_G(default_concurrency);
}

await_context = ecalloc(1, sizeof(async_await_context_t));
await_context->total = futures != NULL ? (int) zend_hash_num_elements(futures) : 0;
await_context->futures_count = 0;
Expand Down Expand Up @@ -994,6 +1004,11 @@ void async_await_futures(
zend_async_resume_when(coroutine, awaitable, false, NULL, &callback->callback);

if (UNEXPECTED(EG(exception))) {
if (tmp_results != NULL) {
zend_array_destroy(tmp_results);
tmp_results = NULL;
}

await_context->dtor(await_context);
return;
}
Expand Down
4 changes: 4 additions & 0 deletions php_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ ZEND_BEGIN_MODULE_GLOBALS(async)
zend_fiber_transfer *main_transfer;
/* The main flow stack */
zend_vm_stack main_vm_stack;
/* System root context */
zend_async_context_t *root_context;
/* The default concurrency */
int default_concurrency;

#ifdef PHP_ASYNC_LIBUV
/* The reactor */
Expand Down
5 changes: 5 additions & 0 deletions run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ BASE_PATH="$(cd "$(dirname "$0")/tests" && pwd)"
RUN_TESTS_PATH="$(cd "$(dirname "$0")/../../" && pwd)/run-tests.php"
PHP_EXECUTABLE="$(cd "$(dirname "$0")/../../" && pwd)/sapi/cli/php"
export VALGRIND_OPTS="--leak-check=full --track-origins=yes"
export MYSQL_TEST_HOST="127.0.0.1"
export MYSQL_TEST_PORT="3306"
export MYSQL_TEST_USER="root"
export MYSQL_TEST_PASSWD="root"
export MYSQL_TEST_DB="php_test"

if [ -z "$1" ]; then
TEST_PATH="$BASE_PATH"
Expand Down
29 changes: 29 additions & 0 deletions tests/await/071-awaitAll_with_cancellation_simultaneously.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
--TEST--
awaitAll() - The object used to cancel the wait is simultaneously the object being awaited.
--FILE--
<?php

use function Async\spawn;
use function Async\awaitAll;

echo "start\n";

$coroutine1 = spawn(function() {
return "first";
});

$coroutine2 = spawn(function() {
return "second";
});

try {
$result = awaitAll([$coroutine1, $coroutine2], $coroutine2);
var_dump($result);
} catch (\Async\CancellationException $e) {
}

echo "end\n";
?>
--EXPECTF--
start
end
42 changes: 42 additions & 0 deletions tests/await/072-awaitAll_with_simultaneously.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
--TEST--
awaitAll() - Attempt to wait for two identical objects.
--FILE--
<?php

use function Async\spawn;
use function Async\awaitAll;

echo "start\n";

$coroutine1 = spawn(function() {
return "first";
});

$coroutine2 = spawn(function() {
return "second";
});

$result = awaitAll([$coroutine1, $coroutine2, $coroutine1, $coroutine2]);
var_dump($result);

echo "end\n";
?>
--EXPECTF--
start
array(2) {
[0]=>
array(4) {
[0]=>
string(5) "first"
[1]=>
string(6) "second"
[2]=>
string(5) "first"
[3]=>
string(6) "second"
}
[1]=>
array(0) {
}
}
end
1 change: 0 additions & 1 deletion tests/edge_cases/005-scheduler_shutdown_basic.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ echo "coroutines spawned\n";

// Trigger graceful shutdown
try {
gracefulShutdown();
awaitAll([$coroutine1, $coroutine2]);
} catch (Throwable $e) {
echo "shutdown exception: " . $e->getMessage() . "\n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ echo "coroutines spawned\n";
// Trigger graceful shutdown with custom cancellation
try {
$cancellation = new \Async\CancellationException("Custom shutdown message");
gracefulShutdown($cancellation);
awaitAll([$error_coroutine, $cleanup_coroutine]);
echo "graceful shutdown with custom cancellation completed\n";
} catch (\Async\CancellationException $e) {
Expand Down
46 changes: 46 additions & 0 deletions tests/stream/016-tcp_stream_socket_accept_timeout.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
--TEST--
Stream: stream_socket_accept() + timeout
--FILE--
<?php

use function Async\spawn;
use function Async\awaitAll;
use function Async\delay;
use function Async\suspend;

echo "Start\n";

$server_port = null;

// Server coroutine
$server = spawn(function() {
echo "Server: starting\n";

$socket = stream_socket_server("tcp://127.0.0.1:0", $errno, $errstr);
if (!$socket) {
echo "Server: failed to start - $errstr\n";
return;
}

$address = stream_socket_get_name($socket, false);
$server_port = (int)substr($address, strrpos($address, ':') + 1);
echo "Server: listening on port $server_port\n";

echo "Server: accepting connections\n";
$client = stream_socket_accept($socket, 1);

echo "Server end\n";
});

echo "End\n";

?>
--EXPECTF--
Start
End
Server: starting
Server: listening on port %d
Server: accepting connections

Warning: stream_socket_accept(): %s
Server end
Loading