Skip to content

Update async-std and stop-token dependencies, migrate to stable channels #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ nom = "5.0"
base64 = "0.13"
chrono = "0.4"
async-native-tls = { version = "0.3.3" }
async-std = { version = "1.6.0", default-features = false, features = ["std"] }
async-std = { version = "1.8.0", default-features = false, features = ["std"] }
pin-utils = "0.1.0-alpha.4"
futures = "0.3.0"
rental = "0.5.5"
stop-token = { version = "0.1.1", features = ["unstable"] }
stop-token = "0.2"
byte-pool = "0.2.2"
lazy_static = "1.4.0"
log = "0.4.8"
Expand All @@ -41,7 +41,7 @@ thiserror = "1.0.9"
lettre_email = "0.9"
pretty_assertions = "0.6.1"
async-smtp = { version = "0.3.0" }
async-std = { version = "1.6.0", default-features = false, features = ["std", "attributes"] }
async-std = { version = "1.8.0", default-features = false, features = ["std", "attributes"] }

[[example]]
name = "basic"
Expand Down
16 changes: 8 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::pin::Pin;
use std::str;

use async_native_tls::{TlsConnector, TlsStream};
use async_std::channel;
use async_std::io::{self, Read, Write};
use async_std::net::{TcpStream, ToSocketAddrs};
use async_std::prelude::*;
use async_std::sync;
use imap_proto::{RequestId, Response};

