Skip to content

Commit 67042c4

Browse files
committed
feat: close ImapStream after an error
This prevents calls to poll_next() from reaching the underlying stream after an error is returned once. Previously calls to poll_next() to ImapStream built on top of a network connection could have returned infinite stream of Some(Err(_)) values. This is dangerous for the code that looks for the end of stream without processing errors such as `while stream.next().await.is_some() {}` because it may result in infinite loop. It is still better to process errors by writing `while stream.try_next().await?.is_some() {}`, but the change protects in case of incorrect library user code.
1 parent 4d2d23f commit 67042c4

File tree

1 file changed

+66
-38
lines changed

1 file changed

+66
-38
lines changed

src/imap_stream.rs

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ pub struct ImapStream<R: Read + Write> {
2525
decode_needs: usize,
2626
/// The buffer.
2727
buffer: Buffer,
28+
29+
/// True if the stream should not return any more items.
30+
///
31+
/// This is set when reading from a stream
32+
/// returns an error.
33+
/// Afterwards the stream returns only `None`
34+
/// and `poll_next()` does not read
35+
/// from the underlying stream.
36+
read_closed: bool,
2837
}
2938

3039
impl<R: Read + Write + Unpin> ImapStream<R> {
@@ -34,6 +43,7 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
3443
inner,
3544
buffer: Buffer::new(),
3645
decode_needs: 0,
46+
read_closed: false,
3747
}
3848
}
3949

@@ -132,6 +142,52 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
132142
}
133143
}
134144
}
145+
146+
fn do_poll_next(
147+
mut self: Pin<&mut Self>,
148+
cx: &mut Context<'_>,
149+
) -> Poll<Option<io::Result<ResponseData>>> {
150+
let this = &mut *self;
151+
if let Some(response) = this.decode()? {
152+
return Poll::Ready(Some(Ok(response)));
153+
}
154+
loop {
155+
this.buffer.ensure_capacity(this.decode_needs)?;
156+
let buf = this.buffer.free_as_mut_slice();
157+
158+
// The buffer should have at least one byte free
159+
// before we try reading into it
160+
// so we can treat 0 bytes read as EOF.
161+
// This is guaranteed by `ensure_capacity()` above
162+
// even if it is called with 0 as an argument.
163+
debug_assert!(!buf.is_empty());
164+
165+
#[cfg(feature = "runtime-async-std")]
166+
let num_bytes_read = ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
167+
168+
#[cfg(feature = "runtime-tokio")]
169+
let num_bytes_read = {
170+
let buf = &mut tokio::io::ReadBuf::new(buf);
171+
let start = buf.filled().len();
172+
ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
173+
buf.filled().len() - start
174+
};
175+
176+
if num_bytes_read == 0 {
177+
if this.buffer.used() > 0 {
178+
return Poll::Ready(Some(Err(io::Error::new(
179+
io::ErrorKind::UnexpectedEof,
180+
"bytes remaining in stream",
181+
))));
182+
}
183+
return Poll::Ready(None);
184+
}
185+
this.buffer.extend_used(num_bytes_read);
186+
if let Some(response) = this.decode()? {
187+
return Poll::Ready(Some(Ok(response)));
188+
}
189+
}
190+
}
135191
}
136192

137193
/// Abstraction around needed buffer management.
@@ -273,46 +329,18 @@ impl<R: Read + Write + Unpin> Stream for ImapStream<R> {
273329
type Item = io::Result<ResponseData>;
274330

275331
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276-
let this = &mut *self;
277-
if let Some(response) = this.decode()? {
278-
return Poll::Ready(Some(Ok(response)));
332+
if self.read_closed {
333+
return Poll::Ready(None);
279334
}
280-
loop {
281-
this.buffer.ensure_capacity(this.decode_needs)?;
282-
let buf = this.buffer.free_as_mut_slice();
283-
284-
// The buffer should have at least one byte free
285-
// before we try reading into it
286-
// so we can treat 0 bytes read as EOF.
287-
// This is guaranteed by `ensure_capacity()` above
288-
// even if it is called with 0 as an argument.
289-
debug_assert!(!buf.is_empty());
290-
291-
#[cfg(feature = "runtime-async-std")]
292-
let num_bytes_read = ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
293-
294-
#[cfg(feature = "runtime-tokio")]
295-
let num_bytes_read = {
296-
let buf = &mut tokio::io::ReadBuf::new(buf);
297-
let start = buf.filled().len();
298-
ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
299-
buf.filled().len() - start
300-
};
301-
302-
if num_bytes_read == 0 {
303-
if this.buffer.used() > 0 {
304-
return Poll::Ready(Some(Err(io::Error::new(
305-
io::ErrorKind::UnexpectedEof,
306-
"bytes remaining in stream",
307-
))));
308-
}
309-
return Poll::Ready(None);
310-
}
311-
this.buffer.extend_used(num_bytes_read);
312-
if let Some(response) = this.decode()? {
313-
return Poll::Ready(Some(Ok(response)));
335+
let res = match ready!(self.as_mut().do_poll_next(cx)) {
336+
None => None,
337+
Some(Err(err)) => {
338+
self.read_closed = true;
339+
Some(Err(err))
314340
}
315-
}
341+
Some(Ok(item)) => Some(Ok(item)),
342+
};
343+
Poll::Ready(res)
316344
}
317345
}
318346

0 commit comments

Comments
 (0)