Skip to content

Commit 578d979

Browse files
authored
orig-proto: Avoid emiting HTTP/2 errors for upgraded requests (#1245)
Our error handlers need to account for the fact that HTTP/1 requests may fail with HTTP/2 errors due to orig-proto upgrading. To simplify error handling (in an upcoming change), the `orig_proto::Upgrade` service introduces a new error type, `orig_proto::DowngradedH2Error` that hides the original error source so that these errors are not detected as being an `h2::Error`.
1 parent b9e61f6 commit 578d979

File tree

4 files changed

+135
-62
lines changed

4 files changed

+135
-62
lines changed

linkerd/proxy/http/src/client.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
//! HTTP/1 messages over an H2 transport; however, some requests cannot be
66
//! proxied via this method, so it also maintains a fallback HTTP/1 client.
77
8-
use crate::{glue::UpgradeBody, h1, h2, orig_proto};
8+
use crate::{h1, h2, orig_proto};
99
use futures::prelude::*;
10-
use linkerd_error::Error;
10+
use linkerd_error::{Error, Result};
11+
use linkerd_http_box::BoxBody;
1112
use linkerd_stack::{layer, Param};
1213
use std::{
1314
marker::PhantomData,
@@ -61,8 +62,7 @@ impl From<crate::Version> for Settings {
6162

6263
// === impl MakeClient ===
6364

64-
type MakeFuture<C, T, B> =
65-
Pin<Box<dyn Future<Output = Result<Client<C, T, B>, Error>> + Send + 'static>>;
65+
type MakeFuture<C, T, B> = Pin<Box<dyn Future<Output = Result<Client<C, T, B>>> + Send + 'static>>;
6666

6767
impl<C, T, B> tower::Service<T> for MakeClient<C, B>
6868
where
@@ -80,7 +80,8 @@ where
8080
type Error = Error;
8181
type Future = MakeFuture<C, T, B>;
8282

83-
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83+
#[inline]
84+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
8485
Poll::Ready(Ok(()))
8586
}
8687

@@ -128,9 +129,7 @@ impl<C: Clone, B> Clone for MakeClient<C, B> {
128129

129130
// === impl Client ===
130131

131-
type RspFuture = Pin<
132-
Box<dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static>,
133-
>;
132+
type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
134133

135134
impl<C, T, B> tower::Service<http::Request<B>> for Client<C, T, B>
136135
where
@@ -143,18 +142,17 @@ where
143142
B::Data: Send,
144143
B::Error: Into<Error> + Send + Sync,
145144
{
146-
type Response = http::Response<UpgradeBody>;
147-
type Error = hyper::Error;
145+
type Response = http::Response<BoxBody>;
146+
type Error = Error;
148147
type Future = Instrumented<RspFuture>;
149148

150-
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151-
let res = match self {
152-
Self::H2(ref mut svc) => futures::ready!(svc.poll_ready(cx)),
153-
Self::OrigProtoUpgrade(ref mut svc) => futures::ready!(svc.poll_ready(cx)),
154-
Self::Http1(_) => Ok(()),
155-
};
156-
157-
Poll::Ready(res)
149+
#[inline]
150+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
151+
match self {
152+
Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
153+
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx),
154+
Self::Http1(_) => Poll::Ready(Ok(())),
155+
}
158156
}
159157

160158
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -174,9 +172,11 @@ where
174172
match self {
175173
Self::Http1(ref mut h1) => h1.request(req),
176174
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req),
177-
Self::H2(ref mut svc) => {
178-
Box::pin(svc.call(req).map_ok(|rsp| rsp.map(UpgradeBody::from))) as RspFuture
179-
}
175+
Self::H2(ref mut svc) => Box::pin(
176+
svc.call(req)
177+
.err_into::<Error>()
178+
.map_ok(|rsp| rsp.map(BoxBody::new)),
179+
) as RspFuture,
180180
}
181181
})
182182
.instrument(span)

linkerd/proxy/http/src/h1.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use crate::{
2-
glue::{HyperConnect, UpgradeBody},
2+
glue::HyperConnect,
33
upgrade::{Http11Upgrade, HttpConnect},
44
};
55
use futures::prelude::*;
66
use http::{
77
header::{CONNECTION, HOST, UPGRADE},
88
uri::{Authority, Parts, Scheme, Uri},
99
};
10-
use linkerd_error::Error;
10+
use linkerd_error::{Error, Result};
11+
use linkerd_http_box::BoxBody;
1112
use std::{future::Future, mem, pin::Pin, time::Duration};
1213
use tracing::{debug, trace};
1314

