Skip to content

feat: add time series statistics profile #17809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub use thread::Thread;
pub use thread::ThreadJoinHandle;
pub use time_series::compress_time_point;
pub use time_series::get_time_series_profile_desc;
pub use time_series::ProfilePoints;
pub use time_series::QueryTimeSeriesProfile;
pub use time_series::QueryTimeSeriesProfileBuilder;
pub use time_series::TimeSeriesProfileDesc;
Expand Down
5 changes: 4 additions & 1 deletion src/common/base/src/runtime/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::runtime::metrics::ScopedRegistry;
use crate::runtime::profile::Profile;
use crate::runtime::time_series::QueryTimeSeriesProfile;
use crate::runtime::MemStatBuffer;
use crate::runtime::TimeSeriesProfiles;

// For implemented and needs to call drop, we cannot use the attribute tag thread local.
// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=ea33533387d401e86423df1a764b5609
Expand Down Expand Up @@ -105,7 +106,8 @@ pub struct TrackingPayload {
pub mem_stat: Option<Arc<MemStat>>,
pub metrics: Option<Arc<ScopedRegistry>>,
pub should_log: bool,
pub time_series_profile: Option<(u32, Arc<QueryTimeSeriesProfile>)>,
pub time_series_profile: Option<Arc<QueryTimeSeriesProfile>>,
pub local_time_series_profile: Option<Arc<TimeSeriesProfiles>>,
}

