From fa1e241b3b6a75424b67da591ff6fffe48f2bc5e Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Sat, 24 May 2025 12:38:37 +0200 Subject: [PATCH] refactor: reduce dependency on `futures-util` --- Cargo.toml | 2 +- src/client/legacy/client.rs | 3 +-- src/client/legacy/connect/dns.rs | 2 +- src/client/legacy/connect/http.rs | 11 ++++------- src/client/legacy/pool.rs | 3 +-- src/common/future.rs | 30 ----------------------------- src/common/mod.rs | 2 -- src/rt/io.rs | 3 +-- src/server/conn/auto/mod.rs | 3 +-- src/service/oneshot.rs | 3 +-- tests/legacy_client.rs | 32 ++++++++++++------------------- tests/test_utils/mod.rs | 4 ++-- 12 files changed, 25 insertions(+), 73 deletions(-) delete mode 100644 src/common/future.rs diff --git a/Cargo.toml b/Cargo.toml index 6c2b986a..1981d44a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ base64 = { version = "0.22", optional = true } bytes = "1.7.1" futures-channel = { version = "0.3", optional = true } futures-core = { version = "0.3" } -futures-util = { version = "0.3.16", default-features = false, optional = true } +futures-util = { version = "0.3.16", default-features = false, features = ["alloc"], optional = true } http = "1.0" http-body = "1.0.0" hyper = "1.6.0" diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index c6dc7788..5180dc47 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -6,7 +6,7 @@ use std::error::Error as StdError; use std::fmt; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; @@ -25,7 +25,6 @@ use super::connect::HttpConnector; use super::connect::{Alpn, Connect, Connected, Connection}; use super::pool::{self, Ver}; -use crate::common::future::poll_fn; use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper}; type BoxSendFuture = Pin + Send>>; diff --git a/src/client/legacy/connect/dns.rs b/src/client/legacy/connect/dns.rs index abeb2cca..13beb542 100644 --- a/src/client/legacy/connect/dns.rs +++ b/src/client/legacy/connect/dns.rs @@ -291,7 +291,7 @@ pub(super) async fn resolve(resolver: &mut R, name: Name) -> Result { match self.fallback { None => self.preferred.connect(self.config).await, Some(mut fallback) => { - let preferred_fut = self.preferred.connect(self.config); - futures_util::pin_mut!(preferred_fut); + let preferred_fut = pin!(self.preferred.connect(self.config)); - let fallback_fut = fallback.remote.connect(self.config); - futures_util::pin_mut!(fallback_fut); + let fallback_fut = pin!(fallback.remote.connect(self.config)); - let fallback_delay = fallback.delay; - futures_util::pin_mut!(fallback_delay); + let fallback_delay = pin!(fallback.delay); let (result, future) = match futures_util::future::select(preferred_fut, fallback_delay).await { diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index e0d6f2ff..61fe406d 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -9,12 +9,11 @@ use std::hash::Hash; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::{Arc, Mutex, Weak}; -use std::task::{self, Poll}; +use std::task::{self, ready, Poll}; use std::time::{Duration, Instant}; use futures_channel::oneshot; -use futures_core::ready; use tracing::{debug, trace}; use hyper::rt::Timer as _; diff --git a/src/common/future.rs b/src/common/future.rs deleted file mode 100644 index 47897f24..00000000 --- a/src/common/future.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64 -pub(crate) fn poll_fn(f: F) -> PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -pub(crate) struct PollFn { - f: F, -} - -impl Unpin for PollFn {} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - (self.f)(cx) - } -} diff --git a/src/common/mod.rs b/src/common/mod.rs index b45cd0b2..63b82885 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -15,5 +15,3 @@ pub(crate) use exec::Exec; pub(crate) use lazy::{lazy, Started as Lazy}; #[cfg(feature = "client")] pub(crate) use sync::SyncWrapper; - -pub(crate) mod future; diff --git a/src/rt/io.rs b/src/rt/io.rs index 888756f6..3158e585 100644 --- a/src/rt/io.rs +++ b/src/rt/io.rs @@ -1,3 +1,4 @@ +use std::future::poll_fn; use std::marker::Unpin; use std::pin::Pin; use std::task::Poll; @@ -5,8 +6,6 @@ use std::task::Poll; use futures_core::ready; use hyper::rt::{Read, ReadBuf, Write}; -use crate::common::future::poll_fn; - pub(crate) async fn read(io: &mut T, buf: &mut [u8]) -> Result where T: Read + Unpin, diff --git a/src/server/conn/auto/mod.rs b/src/server/conn/auto/mod.rs index 7b887ce2..5e334163 100644 --- a/src/server/conn/auto/mod.rs +++ b/src/server/conn/auto/mod.rs @@ -7,11 +7,10 @@ use std::future::Future; use std::marker::PhantomPinned; use std::mem::MaybeUninit; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{error::Error as StdError, io, time::Duration}; use bytes::Bytes; -use futures_core::ready; use http::{Request, Response}; use http_body::Body; use hyper::{ diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 2cc3e6e9..2b15e72f 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,8 +1,7 @@ -use futures_core::ready; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tower_service::Service; // Vendored from tower::util to reduce dependencies, the code is small enough. diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index f0ff90c1..5a3b9be8 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -1,8 +1,9 @@ mod test_utils; +use std::future::Future; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Poll; @@ -10,9 +11,9 @@ use std::thread; use std::time::Duration; use futures_channel::{mpsc, oneshot}; +use futures_core::Stream; use futures_util::future::{self, FutureExt, TryFutureExt}; use futures_util::stream::StreamExt; -use futures_util::{self, Stream}; use http_body_util::BodyExt; use http_body_util::{Empty, Full, StreamBody}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -132,7 +133,7 @@ async fn drop_client_closes_idle_connections() { res.unwrap(); // not closed yet, just idle - future::poll_fn(|ctx| { + std::future::poll_fn(|ctx| { assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); Poll::Ready(()) }) @@ -142,8 +143,7 @@ async fn drop_client_closes_idle_connections() { drop(client); // and wait a few ticks for the connections to close - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; t1.await.unwrap(); @@ -192,8 +192,7 @@ async fn drop_response_future_closes_in_progress_connection() { future::select(res, rx1).await; // res now dropped - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -248,8 +247,7 @@ async fn drop_response_body_closes_in_progress_connection() { res.unwrap(); // and wait a few ticks to see the connection drop - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -301,8 +299,7 @@ async fn no_keep_alive_closes_connection() { let (res, _) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(close, t).await; } @@ -348,8 +345,7 @@ async fn socket_disconnect_closes_idle_conn() { let (res, _) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -561,7 +557,7 @@ async fn client_keep_alive_when_response_before_request_body_ends() { }); future::join(res, rx2).await.0.unwrap(); - future::poll_fn(|ctx| { + std::future::poll_fn(|ctx| { assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); Poll::Ready(()) }) @@ -570,8 +566,7 @@ async fn client_keep_alive_when_response_before_request_body_ends() { assert_eq!(connects.load(Ordering::Relaxed), 1); drop(client); - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); + let t = pin!(tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"))); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1254,10 +1249,7 @@ impl tower_service::Service for MockConnector { type Response = crate::MockConnection; type Error = std::io::Error; type Future = std::pin::Pin< - Box< - dyn futures_util::Future> - + Send, - >, + Box> + Send>, >; // Polls the connector to check if it’s ready to handle a request. diff --git a/tests/test_utils/mod.rs b/tests/test_utils/mod.rs index df3a65d4..ee062b36 100644 --- a/tests/test_utils/mod.rs +++ b/tests/test_utils/mod.rs @@ -1,10 +1,10 @@ +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use futures_channel::mpsc; -use futures_util::task::{Context, Poll}; -use futures_util::Future; use futures_util::TryFutureExt; use hyper::Uri; use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};