@@ -59,9 +60,7 @@ impl<C: Clone, T: Clone, B> Clone for Client<C, T, B> {
5960
}
6061
}
6162

62-
type RspFuture = Pin<
63-
Box<dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static>,
64-
>;
63+
type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
6564

6665
impl<C, T, B> Client<C, T, B>
6766
where
@@ -134,7 +133,7 @@ where
134133
client.as_ref().unwrap().request(req)
135134
};
136135

137-
Box::pin(rsp_fut.map_ok(move |mut rsp| {
136+
Box::pin(rsp_fut.err_into().map_ok(move |mut rsp| {
138137
if is_http_connect {
139138
debug_assert!(
140139
upgrade.is_some(),
@@ -152,7 +151,7 @@ where
152151
strip_connection_headers(rsp.headers_mut());
153152
}
154153

155-
rsp.map(UpgradeBody::from)
154+
rsp.map(BoxBody::new)
156155
}))
157156
}
158157
}

linkerd/proxy/http/src/h2.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::trace;
22
use futures::prelude::*;
3+
pub use h2::{Error as H2Error, Reason};
34
use hyper::{
45
body::HttpBody,
56
client::conn::{self, SendRequest},
67
};
7-
use linkerd_error::Error;
8+
use linkerd_error::{Error, Result};
89
use std::time::Duration;
910
use std::{
1011
future::Future,
@@ -57,8 +58,7 @@ impl<C: Clone, B> Clone for Connect<C, B> {
5758
}
5859
}
5960

60-
type ConnectFuture<B> =
61-
Pin<Box<dyn Future<Output = Result<Connection<B>, Error>> + Send + 'static>>;
61+
type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Send + 'static>>;
6262

6363
impl<C, B, T> tower::Service<T> for Connect<C, B>
6464
where
@@ -74,6 +74,7 @@ where
7474
type Error = Error;
7575
type Future = ConnectFuture<B>;
7676

77+
#[inline]
7778
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
7879
self.connect.poll_ready(cx).map_err(Into::into)
7980
}

linkerd/proxy/http/src/orig_proto.rs

Lines changed: 104 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
use super::{glue::UpgradeBody, h1, h2, upgrade};
1+
use super::{h1, h2, upgrade};
22
use futures::{future, prelude::*};
33
use http::header::{HeaderValue, TRANSFER_ENCODING};
4-
use linkerd_error::Error;
4+
use hyper::body::HttpBody;
5+
use linkerd_error::{Error, Result};
6+
use linkerd_http_box::BoxBody;
57
use linkerd_stack::layer;
68
use std::{
79
future::Future,
810
pin::Pin,
911
task::{Context, Poll},
1012
};
13+
use thiserror::Error;
1114
use tracing::{debug, trace, warn};
1215

1316
pub const L5D_ORIG_PROTO: &str = "l5d-orig-proto";
@@ -19,14 +22,24 @@ pub struct Upgrade<C, T, B> {
1922
h2: h2::Connection<B>,
2023
}
2124

25+
#[derive(Clone, Copy, Debug, Error)]
26+
#[error("upgraded connection failed with HTTP/2 reset: {0}")]
27+
pub struct DowngradedH2Error(h2::Reason);
28+
29+
#[pin_project::pin_project]
30+
#[derive(Debug)]
31+
pub struct UpgradeResponseBody {
32+
inner: hyper::Body,
33+
}
34+
2235
/// Downgrades HTTP2 requests that were previousl upgraded to their original
2336
/// protocol.
2437
#[derive(Clone, Debug)]
2538
pub struct Downgrade<S> {
2639
inner: S,
2740
}
2841

29-
// ==== impl Upgrade =====
42+
// === impl Upgrade ===
3043

3144
impl<C, T, B> Upgrade<C, T, B> {
3245
pub(crate) fn new(http1: h1::Client<C, T, B>, h2: h2::Connection<B>) -> Self {
@@ -45,23 +58,20 @@ where
4558
B::Data: Send,
4659
B::Error: Into<Error> + Send + Sync,
4760
{
48-
type Response = http::Response<UpgradeBody>;
49-
type Error = hyper::Error;
50-
type Future = Pin<
51-
Box<
52-
dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static,
53-
>,
54-
>;
61+
type Response = http::Response<BoxBody>;
62+
type Error = Error;
63+
type Future = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
5564

65+
#[inline]
5666
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57-
self.h2.poll_ready(cx)
67+
self.h2.poll_ready(cx).map_err(downgrade_h2_error)
5868
}
5969

6070
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
6171
debug_assert!(req.version() != http::Version::HTTP_2);
6272
if req.extensions().get::<upgrade::Http11Upgrade>().is_some() {
6373
debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade");
64-
return self.http1.request(req);
74+
return Box::pin(self.http1.request(req).map_ok(|rsp| rsp.map(BoxBody::new)));
6575
}
6676

6777
let orig_version = req.version();
@@ -89,28 +99,91 @@ where
8999

90100
*req.version_mut() = http::Version::HTTP_2;
91101

92-
Box::pin(self.h2.call(req).map_ok(move |mut rsp| {
93-
let version = rsp
94-
.headers_mut()
95-
.remove(L5D_ORIG_PROTO)
96-
.and_then(|orig_proto| {
97-
if orig_proto == "HTTP/1.1" {
98-
Some(http::Version::HTTP_11)
99-
} else if orig_proto == "HTTP/1.0" {
100-
Some(http::Version::HTTP_10)
101-
} else {
102-
None
103-
}
104-
})
105-
.unwrap_or(orig_version);
106-
trace!(?version, "Downgrading response");
107-
*rsp.version_mut() = version;
108-
rsp.map(UpgradeBody::from)
109-
}))
102+
Box::pin(
103+
self.h2
104+
.call(req)
105+
.map_err(downgrade_h2_error)
106+
.map_ok(move |mut rsp| {
107+
let version = rsp
108+
.headers_mut()
109+
.remove(L5D_ORIG_PROTO)
110+
.and_then(|orig_proto| {
111+
if orig_proto == "HTTP/1.1" {
112+
Some(http::Version::HTTP_11)
113+
} else if orig_proto == "HTTP/1.0" {
114+
Some(http::Version::HTTP_10)
115+
} else {
116+
None
117+
}
118+
})
119+
.unwrap_or(orig_version);
120+
trace!(?version, "Downgrading response");
121+
*rsp.version_mut() = version;
122+
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
123+
}),
124+
)
125+
}
126+
}
127+
128+
/// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This
129+
/// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the
130+
/// original request was HTTP/2.
131+
fn downgrade_h2_error(error: hyper::Error) -> Error {
132+
use std::error::Error;
133+
134+
let mut cause = error.source();
135+
while let Some(e) = cause {
136+
if let Some(e) = e.downcast_ref::<h2::H2Error>() {
137+
if let Some(reason) = e.reason() {
138+
return DowngradedH2Error(reason).into();
139+
}
140+
}
141+
142+
cause = error.source();
143+
}
144+
145+
error.into()
146+
}
147+
148+
// === impl UpgradeResponseBody ===
149+
150+
impl Default for UpgradeResponseBody {
151+
fn default() -> Self {
152+
UpgradeResponseBody {
153+
inner: Default::default(),
154+
}
155+
}
156+
}
157+
158+
impl HttpBody for UpgradeResponseBody {
159+
type Data = bytes::Bytes;
160+
type Error = Error;
161+
162+
#[inline]
163+
fn is_end_stream(&self) -> bool {
164+
self.inner.is_end_stream()
165+
}
166+
167+
fn poll_data(
168+
self: Pin<&mut Self>,
169+
cx: &mut Context<'_>,
170+
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
171+
Pin::new(self.project().inner)
172+
.poll_data(cx)
173+
.map_err(downgrade_h2_error)
174+
}
175+
176+
fn poll_trailers(
177+
self: Pin<&mut Self>,
178+
cx: &mut Context<'_>,
179+
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
180+
Pin::new(self.project().inner)
181+
.poll_trailers(cx)
182+
.map_err(downgrade_h2_error)
110183
}
111184
}
112185

113-
// ===== impl Downgrade =====
186+
// === impl Downgrade ===
114187

115188
impl<S> Downgrade<S> {
116189
pub fn layer() -> impl layer::Layer<S, Service = Self> + Copy + Clone {

0 commit comments

Comments
 (0)