Skip to content

Commit 4045298

Browse files
WenyXufengjiachun
andauthored
feat: add region_statistics table (#4771)
* refactor: introduce `region_statistic` * refactor: move DatanodeStat related structs to common_meta * chore: add comments * feat: implement `list_region_stats` for `ClusterInfo` trait * feat: add `region_statistics` table * feat: add table_id and region_number fields * chore: rename unused snafu * chore: udpate sqlness results * chore: avoid to print source in error msg * chore: move `procedure_info` under `greptime` catalog * chore: apply suggestions from CR * Update src/common/meta/src/datanode.rs Co-authored-by: jeremyhi <jiachun_feng@proton.me> --------- Co-authored-by: jeremyhi <jiachun_feng@proton.me>
1 parent cc4106c commit 4045298

39 files changed

Lines changed: 939 additions & 474 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/catalog/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,20 @@ pub enum Error {
5050
source: BoxedError,
5151
},
5252

53-
#[snafu(display("Failed to list nodes in cluster: {source}"))]
53+
#[snafu(display("Failed to list nodes in cluster"))]
5454
ListNodes {
5555
#[snafu(implicit)]
5656
location: Location,
5757
source: BoxedError,
5858
},
5959

60+
#[snafu(display("Failed to region stats in cluster"))]
61+
ListRegionStats {
62+
#[snafu(implicit)]
63+
location: Location,
64+
source: BoxedError,
65+
},
66+
6067
#[snafu(display("Failed to list flows in catalog {catalog}"))]
6168
ListFlows {
6269
#[snafu(implicit)]
@@ -314,6 +321,7 @@ impl ErrorExt for Error {
314321
| Error::ListTables { source, .. }
315322
| Error::ListFlows { source, .. }
316323
| Error::ListProcedures { source, .. }
324+
| Error::ListRegionStats { source, .. }
317325
| Error::ConvertProtoData { source, .. } => source.status_code(),
318326

319327
Error::CreateTable { source, .. } => source.status_code(),

src/catalog/src/kvbackend/manager.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,17 @@ use crate::CatalogManager;
6464
#[derive(Clone)]
6565
pub struct KvBackendCatalogManager {
6666
mode: Mode,
67+
/// Only available in `Distributed` mode.
6768
meta_client: Option<Arc<MetaClient>>,
69+
/// Manages partition rules.
6870
partition_manager: PartitionRuleManagerRef,
71+
/// Manages table metadata.
6972
table_metadata_manager: TableMetadataManagerRef,
7073
/// A sub-CatalogManager that handles system tables
7174
system_catalog: SystemCatalog,
75+
/// Cache registry for all caches.
7276
cache_registry: LayeredCacheRegistryRef,
77+
/// Only available in `Standalone` mode.
7378
procedure_manager: Option<ProcedureManagerRef>,
7479
}
7580

src/catalog/src/system_schema/information_schema.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod key_column_usage;
2020
mod partitions;
2121
mod procedure_info;
2222
mod region_peers;
23+
mod region_statistics;
2324
mod runtime_metrics;
2425
pub mod schemata;
2526
mod table_constraints;
@@ -194,6 +195,11 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
194195
self.catalog_manager.clone(),
195196
)) as _,
196197
),
198+
REGION_STATISTICS => Some(Arc::new(
199+
region_statistics::InformationSchemaRegionStatistics::new(
200+
self.catalog_manager.clone(),
201+
),
202+
) as _),
197203
_ => None,
198204
}
199205
}
@@ -241,6 +247,14 @@ impl InformationSchemaProvider {
241247
CLUSTER_INFO.to_string(),
242248
self.build_table(CLUSTER_INFO).unwrap(),
243249
);
250+
tables.insert(
251+
PROCEDURE_INFO.to_string(),
252+
self.build_table(PROCEDURE_INFO).unwrap(),
253+
);
254+
tables.insert(
255+
REGION_STATISTICS.to_string(),
256+
self.build_table(REGION_STATISTICS).unwrap(),
257+
);
244258
}
245259

