Skip to content

Commit 2bd17a3

Browse files
author
刘黎
committed
Merge branch 'master' into 'master'
基准测试数据和数据量统计留存 See merge request noah/argo_engine!28
2 parents 1f53c3c + 40e16d7 commit 2bd17a3

File tree

254 files changed

+3141
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

254 files changed

+3141
-67
lines changed

benchmarks/examples/ssb_query_example.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ use chrono::prelude::*;
22
use datafusion::arrow::util::pretty::print_batches;
33
use datafusion::datasource::listing::ListingTable;
44
use datafusion::logical_plan::{count_distinct, Expr};
5-
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
6-
use datafusion::optimizer::constant_folding::ConstantFolding;
7-
use datafusion::optimizer::eliminate_limit::EliminateLimit;
8-
use datafusion::optimizer::filter_push_down::FilterPushDown;
9-
use datafusion::optimizer::limit_push_down::LimitPushDown;
10-
use datafusion::optimizer::projection_push_down::ProjectionPushDown;
11-
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
12-
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
13-
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
14-
use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
15-
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
5+
// use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
6+
// use datafusion::optimizer::constant_folding::ConstantFolding;
7+
// use datafusion::optimizer::eliminate_limit::EliminateLimit;
8+
// use datafusion::optimizer::filter_push_down::FilterPushDown;
9+
// use datafusion::optimizer::limit_push_down::LimitPushDown;
10+
// use datafusion::optimizer::projection_push_down::ProjectionPushDown;
11+
// use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
12+
// use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
13+
// use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
14+
// use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
15+
// use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
1616
use datafusion::physical_optimizer::repartition::Repartition;
1717
use datafusion::physical_plan::{collect, displayable};
1818
use datafusion::prelude::*;
@@ -34,22 +34,22 @@ async fn main() -> datafusion::error::Result<()> {
3434
let execution_config = ExecutionConfig::new()
3535
.with_optimizer_rules(
3636
vec![
37-
Arc::new(ConstantFolding::new()),
38-
Arc::new(CommonSubexprEliminate::new()),
39-
Arc::new(EliminateLimit::new()),
40-
Arc::new(ProjectionPushDown::new()),
41-
Arc::new(FilterPushDown::new()),
42-
Arc::new(SimplifyExpressions::new()),
43-
Arc::new(LimitPushDown::new()),
37+
// Arc::new(ConstantFolding::new()),
38+
// Arc::new(CommonSubexprEliminate::new()),
39+
// Arc::new(EliminateLimit::new()),
40+
// Arc::new(ProjectionPushDown::new()),
41+
// Arc::new(FilterPushDown::new()),
42+
// Arc::new(SimplifyExpressions::new()),
43+
// Arc::new(LimitPushDown::new()),
4444
]
4545
)
4646
.with_physical_optimizer_rules(
4747
vec![
48-
Arc::new(AggregateStatistics::new()),
49-
Arc::new(HashBuildProbeOrder::new()),
50-
Arc::new(CoalesceBatches::new()),
51-
Arc::new(Repartition::new()),
52-
Arc::new(AddCoalescePartitionsExec::new()),
48+
// Arc::new(AggregateStatistics::new()),
49+
// Arc::new(HashBuildProbeOrder::new()),
50+
// Arc::new(CoalesceBatches::new()),
51+
// Arc::new(Repartition::new()),
52+
// Arc::new(AddCoalescePartitionsExec::new()),
5353
]
5454
)
5555
//.with_target_partitions(8)

benchmarks/examples/ssb_sql_example.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ use datafusion::arrow::util::pretty::print_batches;
2222
use std::sync::Arc;
2323

2424
use datafusion::error::Result;
25-
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
26-
use datafusion::optimizer::constant_folding::ConstantFolding;
27-
use datafusion::optimizer::eliminate_limit::EliminateLimit;
28-
use datafusion::optimizer::filter_push_down::FilterPushDown;
29-
use datafusion::optimizer::limit_push_down::LimitPushDown;
30-
use datafusion::optimizer::projection_push_down::ProjectionPushDown;
31-
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
32-
use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
33-
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
34-
use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
35-
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
25+
// use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
26+
// use datafusion::optimizer::constant_folding::ConstantFolding;
27+
// use datafusion::optimizer::eliminate_limit::EliminateLimit;
28+
// use datafusion::optimizer::filter_push_down::FilterPushDown;
29+
// use datafusion::optimizer::limit_push_down::LimitPushDown;
30+
// use datafusion::optimizer::projection_push_down::ProjectionPushDown;
31+
// use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
32+
// use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
33+
// use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
34+
// use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
35+
// use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
3636
use datafusion::prelude::*;
3737
use utils::test_util;
3838

@@ -51,23 +51,23 @@ async fn main() -> Result<()> {
5151
let execution_config = ExecutionConfig::new()
5252
.with_optimizer_rules(
5353
vec![
54-
Arc::new(ConstantFolding::new()),
55-
Arc::new(CommonSubexprEliminate::new()),
56-
Arc::new(EliminateLimit::new()),
57-
Arc::new(ProjectionPushDown::new()),
58-
Arc::new(FilterPushDown::new()),
59-
Arc::new(SimplifyExpressions::new()),
60-
Arc::new(LimitPushDown::new()),
54+
// Arc::new(ConstantFolding::new()),
55+
// Arc::new(CommonSubexprEliminate::new()),
56+
// Arc::new(EliminateLimit::new()),
57+
// Arc::new(ProjectionPushDown::new()),
58+
// Arc::new(FilterPushDown::new()),
59+
// Arc::new(SimplifyExpressions::new()),
60+
// Arc::new(LimitPushDown::new()),
6161
//Arc::new(SingleDistinctAggregationToGroupBy::new()),
6262
]
6363
)
6464
.with_physical_optimizer_rules(
6565
vec![
66-
Arc::new(AggregateStatistics::new()),
67-
Arc::new(HashBuildProbeOrder::new()),
68-
Arc::new(CoalesceBatches::new()),
69-
// Arc::new(Repartition::new()),
70-
Arc::new(AddCoalescePartitionsExec::new()),
66+
// Arc::new(AggregateStatistics::new()),
67+
// Arc::new(HashBuildProbeOrder::new()),
68+
// Arc::new(CoalesceBatches::new()),
69+
// // Arc::new(Repartition::new()),
70+
// Arc::new(AddCoalescePartitionsExec::new()),
7171
]
7272
)
7373
//.with_target_partitions(8)

benchmarks/src/bin/bench_count.rs

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
use datafusion::execution::context::ExecutionContext;
2+
use datafusion::arrow::record_batch::RecordBatch;
3+
use datafusion::arrow::util::display::array_value_to_string;
4+
use std::convert::From;
5+
use std::error::Error;
6+
use utils::test_util::{BenchType, BenchDataType};
7+
use utils::test_util;
8+
9+
10+
#[tokio::main]
11+
async fn main() -> datafusion::error::Result<()> {
12+
// 读取event count的answer
13+
let answer_vec: Vec<CountAnswer> = read_count_answer().unwrap();
14+
15+
// 加载数据 执行查询 并返回 Vec<CountAnswer>
16+
let query_vec: Vec<CountAnswer> = query_count().await.unwrap();
17+
18+
// 比对查询结果
19+
let err_info_vec = test_result(&answer_vec, &query_vec).unwrap();
20+
21+
// 打印测试的结果
22+
if err_info_vec.len() > 0 {
23+
println!("事件量统计测试未通过\n错误详情如下:");
24+
for x in err_info_vec {
25+
println!("{}", x);
26+
}
27+
} else {
28+
println!("事件量统计测试通过!!!");
29+
}
30+
31+
Ok(())
32+
}
33+
34+
// 读取event count的answer
35+
fn read_count_answer() -> Result<Vec<CountAnswer>, Box<dyn Error>> {
36+
let answer_file = test_util::bench_answer_file(BenchType::EventCount);
37+
let answer_content: Vec<String> = *(test_util::read_answer_file(answer_file.as_ref()));
38+
let mut answer_vec: Vec<CountAnswer> = Vec::new();
39+
for line in answer_content {
40+
let answer = CountAnswer::from(line);
41+
answer_vec.push(answer);
42+
}
43+
44+
Ok(answer_vec)
45+
}
46+
47+
// 加载数据 执行查询 并返回 Vec<CountAnswer>
48+
async fn query_count() -> Result<Vec<CountAnswer>, Box<dyn Error>> {
49+
let mut ctx = ExecutionContext::new();
50+
51+
ctx.register_parquet("event", test_util::bench_data_file(BenchDataType::Event).as_str()).await?;
52+
ctx.register_parquet("profile", test_util::bench_data_file(BenchDataType::Profile).as_str()).await?;
53+
54+
let detail_sql = r#"select xwhat,count(1) as cn ,count(distinct distinct_id) as uv
55+
from event
56+
join profile
57+
on event.distinct_id = profile.distinct_id
58+
group by xwhat
59+
order by xwhat"#;
60+
let detail_f = ctx.sql(detail_sql).await?;
61+
let detail_results = detail_f.collect().await?;
62+
let mut query_vec = CountAnswer::collect_answer(&detail_results);
63+
64+
let total_sql = r#"select 'null' as xwhat,count(1) as cn ,count(distinct distinct_id) as uv
65+
from event
66+
join profile
67+
on event.distinct_id = profile.distinct_id"#;
68+
let total_df = ctx.sql(total_sql).await?;
69+
let total_results = total_df.collect().await?;
70+
let mut total_query_vec = CountAnswer::collect_answer(&total_results);
71+
72+
query_vec.append(&mut total_query_vec);
73+
74+
Ok(query_vec)
75+
}
76+
77+
// 比对查询结果
78+
fn test_result(answer_vec: &[CountAnswer], query_vec: &[CountAnswer]) -> Result<Vec<String>, Box<dyn Error>> {
79+
let mut err_info_vec: Vec<String> = Vec::new();
80+
for index in 0..answer_vec.len() {
81+
let answer_item: &CountAnswer = answer_vec.get(index).unwrap();
82+
let query_item: &CountAnswer = query_vec.get(index).unwrap();
83+
84+
if query_item != answer_item {
85+
err_info_vec.push(format!("期望结果:{}\n查询结果:{}\n", answer_item.to_string(), query_item.to_string()));
86+
}
87+
}
88+
89+
Ok(err_info_vec)
90+
}
91+
92+
// 封装事件数量统计的结果
93+
struct CountAnswer {
94+
xwhat: String,
95+
count: u32,
96+
uv: u32,
97+
}
98+
99+
impl PartialEq for CountAnswer {
100+
fn eq(&self, other: &Self) -> bool {
101+
other.xwhat == self.xwhat
102+
&& other.count == self.count
103+
&& other.uv == self.uv
104+
}
105+
106+
fn ne(&self, other: &Self) -> bool {
107+
!self.eq(other)
108+
}
109+
}
110+
111+
impl From<String> for CountAnswer {
112+
fn from(content: String) -> Self {
113+
let words: Vec<&str> = content.split(r#"|"#).collect();
114+
if words.len() != 3 {
115+
panic!("size of spliting {} by | is not 3", content);
116+
}
117+
118+
let tmp_xwhat = String::from(words[0]);
119+
let tmp_count = words[1].parse::<u32>().unwrap();
120+
let tmp_uv = words[2].parse::<u32>().unwrap();
121+
let answer = CountAnswer {
122+
xwhat: tmp_xwhat,
123+
count: tmp_count,
124+
uv: tmp_uv,
125+
};
126+
127+
answer
128+
}
129+
}
130+
131+
impl CountAnswer {
132+
fn to_string(&self) -> String {
133+
format!("{}|{}|{}", self.xwhat, self.count, self.uv)
134+
}
135+
136+
pub fn collect_answer(records: &[RecordBatch]) -> Vec<CountAnswer> {
137+
let mut result = Vec::new();
138+
139+
if !records.is_empty() {
140+
for batch in records {
141+
for row in 0..batch.num_rows() {
142+
let mut cells = Vec::new();
143+
for col in 0..batch.num_columns() {
144+
let column = batch.column(col);
145+
cells.push(array_value_to_string(&column, row).unwrap());
146+
}
147+
148+
result.push(CountAnswer::from(cells.join("|")));
149+
}
150+
}
151+
}
152+
153+
result
154+
}
155+
}
156+
157+
158+
#[cfg(test)]
159+
mod tests {
160+
use super::*;
161+
162+
#[test]
163+
fn event_count_answer_eq() {
164+
let answer = CountAnswer { xwhat: String::from("startUp"), count: 12, uv: 100 };
165+
166+
let other = CountAnswer { xwhat: String::from("startUp"), count: 12, uv: 100 };
167+
if answer != other {
168+
panic!("event_count_answer_eq")
169+
}
170+
171+
let other = CountAnswer { xwhat: String::from("startUp"), count: 12, uv: 200 };
172+
if answer == other {
173+
panic!("event_count_answer_eq")
174+
}
175+
176+
let other = CountAnswer { xwhat: String::from("startUp"), count: 2, uv: 100 };
177+
if answer == other {
178+
panic!("event_count_answer_eq")
179+
}
180+
181+
let other = CountAnswer { xwhat: String::from("startUp"), count: 2, uv: 100 };
182+
if answer == other {
183+
panic!("event_count_answer_eq")
184+
}
185+
186+
let other = CountAnswer { xwhat: String::from("ABC"), count: 12, uv: 100 };
187+
if answer == other {
188+
panic!("event_count_answer_eq")
189+
}
190+
}
191+
192+
#[test]
193+
fn from_test() {
194+
let answer = CountAnswer::from(String::from("SignIn_isSuccess|26488|3387"));
195+
let count_answer = CountAnswer {
196+
xwhat: String::from("SignIn_isSuccess"),
197+
count: 26488,
198+
uv: 3387,
199+
};
200+
201+
if answer != count_answer {
202+
panic!();
203+
}
204+
}
205+
206+
#[test]
207+
fn to_string_test() {
208+
let count_answer = CountAnswer {
209+
xwhat: String::from("SignIn_isSuccess"),
210+
count: 26488,
211+
uv: 3387,
212+
};
213+
214+
let string = count_answer.to_string();
215+
if string != String::from("SignIn_isSuccess|26488|3387") {
216+
panic!();
217+
}
218+
}
219+
220+
#[tokio::test]
221+
async fn query_count_test(){
222+
let vec = query_count().await.unwrap();
223+
for x in vec {
224+
println!("{}", x.to_string());
225+
}
226+
}
227+
228+
#[test]
229+
fn read_count_answer_test(){
230+
let vec = read_count_answer().unwrap();
231+
for x in vec {
232+
println!("{}", x.to_string());
233+
}
234+
}
235+
236+
#[tokio::test]
237+
async fn test_result_test(){
238+
let query_vec = query_count().await.unwrap();
239+
let answer_vec = read_count_answer().unwrap();
240+
let vec = test_result(&answer_vec, &query_vec).unwrap();
241+
for x in vec {
242+
println!("{}", x);
243+
}
244+
}
245+
}
246+

0 commit comments

Comments
 (0)