Skip to content

Commit b601901

Browse files
committed
get FolderSync::done to run only in the final reduce
1 parent c9f6989 commit b601901

File tree

3 files changed

+74
-24
lines changed

3 files changed

+74
-24
lines changed

amadeus-core/src/par_sink/combiner.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ mod macros {
88
($combiner:ty, $self:ident, $init:expr) => {
99
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
1010
type Pipe = P;
11-
type ReduceA = FolderSyncReducer<P::Output, $combiner>;
12-
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>;
11+
type ReduceA = FolderSyncReducer<P::Output, $combiner, crate::par_sink::Inter>;
12+
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner, crate::par_sink::Final>;
1313

1414
fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
1515
let init = $init;
@@ -26,9 +26,9 @@ mod macros {
2626
($combiner:ty, $self:ident, $init:expr) => {
2727
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
2828
type Pipe = P;
29-
type ReduceA = FolderSyncReducer<P::Output, $combiner>;
30-
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>;
31-
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $combiner>;
29+
type ReduceA = FolderSyncReducer<P::Output, $combiner, crate::par_sink::Inter>;
30+
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner, crate::par_sink::Inter>;
31+
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $combiner, crate::par_sink::Final>;
3232

3333
fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
3434
let init = $init;

amadeus-core/src/par_sink/folder.rs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ mod macros {
1818
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
1919
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
2020
type Pipe = P;
21-
type ReduceA = FolderSyncReducer<P::Output, $folder_a>;
22-
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b>;
21+
type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
22+
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Final>;
2323

2424
fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
2525
let init_a = $init_a;
@@ -37,9 +37,9 @@ mod macros {
3737
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
3838
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
3939
type Pipe = P;
40-
type ReduceA = FolderSyncReducer<P::Output, $folder_a>;
41-
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b>;
42-
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b>;
40+
type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
41+
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Inter>;
42+
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b, crate::par_sink::Final>;
4343

4444
fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
4545
let init_a = $init_a;
@@ -68,23 +68,56 @@ pub trait FolderSync<Item> {
6868
fn done(&mut self, state: Self::State) -> Self::Done;
6969
}
7070

71+
pub struct Inter;
72+
pub struct Final;
73+
7174
#[derive(Educe, Serialize, Deserialize, new)]
7275
#[educe(Clone(bound = "F: Clone"))]
7376
#[serde(
7477
bound(serialize = "F: Serialize"),
7578
bound(deserialize = "F: Deserialize<'de>")
7679
)]
77-
pub struct FolderSyncReducer<Item, F> {
80+
pub struct FolderSyncReducer<Item, F, Final> {
7881
folder: F,
79-
marker: PhantomData<fn() -> Item>,
82+
marker: PhantomData<fn() -> (Item, Final)>,
83+
}
84+
85+
impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Inter>
86+
where
87+
F: FolderSync<Item>,
88+
{
89+
type Done = F::State;
90+
type Async = FolderSyncReducerAsync<Item, F, F::State, Inter>;
91+
92+
fn into_async(mut self) -> Self::Async {
93+
FolderSyncReducerAsync {
94+
state: Some(self.folder.zero()),
95+
folder: self.folder,
96+
marker: PhantomData,
97+
}
98+
}
99+
}
100+
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Inter>
101+
where
102+
F: FolderSync<Item>,
103+
F::State: ProcessSend + 'static,
104+
{
105+
type Done = F::State;
106+
}
107+
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Inter>
108+
where
109+
F: FolderSync<Item>,
110+
F::State: Send + 'static,
111+
{
112+
type Done = F::State;
80113
}
81114

82-
impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F>
115+
impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Final>
83116
where
84117
F: FolderSync<Item>,
85118
{
86119
type Done = F::Done;
87-
type Async = FolderSyncReducerAsync<Item, F, F::State>;
120+
type Async = FolderSyncReducerAsync<Item, F, F::State, Final>;
88121

89122
fn into_async(mut self) -> Self::Async {
90123
FolderSyncReducerAsync {
@@ -94,14 +127,14 @@ where
94127
}
95128
}
96129
}
97-
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F>
130+
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Final>
98131
where
99132
F: FolderSync<Item>,
100133
F::Done: ProcessSend + 'static,
101134
{
102135
type Done = F::Done;
103136
}
104-
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F>
137+
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Final>
105138
where
106139
F: FolderSync<Item>,
107140
F::Done: Send + 'static,
@@ -110,12 +143,31 @@ where
110143
}
111144

112145
#[pin_project]
113-
pub struct FolderSyncReducerAsync<Item, F, S> {
146+
pub struct FolderSyncReducerAsync<Item, F, S, Final> {
114147
state: Option<S>,
115148
folder: F,
116-
marker: PhantomData<fn() -> Item>,
149+
marker: PhantomData<fn() -> (Item, Final)>,
150+
}
151+
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Inter>
152+
where
153+
F: FolderSync<Item>,
154+
{
155+
type Done = F::State;
156+
157+
#[inline(always)]
158+
fn poll_forward(
159+
self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
160+
) -> Poll<Self::Done> {
161+
let self_ = self.project();
162+
let folder = self_.folder;
163+
let state = self_.state.as_mut().unwrap();
164+
while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
165+
folder.push(state, item);
166+
}
167+
Poll::Ready(self_.state.take().unwrap())
168+
}
117169
}
118-
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State>
170+
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Final>
119171
where
120172
F: FolderSync<Item>,
121173
{

amadeus-core/src/par_sink/stats.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct Mean<P, B> {
1818
impl_par_dist! {
1919
impl<P: ParallelPipe<Item>, Item, B> ParallelSink<Item> for Mean<P, B>
2020
where
21-
B: iter::Sum<P::Output> + iter::Sum<B> + Send + 'static,
21+
B: iter::Sum<P::Output> + iter::Sum<B> + ToPrimitive + Send + 'static,
2222
{
2323
folder_par_sink!(
2424
MeanFolder<B, StepA>,
@@ -46,7 +46,7 @@ where
4646
B: iter::Sum<Item> + iter::Sum<B> + ToPrimitive,
4747
{
4848
type State = (B, usize);
49-
type Done = f64;
49+
type Done = Self::State;
5050

5151
#[inline(always)]
5252
fn zero(&mut self) -> Self::State {
@@ -65,9 +65,7 @@ where
6565

6666
#[inline(always)]
6767
fn done(&mut self, state: Self::State) -> Self::Done {
68-
let sum = state.0;
69-
let count = state.1 as f64;
70-
B::to_f64(&sum).map(|sum| sum / count).unwrap()
68+
state
7169
}
7270
}
7371

0 commit comments

Comments
 (0)