246260
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
@@ -256,10 +270,6 @@ impl InformationSchemaProvider {
256270
self.build_table(TABLE_CONSTRAINTS).unwrap(),
257271
);
258272
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
259-
tables.insert(
260-
PROCEDURE_INFO.to_string(),
261-
self.build_table(PROCEDURE_INFO).unwrap(),
262-
);
263273
// Add memory tables
264274
for name in MEMORY_TABLES.iter() {
265275
tables.insert((*name).to_string(), self.build_table(name).expect(name));
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::{Arc, Weak};
16+
17+
use arrow_schema::SchemaRef as ArrowSchemaRef;
18+
use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
19+
use common_config::Mode;
20+
use common_error::ext::BoxedError;
21+
use common_meta::cluster::ClusterInfo;
22+
use common_meta::datanode::RegionStat;
23+
use common_recordbatch::adapter::RecordBatchStreamAdapter;
24+
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
25+
use common_telemetry::tracing::warn;
26+
use datafusion::execution::TaskContext;
27+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28+
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29+
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30+
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31+
use datatypes::value::Value;
32+
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
33+
use snafu::ResultExt;
34+
use store_api::storage::{ScanRequest, TableId};
35+
36+
use super::{InformationTable, REGION_STATISTICS};
37+
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListRegionStatsSnafu, Result};
38+
use crate::information_schema::Predicates;
39+
use crate::system_schema::utils;
40+
use crate::CatalogManager;
41+
42+
const REGION_ID: &str = "region_id";
43+
const TABLE_ID: &str = "table_id";
44+
const REGION_NUMBER: &str = "region_number";
45+
const MEMTABLE_SIZE: &str = "memtable_size";
46+
const MANIFEST_SIZE: &str = "manifest_size";
47+
const SST_SIZE: &str = "sst_size";
48+
const ENGINE: &str = "engine";
49+
const REGION_ROLE: &str = "region_role";
50+
51+
const INIT_CAPACITY: usize = 42;
52+
53+
/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields:
54+
///
55+
/// - `region_id`: The region id.
56+
/// - `table_id`: The table id.
57+
/// - `region_number`: The region number.
58+
/// - `memtable_size`: The memtable size in bytes.
59+
/// - `manifest_size`: The manifest size in bytes.
60+
/// - `sst_size`: The sst size in bytes.
61+
/// - `engine`: The engine type.
62+
/// - `region_role`: The region role.
63+
///
64+
pub(super) struct InformationSchemaRegionStatistics {
65+
schema: SchemaRef,
66+
catalog_manager: Weak<dyn CatalogManager>,
67+
}
68+
69+
impl InformationSchemaRegionStatistics {
70+
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
71+
Self {
72+
schema: Self::schema(),
73+
catalog_manager,
74+
}
75+
}
76+
77+
pub(crate) fn schema() -> SchemaRef {
78+
Arc::new(Schema::new(vec![
79+
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
80+
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
81+
ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
82+
ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
83+
ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
84+
ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
85+
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
86+
ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
87+
]))
88+
}
89+
90+
fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
91+
InformationSchemaRegionStatisticsBuilder::new(
92+
self.schema.clone(),
93+
self.catalog_manager.clone(),
94+
)
95+
}
96+
}
97+
98+
impl InformationTable for InformationSchemaRegionStatistics {
99+
fn table_id(&self) -> TableId {
100+
INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
101+
}
102+
103+
fn table_name(&self) -> &'static str {
104+
REGION_STATISTICS
105+
}
106+
107+
fn schema(&self) -> SchemaRef {
108+
self.schema.clone()
109+
}
110+
111+
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
112+
let schema = self.schema.arrow_schema().clone();
113+
let mut builder = self.builder();
114+
115+
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
116+
schema,
117+
futures::stream::once(async move {
118+
builder
119+
.make_region_statistics(Some(request))
120+
.await
121+
.map(|x| x.into_df_record_batch())
122+
.map_err(Into::into)
123+
}),
124+
));
125+
126+
Ok(Box::pin(
127+
RecordBatchStreamAdapter::try_new(stream)
128+
.map_err(BoxedError::new)
129+
.context(InternalSnafu)?,
130+
))
131+
}
132+
}
133+
134+
struct InformationSchemaRegionStatisticsBuilder {
135+
schema: SchemaRef,
136+
catalog_manager: Weak<dyn CatalogManager>,
137+
138+
region_ids: UInt64VectorBuilder,
139+
table_ids: UInt32VectorBuilder,
140+
region_numbers: UInt32VectorBuilder,
141+
memtable_sizes: UInt64VectorBuilder,
142+
manifest_sizes: UInt64VectorBuilder,
143+
sst_sizes: UInt64VectorBuilder,
144+
engines: StringVectorBuilder,
145+
region_roles: StringVectorBuilder,
146+
}
147+
148+
impl InformationSchemaRegionStatisticsBuilder {
149+
fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
150+
Self {
151+
schema,
152+
catalog_manager,
153+
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
154+
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
155+
region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
156+
memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
157+
manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
158+
sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
159+
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
160+
region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
161+
}
162+
}
163+
164+
/// Construct a new `InformationSchemaRegionStatistics` from the collected data.
165+
async fn make_region_statistics(
166+
&mut self,
167+
request: Option<ScanRequest>,
168+
) -> Result<RecordBatch> {
169+
let predicates = Predicates::from_scan_request(&request);
170+
let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone);
171+
172+
match mode {
173+
Mode::Standalone => {
174+
// TODO(weny): implement it
175+
}
176+
Mode::Distributed => {
177+
if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? {
178+
let region_stats = meta_client
179+
.list_region_stats()
180+
.await
181+
.map_err(BoxedError::new)
182+
.context(ListRegionStatsSnafu)?;
183+
for region_stat in region_stats {
184+
self.add_region_statistic(&predicates, region_stat);
185+
}
186+
} else {
187+
warn!("Meta client is not available");
188+
}
189+
}
190+
}
191+
192+
self.finish()
193+
}
194+
195+
fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
196+
let row = [
197+
(REGION_ID, &Value::from(region_stat.id.as_u64())),
198+
(TABLE_ID, &Value::from(region_stat.id.table_id())),
199+
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
200+
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
201+
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
202+
(SST_SIZE, &Value::from(region_stat.sst_size)),
203+
(ENGINE, &Value::from(region_stat.engine.as_str())),
204+
(REGION_ROLE, &Value::from(region_stat.role.to_string())),
205+
];
206+
207+
if !predicate.eval(&row) {
208+
return;
209+
}
210+
211+
self.region_ids.push(Some(region_stat.id.as_u64()));
212+
self.table_ids.push(Some(region_stat.id.table_id()));
213+
self.region_numbers
214+
.push(Some(region_stat.id.region_number()));
215+
self.memtable_sizes.push(Some(region_stat.memtable_size));
216+
self.manifest_sizes.push(Some(region_stat.manifest_size));
217+
self.sst_sizes.push(Some(region_stat.sst_size));
218+
self.engines.push(Some(&region_stat.engine));
219+
self.region_roles.push(Some(&region_stat.role.to_string()));
220+
}
221+
222+
fn finish(&mut self) -> Result<RecordBatch> {
223+
let columns: Vec<VectorRef> = vec![
224+
Arc::new(self.region_ids.finish()),
225+
Arc::new(self.table_ids.finish()),
226+
Arc::new(self.region_numbers.finish()),
227+
Arc::new(self.memtable_sizes.finish()),
228+
Arc::new(self.manifest_sizes.finish()),
229+
Arc::new(self.sst_sizes.finish()),
230+
Arc::new(self.engines.finish()),
231+
Arc::new(self.region_roles.finish()),
232+
];
233+
234+
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
235+
}
236+
}
237+
238+
impl DfPartitionStream for InformationSchemaRegionStatistics {
239+
fn schema(&self) -> &ArrowSchemaRef {
240+
self.schema.arrow_schema()
241+
}
242+
243+
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
244+
let schema = self.schema.arrow_schema().clone();
245+
let mut builder = self.builder();
246+
Box::pin(DfRecordBatchStreamAdapter::new(
247+
schema,
248+
futures::stream::once(async move {
249+
builder
250+
.make_region_statistics(None)
251+
.await
252+
.map(|x| x.into_df_record_batch())
253+
.map_err(Into::into)
254+
}),
255+
))
256+
}
257+
}

