Skip to content

Commit 6dd16e9

Browse files
authored
transport: Improve TCP server error messages (#1372)
The TCP server can fail connections in a few situations: * `accept(2)` can fail * setting thte TCP keepalive can fail * getting the client peer address can fail It's not easy to differentiate these errors when inspecting proxy logs. This change adds wrapper error types with custom error messages that disambiguate the error cause and updates the `Bind` trait to use boxed error types (instead of `io::Error`). Additionally, this change includes the client address when logging information/warnings about connection errors in general. This is redundant when logging is set at debug, but will be helpful when logging is set at the normal level.
1 parent 611b8ed commit 6dd16e9

File tree

8 files changed

+46
-26
lines changed

8 files changed

+46
-26
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1447,9 +1447,11 @@ version = "0.1.0"
14471447
dependencies = [
14481448
"futures",
14491449
"libc",
1450+
"linkerd-error",
14501451
"linkerd-io",
14511452
"linkerd-stack",
14521453
"socket2 0.4.2",
1454+
"thiserror",
14531455
"tokio",
14541456
"tokio-stream",
14551457
"tracing",

linkerd/app/core/src/serve.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
io,
33
svc::{self, Param},
44
transport::{ClientAddr, Remote},
5+
Result,
56
};
67
use futures::prelude::*;
78
use linkerd_error::Error;
@@ -12,7 +13,7 @@ use tracing::{debug, debug_span, info, instrument::Instrument, warn};
1213
///
1314
/// The task is driven until shutdown is signaled.
1415
pub async fn serve<M, S, I, A>(
15-
listen: impl Stream<Item = std::io::Result<(A, I)>>,
16+
listen: impl Stream<Item = Result<(A, I)>>,
1617
new_accept: M,
1718
shutdown: impl Future,
1819
) where
@@ -39,7 +40,8 @@ pub async fn serve<M, S, I, A>(
3940
};
4041

4142
// The local addr should be instrumented from the listener's context.
42-
let span = debug_span!("accept", client.addr = %addrs.param()).entered();
43+
let Remote(ClientAddr(client_addr)) = addrs.param();
44+
let span = debug_span!("accept", client.addr = %client_addr).entered();
4345
let accept = new_accept.new_service(addrs);
4446

4547
// Dispatch all of the work for a given connection onto a
@@ -57,15 +59,17 @@ pub async fn serve<M, S, I, A>(
5759
Err(reason) if is_io(&*reason) => {
5860
debug!(%reason, "Connection closed")
5961
}
60-
Err(error) => info!(%error, "Connection closed"),
62+
Err(error) => {
63+
info!(%error, client.addr = %client_addr, "Connection closed")
64+
}
6165
}
6266
// Hold the service until the connection is complete. This
6367
// helps tie any inner cache lifetimes to the services they
6468
// return.
6569
drop(accept);
6670
}
6771
Err(error) => {
68-
warn!(%error, "Server failed to become ready");
72+
warn!(%error, client.addr = %client_addr, "Server failed to become ready");
6973
}
7074
}
7175
}

