Skip to content

Commit e2e7d72

Browse files
committed
adding stats functions
1 parent e118942 commit e2e7d72

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

amadeus-core/src/par_sink.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod pipe;
1515
mod sample;
1616
mod sum;
1717
mod tuple;
18+
mod stats;
1819

1920
use super::par_pipe::*;
2021
use crate::{pipe::Sink, pool::ProcessSend};

amadeus-core/src/par_sink/stats.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use derive_new::new;
2+
use educe::Educe;
3+
use serde::{Deserialize, Serialize};
4+
use std::{iter, marker::PhantomData, mem};
5+
6+
use super::{
7+
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink
8+
};
9+
10+
#[derive(new)]
11+
#[must_use]
12+
pub struct Mean<P, B> {
13+
pipe: P,
14+
marker: PhantomData<fn() -> B>,
15+
}
16+
17+
impl_par_dist! {
18+
impl<P: ParallelPipe<Item>, Item, B> ParallelSink<Item> for Mean<P, B>
19+
where
20+
B: iter::Sum<P::Output> + iter::Sum<B> + Send + 'static,
21+
{
22+
folder_par_sink!(
23+
MeanFolder<B, StepA>,
24+
MeanFolder<B, StepB>,
25+
self,
26+
MeanFolder::new(),
27+
MeanFolder::new()
28+
);
29+
}
30+
}
31+
32+
#[derive(Educe, Serialize, Deserialize, new)]
33+
#[educe(Clone)]
34+
#[serde(bound = "")]
35+
36+
pub struct MeanFolder<B, Step> {
37+
marker: PhantomData<fn() -> (B, Step)>,
38+
}
39+
40+
pub struct StepA;
41+
pub struct StepB;
42+
43+
impl<B, Item> FolderSync<Item> for MeanFolder<B, StepA>
44+
where
45+
B: iter::Sum<Item> + iter::Sum<B>,
46+
{
47+
type Done = (B, usize);
48+
49+
#[inline(always)]
50+
fn zero(&mut self) -> Self::Done {
51+
(iter::empty::<B>().sum(),0)
52+
}
53+
54+
#[inline(always)]
55+
fn push(&mut self, state: &mut Self::Done, item: Item) {
56+
let zero = iter::empty::<B>().sum();
57+
let left = mem::replace(&mut state.0, zero);
58+
let right = iter::once(item).sum::<B>();
59+
60+
state.0 = B::sum(iter::once(left).chain(iter::once(right)));
61+
state.1 += 1;
62+
}
63+
}
64+
65+
66+
67+
impl<B> FolderSync<(B, usize)> for MeanFolder<B, StepB>
68+
where
69+
B: iter::Sum<B>
70+
{
71+
type Done = (B, usize);
72+
73+
#[inline(always)]
74+
fn zero(&mut self) -> Self::Done {
75+
(iter::empty().sum(),0)
76+
}
77+
78+
#[inline(always)]
79+
fn push(&mut self, state: &mut Self::Done, item: (B, usize)) {
80+
let zero = iter::empty().sum();
81+
let left = mem::replace(&mut state.0, zero);
82+
let right = iter::once(item.0).sum();
83+
84+
state.0 = B::sum(iter::once(left).chain(iter::once(right)));
85+
state.1 += 1;
86+
}
87+
}

0 commit comments

Comments
 (0)