use super::authenticator::Authenticator;
Expand Down Expand Up @@ -36,11 +36,11 @@ macro_rules! quote {
#[derive(Debug)]
pub struct Session<T: Read + Write + Unpin + fmt::Debug> {
pub(crate) conn: Connection<T>,
pub(crate) unsolicited_responses_tx: sync::Sender<UnsolicitedResponse>,
pub(crate) unsolicited_responses_tx: channel::Sender<UnsolicitedResponse>,

/// Server responses that are not related to the current command. See also the note on
/// [unilateral server responses in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7).
pub unsolicited_responses: sync::Receiver<UnsolicitedResponse>,
pub unsolicited_responses: channel::Receiver<UnsolicitedResponse>,
}

impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Session<T> {}
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {

// not public, just to avoid duplicating the channel creation code
fn new(conn: Connection<T>) -> Self {
let (tx, rx) = sync::channel(100);
let (tx, rx) = channel::bounded(100);
Session {
conn,
unsolicited_responses: rx,
Expand Down Expand Up @@ -1313,7 +1313,7 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
pub async fn run_command_and_check_ok(
&mut self,
command: &str,
unsolicited: Option<sync::Sender<UnsolicitedResponse>>,
unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
) -> Result<()> {
let id = self.run_command(command).await?;
self.check_done_ok(&id, unsolicited).await?;
Expand All @@ -1324,7 +1324,7 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
pub(crate) async fn check_done_ok(
&mut self,
id: &RequestId,
unsolicited: Option<sync::Sender<UnsolicitedResponse>>,
unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
) -> Result<()> {
if let Some(first_res) = self.stream.next().await {
self.check_done_ok_from(id, unsolicited, first_res?).await
Expand All @@ -1336,7 +1336,7 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
pub(crate) async fn check_done_ok_from(
&mut self,
id: &RequestId,
unsolicited: Option<sync::Sender<UnsolicitedResponse>>,
unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
mut response: ResponseData,
) -> Result<()> {
loop {
Expand Down Expand Up @@ -1495,7 +1495,7 @@ mod tests {
let client = mock_client!(mock_stream);
enum Authenticate {
Auth,
};
}
impl Authenticator for &Authenticate {
type Response = Vec<u8>;
fn process(&mut self, challenge: &[u8]) -> Self::Response {
Expand Down
87 changes: 50 additions & 37 deletions src/parse.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use std::collections::HashSet;

use async_std::channel;
use async_std::io;
use async_std::prelude::*;
use async_std::stream::Stream;
use async_std::sync;
use imap_proto::{self, MailboxDatum, RequestId, Response};

use crate::error::{Error, Result};
use crate::types::ResponseData;
use crate::types::*;

pub(crate) fn parse_names<'a, T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &'a mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
pub(crate) fn parse_names<T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &mut T,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> impl Stream<Item = Result<Name>> + 'a + Send + Unpin {
) -> impl Stream<Item = Result<Name>> + '_ + Send + Unpin {
use futures::{FutureExt, StreamExt};

StreamExt::filter_map(
Expand Down Expand Up @@ -56,11 +56,11 @@ fn filter_sync(res: &io::Result<ResponseData>, command_tag: &RequestId) -> bool
}
}

pub(crate) fn parse_fetches<'a, T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &'a mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
pub(crate) fn parse_fetches<T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &mut T,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> impl Stream<Item = Result<Fetch>> + 'a + Send + Unpin {
) -> impl Stream<Item = Result<Fetch>> + '_ + Send + Unpin {
use futures::{FutureExt, StreamExt};

StreamExt::filter_map(
Expand All @@ -85,11 +85,11 @@ pub(crate) fn parse_fetches<'a, T: Stream<Item = io::Result<ResponseData>> + Unp
)
}

pub(crate) fn parse_expunge<'a, T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &'a mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
pub(crate) fn parse_expunge<T: Stream<Item = io::Result<ResponseData>> + Unpin + Send>(
stream: &mut T,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> impl Stream<Item = Result<u32>> + 'a + Send {
) -> impl Stream<Item = Result<u32>> + '_ + Send {
use futures::StreamExt;

StreamExt::filter_map(
Expand All @@ -113,9 +113,9 @@ pub(crate) fn parse_expunge<'a, T: Stream<Item = io::Result<ResponseData>> + Unp
)
}

pub(crate) async fn parse_capabilities<'a, T: Stream<Item = io::Result<ResponseData>> + Unpin>(
stream: &'a mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
pub(crate) async fn parse_capabilities<T: Stream<Item = io::Result<ResponseData>> + Unpin>(
stream: &mut T,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> Result<Capabilities> {
let mut caps: HashSet<Capability> = HashSet::new();
Expand Down Expand Up @@ -143,7 +143,7 @@ pub(crate) async fn parse_capabilities<'a, T: Stream<Item = io::Result<ResponseD

pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpin>(
stream: &mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> Result<()> {
while let Some(resp) = stream
Expand All @@ -160,7 +160,7 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi

pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + Unpin>(
stream: &mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> Result<Mailbox> {
let mut mailbox = Mailbox::default();
Expand Down Expand Up @@ -252,7 +252,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U

pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin>(
stream: &mut T,
unsolicited: sync::Sender<UnsolicitedResponse>,
unsolicited: channel::Sender<UnsolicitedResponse>,
command_tag: RequestId,
) -> Result<HashSet<u32>> {
let mut ids: HashSet<u32> = HashSet::new();
Expand Down Expand Up @@ -282,7 +282,7 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin
// (see Section 7 of RFC 3501):
pub(crate) async fn handle_unilateral(
res: ResponseData,
unsolicited: sync::Sender<UnsolicitedResponse>,
unsolicited: channel::Sender<UnsolicitedResponse>,
) {
// ignore these if they are not being consumed
if unsolicited.is_full() {
Expand All @@ -307,19 +307,32 @@ pub(crate) async fn handle_unilateral(
})
.collect(),
})
.await;
.await
.expect("Channel closed unexpectedly");
}
Response::MailboxData(MailboxDatum::Recent(n)) => {
unsolicited.send(UnsolicitedResponse::Recent(*n)).await;
unsolicited
.send(UnsolicitedResponse::Recent(*n))
.await
.expect("Channel closed unexpectedly");
}
Response::MailboxData(MailboxDatum::Exists(n)) => {
unsolicited.send(UnsolicitedResponse::Exists(*n)).await;
unsolicited
.send(UnsolicitedResponse::Exists(*n))
.await
.expect("Channel closed unexpectedly");
}
Response::Expunge(n) => {
unsolicited.send(UnsolicitedResponse::Expunge(*n)).await;
unsolicited
.send(UnsolicitedResponse::Expunge(*n))
.await
.expect("Channel closed unexpectedly");
}
_ => {
unsolicited.send(UnsolicitedResponse::Other(res)).await;
unsolicited
.send(UnsolicitedResponse::Other(res))
.await
.expect("Channel closed unexpectedly");
}
}
}
Expand Down Expand Up @@ -350,7 +363,7 @@ mod tests {
input_stream(&["* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]);

let mut stream = async_std::stream::from_iter(responses);
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let id = RequestId("A0001".into());
let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap();
// shouldn't be any unexpected responses parsed
Expand All @@ -368,7 +381,7 @@ mod tests {
let responses = input_stream(&["* CAPABILITY IMAP4REV1 STARTTLS\r\n"]);
let mut stream = async_std::stream::from_iter(responses);

let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let id = RequestId("A0001".into());
let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap();

Expand All @@ -383,7 +396,7 @@ mod tests {
#[async_std::test]
#[should_panic]
async fn parse_capability_invalid_test() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&["* JUNK IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]);
let mut stream = async_std::stream::from_iter(responses);

Expand All @@ -396,7 +409,7 @@ mod tests {

#[async_std::test]
async fn parse_names_test() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&["* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n"]);
let mut stream = async_std::stream::from_iter(responses);

Expand All @@ -417,7 +430,7 @@ mod tests {

#[async_std::test]
async fn parse_fetches_empty() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[]);
let mut stream = async_std::stream::from_iter(responses);
let id = RequestId("a".into());
Expand All @@ -432,7 +445,7 @@ mod tests {

#[async_std::test]
async fn parse_fetches_test() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[
"* 24 FETCH (FLAGS (\\Seen) UID 4827943)\r\n",
"* 25 FETCH (FLAGS (\\Seen))\r\n",
Expand Down Expand Up @@ -462,7 +475,7 @@ mod tests {
#[async_std::test]
async fn parse_fetches_w_unilateral() {
// https://github.com/mattnenterprise/rust-imap/issues/81
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&["* 37 FETCH (UID 74)\r\n", "* 1 RECENT\r\n"]);
let mut stream = async_std::stream::from_iter(responses);
let id = RequestId("a".into());
Expand All @@ -480,7 +493,7 @@ mod tests {

#[async_std::test]
async fn parse_names_w_unilateral() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[
"* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n",
"* 4 EXPUNGE\r\n",
Expand All @@ -506,7 +519,7 @@ mod tests {

#[async_std::test]
async fn parse_capabilities_w_unilateral() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[
"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n",
"* STATUS dev.github (MESSAGES 10 UIDNEXT 11 UIDVALIDITY 1408806928 UNSEEN 0)\r\n",
Expand Down Expand Up @@ -541,7 +554,7 @@ mod tests {

#[async_std::test]
async fn parse_ids_w_unilateral() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[
"* SEARCH 23 42 4711\r\n",
"* 1 RECENT\r\n",
Expand Down Expand Up @@ -571,7 +584,7 @@ mod tests {

#[async_std::test]
async fn parse_ids_test() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&[
"* SEARCH 1600 1698 1739 1781 1795 1885 1891 1892 1893 1898 1899 1901 1911 1926 1932 1933 1993 1994 2007 2032 2033 2041 2053 2062 2063 2065 2066 2072 2078 2079 2082 2084 2095 2100 2101 2102 2103 2104 2107 2116 2120 2135 2138 2154 2163 2168 2172 2189 2193 2198 2199 2205 2212 2213 2221 2227 2267 2275 2276 2295 2300 2328 2330 2332 2333 2334\r\n",
"* SEARCH 2335 2336 2337 2338 2339 2341 2342 2347 2349 2350 2358 2359 2362 2369 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2390 2392 2397 2400 2401 2403 2405 2409 2411 2414 2417 2419 2420 2424 2426 2428 2439 2454 2456 2467 2468 2469 2490 2515 2519 2520 2521\r\n",
Expand Down Expand Up @@ -604,7 +617,7 @@ mod tests {

#[async_std::test]
async fn parse_ids_search() {
let (send, recv) = sync::channel(10);
let (send, recv) = channel::bounded(10);
let responses = input_stream(&["* SEARCH\r\n"]);
let mut stream = async_std::stream::from_iter(responses);

Expand Down