pub struct TrackingGuard {
Expand Down Expand Up @@ -168,6 +170,7 @@ impl ThreadTracker {
query_id: None,
should_log: true,
time_series_profile: None,
local_time_series_profile: None,
},
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/runtime/time_series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod query_profile;

pub use profile::compress_time_point;
pub use profile::get_time_series_profile_desc;
pub use profile::ProfilePoints;
pub use profile::TimeSeriesProfileDesc;
pub use profile::TimeSeriesProfileName;
pub use profile::TimeSeriesProfiles;
Expand Down
131 changes: 84 additions & 47 deletions src/common/base/src/runtime/time_series/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ const DEFAULT_INTERVAL: usize = 1000;
// DataPoint is a tuple of (timestamp, value)
type DataPoint = (usize, usize);

pub struct ProfilePoints {
pub points: ConcurrentQueue<DataPoint>,
pub value: AtomicUsize,
pub last_record_timestamp: AtomicUsize,
}

pub struct TimeSeriesProfiles {
pub profiles: Vec<ProfilePoints>,
}

pub enum TimeSeriesProfileName {
OutputRows,
OutputBytes,
Expand Down Expand Up @@ -66,73 +56,108 @@ pub fn get_time_series_profile_desc() -> Arc<Vec<TimeSeriesProfileDesc>> {
.clone()
}

impl TimeSeriesProfiles {
pub fn new() -> Self {
let type_num = mem::variant_count::<TimeSeriesProfileName>();
TimeSeriesProfiles {
profiles: Self::create_profiles(type_num),
}
}
pub struct ProfilePoints {
pub points: ConcurrentQueue<DataPoint>,
pub value: AtomicUsize,
pub last_check_timestamp: AtomicUsize,
}

fn create_profiles(type_num: usize) -> Vec<ProfilePoints> {
let mut profiles = Vec::with_capacity(type_num);
for _ in 0..type_num {
profiles.push(ProfilePoints {
points: ConcurrentQueue::unbounded(),
last_record_timestamp: AtomicUsize::new(0),
value: AtomicUsize::new(0),
});
pub struct TimeSeriesProfiles {
pub profiles: Vec<ProfilePoints>,
}

impl ProfilePoints {
pub fn new() -> Self {
ProfilePoints {
points: ConcurrentQueue::unbounded(),
last_check_timestamp: AtomicUsize::new(0),
value: AtomicUsize::new(0),
}
profiles
}

pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool {
let profile = &self.profiles[name as usize];
let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL;
let mut current_last_record = now;
pub fn record_time_slot(&self, now: usize, value: usize) -> bool {
let mut is_record = false;
let mut current_last_check = 0;
loop {
match profile.last_record_timestamp.compare_exchange_weak(
current_last_record,
match self.last_check_timestamp.compare_exchange_weak(
current_last_check,
now,
SeqCst,
SeqCst,
) {
Ok(_) => {
if current_last_record == 0 {
if current_last_check == 0 {
// the first time, we will record it in next time slot
break;
}
if now == current_last_record {
if now == current_last_check {
// still in the same slot
break;
}
let last_value = profile.value.swap(0, SeqCst);
let _ = profile.points.push((current_last_record, last_value));
let last_value = self.value.swap(0, SeqCst);
let _ = self.points.push((current_last_check, last_value));
is_record = true;
break;
}
Err(last_record) => {
current_last_record = last_record;
if now < last_record {
// for concurrent situation, `now` could be earlier than `last_record`
// that means we are missing the time slot, it is already push into
// the points queue. We just need to push the value into the queue again.
// will merge them in the flush
let _ = self.points.push((now, value));
// early return, should avoid adding value into this time slot
return true;
}
current_last_check = last_record;
}
}
}
profile.value.fetch_add(value, SeqCst);
self.value.fetch_add(value, SeqCst);
is_record
}
}

impl Default for ProfilePoints {
fn default() -> Self {
Self::new()
}
}

impl TimeSeriesProfiles {
pub fn new() -> Self {
let type_num = mem::variant_count::<TimeSeriesProfileName>();
TimeSeriesProfiles {
profiles: Self::create_profiles(type_num),
}
}

pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec<(u32, Vec<Vec<usize>>)> {
fn create_profiles(type_num: usize) -> Vec<ProfilePoints> {
let mut profiles = Vec::with_capacity(type_num);
for _ in 0..type_num {
profiles.push(ProfilePoints::new());
}
profiles
}

pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool {
let profile = &self.profiles[name as usize];
let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL;
profile.record_time_slot(now, value)
}

pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec<Vec<Vec<usize>>> {
let mut batch = Vec::with_capacity(self.profiles.len());
for (profile_name, profile) in self.profiles.iter().enumerate() {
for profile in self.profiles.iter() {
if *quota == 0 && !finish {
break;
}
if finish {
// if flush called by finish, we need to flush the last record
let last_timestamp = profile.last_check_timestamp.load(SeqCst);
let last_value = profile.value.swap(0, SeqCst);
let _ = profile
.points
.push((profile.last_record_timestamp.load(SeqCst), last_value));
if last_value != 0 && last_timestamp != 0 {
let _ = profile.points.push((last_timestamp, last_value));
}
}
let mut points = Vec::with_capacity(profile.points.len());
while let Ok(point) = profile.points.pop() {
Expand All @@ -142,7 +167,7 @@ impl TimeSeriesProfiles {
break;
}
}
batch.push((profile_name as u32, compress_time_point(&points)));
batch.push(compress_time_point(&points));
}
batch
}
Expand All @@ -167,6 +192,12 @@ impl Default for TimeSeriesProfiles {
/// `[(1744971865,100), (1744971866,200), (1744971867,50), (1744971868,150), (1744971870,20), (1744971871,40)]`
/// the compressed result will be:
/// `[[1744971865, 100, 200, 50, 150], [1744971870, 20, 40]]`
///
/// Note:
/// Why convert to `[timestamp, value0, value1, value2]` instead of `[timestamp, (value0, value1, value2)]`:
/// Rust serde_json will convert a tuple to a list. [timestamp, (value0, value1, value2)] will be converted to
/// `[timestamp, value0, value1, value2]` after serialization.
/// See: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=3c153dfcfdde3032c80c05f4010f3d0f
pub fn compress_time_point(points: &[DataPoint]) -> Vec<Vec<usize>> {
let mut result = Vec::new();
let mut i = 0;
Expand All @@ -176,8 +207,14 @@ pub fn compress_time_point(points: &[DataPoint]) -> Vec<Vec<usize>> {
group.push(start_time);
group.push(value);
let mut j = i + 1;
while j < points.len() && points[j].0 == points[j - 1].0 + 1 {
group.push(points[j].1);
while j < points.len()
&& (points[j].0 == points[j - 1].0 + 1 || points[j].0 == points[j - 1].0)
{
let mut v = points[j].1;
if points[j].0 == points[j - 1].0 {
v += group.pop().unwrap();
}
group.push(v);
j += 1;
}
result.push(group);
Expand Down
83 changes: 57 additions & 26 deletions src/common/base/src/runtime/time_series/query_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,51 @@ const DEFAULT_BATCH_SIZE: usize = 1024;

pub struct QueryTimeSeriesProfile {
pub global_count: AtomicUsize,
pub plans_profiles: Vec<Arc<TimeSeriesProfiles>>,
pub plans_profiles: Vec<(u32, Arc<TimeSeriesProfiles>)>,
pub query_id: String,
}

impl QueryTimeSeriesProfile {
pub fn record_time_series_profile(name: TimeSeriesProfileName, value: usize) {
ThreadTracker::with(|x| match x.borrow().payload.time_series_profile.as_ref() {
None => {}
Some((plan_id, profile)) => {
if let Some(p) = profile.plans_profiles.get(*plan_id as usize) {
if p.record(name, value)
&& profile.global_count.fetch_add(1, SeqCst) == DEFAULT_BATCH_SIZE - 1
{
profile.flush(false);
ThreadTracker::with(
|x| match x.borrow().payload.local_time_series_profile.as_ref() {
None => {}
Some(profile) => {
if profile.record(name, value) {
if let Some(global_profile) =
x.borrow().payload.time_series_profile.as_ref()
{
let should_flush = Self::should_flush(&global_profile.global_count);
if should_flush {
global_profile.flush(false);
}
}
}
}
},
)
}

pub fn should_flush(global_count: &AtomicUsize) -> bool {
let mut prev = 0;
loop {
let next = if prev == DEFAULT_BATCH_SIZE - 1 {
0
} else {
prev + 1
};
match global_count.compare_exchange_weak(prev, next, SeqCst, SeqCst) {
Ok(_) => {
if next == 0 {
return true;
}
return false;
}
Err(next_prev) => {
prev = next_prev;
}
}
})
}
}

pub fn flush(&self, finish: bool) {
Expand All @@ -60,26 +87,23 @@ impl QueryTimeSeriesProfile {
#[derive(Serialize)]
struct PlanTimeSeries {
plan_id: u32,
data: Vec<ProfileTimeSeries>,
data: Vec<Vec<Vec<usize>>>,
}
#[derive(Serialize)]
struct ProfileTimeSeries(u32, Vec<Vec<usize>>);
let mut quota = DEFAULT_BATCH_SIZE as i32;
let mut plans = Vec::with_capacity(self.plans_profiles.len());
for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() {
for (plan_id, plan_profile) in self.plans_profiles.iter() {
if quota == 0 && !finish {
break;
}
let profile_time_series_vec: Vec<ProfileTimeSeries> = plan_profile
.flush(finish, &mut quota)
.into_iter()
.map(|(id, points)| ProfileTimeSeries(id, points))
.collect();
let profile_time_series_vec = plan_profile.flush(finish, &mut quota);
plans.push(PlanTimeSeries {
plan_id: plan_id as u32,
plan_id: *plan_id,
data: profile_time_series_vec,
});
}
if quota == DEFAULT_BATCH_SIZE as i32 {
return;
}
let query_time_series = QueryTimeSeries {
query_id: self.query_id.clone(),
plans,
Expand Down Expand Up @@ -109,18 +133,25 @@ impl QueryTimeSeriesProfileBuilder {
}
}

pub fn register_time_series_profile(&mut self, plan_id: u32) {
pub fn register_time_series_profile(&mut self, plan_id: u32) -> Arc<TimeSeriesProfiles> {
if !self.plans_profile.contains_key(&plan_id) {
self.plans_profile
.insert(plan_id, Arc::new(TimeSeriesProfiles::new()));
let profile = Arc::new(TimeSeriesProfiles::new());
self.plans_profile.insert(plan_id, profile.clone());
profile
} else {
self.plans_profile.get(&plan_id).unwrap().clone()
}
}

pub fn build(self) -> QueryTimeSeriesProfile {
pub fn build(&self) -> QueryTimeSeriesProfile {
QueryTimeSeriesProfile {
global_count: AtomicUsize::new(0),
plans_profiles: self.plans_profile.into_values().collect(),
query_id: self.query_id,
plans_profiles: self
.plans_profile
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
query_id: self.query_id.clone(),
}
}
}
Loading
Loading