src/catalog/src/system_schema/information_schema/table_names.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ pub const CLUSTER_INFO: &str = "cluster_info";
4646
pub const VIEWS: &str = "views";
4747
pub const FLOWS: &str = "flows";
4848
pub const PROCEDURE_INFO: &str = "procedure_info";
49+
pub const REGION_STATISTICS: &str = "region_statistics";

src/common/catalog/src/consts.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
100100
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
101101
/// id for information_schema.procedure_info
102102
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
103+
/// id for information_schema.region_statistics
104+
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
105+
103106
/// ----- End of information_schema tables -----
104107
105108
/// ----- Begin of pg_catalog tables -----

src/common/meta/src/cluster.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use regex::Regex;
2020
use serde::{Deserialize, Serialize};
2121
use snafu::{ensure, OptionExt, ResultExt};
2222

23+
use crate::datanode::RegionStat;
2324
use crate::error::{
2425
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
2526
InvalidRoleSnafu, ParseNumSnafu, Result,
@@ -47,6 +48,9 @@ pub trait ClusterInfo {
4748
role: Option<Role>,
4849
) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
4950

51+
/// List all region stats in the cluster.
52+
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
53+
5054
// TODO(jeremy): Other info, like region status, etc.
5155
}
5256

0 commit comments

Comments
 (0)