linkerd/app/inbound/src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use futures::Stream;
33
use linkerd_app_core::{
44
dns, io, metrics, profiles, serve, svc,
55
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
6-
Error,
6+
Error, Result,
77
};
88
use std::fmt::Debug;
99
use tracing::debug_span;
@@ -30,7 +30,7 @@ impl Inbound<()> {
3030
pub async fn serve<A, I, G, GSvc, P>(
3131
self,
3232
addr: Local<ServerAddr>,
33-
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
33+
listen: impl Stream<Item = Result<(A, I)>> + Send + Sync + 'static,
3434
policies: impl policy::CheckPolicy + Clone + Send + Sync + 'static,
3535
profiles: P,
3636
gateway: G,

linkerd/app/integration/src/proxy.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use super::*;
2-
use app_core::transport::OrigDstAddr;
32
use linkerd_app_core::{
43
svc::Param,
4+
transport::OrigDstAddr,
55
transport::{listen, orig_dst, Keepalive, ListenAddr},
6+
Result,
67
};
78
use std::{fmt, future::Future, net::SocketAddr, pin::Pin, task::Poll, thread};
89
use tokio::net::TcpStream;
@@ -65,22 +66,18 @@ where
6566
{
6667
type Addrs = orig_dst::Addrs;
6768
type Io = tokio::net::TcpStream;
68-
type Incoming = Pin<
69-
Box<dyn Stream<Item = io::Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>,
70-
>;
69+
type Incoming =
70+
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;
7171

72-
fn bind(self, params: &T) -> io::Result<listen::Bound<Self::Incoming>> {
72+
fn bind(self, params: &T) -> Result<listen::Bound<Self::Incoming>> {
7373
let (bound, incoming) = listen::BindTcp::default().bind(params)?;
7474
let incoming = Box::pin(incoming.map(move |res| {
7575
let (inner, tcp) = res?;
7676
let orig_dst = match self {
7777
Self::Addr(addr) => OrigDstAddr(addr),
7878
Self::Direct => OrigDstAddr(inner.server.into()),
7979
Self::None => {
80-
return Err(io::Error::new(
81-
io::ErrorKind::Other,
82-
"No mocked SO_ORIG_DST",
83-
))
80+
return Err("No mocked SO_ORIG_DST".into());
8481
}
8582
};
8683
let addrs = orig_dst::Addrs { inner, orig_dst };

linkerd/app/outbound/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use linkerd_app_core::{
3333
svc::{self, stack::Param},
3434
tls,
3535
transport::{self, addrs::*},
36-
AddrMatch, Error, ProxyRuntime,
36+
AddrMatch, Error, ProxyRuntime, Result,
3737
};
3838
use std::{
3939
collections::{HashMap, HashSet},
@@ -149,7 +149,7 @@ impl<S> Outbound<S> {
149149
impl Outbound<()> {
150150
pub async fn serve<A, I, P, R>(
151151
self,
152-
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
152+
listen: impl Stream<Item = Result<(A, I)>> + Send + Sync + 'static,
153153
profiles: P,
154154
resolve: R,
155155
) where

linkerd/proxy/transport/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ Transport-level implementations that rely on core proxy infrastructure
1111

1212
[dependencies]
1313
futures = { version = "0.3", default-features = false }
14+
linkerd-error = { path = "../../error" }
1415
linkerd-io = { path = "../../io" }
1516
linkerd-stack = { path = "../../stack" }
1617
socket2 = "0.4"
18+
thiserror = "1"
1719
tokio = { version = "1", features = ["macros", "net"] }
1820
tokio-stream = { version = "0.1", features = ["net"] }
1921
tracing = "0.1.29"

linkerd/proxy/transport/src/listen.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::{addrs::*, Keepalive};
22
use futures::prelude::*;
3+
use linkerd_error::Result;
34
use linkerd_io as io;
45
use linkerd_stack::Param;
56
use std::{fmt, pin::Pin};
7+
use thiserror::Error;
68
use tokio::net::TcpStream;
79
use tokio_stream::wrappers::TcpListenerStream;
810

@@ -21,9 +23,9 @@ pub trait Bind<T> {
2123
+ Sync
2224
+ 'static;
2325
type Addrs: Clone + Send + Sync + 'static;
24-
type Incoming: Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync + 'static;
26+
type Incoming: Stream<Item = Result<(Self::Addrs, Self::Io)>> + Send + Sync + 'static;
2527

26-
fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>>;
28+
fn bind(self, params: &T) -> Result<Bound<Self::Incoming>>;
2729
}
2830

2931
pub type Bound<I> = (Local<ServerAddr>, I);
@@ -37,6 +39,18 @@ pub struct Addrs {
3739
pub client: Remote<ClientAddr>,
3840
}
3941

42+
#[derive(Debug, Error)]
43+
#[error("failed to accept socket: {0}")]
44+
struct AcceptError(#[source] io::Error);
45+
46+
#[derive(Debug, Error)]
47+
#[error("failed to set TCP keepalive: {0}")]
48+
struct KeepaliveError(#[source] io::Error);
49+
50+
#[derive(Debug, Error)]
51+
#[error("failed to obtain peer address: {0}")]
52+
struct PeerAddrError(#[source] io::Error);
53+
4054
// === impl BindTcp ===
4155

4256
impl BindTcp {
@@ -50,10 +64,10 @@ where
5064
T: Param<ListenAddr> + Param<Keepalive>,
5165
{
5266
type Addrs = Addrs;
53-
type Incoming = Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync>>;
67+
type Incoming = Pin<Box<dyn Stream<Item = Result<(Self::Addrs, Self::Io)>> + Send + Sync>>;
5468
type Io = TcpStream;
5569

56-
fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>> {
70+
fn bind(self, params: &T) -> Result<Bound<Self::Incoming>> {
5771
let listen = {
5872
let ListenAddr(addr) = params.param();
5973
let l = std::net::TcpListener::bind(addr)?;
@@ -64,10 +78,10 @@ where
6478
let server = Local(ServerAddr(listen.local_addr()?));
6579
let Keepalive(keepalive) = params.param();
6680
let accept = TcpListenerStream::new(listen).map(move |res| {
67-
let tcp = res?;
81+
let tcp = res.map_err(AcceptError)?;
6882
super::set_nodelay_or_warn(&tcp);
69-
let tcp = super::set_keepalive_or_warn(tcp, keepalive)?;
70-
let client = Remote(ClientAddr(tcp.peer_addr()?));
83+
let tcp = super::set_keepalive_or_warn(tcp, keepalive).map_err(KeepaliveError)?;
84+
let client = Remote(ClientAddr(tcp.peer_addr().map_err(PeerAddrError)?));
7185
Ok((Addrs { server, client }, tcp))
7286
});
7387

linkerd/proxy/transport/src/orig_dst.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
listen::{self, Bind, Bound},
44
};
55
use futures::prelude::*;
6+
use linkerd_error::Result;
67
use linkerd_io as io;
78
use linkerd_stack::Param;
89
use std::pin::Pin;
@@ -60,9 +61,9 @@ where
6061
type Addrs = Addrs<B::Addrs>;
6162
type Io = TcpStream;
6263
type Incoming =
63-
Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, TcpStream)>> + Send + Sync + 'static>>;
64+
Pin<Box<dyn Stream<Item = Result<(Self::Addrs, TcpStream)>> + Send + Sync + 'static>>;
6465

65-
fn bind(self, t: &T) -> io::Result<Bound<Self::Incoming>> {
66+
fn bind(self, t: &T) -> Result<Bound<Self::Incoming>> {
6667
let (addr, incoming) = self.inner.bind(t)?;
6768

6869
let incoming = incoming.map(|res| {

0 commit comments

Comments
 (0)