Skip to content

Commit 8d9befa

Browse files
committed
adding stats functions
1 parent e2e7d72 commit 8d9befa

File tree

9 files changed

+103
-45
lines changed

9 files changed

+103
-45
lines changed

amadeus-core/src/par_sink/combine.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ where
2020
F: FnMut<(A, A), Output = A>,
2121
Item: Into<Option<A>>,
2222
{
23-
type Done = Option<A>;
23+
type State = Option<A>;
24+
type Done = Self::State;
2425

25-
fn zero(&mut self) -> Self::Done {
26+
fn zero(&mut self) -> Self::State {
2627
None
2728
}
28-
fn push(&mut self, state: &mut Self::Done, item: Item) {
29+
fn push(&mut self, state: &mut Self::State, item: Item) {
2930
if let Some(item) = item.into() {
3031
*state = Some(if let Some(state) = state.take() {
3132
self.0.call_mut((state, item))
@@ -34,6 +35,8 @@ where
3435
});
3536
}
3637
}
38+
fn done(&mut self, state: Self::State) -> Self::Done { state }
39+
3740
}
3841

3942
#[derive(new)]

amadeus-core/src/par_sink/combiner.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ where
5757
C: CombinerSync<Done = B>,
5858
Item: Into<Option<B>>,
5959
{
60-
type Done = Option<B>;
60+
type State = Option<B>;
61+
type Done = Self::State;
6162

62-
fn zero(&mut self) -> Self::Done {
63+
fn zero(&mut self) -> Self::State {
6364
None
6465
}
65-
fn push(&mut self, state: &mut Self::Done, item: Item) {
66+
fn push(&mut self, state: &mut Self::State, item: Item) {
6667
if let Some(item) = item.into() {
6768
*state = Some(if let Some(state) = state.take() {
6869
self.combine(state, item)
@@ -71,4 +72,6 @@ where
7172
});
7273
}
7374
}
75+
fn done(&mut self, state: Self::State) -> Self::Done { state }
76+
7477
}

amadeus-core/src/par_sink/count.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@ impl_par_dist! {
2727
pub struct CountFolder;
2828

2929
impl<Item> FolderSync<Item> for CountFolder {
30-
type Done = usize;
30+
type State = usize;
31+
type Done = Self::State;
3132

3233
#[inline(always)]
33-
fn zero(&mut self) -> Self::Done {
34+
fn zero(&mut self) -> Self::State {
3435
0
3536
}
3637
#[inline(always)]
37-
fn push(&mut self, state: &mut Self::Done, _item: Item) {
38+
fn push(&mut self, state: &mut Self::State, _item: Item) {
3839
*state += 1;
3940
}
41+
#[inline(always)]
42+
fn done(&mut self, state: Self::State) -> Self::Done { state }
43+
4044
}

amadeus-core/src/par_sink/fold.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,28 +50,35 @@ where
5050
ID: FnMut<(), Output = B>,
5151
F: FnMut<(B, Either<Item, B>), Output = B>,
5252
{
53-
type Done = B;
53+
type State = B;
54+
type Done = Self::State;
5455

55-
fn zero(&mut self) -> Self::Done {
56+
fn zero(&mut self) -> Self::State {
5657
self.identity.call_mut(())
5758
}
58-
fn push(&mut self, state: &mut Self::Done, item: Item) {
59+
fn push(&mut self, state: &mut Self::State, item: Item) {
5960
replace_with_or_abort(state, |state| self.op.call_mut((state, Either::Left(item))))
6061
}
62+
fn done(&mut self, state: Self::State) -> Self::Done { state }
63+
6164
}
6265
impl<A, ID, F, Item> FolderSync<Item> for FoldFolder<A, ID, F, Item, StepB>
6366
where
6467
ID: FnMut<(), Output = Item>,
6568
F: FnMut<(Item, Either<A, Item>), Output = Item>,
6669
{
67-
type Done = Item;
70+
type State = Item;
71+
type Done = Self::State;
6872

69-
fn zero(&mut self) -> Self::Done {
73+
fn zero(&mut self) -> Self::State {
7074
self.identity.call_mut(())
7175
}
72-
fn push(&mut self, state: &mut Self::Done, item: Item) {
76+
fn push(&mut self, state: &mut Self::State, item: Item) {
7377
replace_with_or_abort(state, |state| {
7478
self.op.call_mut((state, Either::Right(item)))
7579
})
7680
}
81+
#[inline(always)]
82+
fn done(&mut self, state: Self::State) -> Self::Done { state }
83+
7784
}

amadeus-core/src/par_sink/folder.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,13 @@ mod macros {
6060
pub(crate) use macros::{folder_dist_sink, folder_par_sink};
6161

6262
pub trait FolderSync<Item> {
63+
type State;
6364
type Done;
6465

65-
fn zero(&mut self) -> Self::Done;
66-
fn push(&mut self, state: &mut Self::Done, item: Item);
66+
fn zero(&mut self) -> Self::State;
67+
fn push(&mut self, state: &mut Self::State, item: Item);
68+
fn done(&mut self, state: Self::State) -> Self::Done;
69+
6770
}
6871

6972
#[derive(Educe, Serialize, Deserialize, new)]

amadeus-core/src/par_sink/histogram.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,29 @@ impl<Item> FolderSync<Item> for HistogramFolder<Item, StepA>
3838
where
3939
Item: Hash + Ord,
4040
{
41-
type Done = HashMap<Item, usize>;
41+
type State = HashMap<Item, usize>;
42+
type Done = Self::State;
4243

43-
fn zero(&mut self) -> Self::Done {
44+
fn zero(&mut self) -> Self::State {
4445
HashMap::new()
4546
}
4647
fn push(&mut self, state: &mut Self::Done, item: Item) {
4748
*state.entry(item).or_insert(0) += 1;
4849
}
50+
fn done(&mut self, state: Self::State) -> Self::Done { state }
51+
4952
}
5053
impl<B> FolderSync<HashMap<B, usize>> for HistogramFolder<B, StepB>
5154
where
5255
B: Hash + Ord,
5356
{
54-
type Done = Vec<(B, usize)>;
57+
type State = Vec<(B, usize)>;
58+
type Done = Self::State;
5559

56-
fn zero(&mut self) -> Self::Done {
60+
fn zero(&mut self) -> Self::State {
5761
Vec::new()
5862
}
59-
fn push(&mut self, state: &mut Self::Done, b: HashMap<B, usize>) {
63+
fn push(&mut self, state: &mut Self::State, b: HashMap<B, usize>) {
6064
let mut b = b.into_iter().collect::<Vec<_>>();
6165
b.sort_by(|a, b| a.0.cmp(&b.0));
6266
replace_with_or_default(state, |state| {
@@ -73,17 +77,20 @@ where
7377
.collect()
7478
})
7579
}
80+
fn done(&mut self, state: Self::State) -> Self::Done { state }
81+
7682
}
7783
impl<B> FolderSync<Vec<(B, usize)>> for HistogramFolder<B, StepB>
7884
where
7985
B: Hash + Ord,
8086
{
81-
type Done = Vec<(B, usize)>;
87+
type State = Vec<(B, usize)>;
88+
type Done = Self::State;
8289

83-
fn zero(&mut self) -> Self::Done {
90+
fn zero(&mut self) -> Self::State {
8491
Vec::new()
8592
}
86-
fn push(&mut self, state: &mut Self::Done, b: Vec<(B, usize)>) {
93+
fn push(&mut self, state: &mut Self::State, b: Vec<(B, usize)>) {
8794
replace_with_or_default(state, |state| {
8895
state
8996
.into_iter()
@@ -98,4 +105,7 @@ where
98105
.collect()
99106
})
100107
}
108+
#[inline(always)]
109+
fn done(&mut self, state: Self::State) -> Self::Done { state }
110+
101111
}

amadeus-core/src/par_sink/sample.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,17 @@ pub struct SampleUnstableFolder {
3838
}
3939

4040
impl<Item> FolderSync<Item> for SampleUnstableFolder {
41-
type Done = SASampleUnstable<Item>;
41+
type State = SASampleUnstable<Item>;
42+
type Done = Self::State;
4243

43-
fn zero(&mut self) -> Self::Done {
44+
fn zero(&mut self) -> Self::State {
4445
SASampleUnstable::new(self.samples)
4546
}
46-
fn push(&mut self, state: &mut Self::Done, item: Item) {
47+
fn push(&mut self, state: &mut Self::State, item: Item) {
4748
state.push(item, &mut thread_rng())
4849
}
50+
fn done(&mut self, state: Self::State) -> Self::Done { state }
51+
4952
}
5053

5154
#[derive(new)]
@@ -83,14 +86,17 @@ impl<Item> FolderSync<Item> for MostFrequentFolder
8386
where
8487
Item: Clone + Hash + Eq + Send + 'static,
8588
{
86-
type Done = Top<Item, usize>;
89+
type State = Top<Item, usize>;
90+
type Done = Self::State;
8791

88-
fn zero(&mut self) -> Self::Done {
92+
fn zero(&mut self) -> Self::State {
8993
Top::new(self.n, self.probability, self.tolerance, ())
9094
}
91-
fn push(&mut self, state: &mut Self::Done, item: Item) {
95+
fn push(&mut self, state: &mut Self::State, item: Item) {
9296
state.push(item, &1)
9397
}
98+
fn done(&mut self, state: Self::State) -> Self::Done { state }
99+
94100
}
95101

96102
#[derive(new)]
@@ -137,12 +143,15 @@ where
137143
A: Clone + Hash + Eq + Send + 'static,
138144
B: Hash + 'static,
139145
{
140-
type Done = Top<A, HyperLogLogMagnitude<B>>;
146+
type State = Top<A, HyperLogLogMagnitude<B>>;
147+
type Done = Self::State;
141148

142-
fn zero(&mut self) -> Self::Done {
149+
fn zero(&mut self) -> Self::State {
143150
Top::new(self.n, self.probability, self.tolerance, self.error_rate)
144151
}
145-
fn push(&mut self, state: &mut Self::Done, item: (A, B)) {
152+
fn push(&mut self, state: &mut Self::State, item: (A, B)) {
146153
state.push(item.0, &item.1)
147154
}
155+
fn done(&mut self, state: Self::State) -> Self::Done { state }
156+
148157
}

amadeus-core/src/par_sink/stats.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,26 @@ impl<B, Item> FolderSync<Item> for MeanFolder<B, StepA>
4444
where
4545
B: iter::Sum<Item> + iter::Sum<B>,
4646
{
47-
type Done = (B, usize);
47+
type State = (B, usize);
48+
type Done = Self::State;
4849

4950
#[inline(always)]
50-
fn zero(&mut self) -> Self::Done {
51+
fn zero(&mut self) -> Self::State {
5152
(iter::empty::<B>().sum(),0)
5253
}
5354

5455
#[inline(always)]
55-
fn push(&mut self, state: &mut Self::Done, item: Item) {
56+
fn push(&mut self, state: &mut Self::State, item: Item) {
5657
let zero = iter::empty::<B>().sum();
5758
let left = mem::replace(&mut state.0, zero);
5859
let right = iter::once(item).sum::<B>();
5960

6061
state.0 = B::sum(iter::once(left).chain(iter::once(right)));
6162
state.1 += 1;
6263
}
64+
65+
#[inline(always)]
66+
fn done(&mut self, state: Self::State) -> Self::Done { state }
6367
}
6468

6569

@@ -68,20 +72,25 @@ impl<B> FolderSync<(B, usize)> for MeanFolder<B, StepB>
6872
where
6973
B: iter::Sum<B>
7074
{
71-
type Done = (B, usize);
75+
type State = (B, usize);
76+
type Done = Self::State;
7277

7378
#[inline(always)]
74-
fn zero(&mut self) -> Self::Done {
79+
fn zero(&mut self) -> Self::State {
7580
(iter::empty().sum(),0)
7681
}
7782

7883
#[inline(always)]
79-
fn push(&mut self, state: &mut Self::Done, item: (B, usize)) {
84+
fn push(&mut self, state: &mut Self::State, item: (B, usize)) {
8085
let zero = iter::empty().sum();
8186
let left = mem::replace(&mut state.0, zero);
8287
let right = iter::once(item.0).sum();
8388

8489
state.0 = B::sum(iter::once(left).chain(iter::once(right)));
8590
state.1 += 1;
8691
}
92+
93+
#[inline(always)]
94+
fn done(&mut self, state: Self::State) -> Self::Done { state }
95+
8796
}

amadeus-core/src/par_sink/sum.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,24 @@ impl<Item, B> FolderSync<Item> for SumFolder<B>
3939
where
4040
B: iter::Sum<Item> + iter::Sum<B>,
4141
{
42-
type Done = B;
42+
type State = B;
43+
type Done = Self::State;
4344

4445
#[inline(always)]
4546
fn zero(&mut self) -> Self::Done {
4647
iter::empty::<B>().sum()
4748
}
4849
#[inline(always)]
49-
fn push(&mut self, state: &mut Self::Done, item: Item) {
50+
fn push(&mut self, state: &mut Self::State, item: Item) {
5051
let zero = iter::empty::<B>().sum();
5152
let left = mem::replace(state, zero);
5253
let right = iter::once(item).sum::<B>();
5354
*state = B::sum(iter::once(left).chain(iter::once(right)));
5455
}
56+
57+
#[inline(always)]
58+
fn done(&mut self, state: Self::State) -> Self::Done { state }
59+
5560
}
5661

5762
#[derive(Clone, Serialize, Deserialize)]
@@ -69,18 +74,23 @@ impl<Item> FolderSync<Item> for SumZeroFolder<Item>
6974
where
7075
Option<Item>: iter::Sum<Item>,
7176
{
72-
type Done = Item;
77+
type State = Item;
78+
type Done = Self::State;
7379

7480
#[inline(always)]
75-
fn zero(&mut self) -> Self::Done {
81+
fn zero(&mut self) -> Self::State {
7682
self.zero.take().unwrap()
7783
}
7884
#[inline(always)]
79-
fn push(&mut self, state: &mut Self::Done, item: Item) {
85+
fn push(&mut self, state: &mut Self::State, item: Item) {
8086
replace_with_or_abort(state, |left| {
8187
let right = iter::once(item).sum::<Option<Item>>().unwrap();
8288
<Option<Item> as iter::Sum<Item>>::sum(iter::once(left).chain(iter::once(right)))
8389
.unwrap()
8490
})
8591
}
92+
93+
#[inline(always)]
94+
fn done(&mut self, state: Self::State) -> Self::Done { state }
95+
8696
}

0 commit comments

Comments
 (0)