diff --git a/Cargo.toml b/Cargo.toml index 424f33a..f22fe64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,38 +19,33 @@ is-it-maintained-issue-resolution = { repository = "async-email/async-imap" } is-it-maintained-open-issues = { repository = "async-email/async-imap" } [features] -default = [] +default = ["runtime-async-std"] + +runtime-async-std = ["async-std", "async-native-tls/runtime-async-std"] +runtime-tokio = ["tokio", "async-native-tls/runtime-tokio"] [dependencies] -imap-proto = "0.14.3" -nom = "6.0" +imap-proto = "0.16.1" +nom = "7.0" base64 = "0.13" chrono = "0.4" -async-native-tls = { version = "0.3.3" } -async-std = { version = "1.8.0", default-features = false, features = ["std"] } pin-utils = "0.1.0-alpha.4" futures = "0.3.15" -ouroboros = "0.9" +ouroboros = "0.15" stop-token = "0.7" byte-pool = "0.2.2" once_cell = "1.8.0" log = "0.4.8" thiserror = "1.0.9" +async-channel = "1.6.1" + +async-native-tls = { version = "0.4", default-features = false } +async-std = { version = "1.8.0", default-features = false, features = ["std"], optional = true } +tokio = { version = "1", features = ["net", "sync", "time"], optional = true } + [dev-dependencies] -lettre_email = "0.9" -pretty_assertions = "0.6.1" -async-smtp = { version = "0.3.0" } -async-std = { version = "1.8.0", default-features = false, features = ["std", "attributes"] } - -[[example]] -name = "basic" -required-features = ["default"] - -[[example]] -name = "gmail_oauth2" -required-features = ["default"] - -[[test]] -name = "imap_integration" -required-features = ["default"] +pretty_assertions = "1.2" +async-std = { version = "1.8.0", features = ["std", "attributes"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } + diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 0000000..32038e4 --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "async-imap-examples" +version = "0.1.0" +publish = false +authors = ["dignifiedquire "] +license = "Apache-2.0/MIT" +edition = "2018" + +[features] +default = ["runtime-async-std"] + +runtime-async-std = ["async-std", "async-native-tls/runtime-async-std", "async-smtp/runtime-async-std", "async-imap/runtime-async-std"] +runtime-tokio = ["tokio", "async-native-tls/runtime-tokio", "async-smtp/runtime-tokio", "async-imap/runtime-tokio"] + +[dependencies] +async-imap = { path = "../", default-features = false } +async-native-tls = { version = "0.4", default-features = false } +async-smtp = { version = "0.4", default-features = false, features = ["smtp-transport"] } + +async-std = { version = "1.8.0", features = ["std", "attributes"], optional = true } +futures = "0.3.21" +tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } + +[patch.crates-io] +async-smtp = { git = "https://github.com/async-email/async-smtp", branch = "tokio" } diff --git a/examples/README.md b/examples/README.md index fe9abb6..241a71d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,9 @@ Examples: * basic - This is a very basic example of using the client. * idle - This is a basic example of how to perform an IMAP IDLE call - and interrupt it based on typing a line into stdin. + and interrupt it based on typing a line into stdin. * gmail_oauth2 - This is an example using oauth2 for logging into - gmail via the OAUTH2 mechanism. + gmail via the OAUTH2 mechanism. + +* futures - The basic example, but using the `futures` executor. diff --git a/examples/gmail_oauth2.rs b/examples/gmail_oauth2.rs deleted file mode 100644 index f5c382e..0000000 --- a/examples/gmail_oauth2.rs +++ /dev/null @@ -1,63 +0,0 @@ -use async_imap::error::Result; -use async_std::prelude::*; -use async_std::task; - -struct GmailOAuth2 { - user: String, - access_token: String, -} - -impl async_imap::Authenticator for &GmailOAuth2 { - type Response = String; - - fn process(&mut self, _data: &[u8]) -> Self::Response { - format!( - "user={}\x01auth=Bearer {}\x01\x01", - self.user, self.access_token - ) - } -} - -fn main() -> Result<()> { - task::block_on(async move { - let gmail_auth = GmailOAuth2 { - user: String::from("sombody@gmail.com"), - access_token: String::from(""), - }; - let domain = "imap.gmail.com"; - let port = 993; - let socket_addr = (domain, port); - let tls = async_native_tls::TlsConnector::new(); - let client = async_imap::connect(socket_addr, domain, tls).await?; - - let mut imap_session = match client.authenticate("XOAUTH2", &gmail_auth).await { - Ok(c) => c, - Err((e, _unauth_client)) => { - println!("error authenticating: {}", e); - return Err(e); - } - }; - - match imap_session.select("INBOX").await { - Ok(mailbox) => println!("{}", mailbox), - Err(e) => println!("Error selecting INBOX: {}", e), - }; - - { - let mut msgs = imap_session.fetch("2", "body[text]").await.map_err(|e| { - eprintln!("Error Fetching email 2: {}", e); - e - })?; - - // TODO: get rid of this - let mut msgs = unsafe { std::pin::Pin::new_unchecked(&mut msgs) }; - - while let Some(msg) = msgs.next().await { - print!("{:?}", msg?); - } - } - - imap_session.logout().await?; - Ok(()) - }) -} diff --git a/examples/src/bin/basic.rs b/examples/src/bin/basic.rs new file mode 100644 index 0000000..25ce577 --- /dev/null +++ b/examples/src/bin/basic.rs @@ -0,0 +1,59 @@ +use std::env; + +use async_imap::error::{Error, Result}; +use futures::TryStreamExt; + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; + println!("**result:\n{}", res.unwrap()); + Ok(()) + } +} + +async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { + let tls = async_native_tls::TlsConnector::new(); + let imap_addr = (imap_server, 993); + + // we pass in the imap_server twice to check that the server's TLS + // certificate is valid for the imap_server we're connecting to. + let client = async_imap::connect(imap_addr, imap_server, tls).await?; + println!("-- connected to {}:{}", imap_addr.0, imap_addr.1); + + // the client we have here is unauthenticated. + // to do anything useful with the e-mails, we need to log in + let mut imap_session = client.login(login, password).await.map_err(|e| e.0)?; + println!("-- logged in a {}", login); + + // we want to fetch the first email in the INBOX mailbox + imap_session.select("INBOX").await?; + println!("-- INBOX selected"); + + // fetch message number 1 in this mailbox, along with its RFC822 field. + // RFC 822 dictates the format of the body of e-mails + let messages_stream = imap_session.fetch("1", "RFC822").await?; + let messages: Vec<_> = messages_stream.try_collect().await?; + let message = if let Some(m) = messages.first() { + m + } else { + return Ok(None); + }; + + // extract the message's body + let body = message.body().expect("message did not have a body!"); + let body = std::str::from_utf8(body) + .expect("message was not valid utf-8") + .to_string(); + println!("-- 1 message received, logging out"); + + // be nice to the server and log out + imap_session.logout().await?; + + Ok(Some(body)) +} diff --git a/examples/basic.rs b/examples/src/bin/futures.rs similarity index 84% rename from examples/basic.rs rename to examples/src/bin/futures.rs index 333ceab..5a898e1 100644 --- a/examples/basic.rs +++ b/examples/src/bin/futures.rs @@ -1,10 +1,10 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; -use async_std::task; use std::env; +use async_imap::error::{Error, Result}; +use futures::TryStreamExt; + fn main() -> Result<()> { - task::block_on(async { + futures::executor::block_on(async { let args: Vec = env::args().collect(); if args.len() != 4 { eprintln!("need three arguments: imap-server login password"); @@ -19,12 +19,11 @@ fn main() -> Result<()> { async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { let tls = async_native_tls::TlsConnector::new(); - let imap_addr = (imap_server, 993); // we pass in the imap_server twice to check that the server's TLS // certificate is valid for the imap_server we're connecting to. - let client = async_imap::connect(imap_addr, imap_server, tls).await?; - println!("-- connected to {}:{}", imap_addr.0, imap_addr.1); + let client = async_imap::connect((imap_server, 993), imap_server, tls).await?; + println!("-- connected to {}:{}", imap_server, 993); // the client we have here is unauthenticated. // to do anything useful with the e-mails, we need to log in @@ -38,7 +37,7 @@ async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Resu // fetch message number 1 in this mailbox, along with its RFC822 field. // RFC 822 dictates the format of the body of e-mails let messages_stream = imap_session.fetch("1", "RFC822").await?; - let messages: Vec<_> = messages_stream.collect::>().await?; + let messages: Vec<_> = messages_stream.try_collect().await?; let message = if let Some(m) = messages.first() { m } else { diff --git a/examples/src/bin/gmail_oauth2.rs b/examples/src/bin/gmail_oauth2.rs new file mode 100644 index 0000000..da68bd2 --- /dev/null +++ b/examples/src/bin/gmail_oauth2.rs @@ -0,0 +1,59 @@ +use async_imap::error::Result; +use futures::StreamExt; + +struct GmailOAuth2 { + user: String, + access_token: String, +} + +impl async_imap::Authenticator for &GmailOAuth2 { + type Response = String; + + fn process(&mut self, _data: &[u8]) -> Self::Response { + format!( + "user={}\x01auth=Bearer {}\x01\x01", + self.user, self.access_token + ) + } +} + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let gmail_auth = GmailOAuth2 { + user: String::from("sombody@gmail.com"), + access_token: String::from(""), + }; + let domain = "imap.gmail.com"; + let port = 993; + let socket_addr = (domain, port); + let tls = async_native_tls::TlsConnector::new(); + let client = async_imap::connect(socket_addr, domain, tls).await?; + + let mut imap_session = match client.authenticate("XOAUTH2", &gmail_auth).await { + Ok(c) => c, + Err((e, _unauth_client)) => { + println!("error authenticating: {}", e); + return Err(e); + } + }; + + match imap_session.select("INBOX").await { + Ok(mailbox) => println!("{}", mailbox), + Err(e) => println!("Error selecting INBOX: {}", e), + }; + + { + let mut msgs = imap_session.fetch("2", "body[text]").await.map_err(|e| { + eprintln!("Error Fetching email 2: {}", e); + e + })?; + + while let Some(msg) = msgs.next().await { + print!("{:?}", msg?); + } + } + + imap_session.logout().await?; + Ok(()) +} diff --git a/examples/idle.rs b/examples/src/bin/idle.rs similarity index 78% rename from examples/idle.rs rename to examples/src/bin/idle.rs index 4aa83f9..60af895 100644 --- a/examples/idle.rs +++ b/examples/src/bin/idle.rs @@ -1,22 +1,27 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; -// use async_std::io; -use async_imap::extensions::idle::IdleResponse::*; -use async_std::task; use std::env; use std::time::Duration; -fn main() -> Result<()> { - task::block_on(async { - let args: Vec = env::args().collect(); - if args.len() != 4 { - eprintln!("need three arguments: imap-server login password"); - Err(Error::Bad("need three arguments".into())) - } else { - fetch_and_idle(&args[1], &args[2], &args[3]).await?; - Ok(()) - } - }) +use async_imap::error::{Error, Result}; +use async_imap::extensions::idle::IdleResponse::*; +use futures::StreamExt; + +#[cfg(feature = "runtime-async-std")] +use async_std::{task, task::sleep}; + +#[cfg(feature = "runtime-tokio")] +use tokio::{task, time::sleep}; + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + fetch_and_idle(&args[1], &args[2], &args[3]).await?; + Ok(()) + } } async fn fetch_and_idle(imap_server: &str, login: &str, password: &str) -> Result<()> { @@ -59,7 +64,7 @@ async fn fetch_and_idle(imap_server: &str, login: &str, password: &str) -> Resul task::spawn(async move { println!("-- thread: waiting for 30s"); - task::sleep(Duration::from_secs(30)).await; + sleep(Duration::from_secs(30)).await; println!("-- thread: waited 30 secs, now interrupting idle"); drop(interrupt); }); diff --git a/examples/src/bin/integration.rs b/examples/src/bin/integration.rs new file mode 100644 index 0000000..a6882f7 --- /dev/null +++ b/examples/src/bin/integration.rs @@ -0,0 +1,301 @@ +use std::borrow::Cow; +use std::time::Duration; + +use async_imap::Session; +use async_native_tls::TlsConnector; +use async_smtp::ServerAddress; +#[cfg(feature = "runtime-async-std")] +use async_std::{net::TcpStream, task, task::sleep}; +use futures::{StreamExt, TryStreamExt}; +#[cfg(feature = "runtime-tokio")] +use tokio::{net::TcpStream, task, time::sleep}; + +fn native_tls() -> async_native_tls::TlsConnector { + async_native_tls::TlsConnector::new() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) +} + +fn tls() -> TlsConnector { + TlsConnector::new() + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) +} + +fn test_host() -> String { + std::env::var("TEST_HOST").unwrap_or_else(|_| "127.0.0.1".into()) +} + +async fn session(user: &str) -> Session> { + async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) + .await + .unwrap() + .login(user, user) + .await + .ok() + .unwrap() +} + +async fn smtp(user: &str) -> async_smtp::SmtpTransport { + let creds = + async_smtp::smtp::authentication::Credentials::new(user.to_string(), user.to_string()); + async_smtp::SmtpClient::with_security( + ServerAddress::new(test_host(), 3465), + async_smtp::ClientSecurity::Wrapper(async_smtp::ClientTlsParameters { + connector: native_tls(), + domain: "localhost".to_string(), + }), + ) + .credentials(creds) + .into_transport() +} + +async fn _connect_insecure_then_secure() { + let stream = TcpStream::connect((test_host().as_ref(), 3143)) + .await + .unwrap(); + + // ignored because of https://github.com/greenmail-mail-test/greenmail/issues/135 + async_imap::Client::new(stream) + .secure("imap.example.com", tls()) + .await + .unwrap(); +} + +async fn _connect_secure() { + async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) + .await + .unwrap(); +} + +async fn login() { + session("readonly-test@localhost").await; +} + +async fn logout() { + let mut s = session("readonly-test@localhost").await; + s.logout().await.unwrap(); +} + +async fn inbox_zero() { + // https://github.com/greenmail-mail-test/greenmail/issues/265 + let mut s = session("readonly-test@localhost").await; + s.select("INBOX").await.unwrap(); + let inbox = s.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +fn make_email(to: &str) -> async_smtp::SendableEmail { + let message_id = "abc"; + async_smtp::SendableEmail::new( + async_smtp::Envelope::new( + Some("sender@localhost".parse().unwrap()), + vec![to.parse().unwrap()], + ) + .unwrap(), + message_id, + format!("To: <{}>\r\nFrom: \r\nMessage-ID: <{}.msg@localhost>\r\nSubject: My first e-mail\r\n\r\nHello world from SMTP", to, message_id), + ) +} + +async fn inbox() { + let to = "inbox@localhost"; + + // first log in so we'll see the unsolicited e-mails + let mut c = session(to).await; + c.select("INBOX").await.unwrap(); + + println!("sending"); + let mut s = smtp(to).await; + + // then send the e-mail + let mail = make_email(to); + s.connect_and_send(mail).await.unwrap(); + + println!("searching"); + + // now we should see the e-mail! + let inbox = c.search("ALL").await.unwrap(); + // and the one message should have the first message sequence number + assert_eq!(inbox.len(), 1); + assert!(inbox.contains(&1)); + + // we should also get two unsolicited responses: Exists and Recent + c.noop().await.unwrap(); + println!("noop done"); + let mut unsolicited = Vec::new(); + while !c.unsolicited_responses.is_empty() { + unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); + } + + assert_eq!(unsolicited.len(), 2); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); + + println!("fetching"); + + // let's see that we can also fetch the e-mail + let fetch: Vec<_> = c + .fetch("1", "(ALL UID)") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(fetch.len(), 1); + let fetch = &fetch[0]; + assert_eq!(fetch.message, 1); + assert_ne!(fetch.uid, None); + assert_eq!(fetch.size, Some(21)); + let e = fetch.envelope().unwrap(); + assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); + assert_ne!(e.from, None); + assert_eq!(e.from.as_ref().unwrap().len(), 1); + let from = &e.from.as_ref().unwrap()[0]; + assert_eq!(from.mailbox, Some(Cow::Borrowed(&b"sender"[..]))); + assert_eq!(from.host, Some(Cow::Borrowed(&b"localhost"[..]))); + assert_ne!(e.to, None); + assert_eq!(e.to.as_ref().unwrap().len(), 1); + let to = &e.to.as_ref().unwrap()[0]; + assert_eq!(to.mailbox, Some(Cow::Borrowed(&b"inbox"[..]))); + assert_eq!(to.host, Some(Cow::Borrowed(&b"localhost"[..]))); + let date_opt = fetch.internal_date(); + assert!(date_opt.is_some()); + + // and let's delete it to clean up + c.store("1", "+FLAGS (\\Deleted)") + .await + .unwrap() + .collect::>() + .await; + c.expunge().await.unwrap().collect::>().await; + + // the e-mail should be gone now + let inbox = c.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +async fn inbox_uid() { + let to = "inbox-uid@localhost"; + + // first log in so we'll see the unsolicited e-mails + let mut c = session(to).await; + c.select("INBOX").await.unwrap(); + + // then send the e-mail + let mut s = smtp(to).await; + let e = make_email(to); + s.connect_and_send(e).await.unwrap(); + + // now we should see the e-mail! + let inbox = c.uid_search("ALL").await.unwrap(); + // and the one message should have the first message sequence number + assert_eq!(inbox.len(), 1); + let uid = inbox.into_iter().next().unwrap(); + + // we should also get two unsolicited responses: Exists and Recent + c.noop().await.unwrap(); + let mut unsolicited = Vec::new(); + while !c.unsolicited_responses.is_empty() { + unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); + } + + assert_eq!(unsolicited.len(), 2); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); + + // let's see that we can also fetch the e-mail + let fetch: Vec<_> = c + .uid_fetch(format!("{}", uid), "(ALL UID)") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(fetch.len(), 1); + let fetch = &fetch[0]; + assert_eq!(fetch.uid, Some(uid)); + let e = fetch.envelope().unwrap(); + assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); + let date_opt = fetch.internal_date(); + assert!(date_opt.is_some()); + + // and let's delete it to clean up + c.uid_store(format!("{}", uid), "+FLAGS (\\Deleted)") + .await + .unwrap() + .collect::>() + .await; + c.expunge().await.unwrap().collect::>().await; + + // the e-mail should be gone now + let inbox = c.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +async fn _list() { + let mut s = session("readonly-test@localhost").await; + s.select("INBOX").await.unwrap(); + let subdirs: Vec<_> = s.list(None, Some("%")).await.unwrap().collect().await; + assert_eq!(subdirs.len(), 0); + + // TODO: make a subdir +} + +// Greenmail does not support IDLE :( +async fn _idle() -> async_imap::error::Result<()> { + let mut session = session("idle-test@localhost").await; + + // get that inbox + let res = session.select("INBOX").await?; + println!("selected: {:#?}", res); + + // fetchy fetch + let msg_stream = session.fetch("1:3", "(FLAGS BODY.PEEK[])").await?; + let msgs = msg_stream.collect::>().await; + println!("msgs: {:?}", msgs.len()); + + // Idle session + println!("starting idle"); + let mut idle = session.idle(); + idle.init().await?; + + let (idle_wait, interrupt) = idle.wait_with_timeout(std::time::Duration::from_secs(30)); + println!("idle wait"); + + task::spawn(async move { + println!("waiting for 1s"); + sleep(Duration::from_secs(2)).await; + println!("interrupting idle"); + drop(interrupt); + }); + + let idle_result = idle_wait.await; + println!("idle result: {:#?}", &idle_result); + + // return the session after we are done with it + let mut session = idle.done().await?; + + println!("logging out"); + session.logout().await?; + + Ok(()) +} + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() { + login().await; + logout().await; + inbox_zero().await; + inbox().await; + inbox_uid().await; +} diff --git a/src/client.rs b/src/client.rs index b72b0e9..cc53087 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,13 +4,21 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::str; +use async_channel::{self as channel, bounded}; use async_native_tls::{TlsConnector, TlsStream}; -use async_std::channel; -use async_std::io::{self, Read, Write}; -use async_std::net::{TcpStream, ToSocketAddrs}; -use async_std::prelude::*; +#[cfg(feature = "runtime-async-std")] +use async_std::{ + io::{Read, Write, WriteExt}, + net::{TcpStream, ToSocketAddrs}, +}; use extensions::quota::parse_get_quota_root; +use futures::{io, Stream, StreamExt}; use imap_proto::{RequestId, Response}; +#[cfg(feature = "runtime-tokio")] +use tokio::{ + io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt}, + net::{TcpStream, ToSocketAddrs}, +}; use super::authenticator::Authenticator; use super::error::{Error, ParseError, Result, ValidateError}; @@ -359,7 +367,7 @@ impl Session { // not public, just to avoid duplicating the channel creation code fn new(conn: Connection) -> Self { - let (tx, rx) = channel::bounded(100); + let (tx, rx) = bounded(100); Session { conn, unsolicited_responses: rx, @@ -814,12 +822,15 @@ impl Session { /// /// ```no_run /// use async_imap::{types::Seq, Session, error::Result}; - /// use async_std::prelude::*; + /// #[cfg(feature = "runtime-async-std")] /// use async_std::net::TcpStream; + /// #[cfg(feature = "runtime-tokio")] + /// use tokio::net::TcpStream; + /// use futures::TryStreamExt; /// /// async fn delete(seq: Seq, s: &mut Session) -> Result<()> { /// let updates_stream = s.store(format!("{}", seq), "+FLAGS (\\Deleted)").await?; - /// let _updates: Vec<_> = updates_stream.collect::>().await?; + /// let _updates: Vec<_> = updates_stream.try_collect().await?; /// s.expunge().await?; /// Ok(()) /// } @@ -1464,6 +1475,7 @@ mod tests { use super::super::mock_stream::MockStream; use super::*; use std::borrow::Cow; + use std::future::Future; use async_std::sync::{Arc, Mutex}; use imap_proto::Status; @@ -1490,7 +1502,8 @@ mod tests { }; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn fetch_body() { let response = "a0 OK Logged in.\r\n\ * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\ @@ -1500,7 +1513,8 @@ mod tests { session.read_response().await.unwrap().unwrap(); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn readline_delay_read() { let greeting = "* OK Dovecot ready.\r\n"; let mock_stream = MockStream::default() @@ -1519,7 +1533,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn readline_eof() { let mock_stream = MockStream::default().with_eof(); let mut client = mock_client!(mock_stream); @@ -1527,7 +1542,8 @@ mod tests { assert!(res.is_none()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[should_panic] async fn readline_err() { // TODO Check the error test @@ -1536,7 +1552,8 @@ mod tests { client.read_response().await.unwrap().unwrap(); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn authenticate() { let response = b"+ YmFy\r\n\ A0001 OK Logged in\r\n" @@ -1567,7 +1584,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn login() { let response = b"A0001 OK Logged in\r\n".to_vec(); let username = "username"; @@ -1586,7 +1604,8 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn logout() { let response = b"A0001 OK Logout completed.\r\n".to_vec(); let command = "A0001 LOGOUT\r\n"; @@ -1599,7 +1618,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn rename() { let response = b"A0001 OK RENAME completed\r\n".to_vec(); let current_mailbox_name = "INBOX"; @@ -1621,7 +1641,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn subscribe() { let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec(); let mailbox = "INBOX"; @@ -1635,7 +1656,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn unsubscribe() { let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec(); let mailbox = "INBOX"; @@ -1649,7 +1671,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn expunge() { let response = b"A0001 OK EXPUNGE completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1661,7 +1684,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_expunge() { let response = b"* 2 EXPUNGE\r\n\ * 3 EXPUNGE\r\n\ @@ -1682,7 +1706,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn check() { let response = b"A0001 OK CHECK completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1694,7 +1719,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn examine() { let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\ @@ -1733,7 +1759,8 @@ mod tests { assert_eq!(mailbox, expected_mailbox); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn select() { let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \ @@ -1781,7 +1808,8 @@ mod tests { assert_eq!(mailbox, expected_mailbox); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn search() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0001 OK Search completed\r\n" @@ -1797,7 +1825,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_search() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0001 OK Search completed\r\n" @@ -1813,7 +1842,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_search_unordered() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0002 OK CAPABILITY completed\r\n\ @@ -1830,7 +1860,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn capability() { let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\ A0001 OK CAPABILITY completed\r\n" @@ -1849,7 +1880,8 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn create() { let response = b"A0001 OK CREATE completed\r\n".to_vec(); let mailbox_name = "INBOX"; @@ -1863,7 +1895,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn delete() { let response = b"A0001 OK DELETE completed\r\n".to_vec(); let mailbox_name = "INBOX"; @@ -1877,7 +1910,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn noop() { let response = b"A0001 OK NOOP completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1889,7 +1923,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn close() { let response = b"A0001 OK CLOSE completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1901,7 +1936,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn store() { generic_store(" ", |c, set, query| async move { c.lock() @@ -1915,7 +1951,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_store() { generic_store(" UID ", |c, set, query| async move { c.lock() @@ -1942,7 +1979,8 @@ mod tests { generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn copy() { generic_copy(" ", |c, set, query| async move { c.lock().await.copy(set, query).await?; @@ -1951,7 +1989,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_copy() { generic_copy(" UID ", |c, set, query| async move { c.lock().await.uid_copy(set, query).await?; @@ -1976,7 +2015,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn mv() { let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\ * 2 EXPUNGE\r\n\ @@ -1994,7 +2034,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_mv() { let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\ * 2 EXPUNGE\r\n\ @@ -2012,7 +2053,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn fetch() { generic_fetch(" ", |c, seq, query| async move { c.lock() @@ -2027,7 +2069,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_fetch() { generic_fetch(" UID ", |c, seq, query| async move { c.lock() diff --git a/src/error.rs b/src/error.rs index fe686ea..a385818 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,13 +1,12 @@ //! IMAP error types. use std::io::Error as IoError; -use std::result; use std::str::Utf8Error; use base64::DecodeError; /// A convenience wrapper around `Result` for `imap::Error`. -pub type Result = result::Result; +pub type Result = std::result::Result; /// A set of errors that can occur in the IMAP client #[derive(thiserror::Error, Debug)] diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index 1dc49bb..05a71d7 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -4,12 +4,20 @@ use std::fmt; use std::pin::Pin; use std::time::Duration; -use async_std::io::{self, Read, Write}; -use async_std::prelude::*; -use async_std::stream::Stream; +#[cfg(feature = "runtime-async-std")] +use async_std::{ + future::timeout, + io::{Read, Write}, +}; +use futures::prelude::*; use futures::task::{Context, Poll}; use imap_proto::{RequestId, Response, Status}; use stop_token::prelude::*; +#[cfg(feature = "runtime-tokio")] +use tokio::{ + io::{AsyncRead as Read, AsyncWrite as Write}, + time::timeout, +}; use crate::client::Session; use crate::error::Result; @@ -40,7 +48,7 @@ pub struct Handle { impl Unpin for Handle {} impl Stream for Handle { - type Item = io::Result; + type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().session().get_stream().poll_next(cx) @@ -141,7 +149,7 @@ impl Handle { /// Must be called after [Handle::init]. pub fn wait_with_timeout( &mut self, - timeout: Duration, + dur: Duration, ) -> ( impl Future> + '_, stop_token::StopSource, @@ -153,7 +161,7 @@ impl Handle { let (waiter, interrupt) = self.wait(); let fut = async move { - match async_std::future::timeout(timeout, waiter).await { + match timeout(dur, waiter).await { Ok(res) => res, Err(_err) => Ok(IdleResponse::Timeout), } @@ -180,8 +188,8 @@ impl Handle { } => { if tag == self.id.as_ref().unwrap() { if let Status::Bad = status { - return Err(io::Error::new( - io::ErrorKind::ConnectionRefused, + return Err(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, information.as_ref().unwrap().to_string(), ) .into()); @@ -195,7 +203,7 @@ impl Handle { } } - Err(io::Error::new(io::ErrorKind::ConnectionRefused, "").into()) + Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "").into()) } /// Signal that we want to exit the idle connection, by sending the `DONE` diff --git a/src/extensions/quota.rs b/src/extensions/quota.rs index 7d4abc4..64469af 100644 --- a/src/extensions/quota.rs +++ b/src/extensions/quota.rs @@ -1,15 +1,14 @@ //! Adds support for the GETQUOTA and GETQUOTAROOT commands specificed in [RFC2087](https://tools.ietf.org/html/rfc2087). -use async_std::channel; -use async_std::io; -use async_std::prelude::*; -use async_std::stream::Stream; +use async_channel as channel; +use futures::io; +use futures::prelude::*; use imap_proto::{self, RequestId, Response}; use crate::types::*; use crate::{ error::Result, - parse::{filter_sync, handle_unilateral}, + parse::{filter, handle_unilateral}, }; use crate::{ error::{Error, ParseError}, @@ -23,7 +22,7 @@ pub(crate) async fn parse_get_quota> + ) -> Result { let mut quota = None; while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -53,7 +52,7 @@ pub(crate) async fn parse_get_quota_root = Vec::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { diff --git a/src/imap_stream.rs b/src/imap_stream.rs index 492f0a5..b97e005 100644 --- a/src/imap_stream.rs +++ b/src/imap_stream.rs @@ -1,14 +1,17 @@ use std::fmt; use std::pin::Pin; +use std::sync::Arc; -use async_std::io::{self, Read, Write}; -use async_std::prelude::*; -use async_std::stream::Stream; -use async_std::sync::Arc; +#[cfg(feature = "runtime-async-std")] +use async_std::io::{Read, Write, WriteExt}; use byte_pool::{Block, BytePool}; +use futures::stream::Stream; use futures::task::{Context, Poll}; +use futures::{io, ready}; use nom::Needed; use once_cell::sync::Lazy; +#[cfg(feature = "runtime-tokio")] +use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt}; use crate::types::{Request, ResponseData}; @@ -287,13 +290,19 @@ impl Stream for ImapStream { } loop { this.buffer.ensure_capacity(this.decode_needs)?; - let num_bytes_read = - match Pin::new(&mut this.inner).poll_read(cx, this.buffer.free_as_mut_slice()) { - Poll::Ready(result) => result?, - Poll::Pending => { - return Poll::Pending; - } - }; + let buf = this.buffer.free_as_mut_slice(); + + #[cfg(feature = "runtime-async-std")] + let num_bytes_read = ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + + #[cfg(feature = "runtime-tokio")] + let num_bytes_read = { + let buf = &mut tokio::io::ReadBuf::new(buf); + let start = buf.filled().len(); + ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + buf.filled().len() - start + }; + if num_bytes_read == 0 { this.closed = true; return Poll::Ready(this.stream_eof_value()); diff --git a/src/lib.rs b/src/lib.rs index 91be90f..4b51f81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ //! Below is a basic client example. See the `examples/` directory for more. //! //! ```no_run -//! use async_std::prelude::*; +//! use futures::prelude::*; //! use async_imap::error::Result; //! //! async fn fetch_inbox_top() -> Result> { @@ -40,7 +40,7 @@ //! // fetch message number 1 in this mailbox, along with its RFC822 field. //! // RFC 822 dictates the format of the body of e-mails //! let messages_stream = imap_session.fetch("1", "RFC822").await?; -//! let messages: Vec<_> = messages_stream.collect::>().await?; +//! let messages: Vec<_> = messages_stream.try_collect().await?; //! let message = if let Some(m) = messages.first() { //! m //! } else { @@ -62,6 +62,11 @@ #![warn(missing_docs)] #![deny(rust_2018_idioms, unsafe_code)] +#[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))] +compile_error!("one of 'runtime-async-std' or 'runtime-tokio' features must be enabled"); + +#[cfg(all(feature = "runtime-tokio", feature = "runtime-async-std"))] +compile_error!("only one of 'runtime-async-std' or 'runtime-tokio' features must be enabled"); #[macro_use] extern crate pin_utils; diff --git a/src/mock_stream.rs b/src/mock_stream.rs index fb19667..bcf12fd 100644 --- a/src/mock_stream.rs +++ b/src/mock_stream.rs @@ -1,10 +1,14 @@ use std::cmp::min; +use std::io::{Error, ErrorKind, Result}; use std::pin::Pin; +use std::task::{Context, Poll}; -use async_std::io::{Error, ErrorKind, Read, Result, Write}; -use futures::task::{Context, Poll}; +#[cfg(feature = "runtime-async-std")] +use async_std::io::{Read, Write}; +#[cfg(feature = "runtime-tokio")] +use tokio::io::{AsyncRead as Read, AsyncWrite as Write}; -#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[derive(Default, Clone, Debug, Eq, PartialEq, Hash)] pub struct MockStream { read_buf: Vec, read_pos: usize, @@ -14,19 +18,6 @@ pub struct MockStream { read_delay: usize, } -impl Default for MockStream { - fn default() -> Self { - MockStream { - read_buf: Vec::new(), - read_pos: 0, - written_buf: Vec::new(), - err_on_read: false, - eof_on_read: false, - read_delay: 0, - } - } -} - impl MockStream { pub fn new(read_buf: Vec) -> MockStream { MockStream::default().with_buf(read_buf) @@ -53,6 +44,55 @@ impl MockStream { } } +#[cfg(feature = "runtime-tokio")] +impl Read for MockStream { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.eof_on_read { + return Poll::Ready(Ok(())); + } + if self.err_on_read { + return Poll::Ready(Err(Error::new(ErrorKind::Other, "MockStream Error"))); + } + if self.read_pos >= self.read_buf.len() { + return Poll::Ready(Err(Error::new(ErrorKind::UnexpectedEof, "EOF"))); + } + let mut write_len = min(buf.remaining(), self.read_buf.len() - self.read_pos); + if self.read_delay > 0 { + self.read_delay -= 1; + write_len = min(write_len, 1); + } + let max_pos = self.read_pos + write_len; + buf.put_slice(&self.read_buf[self.read_pos..max_pos]); + self.read_pos += write_len; + Poll::Ready(Ok(())) + } +} + +#[cfg(feature = "runtime-tokio")] +impl Write for MockStream { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.written_buf.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[cfg(feature = "runtime-async-std")] impl Read for MockStream { fn poll_read( mut self: Pin<&mut Self>, @@ -82,6 +122,7 @@ impl Read for MockStream { } } +#[cfg(feature = "runtime-async-std")] impl Write for MockStream { fn poll_write( mut self: Pin<&mut Self>, diff --git a/src/parse.rs b/src/parse.rs index 07663de..7913b92 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; -use async_std::channel; -use async_std::io; -use async_std::prelude::*; -use async_std::stream::Stream; +use async_channel as channel; +use futures::io; +use futures::prelude::*; +use futures::stream::Stream; use imap_proto::{self, MailboxDatum, RequestId, Response}; use crate::error::{Error, Result}; @@ -41,7 +41,10 @@ pub(crate) fn parse_names> + Unpin + S ) } -fn filter(res: &io::Result, command_tag: &RequestId) -> impl Future { +pub(crate) fn filter( + res: &io::Result, + command_tag: &RequestId, +) -> impl Future { let val = filter_sync(res, command_tag); futures::future::ready(val) } @@ -121,7 +124,7 @@ pub(crate) async fn parse_capabilities let mut caps: HashSet = HashSet::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -147,7 +150,7 @@ pub(crate) async fn parse_noop> + Unpi command_tag: RequestId, ) -> Result<()> { while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -294,7 +297,7 @@ pub(crate) async fn parse_ids> + Unpin let mut ids: HashSet = HashSet::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -365,6 +368,7 @@ pub(crate) async fn handle_unilateral( #[cfg(test)] mod tests { use super::*; + use async_channel::bounded; fn input_stream(data: &[&str]) -> Vec> { data.iter() @@ -380,14 +384,15 @@ mod tests { .collect() } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capability_test() { let expected_capabilities = &["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"]; let responses = input_stream(&["* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]); let mut stream = async_std::stream::from_iter(responses); - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let id = RequestId("A0001".into()); let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap(); // shouldn't be any unexpected responses parsed @@ -398,14 +403,15 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capability_case_insensitive_test() { // Test that "IMAP4REV1" (instead of "IMAP4rev1") is accepted let expected_capabilities = &["IMAP4rev1", "STARTTLS"]; let responses = input_stream(&["* CAPABILITY IMAP4REV1 STARTTLS\r\n"]); let mut stream = async_std::stream::from_iter(responses); - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let id = RequestId("A0001".into()); let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap(); @@ -417,10 +423,11 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[should_panic] async fn parse_capability_invalid_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* JUNK IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]); let mut stream = async_std::stream::from_iter(responses); @@ -431,45 +438,48 @@ mod tests { assert!(recv.is_empty()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_names_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n"]); let mut stream = async_std::stream::from_iter(responses); let id = RequestId("A0001".into()); let names: Vec<_> = parse_names(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); assert_eq!(names.len(), 1); assert_eq!( names[0].attributes(), - &[NameAttribute::from("\\HasNoChildren")] + &[NameAttribute::Extension("\\HasNoChildren".into())] ); assert_eq!(names[0].delimiter(), Some(".")); assert_eq!(names[0].name(), "INBOX"); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_empty() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[]); let mut stream = async_std::stream::from_iter(responses); let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); assert!(fetches.is_empty()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* 24 FETCH (FLAGS (\\Seen) UID 4827943)\r\n", "* 25 FETCH (FLAGS (\\Seen))\r\n", @@ -478,7 +488,7 @@ mod tests { let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); @@ -496,16 +506,17 @@ mod tests { assert_eq!(fetches[1].header(), None); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_w_unilateral() { // https://github.com/mattnenterprise/rust-imap/issues/81 - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* 37 FETCH (UID 74)\r\n", "* 1 RECENT\r\n"]); let mut stream = async_std::stream::from_iter(responses); let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert_eq!(recv.recv().await.unwrap(), UnsolicitedResponse::Recent(1)); @@ -515,9 +526,10 @@ mod tests { assert_eq!(fetches[0].uid, Some(74)); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_names_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n", "* 4 EXPUNGE\r\n", @@ -526,7 +538,7 @@ mod tests { let id = RequestId("A0001".into()); let names = parse_names(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); @@ -535,15 +547,16 @@ mod tests { assert_eq!(names.len(), 1); assert_eq!( names[0].attributes(), - &[NameAttribute::from("\\HasNoChildren")] + &[NameAttribute::Extension("\\HasNoChildren".into())] ); assert_eq!(names[0].delimiter(), Some(".")); assert_eq!(names[0].name(), "INBOX"); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capabilities_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n", "* STATUS dev.github (MESSAGES 10 UIDNEXT 11 UIDVALIDITY 1408806928 UNSEEN 0)\r\n", @@ -576,9 +589,10 @@ mod tests { assert_eq!(recv.recv().await.unwrap(), UnsolicitedResponse::Exists(4)); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* SEARCH 23 42 4711\r\n", "* 1 RECENT\r\n", @@ -606,9 +620,10 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* SEARCH 1600 1698 1739 1781 1795 1885 1891 1892 1893 1898 1899 1901 1911 1926 1932 1933 1993 1994 2007 2032 2033 2041 2053 2062 2063 2065 2066 2072 2078 2079 2082 2084 2095 2100 2101 2102 2103 2104 2107 2116 2120 2135 2138 2154 2163 2168 2172 2189 2193 2198 2199 2205 2212 2213 2221 2227 2267 2275 2276 2295 2300 2328 2330 2332 2333 2334\r\n", "* SEARCH 2335 2336 2337 2338 2339 2341 2342 2347 2349 2350 2358 2359 2362 2369 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2390 2392 2397 2400 2401 2403 2405 2409 2411 2414 2417 2419 2420 2424 2426 2428 2439 2454 2456 2467 2468 2469 2490 2515 2519 2520 2521\r\n", @@ -639,9 +654,10 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_search() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* SEARCH\r\n"]); let mut stream = async_std::stream::from_iter(responses); @@ -653,9 +669,10 @@ mod tests { assert_eq!(ids, HashSet::::new()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_mailbox_does_not_exist_error() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "A0003 NO Mailbox doesn't exist: DeltaChat (0.001 + 0.140 + 0.139 secs).\r\n", ]); diff --git a/src/types/name.rs b/src/types/name.rs index 43374b7..be978ed 100644 --- a/src/types/name.rs +++ b/src/types/name.rs @@ -1,5 +1,4 @@ -use std::borrow::Cow; - +pub use imap_proto::types::NameAttribute; use imap_proto::{MailboxDatum, Response}; use crate::types::ResponseData; @@ -21,74 +20,15 @@ pub struct InnerName<'a> { name: &'a str, } -/// An attribute set for an IMAP name. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub enum NameAttribute<'a> { - /// It is not possible for any child levels of hierarchy to exist - /// under this name; no child levels exist now and none can be - /// created in the future. - NoInferiors, - - /// It is not possible to use this name as a selectable mailbox. - NoSelect, - - /// The mailbox has been marked "interesting" by the server; the - /// mailbox probably contains messages that have been added since - /// the last time the mailbox was selected. - Marked, - - /// The mailbox does not contain any additional messages since the - /// last time the mailbox was selected. - Unmarked, - - /// A non-standard user- or server-defined name attribute. - Custom(Cow<'a, str>), -} - -impl NameAttribute<'static> { - fn system(s: &str) -> Option { - match s { - "\\Noinferiors" => Some(NameAttribute::NoInferiors), - "\\Noselect" => Some(NameAttribute::NoSelect), - "\\Marked" => Some(NameAttribute::Marked), - "\\Unmarked" => Some(NameAttribute::Unmarked), - _ => None, - } - } -} - -impl<'a> From for NameAttribute<'a> { - fn from(s: String) -> Self { - if let Some(f) = NameAttribute::system(&s) { - f - } else { - NameAttribute::Custom(Cow::Owned(s)) - } - } -} - -impl<'a> From<&'a str> for NameAttribute<'a> { - fn from(s: &'a str) -> Self { - if let Some(f) = NameAttribute::system(s) { - f - } else { - NameAttribute::Custom(Cow::Borrowed(s)) - } - } -} - impl Name { pub(crate) fn from_mailbox_data(resp: ResponseData) -> Self { Name::new(Box::new(resp), |response| match response.parsed() { Response::MailboxData(MailboxDatum::List { - flags, + name_attributes, delimiter, name, }) => InnerName { - attributes: flags - .iter() - .map(|s| NameAttribute::from(s.as_ref())) - .collect(), + attributes: name_attributes.to_owned(), delimiter: delimiter.as_deref(), name, }, diff --git a/tests/imap_integration.rs b/tests/imap_integration.rs deleted file mode 100644 index 817cbe1..0000000 --- a/tests/imap_integration.rs +++ /dev/null @@ -1,322 +0,0 @@ -use std::borrow::Cow; -use std::time::Duration; - -use async_imap::Session; -use async_native_tls::TlsConnector; -use async_std::net::TcpStream; -use async_std::prelude::*; -use async_std::task; - -fn native_tls() -> async_native_tls::TlsConnector { - async_native_tls::TlsConnector::new() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) -} - -fn tls() -> TlsConnector { - TlsConnector::new() - .danger_accept_invalid_hostnames(true) - .danger_accept_invalid_certs(true) -} - -fn test_host() -> String { - std::env::var("TEST_HOST").unwrap_or_else(|_| "127.0.0.1".into()) -} - -async fn session(user: &str) -> Session> { - async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) - .await - .unwrap() - .login(user, user) - .await - .ok() - .unwrap() -} - -async fn smtp(user: &str) -> async_smtp::SmtpTransport { - let creds = - async_smtp::smtp::authentication::Credentials::new(user.to_string(), user.to_string()); - async_smtp::SmtpClient::with_security( - &format!("{}:3465", test_host()), - async_smtp::ClientSecurity::Wrapper(async_smtp::ClientTlsParameters { - connector: native_tls(), - domain: "localhost".to_string(), - }), - ) - .await - .expect("Failed to connect to smtp server") - .credentials(creds) - .into_transport() -} - -// #[test] -fn _connect_insecure_then_secure() { - task::block_on(async { - let stream = TcpStream::connect((test_host().as_ref(), 3143)) - .await - .unwrap(); - - // ignored because of https://github.com/greenmail-mail-test/greenmail/issues/135 - async_imap::Client::new(stream) - .secure("imap.example.com", tls()) - .await - .unwrap(); - }); -} - -#[test] -#[ignore] -fn connect_secure() { - task::block_on(async { - async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) - .await - .unwrap(); - }); -} - -#[test] -#[ignore] -fn login() { - task::block_on(async { - session("readonly-test@localhost").await; - }); -} - -#[test] -#[ignore] -fn logout() { - task::block_on(async { - let mut s = session("readonly-test@localhost").await; - s.logout().await.unwrap(); - }); -} - -#[test] -#[ignore] -fn inbox_zero() { - task::block_on(async { - // https://github.com/greenmail-mail-test/greenmail/issues/265 - let mut s = session("readonly-test@localhost").await; - s.select("INBOX").await.unwrap(); - let inbox = s.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} -fn make_email(to: &str) -> async_smtp::SendableEmail { - let message_id = "abc"; - async_smtp::SendableEmail::new( - async_smtp::Envelope::new( - Some("sender@localhost".parse().unwrap()), - vec![to.parse().unwrap()], - ) - .unwrap(), - message_id.to_string(), - format!("To: <{}>\r\nFrom: \r\nMessage-ID: <{}.msg@localhost>\r\nSubject: My first e-mail\r\n\r\nHello world from SMTP", to, message_id), - ) -} - -#[test] -#[ignore] -fn inbox() { - task::block_on(async { - let to = "inbox@localhost"; - - // first log in so we'll see the unsolicited e-mails - let mut c = session(to).await; - c.select("INBOX").await.unwrap(); - - println!("sending"); - let mut s = smtp(to).await; - - // then send the e-mail - let mail = make_email(to); - s.connect_and_send(mail).await.unwrap(); - - println!("searching"); - - // now we should see the e-mail! - let inbox = c.search("ALL").await.unwrap(); - // and the one message should have the first message sequence number - assert_eq!(inbox.len(), 1); - assert!(inbox.contains(&1)); - - // we should also get two unsolicited responses: Exists and Recent - c.noop().await.unwrap(); - println!("noop done"); - let mut unsolicited = Vec::new(); - while !c.unsolicited_responses.is_empty() { - unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); - } - - assert_eq!(unsolicited.len(), 2); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); - - println!("fetching"); - - // let's see that we can also fetch the e-mail - let fetch: Vec<_> = c - .fetch("1", "(ALL UID)") - .await - .unwrap() - .collect::, _>>() - .await - .unwrap(); - assert_eq!(fetch.len(), 1); - let fetch = &fetch[0]; - assert_eq!(fetch.message, 1); - assert_ne!(fetch.uid, None); - assert_eq!(fetch.size, Some(21)); - let e = fetch.envelope().unwrap(); - assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); - assert_ne!(e.from, None); - assert_eq!(e.from.as_ref().unwrap().len(), 1); - let from = &e.from.as_ref().unwrap()[0]; - assert_eq!(from.mailbox, Some(Cow::Borrowed(&b"sender"[..]))); - assert_eq!(from.host, Some(Cow::Borrowed(&b"localhost"[..]))); - assert_ne!(e.to, None); - assert_eq!(e.to.as_ref().unwrap().len(), 1); - let to = &e.to.as_ref().unwrap()[0]; - assert_eq!(to.mailbox, Some(Cow::Borrowed(&b"inbox"[..]))); - assert_eq!(to.host, Some(Cow::Borrowed(&b"localhost"[..]))); - let date_opt = fetch.internal_date(); - assert!(date_opt.is_some()); - - // and let's delete it to clean up - c.store("1", "+FLAGS (\\Deleted)") - .await - .unwrap() - .collect::>() - .await; - c.expunge().await.unwrap().collect::>().await; - - // the e-mail should be gone now - let inbox = c.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} - -#[test] -#[ignore] -fn inbox_uid() { - task::block_on(async { - let to = "inbox-uid@localhost"; - - // first log in so we'll see the unsolicited e-mails - let mut c = session(to).await; - c.select("INBOX").await.unwrap(); - - // then send the e-mail - let mut s = smtp(to).await; - let e = make_email(to); - s.connect_and_send(e).await.unwrap(); - - // now we should see the e-mail! - let inbox = c.uid_search("ALL").await.unwrap(); - // and the one message should have the first message sequence number - assert_eq!(inbox.len(), 1); - let uid = inbox.into_iter().next().unwrap(); - - // we should also get two unsolicited responses: Exists and Recent - c.noop().await.unwrap(); - let mut unsolicited = Vec::new(); - while !c.unsolicited_responses.is_empty() { - unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); - } - - assert_eq!(unsolicited.len(), 2); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); - - // let's see that we can also fetch the e-mail - let fetch: Vec<_> = c - .uid_fetch(format!("{}", uid), "(ALL UID)") - .await - .unwrap() - .collect::>() - .await - .unwrap(); - assert_eq!(fetch.len(), 1); - let fetch = &fetch[0]; - assert_eq!(fetch.uid, Some(uid)); - let e = fetch.envelope().unwrap(); - assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); - let date_opt = fetch.internal_date(); - assert!(date_opt.is_some()); - - // and let's delete it to clean up - c.uid_store(format!("{}", uid), "+FLAGS (\\Deleted)") - .await - .unwrap() - .collect::>() - .await; - c.expunge().await.unwrap().collect::>().await; - - // the e-mail should be gone now - let inbox = c.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} - -// #[test] -fn _list() { - task::block_on(async { - let mut s = session("readonly-test@localhost").await; - s.select("INBOX").await.unwrap(); - let subdirs: Vec<_> = s.list(None, Some("%")).await.unwrap().collect().await; - assert_eq!(subdirs.len(), 0); - - // TODO: make a subdir - }); -} - -// Greenmail does not support IDLE :( -// #[test] -fn _idle() -> async_imap::error::Result<()> { - task::block_on(async { - let mut session = session("idle-test@localhost").await; - - // get that inbox - let res = session.select("INBOX").await?; - println!("selected: {:#?}", res); - - // fetchy fetch - let msg_stream = session.fetch("1:3", "(FLAGS BODY.PEEK[])").await?; - let msgs = msg_stream.collect::>().await; - println!("msgs: {:?}", msgs.len()); - - // Idle session - println!("starting idle"); - let mut idle = session.idle(); - idle.init().await?; - - let (idle_wait, interrupt) = idle.wait_with_timeout(std::time::Duration::from_secs(30)); - println!("idle wait"); - - task::spawn(async move { - println!("waiting for 1s"); - task::sleep(Duration::from_secs(2)).await; - println!("interrupting idle"); - drop(interrupt); - }); - - let idle_result = idle_wait.await; - println!("idle result: {:#?}", &idle_result); - - // return the session after we are done with it - let mut session = idle.done().await?; - - println!("logging out"); - session.logout().await?; - - Ok(()) - }) -}