Skip to content

Commit 14ffe13

Browse files
committed
vendor streaming_algorithm
2 parents 3d343cc + 8d0b99c commit 14ffe13

File tree

18 files changed

+3699
-12
lines changed

18 files changed

+3699
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ amadeus-commoncrawl = { version = "=0.3.7", path = "amadeus-commoncrawl", option
4444
amadeus-parquet = { version = "=0.3.7", path = "amadeus-parquet", optional = true }
4545
amadeus-postgres = { version = "=0.3.7", path = "amadeus-postgres", optional = true }
4646
amadeus-serde = { version = "=0.3.7", path = "amadeus-serde", optional = true }
47+
amadeus-streaming = { version = "=0.3.7", path = "amadeus-streaming" }
4748
async-channel = "1.1"
4849
bincode = { version = "1.3", optional = true }
4950
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
@@ -72,7 +73,6 @@ doc-comment = "0.3"
7273
either = { version = "1.5", features = ["serde"] }
7374
rand = "0.7"
7475
serde_json = "1.0"
75-
streaming_algorithms = "0.3"
7676
tokio = { version = "0.2", features = ["macros", "time"] }
7777

7878
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]

amadeus-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
1919
maintenance = { status = "actively-developed" }
2020

2121
[dependencies]
22+
amadeus-streaming = { version = "=0.3.7", path = "../amadeus-streaming" }
2223
async-trait = "0.1"
2324
derive-new = "0.5"
2425
educe = "0.4"
@@ -33,7 +34,6 @@ rand = "0.7"
3334
replace_with = "0.1"
3435
serde = { version = "1.0", features = ["derive"] }
3536
serde_closure = "0.3"
36-
streaming_algorithms = "0.3"
3737
sum = { version = "0.1", features = ["futures", "serde"] }
3838
tokio = { version = "0.2", features = ["blocking", "rt-core"] }
3939
walkdir = "2.2"

amadeus-core/src/par_sink/sample.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#![allow(clippy::type_complexity)]
22

3+
use amadeus_streaming::{HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Top};
34
use derive_new::new;
45
use rand::thread_rng;
56
use serde::{Deserialize, Serialize};
67
use std::hash::Hash;
7-
use streaming_algorithms::{HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Top};
88

99
use super::{
1010
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder, SumZeroFolder

amadeus-core/src/par_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ macro_rules! stream {
296296
#[inline]
297297
async fn most_frequent<P>(
298298
self, pool: &P, n: usize, probability: f64, tolerance: f64,
299-
) -> ::streaming_algorithms::Top<Self::Item, usize>
299+
) -> ::amadeus_streaming::Top<Self::Item, usize>
300300
where
301301
P: $pool,
302302
Self::Item: Hash + Eq + Clone + $send + 'static,
@@ -313,7 +313,7 @@ macro_rules! stream {
313313
#[inline]
314314
async fn most_distinct<P, A, B>(
315315
self, pool: &P, n: usize, probability: f64, tolerance: f64, error_rate: f64,
316-
) -> ::streaming_algorithms::Top<A, streaming_algorithms::HyperLogLogMagnitude<B>>
316+
) -> ::amadeus_streaming::Top<A, amadeus_streaming::HyperLogLogMagnitude<B>>
317317
where
318318
P: $pool,
319319
Self: $stream<Item = (A, B)> + Sized,
@@ -337,7 +337,7 @@ macro_rules! stream {
337337
#[inline]
338338
async fn sample_unstable<P>(
339339
self, pool: &P, samples: usize,
340-
) -> ::streaming_algorithms::SampleUnstable<Self::Item>
340+
) -> ::amadeus_streaming::SampleUnstable<Self::Item>
341341
where
342342
P: $pool,
343343
Self::Item: $send + 'static,

amadeus-streaming/Cargo.toml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[package]
2+
name = "amadeus-streaming"
3+
version = "0.3.7"
4+
license = "Apache-2.0"
5+
authors = ["Alec Mocatta <[email protected]>"]
6+
categories = ["data-structures", "algorithms", "science"]
7+
keywords = ["streaming-algorithm", "probabilistic", "sketch", "data-structure", "hyperloglog"]
8+
description = """
9+
SIMD-accelerated implementations of various streaming algorithms, including Count–min sketch, Top k, HyperLogLog, Reservoir sampling.
10+
"""
11+
repository = "https://github.com/alecmocatta/amadeus"
12+
homepage = "https://github.com/alecmocatta/amadeus"
13+
documentation = "https://docs.rs/amadeus"
14+
readme = "README.md"
15+
edition = "2018"
16+
17+
[badges]
18+
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
19+
maintenance = { status = "actively-developed" }
20+
21+
[features]
22+
nightly = ["packed_simd"]
23+
24+
[dependencies]
25+
twox-hash = "1.1"
26+
serde = { version = "1.0", features = ["derive"] }
27+
rand = { version = "0.7", features = ["small_rng"] }
28+
packed_simd = { version = "0.3", features = ["into_bits"], optional = true }
29+
30+
[build-dependencies]
31+
rustversion = "1.0"

amadeus-streaming/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# amadeus-streaming
2+
3+
SIMD-accelerated implementations of various [streaming algorithms](https://en.wikipedia.org/wiki/Streaming_algorithm).
4+
5+
This library is a work in progress. PRs are very welcome! Currently implemented algorithms include:
6+
7+
* Count–min sketch
8+
* Top k (Count–min sketch plus a doubly linked hashmap to track heavy hitters / top k keys when ordered by aggregated value)
9+
* HyperLogLog
10+
* Reservoir sampling
11+
12+
A goal of this library is to enable composition of these algorithms; for example Top k + HyperLogLog to enable an approximate version of something akin to `SELECT key FROM table GROUP BY key ORDER BY COUNT(DISTINCT value) DESC LIMIT k`.
13+
14+
Run your application with `RUSTFLAGS="-C target-cpu=native"` and the `nightly` feature to benefit from the SIMD-acceleration like so:
15+
16+
```bash
17+
RUSTFLAGS="-C target-cpu=native" cargo run --features "streaming_algorithms/nightly" --release
18+
```
19+
20+
See [this gist](https://gist.github.com/debasishg/8172796) for a good list of further algorithms to be implemented. Other resources are [Probabilistic data structures – Wikipedia](https://en.wikipedia.org/wiki/Category:Probabilistic_data_structures), [DataSketches – A similar Java library originating at Yahoo](https://datasketches.github.io/), and [Algebird – A similar Java library originating at Twitter](https://github.com/twitter/algebird).
21+
22+
As these implementations are often in hot code paths, unsafe is used, albeit only when necessary to a) achieve the asymptotically optimal algorithm or b) mitigate an observed bottleneck.
23+
24+
## License
25+
Licensed under either of
26+
27+
* Apache License, Version 2.0, ([LICENSE-APACHE.txt](LICENSE-APACHE.txt) or http://www.apache.org/licenses/LICENSE-2.0)
28+
* MIT license ([LICENSE-MIT.txt](LICENSE-MIT.txt) or http://opensource.org/licenses/MIT)
29+
30+
at your option.
31+
32+
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

amadeus-streaming/build.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
fn main() {
2+
println!("cargo:rerun-if-changed=build.rs");
3+
4+
nightly();
5+
}
6+
7+
#[rustversion::nightly]
8+
fn nightly() {
9+
println!("cargo:rustc-cfg=nightly");
10+
}
11+
#[rustversion::not(nightly)]
12+
fn nightly() {}

0 commit comments

Comments
 (0)