Skip to content

Commit c147399

Browse files
Replace parts of futures-util with std APIs
1 parent 2098ac2 commit c147399

14 files changed

+60
-63
lines changed

postgres/src/connection.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::{Error, Notification};
2-
use futures_util::{future, pin_mut, Stream};
2+
use futures_util::Stream;
33
use std::collections::VecDeque;
4-
use std::future::Future;
4+
use std::future::{self, Future};
55
use std::ops::{Deref, DerefMut};
6-
use std::pin::Pin;
6+
use std::pin::{pin, Pin};
77
use std::sync::Arc;
88
use std::task::{Context, Poll};
99
use tokio::io::{AsyncRead, AsyncWrite};
@@ -52,7 +52,7 @@ impl Connection {
5252
where
5353
F: Future<Output = Result<T, Error>>,
5454
{
55-
pin_mut!(future);
55+
let mut future = pin!(future);
5656
self.poll_block_on(|cx, _, _| future.as_mut().poll(cx))
5757
}
5858

postgres/src/notifications.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
use crate::connection::ConnectionRef;
44
use crate::{Error, Notification};
55
use fallible_iterator::FallibleIterator;
6-
use futures_util::{ready, FutureExt};
6+
use futures_util::FutureExt;
77
use std::pin::Pin;
8-
use std::task::Poll;
8+
use std::task::{ready, Poll};
99
use std::time::Duration;
1010
use tokio::time::{self, Instant, Sleep};
1111

tokio-postgres/src/binary_copy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::types::{FromSql, IsNull, ToSql, Type, WrongType};
44
use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
55
use byteorder::{BigEndian, ByteOrder};
66
use bytes::{Buf, BufMut, Bytes, BytesMut};
7-
use futures_util::{ready, SinkExt, Stream};
7+
use futures_util::{SinkExt, Stream};
88
use pin_project_lite::pin_project;
99
use postgres_types::BorrowToSql;
1010
use std::convert::TryFrom;
@@ -13,7 +13,7 @@ use std::io::Cursor;
1313
use std::ops::Range;
1414
use std::pin::Pin;
1515
use std::sync::Arc;
16-
use std::task::{Context, Poll};
16+
use std::task::{ready, Context, Poll};
1717

1818
const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
1919
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;

tokio-postgres/src/client.rs

+12-11
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@ use crate::{
1919
use bytes::{Buf, BytesMut};
2020
use fallible_iterator::FallibleIterator;
2121
use futures_channel::mpsc;
22-
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
22+
use futures_util::{ready, StreamExt, TryStreamExt};
2323
use parking_lot::Mutex;
2424
use postgres_protocol::message::backend::Message;
2525
use postgres_types::BorrowToSql;
2626
use std::collections::HashMap;
2727
use std::fmt;
28+
use std::future;
2829
#[cfg(feature = "runtime")]
2930
use std::net::IpAddr;
3031
#[cfg(feature = "runtime")]
3132
use std::path::PathBuf;
33+
use std::pin::pin;
3234
use std::sync::Arc;
3335
use std::task::{Context, Poll};
3436
#[cfg(feature = "runtime")]
@@ -300,8 +302,7 @@ impl Client {
300302
where
301303
T: ?Sized + ToStatement,
302304
{
303-
let stream = self.query_raw(statement, slice_iter(params)).await?;
304-
pin_mut!(stream);
305+
let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);
305306

306307
let mut first = None;
307308

@@ -336,18 +337,18 @@ impl Client {
336337
///
337338
/// ```no_run
338339
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
339-
/// use futures_util::{pin_mut, TryStreamExt};
340+
/// use std::pin::pin;
341+
/// use futures_util::TryStreamExt;
340342
///
341343
/// let params: Vec<String> = vec![
342344
/// "first param".into(),
343345
/// "second param".into(),
344346
/// ];
345-
/// let mut it = client.query_raw(
347+
/// let mut it = pin!(client.query_raw(
346348
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
347349
/// params,
348-
/// ).await?;
350+
/// ).await?);
349351
///
350-
/// pin_mut!(it);
351352
/// while let Some(row) = it.try_next().await? {
352353
/// let foo: i32 = row.get("foo");
353354
/// println!("foo: {}", foo);
@@ -402,19 +403,19 @@ impl Client {
402403
///
403404
/// ```no_run
404405
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
405-
/// use futures_util::{pin_mut, TryStreamExt};
406+
/// use std::pin::pin;
407+
/// use futures_util::{TryStreamExt};
406408
/// use tokio_postgres::types::Type;
407409
///
408410
/// let params: Vec<(String, Type)> = vec![
409411
/// ("first param".into(), Type::TEXT),
410412
/// ("second param".into(), Type::TEXT),
411413
/// ];
412-
/// let mut it = client.query_typed_raw(
414+
/// let mut it = pin!(client.query_typed_raw(
413415
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
414416
/// params,
415-
/// ).await?;
417+
/// ).await?);
416418
///
417-
/// pin_mut!(it);
418419
/// while let Some(row) = it.try_next().await? {
419420
/// let foo: i32 = row.get("foo");
420421
/// println!("foo: {}", foo);

tokio-postgres/src/connect.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use crate::connect_raw::connect_raw;
44
use crate::connect_socket::connect_socket;
55
use crate::tls::MakeTlsConnect;
66
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
7-
use futures_util::{future, pin_mut, Future, FutureExt, Stream};
7+
use futures_util::{FutureExt, Stream};
88
use rand::seq::SliceRandom;
9+
use std::future::{self, Future};
10+
use std::pin::pin;
911
use std::task::Poll;
1012
use std::{cmp, io};
1113
use tokio::net;
@@ -161,18 +163,18 @@ where
161163
let (mut client, mut connection) = connect_raw(socket, tls, has_hostname, config).await?;
162164

163165
if config.target_session_attrs != TargetSessionAttrs::Any {
164-
let rows = client.simple_query_raw("SHOW transaction_read_only");
165-
pin_mut!(rows);
166+
let mut rows = pin!(client.simple_query_raw("SHOW transaction_read_only"));
166167

167-
let rows = future::poll_fn(|cx| {
168-
if connection.poll_unpin(cx)?.is_ready() {
169-
return Poll::Ready(Err(Error::closed()));
170-
}
168+
let mut rows = pin!(
169+
future::poll_fn(|cx| {
170+
if connection.poll_unpin(cx)?.is_ready() {
171+
return Poll::Ready(Err(Error::closed()));
172+
}
171173

172-
rows.as_mut().poll(cx)
173-
})
174-
.await?;
175-
pin_mut!(rows);
174+
rows.as_mut().poll(cx)
175+
})
176+
.await?
177+
);
176178

177179
loop {
178180
let next = future::poll_fn(|cx| {

tokio-postgres/src/connect_raw.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{Client, Connection, Error};
77
use bytes::BytesMut;
88
use fallible_iterator::FallibleIterator;
99
use futures_channel::mpsc;
10-
use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt};
10+
use futures_util::{Sink, SinkExt, Stream, TryStreamExt};
1111
use postgres_protocol::authentication;
1212
use postgres_protocol::authentication::sasl;
1313
use postgres_protocol::authentication::sasl::ScramSha256;
@@ -17,7 +17,7 @@ use std::borrow::Cow;
1717
use std::collections::{HashMap, VecDeque};
1818
use std::io;
1919
use std::pin::Pin;
20-
use std::task::{Context, Poll};
20+
use std::task::{ready, Context, Poll};
2121
use tokio::io::{AsyncRead, AsyncWrite};
2222
use tokio_util::codec::Framed;
2323

tokio-postgres/src/connection.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use crate::{AsyncMessage, Error, Notification};
66
use bytes::BytesMut;
77
use fallible_iterator::FallibleIterator;
88
use futures_channel::mpsc;
9-
use futures_util::{ready, stream::FusedStream, Sink, Stream, StreamExt};
9+
use futures_util::{stream::FusedStream, Sink, Stream, StreamExt};
1010
use log::{info, trace};
1111
use postgres_protocol::message::backend::Message;
1212
use postgres_protocol::message::frontend;
1313
use std::collections::{HashMap, VecDeque};
1414
use std::future::Future;
1515
use std::pin::Pin;
16-
use std::task::{Context, Poll};
16+
use std::task::{ready, Context, Poll};
1717
use tokio::io::{AsyncRead, AsyncWrite};
1818
use tokio_util::codec::Framed;
1919

tokio-postgres/src/copy_in.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ use crate::query::extract_row_affected;
55
use crate::{query, slice_iter, Error, Statement};
66
use bytes::{Buf, BufMut, BytesMut};
77
use futures_channel::mpsc;
8-
use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt};
8+
use futures_util::{Sink, SinkExt, Stream, StreamExt};
99
use log::debug;
1010
use pin_project_lite::pin_project;
1111
use postgres_protocol::message::backend::Message;
1212
use postgres_protocol::message::frontend;
1313
use postgres_protocol::message::frontend::CopyData;
14+
use std::future;
1415
use std::marker::{PhantomData, PhantomPinned};
1516
use std::pin::Pin;
16-
use std::task::{Context, Poll};
17+
use std::task::{ready, Context, Poll};
1718

1819
enum CopyInMessage {
1920
Message(FrontendMessage),

tokio-postgres/src/copy_out.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::{query, slice_iter, Error, Statement};
55
use bytes::Bytes;
6-
use futures_util::{ready, Stream};
6+
use futures_util::Stream;
77
use log::debug;
88
use pin_project_lite::pin_project;
99
use postgres_protocol::message::backend::Message;
1010
use std::marker::PhantomPinned;
1111
use std::pin::Pin;
12-
use std::task::{Context, Poll};
12+
use std::task::{ready, Context, Poll};
1313

1414
pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
1515
debug!("executing copy out statement {}", statement.name());

tokio-postgres/src/prepare.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use crate::{query, slice_iter};
77
use crate::{Column, Error, Statement};
88
use bytes::Bytes;
99
use fallible_iterator::FallibleIterator;
10-
use futures_util::{pin_mut, TryStreamExt};
10+
use futures_util::TryStreamExt;
1111
use log::debug;
1212
use postgres_protocol::message::backend::Message;
1313
use postgres_protocol::message::frontend;
1414
use std::future::Future;
15-
use std::pin::Pin;
15+
use std::pin::{pin, Pin};
1616
use std::sync::atomic::{AtomicUsize, Ordering};
1717
use std::sync::Arc;
1818

@@ -142,8 +142,7 @@ pub(crate) async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type
142142

143143
let stmt = typeinfo_statement(client).await?;
144144

145-
let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
146-
pin_mut!(rows);
145+
let mut rows = pin!(query::query(client, stmt, slice_iter(&[&oid])).await?);
147146

148147
let row = match rows.try_next().await? {
149148
Some(row) => row,

tokio-postgres/src/query.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::types::{BorrowToSql, IsNull};
66
use crate::{Column, Error, Portal, Row, Statement};
77
use bytes::{Bytes, BytesMut};
88
use fallible_iterator::FallibleIterator;
9-
use futures_util::{ready, Stream};
9+
use futures_util::Stream;
1010
use log::{debug, log_enabled, Level};
1111
use pin_project_lite::pin_project;
1212
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
@@ -16,7 +16,7 @@ use std::fmt;
1616
use std::marker::PhantomPinned;
1717
use std::pin::Pin;
1818
use std::sync::Arc;
19-
use std::task::{Context, Poll};
19+
use std::task::{ready, Context, Poll};
2020

2121
struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
2222

tokio-postgres/src/simple_query.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ use crate::query::extract_row_affected;
55
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
66
use bytes::Bytes;
77
use fallible_iterator::FallibleIterator;
8-
use futures_util::{ready, Stream};
8+
use futures_util::Stream;
99
use log::debug;
1010
use pin_project_lite::pin_project;
1111
use postgres_protocol::message::backend::Message;
1212
use postgres_protocol::message::frontend;
1313
use std::marker::PhantomPinned;
1414
use std::pin::Pin;
1515
use std::sync::Arc;
16-
use std::task::{Context, Poll};
16+
use std::task::{ready, Context, Poll};
1717

1818
/// Information about a column of a single query row.
1919
#[derive(Debug)]

tokio-postgres/tests/test/binary_copy.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::connect;
2-
use futures_util::{pin_mut, TryStreamExt};
2+
use futures_util::TryStreamExt;
3+
use std::pin::pin;
34
use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream};
45
use tokio_postgres::types::Type;
56

@@ -16,8 +17,7 @@ async fn write_basic() {
1617
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
1718
.await
1819
.unwrap();
19-
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
20-
pin_mut!(writer);
20+
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));
2121
writer.as_mut().write(&[&1i32, &"foobar"]).await.unwrap();
2222
writer
2323
.as_mut()
@@ -50,8 +50,7 @@ async fn write_many_rows() {
5050
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
5151
.await
5252
.unwrap();
53-
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
54-
pin_mut!(writer);
53+
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));
5554

5655
for i in 0..10_000i32 {
5756
writer
@@ -86,8 +85,7 @@ async fn write_big_rows() {
8685
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
8786
.await
8887
.unwrap();
89-
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]);
90-
pin_mut!(writer);
88+
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]));
9189

9290
for i in 0..2i32 {
9391
writer

tokio-postgres/tests/test/main.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
use bytes::{Bytes, BytesMut};
44
use futures_channel::mpsc;
5-
use futures_util::{
6-
future, join, pin_mut, stream, try_join, Future, FutureExt, SinkExt, StreamExt, TryStreamExt,
7-
};
5+
use futures_util::{join, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt};
86
use pin_project_lite::pin_project;
97
use std::fmt::Write;
10-
use std::pin::Pin;
8+
use std::future::{self, Future};
9+
use std::pin::{pin, Pin};
1110
use std::task::{Context, Poll};
1211
use std::time::Duration;
1312
use tokio::net::TcpStream;
@@ -589,8 +588,7 @@ async fn copy_in() {
589588
.into_iter()
590589
.map(Ok::<_, Error>),
591590
);
592-
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
593-
pin_mut!(sink);
591+
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
594592
sink.send_all(&mut stream).await.unwrap();
595593
let rows = sink.finish().await.unwrap();
596594
assert_eq!(rows, 2);
@@ -636,8 +634,7 @@ async fn copy_in_large() {
636634
.map(Ok::<_, Error>),
637635
);
638636

639-
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
640-
pin_mut!(sink);
637+
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
641638
sink.send_all(&mut stream).await.unwrap();
642639
let rows = sink.finish().await.unwrap();
643640
assert_eq!(rows, 10_000);
@@ -658,8 +655,7 @@ async fn copy_in_error() {
658655
.unwrap();
659656

660657
{
661-
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
662-
pin_mut!(sink);
658+
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
663659
sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap();
664660
}
665661

0 commit comments

Comments
 (0)