Skip to content

Commit 34620e6

Browse files
committed
And now work around rust-lang/rust#53443
1 parent b4c3fa2 commit 34620e6

File tree

3 files changed

+115
-119
lines changed

3 files changed

+115
-119
lines changed

noria-benchmarks/vote/clients/localsoup/mod.rs

Lines changed: 68 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ pub(crate) struct LocalNoria {
2323
unsafe impl Send for LocalNoria {}
2424

2525
impl VoteClient for LocalNoria {
26-
existential type NewFuture: Future<Item = Self, Error = failure::Error>;
27-
existential type ReadFuture: Future<Item = (), Error = failure::Error>;
28-
existential type WriteFuture: Future<Item = (), Error = failure::Error>;
26+
// TODO: existential once https://github.com/rust-lang/rust/issues/53546 is fixed
27+
type NewFuture = Box<Future<Item = Self, Error = failure::Error> + Send>;
28+
type ReadFuture = Box<Future<Item = (), Error = failure::Error> + Send>;
29+
type WriteFuture = Box<Future<Item = (), Error = failure::Error> + Send>;
2930

3031
fn spawn(
3132
ex: tokio::runtime::TaskExecutor,
@@ -72,48 +73,50 @@ impl VoteClient for LocalNoria {
7273
println!("Prepopulating with {} articles", params.articles);
7374
}
7475

75-
g.and_then(|mut g| g.graph.handle().table("Article").map(move |a| (g, a)))
76-
.and_then(move |(g, mut a)| {
77-
if fudge {
78-
a.i_promise_dst_is_same_process();
79-
}
80-
81-
a.perform_all((0..params.articles).map(|i| {
82-
vec![
83-
((i + 1) as i32).into(),
84-
format!("Article #{}", i + 1).into(),
85-
]
86-
}))
87-
.map(move |_| g)
88-
.then(|r| {
89-
r.context("failed to do article prepopulation")
90-
.map_err(Into::into)
76+
Box::new(
77+
g.and_then(|mut g| g.graph.handle().table("Article").map(move |a| (g, a)))
78+
.and_then(move |(g, mut a)| {
79+
if fudge {
80+
a.i_promise_dst_is_same_process();
81+
}
82+
83+
a.perform_all((0..params.articles).map(|i| {
84+
vec![
85+
((i + 1) as i32).into(),
86+
format!("Article #{}", i + 1).into(),
87+
]
88+
}))
89+
.map(move |_| g)
90+
.then(|r| {
91+
r.context("failed to do article prepopulation")
92+
.map_err(Into::into)
93+
})
9194
})
92-
})
93-
.and_then(move |mut g| {
94-
if verbose {
95-
println!("Done with prepopulation");
96-
}
97-
98-
// TODO: allow writes to propagate
99-
100-
g.graph
101-
.handle()
102-
.view("ArticleWithVoteCount")
103-
.and_then(move |r| {
104-
g.graph.handle().table("Vote").map(move |mut w| {
105-
if fudge {
106-
// fudge write rpcs by sending just the pointer over tcp
107-
w.i_promise_dst_is_same_process();
108-
}
109-
LocalNoria {
110-
_g: Arc::new(g),
111-
r: Some(r),
112-
w: Some(w),
113-
}
95+
.and_then(move |mut g| {
96+
if verbose {
97+
println!("Done with prepopulation");
98+
}
99+
100+
// TODO: allow writes to propagate
101+
102+
g.graph
103+
.handle()
104+
.view("ArticleWithVoteCount")
105+
.and_then(move |r| {
106+
g.graph.handle().table("Vote").map(move |mut w| {
107+
if fudge {
108+
// fudge write rpcs by sending just the pointer over tcp
109+
w.i_promise_dst_is_same_process();
110+
}
111+
LocalNoria {
112+
_g: Arc::new(g),
113+
r: Some(r),
114+
w: Some(w),
115+
}
116+
})
114117
})
115-
})
116-
})
118+
}),
119+
)
117120
}
118121

119122
fn handle_writes(&mut self, ids: &[i32]) -> Self::WriteFuture {
@@ -122,11 +125,13 @@ impl VoteClient for LocalNoria {
122125
.map(|&article_id| vec![(article_id as usize).into(), 0.into()])
123126
.collect();
124127

125-
self.w
126-
.as_mut()
127-
.unwrap()
128-
.perform_all(data)
129-
.map_err(failure::Error::from)
128+
Box::new(
129+
self.w
130+
.as_mut()
131+
.unwrap()
132+
.perform_all(data)
133+
.map_err(failure::Error::from),
134+
)
130135
}
131136

132137
fn handle_reads(&mut self, ids: &[i32]) -> Self::ReadFuture {
@@ -136,19 +141,21 @@ impl VoteClient for LocalNoria {
136141
.collect();
137142

138143
let len = ids.len();
139-
self.r
140-
.as_mut()
141-
.unwrap()
142-
.multi_lookup(arg, true)
143-
.map(|rows| {
144-
// TODO
145-
//assert_eq!(rows.map(|rows| rows.len()), Ok(1));
146-
rows.len()
147-
})
148-
.map(move |rows| {
149-
assert_eq!(rows, len);
150-
})
151-
.map_err(failure::Error::from)
144+
Box::new(
145+
self.r
146+
.as_mut()
147+
.unwrap()
148+
.multi_lookup(arg, true)
149+
.map(|rows| {
150+
// TODO
151+
//assert_eq!(rows.map(|rows| rows.len()), Ok(1));
152+
rows.len()
153+
})
154+
.map(move |rows| {
155+
assert_eq!(rows, len);
156+
})
157+
.map_err(failure::Error::from),
158+
)
152159
}
153160
}
154161

noria-benchmarks/vote/clients/netsoup.rs

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ pub(crate) struct Conn {
1313
}
1414

1515
impl VoteClient for Conn {
16-
existential type NewFuture: Future<Item = Self, Error = failure::Error>;
17-
existential type ReadFuture: Future<Item = (), Error = failure::Error>;
18-
existential type WriteFuture: Future<Item = (), Error = failure::Error>;
16+
// TODO: existential once https://github.com/rust-lang/rust/issues/53546 is fixed
17+
type NewFuture = Box<Future<Item = Self, Error = failure::Error> + Send>;
18+
type ReadFuture = Box<Future<Item = (), Error = failure::Error> + Send>;
19+
type WriteFuture = Box<Future<Item = (), Error = failure::Error> + Send>;
1920

2021
fn spawn(
2122
_: tokio::runtime::TaskExecutor,
@@ -28,37 +29,42 @@ impl VoteClient for Conn {
2829
args.value_of("deployment").unwrap()
2930
);
3031

31-
ZookeeperAuthority::new(&zk)
32-
.into_future()
33-
.and_then(ControllerHandle::new)
34-
.and_then(move |mut c| {
35-
if params.prime {
36-
// for prepop, we need a mutator
37-
future::Either::A(
38-
c.install_recipe(RECIPE)
39-
.and_then(move |_| c.table("Article").map(move |a| (c, a)))
40-
.and_then(move |(c, mut a)| {
41-
a.perform_all((0..params.articles).map(|i| {
42-
vec![((i + 1) as i32).into(), format!("Article #{}", i).into()]
43-
}))
44-
.map(move |_| c)
45-
.then(|r| {
46-
r.context("failed to do article prepopulation")
47-
.map_err(Into::into)
48-
})
49-
}),
50-
)
51-
} else {
52-
future::Either::B(future::ok(c))
53-
}
54-
})
55-
.and_then(|mut c| c.table("Vote").map(move |v| (c, v)))
56-
.and_then(|(mut c, v)| c.view("ArticleWithVoteCount").map(move |awvc| (c, v, awvc)))
57-
.map(|(c, v, awvc)| Conn {
58-
ch: c,
59-
r: Some(awvc),
60-
w: Some(v),
61-
})
32+
Box::new(
33+
ZookeeperAuthority::new(&zk)
34+
.into_future()
35+
.and_then(ControllerHandle::new)
36+
.and_then(move |mut c| {
37+
if params.prime {
38+
// for prepop, we need a mutator
39+
future::Either::A(
40+
c.install_recipe(RECIPE)
41+
.and_then(move |_| c.table("Article").map(move |a| (c, a)))
42+
.and_then(move |(c, mut a)| {
43+
a.perform_all((0..params.articles).map(|i| {
44+
vec![
45+
((i + 1) as i32).into(),
46+
format!("Article #{}", i).into(),
47+
]
48+
}))
49+
.map(move |_| c)
50+
.then(|r| {
51+
r.context("failed to do article prepopulation")
52+
.map_err(Into::into)
53+
})
54+
}),
55+
)
56+
} else {
57+
future::Either::B(future::ok(c))
58+
}
59+
})
60+
.and_then(|mut c| c.table("Vote").map(move |v| (c, v)))
61+
.and_then(|(mut c, v)| c.view("ArticleWithVoteCount").map(move |awvc| (c, v, awvc)))
62+
.map(|(c, v, awvc)| Conn {
63+
ch: c,
64+
r: Some(awvc),
65+
w: Some(v),
66+
}),
67+
)
6268
}
6369

6470
fn handle_writes(&mut self, ids: &[i32]) -> Self::WriteFuture {

noria/src/table.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -213,30 +213,11 @@ pub struct Table {
213213
shard_addrs: Vec<SocketAddr>,
214214
}
215215

216-
// b/c of https://github.com/rust-lang/rust/issues/53546
217-
pub fn map1(
218-
_: (),
219-
_: Tagged<()>,
220-
) -> Result<
221-
(),
222-
tower_buffer::Error<
223-
tower_balance::Error<
224-
tower_buffer::Error<
225-
tokio_tower::multiplex::client::Error<
226-
tokio_tower::multiplex::MultiplexTransport<Transport, Tagger>,
227-
>,
228-
>,
229-
tokio_tower::multiplex::client::SpawnError<std::io::Error>,
230-
>,
231-
>,
232-
> {
233-
Ok(())
234-
}
235-
236216
impl Service<Input> for Table {
237217
type Error = TableError;
238218
type Response = <TableRpc as Service<Tagged<LocalOrNot<Input>>>>::Response;
239-
existential type Future: Future<Item = Tagged<()>, Error = TableError>;
219+
// existential once https://github.com/rust-lang/rust/issues/53443 is fixed
220+
type Future = Box<Future<Item = Tagged<()>, Error = TableError> + Send>;
240221

241222
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
242223
for s in &mut self.shards {
@@ -251,7 +232,8 @@ impl Service<Input> for Table {
251232
// TODO: check each row's .len() against self.columns.len() -> WrongColumnCount
252233

253234
if self.shards.len() == 1 {
254-
future::Either::A(
235+
// when Box goes away, this becomes future::Either::A
236+
Box::new(
255237
self.shards[0]
256238
.call(
257239
if self.dst_is_local {
@@ -310,9 +292,10 @@ impl Service<Input> for Table {
310292
}
311293
}
312294

313-
future::Either::B(
295+
// when Box goes away, this becomes future::Either::B
296+
Box::new(
314297
wait_for
315-
.fold((), map1)
298+
.fold((), |_, _| Ok(()))
316299
.map_err(TableError::from)
317300
.map(Tagged::from),
318301
)

0 commit comments

Comments
 (0)