diff --git a/Cargo.lock b/Cargo.lock index 9c408ef..8c10247 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,12 @@ dependencies = [ "libc", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.88" @@ -568,8 +574,10 @@ name = "ngx" version = "0.5.0" dependencies = [ "allocator-api2", + "async-task", "lock_api", "nginx-sys", + "pin-project-lite", "target-triple", ] diff --git a/Cargo.toml b/Cargo.toml index 0daa1bd..7799118 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,11 +26,18 @@ rust-version.workspace = true [dependencies] allocator-api2 = { version = "0.2.21", default-features = false } +async-task = { version = "4.7.1", optional = true } lock_api = "0.4.13" nginx-sys = { path = "nginx-sys", default-features=false, version = "0.5.0"} +pin-project-lite = { version = "0.2.16", optional = true } [features] -default = ["vendored","std"] +default = ["std", "vendored"] +async = [ + "alloc", + "dep:async-task", + "dep:pin-project-lite", +] # Enables the components using memory allocation. # If no `std` flag, `alloc` crate is internally used instead. This flag is mainly for `no_std` build. alloc = ["allocator-api2/alloc"] diff --git a/src/async_/mod.rs b/src/async_/mod.rs new file mode 100644 index 0000000..dd801ca --- /dev/null +++ b/src/async_/mod.rs @@ -0,0 +1,6 @@ +//! Async runtime and set of utilities on top of the NGINX event loop. +pub use self::sleep::{sleep, Sleep}; +pub use self::spawn::{spawn, Task}; + +mod sleep; +mod spawn; diff --git a/src/async_/sleep.rs b/src/async_/sleep.rs new file mode 100644 index 0000000..5895490 --- /dev/null +++ b/src/async_/sleep.rs @@ -0,0 +1,138 @@ +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::ptr::{self, NonNull}; +use core::task::{self, Poll}; +use core::time::Duration; + +use nginx_sys::{ngx_add_timer, ngx_del_timer, ngx_event_t, ngx_log_t, ngx_msec_int_t, ngx_msec_t}; +use pin_project_lite::pin_project; + +use crate::{ngx_container_of, ngx_log_debug}; + +/// Maximum duration that can be achieved using [ngx_add_timer]. +const NGX_TIMER_DURATION_MAX: Duration = Duration::from_millis(ngx_msec_int_t::MAX as _); + +/// Puts the current task to sleep for at least the specified amount of time. +/// +/// The function is a shorthand for [Sleep::new] using the global logger for debug output. +#[inline] +pub fn sleep(duration: Duration) -> Sleep { + Sleep::new(duration, crate::log::ngx_cycle_log()) +} + +pin_project! { +/// Future returned by [sleep]. +pub struct Sleep { + #[pin] + timer: TimerEvent, + duration: Duration, +} +} + +impl Sleep { + /// Creates a new Sleep with the specified duration and logger for debug messages. + pub fn new(duration: Duration, log: NonNull) -> Self { + let timer = TimerEvent::new(log); + ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}"); + Sleep { timer, duration } + } +} + +impl Future for Sleep { + type Output = (); + + #[cfg(not(target_pointer_width = "32"))] + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let msec = self.duration.min(NGX_TIMER_DURATION_MAX).as_millis() as ngx_msec_t; + let this = self.project(); + this.timer.poll_sleep(msec, cx) + } + + #[cfg(target_pointer_width = "32")] + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + if self.duration.is_zero() { + return Poll::Ready(()); + } + let step = self.duration.min(NGX_TIMER_DURATION_MAX); + + let mut this = self.project(); + // Handle ngx_msec_t overflow on 32-bit platforms. + match this.timer.as_mut().poll_sleep(step.as_millis() as _, cx) { + // Last step + Poll::Ready(()) if this.duration == &step => Poll::Ready(()), + Poll::Ready(()) => { + *this.duration = this.duration.saturating_sub(step); + this.timer.event.set_timedout(0); // rearm + this.timer.as_mut().poll_sleep(step.as_millis() as _, cx) + } + x => x, + } + } +} + +struct TimerEvent { + event: ngx_event_t, + waker: Option, +} + +// SAFETY: Timer will only be used in a single-threaded environment +unsafe impl Send for TimerEvent {} +unsafe impl Sync for TimerEvent {} + +impl TimerEvent { + pub fn new(log: NonNull) -> Self { + static IDENT: [usize; 4] = [ + 0, 0, 0, 0x4153594e, // ASYN + ]; + + let mut ev: ngx_event_t = unsafe { mem::zeroed() }; + // The data is only used for `ngx_event_ident` and will not be mutated. + ev.data = ptr::addr_of!(IDENT).cast_mut().cast(); + ev.handler = Some(Self::timer_handler); + ev.log = log.as_ptr(); + ev.set_cancelable(1); + + Self { + event: ev, + waker: None, + } + } + + pub fn poll_sleep( + mut self: Pin<&mut Self>, + duration: ngx_msec_t, + context: &mut task::Context<'_>, + ) -> Poll<()> { + if self.event.timedout() != 0 { + Poll::Ready(()) + } else if self.event.timer_set() != 0 { + if let Some(waker) = self.waker.as_mut() { + waker.clone_from(context.waker()); + } else { + self.waker = Some(context.waker().clone()); + } + Poll::Pending + } else { + unsafe { ngx_add_timer(ptr::addr_of_mut!(self.event), duration) }; + self.waker = Some(context.waker().clone()); + Poll::Pending + } + } + + unsafe extern "C" fn timer_handler(ev: *mut ngx_event_t) { + let timer = ngx_container_of!(ev, Self, event); + + if let Some(waker) = (*timer).waker.take() { + waker.wake(); + } + } +} + +impl Drop for TimerEvent { + fn drop(&mut self) { + if self.event.timer_set() != 0 { + unsafe { ngx_del_timer(ptr::addr_of_mut!(self.event)) }; + } + } +} diff --git a/src/async_/spawn.rs b/src/async_/spawn.rs new file mode 100644 index 0000000..f67230b --- /dev/null +++ b/src/async_/spawn.rs @@ -0,0 +1,148 @@ +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem; +use core::ptr::{self, NonNull}; + +#[cfg(all(not(feature = "std"), feature = "alloc"))] +use alloc::collections::vec_deque::VecDeque; +#[cfg(feature = "std")] +use std::collections::vec_deque::VecDeque; + +pub use async_task::Task; +use async_task::{Runnable, ScheduleInfo, WithInfo}; +use nginx_sys::{ + ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event, ngx_posted_next_events, +}; + +use crate::log::ngx_cycle_log; +use crate::{ngx_container_of, ngx_log_debug}; + +static SCHEDULER: Scheduler = Scheduler::new(); + +struct Scheduler(UnsafeCell); + +// SAFETY: Scheduler must only be used from the main thread of a worker process. +unsafe impl Send for Scheduler {} +unsafe impl Sync for Scheduler {} + +impl Scheduler { + const fn new() -> Self { + Self(UnsafeCell::new(SchedulerInner::new())) + } + + pub fn schedule(&self, runnable: Runnable) { + // SAFETY: the cell is not empty, and we have exclusive access due to being a + // single-threaded application. + let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) }; + inner.send(runnable) + } +} + +#[repr(C)] +struct SchedulerInner { + _ident: [usize; 4], // `ngx_event_ident` compatibility + event: ngx_event_t, + queue: VecDeque, +} + +impl SchedulerInner { + const fn new() -> Self { + let mut event: ngx_event_t = unsafe { mem::zeroed() }; + event.handler = Some(Self::scheduler_event_handler); + + Self { + _ident: [ + 0, 0, 0, 0x4153594e, // ASYN + ], + event, + queue: VecDeque::new(), + } + } + + pub fn send(&mut self, runnable: Runnable) { + // Cached `ngx_cycle.log` can be invalidated when reloading configuration in a single + // process mode. Update `log` every time to avoid using stale log pointer. + self.event.log = ngx_cycle_log().as_ptr(); + + // While this event is not used as a timer at the moment, we still want to ensure that it is + // compatible with `ngx_event_ident`. + if self.event.data.is_null() { + self.event.data = ptr::from_mut(self).cast(); + } + + // FIXME: VecDeque::push could panic on an allocation failure, switch to a datastructure + // which will not and propagate the failure. + self.queue.push_back(runnable); + unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) } + } + + /// This event handler is called by ngx_event_process_posted at the end of + /// ngx_process_events_and_timers. + extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) { + let mut runnables = { + // SAFETY: + // This handler always receives a non-null pointer to an event embedded into a + // SchedulerInner instance. + // We modify the contents of `UnsafeCell`, but we ensured that the access is unique due + // to being single-threaded and dropping the reference before we start processing queued + // runnables. + let this = + unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).as_mut() }; + + ngx_log_debug!( + this.event.log, + "async: processing {} deferred wakeups", + this.queue.len() + ); + + // Move runnables to a new queue to avoid borrowing from the SchedulerInner and limit + // processing to already queued wakeups. This ensures that we correctly handle tasks + // that keep scheduling themselves (e.g. using yield_now() in a loop). + // We can't use drain() as it borrows from self and breaks aliasing rules. + mem::take(&mut this.queue) + }; + + for runnable in runnables.drain(..) { + runnable.run(); + } + } +} + +impl Drop for SchedulerInner { + fn drop(&mut self) { + if self.event.posted() != 0 { + unsafe { ngx_delete_posted_event(&mut self.event) }; + } + + if self.event.timer_set() != 0 { + unsafe { ngx_del_timer(&mut self.event) }; + } + } +} + +fn schedule(runnable: Runnable, info: ScheduleInfo) { + if info.woken_while_running { + SCHEDULER.schedule(runnable); + ngx_log_debug!( + ngx_cycle_log().as_ptr(), + "async: task scheduled while running" + ); + } else { + runnable.run(); + } +} + +/// Creates a new task running on the NGINX event loop. +pub fn spawn(future: F) -> Task +where + F: Future + 'static, + T: 'static, +{ + ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: spawning new task"); + let scheduler = WithInfo(schedule); + // Safety: single threaded embedding takes care of send/sync requirements for future and + // scheduler. Future and scheduler are both 'static. + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) }; + runnable.schedule(); + task +} diff --git a/src/lib.rs b/src/lib.rs index adecb78..0dd4fc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,8 @@ extern crate alloc; extern crate std; pub mod allocator; +#[cfg(feature = "async")] +pub mod async_; /// The core module. /// diff --git a/src/log.rs b/src/log.rs index b146df0..e257b71 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,6 +1,7 @@ use core::cmp; use core::fmt::{self, Write}; use core::mem::MaybeUninit; +use core::ptr::NonNull; use crate::ffi::{self, ngx_err_t, ngx_log_t, ngx_uint_t, NGX_MAX_ERROR_STR}; @@ -11,6 +12,19 @@ use crate::ffi::{self, ngx_err_t, ngx_log_t, ngx_uint_t, NGX_MAX_ERROR_STR}; pub const LOG_BUFFER_SIZE: usize = NGX_MAX_ERROR_STR as usize - b"1970/01/01 00:00:00 [info] 1#1: ".len(); +/// Obtains a pointer to the global (cycle) log object. +/// +/// The returned pointer is tied to the current cycle lifetime, and will be invalidated by a +/// configuration reload in the master process or in a single-process mode. If you plan to store it, +/// make sure that your storage is also tied to the cycle lifetime (e.g. module configuration or +/// connection/request data). +/// +/// The function may panic if you call it before the main() in nginx creates an initial cycle. +#[inline(always)] +pub fn ngx_cycle_log() -> NonNull { + NonNull::new(unsafe { (*nginx_sys::ngx_cycle).log }).expect("global logger") +} + /// Utility function to provide typed checking of the mask's field state. #[inline(always)] pub fn check_mask(mask: DebugMask, log_level: usize) -> bool {