Skip to content

Commit 0e8064b

Browse files
authored
only report wait times from clients currently waiting to match behavior of pgbouncer (#655)
* Change maxwait to only report wait times from clients currently waiting to match behavior of pgbouncer * Fix tests
1 parent 4dbef49 commit 0e8064b

File tree

3 files changed

+33
-12
lines changed

3 files changed

+33
-12
lines changed

src/stats/client.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ pub struct ClientStats {
3838
/// Total time spent waiting for a connection from pool, measures in microseconds
3939
pub total_wait_time: Arc<AtomicU64>,
4040

41-
/// Maximum time spent waiting for a connection from pool, measures in microseconds
42-
pub max_wait_time: Arc<AtomicU64>,
41+
/// When this client started waiting.
42+
/// Stored as microseconds since connect_time so it can fit in an AtomicU64 instead
43+
/// of us using an "AtomicInstant"
44+
pub wait_start: Arc<AtomicU64>,
4345

4446
/// Current state of the client
4547
pub state: Arc<AtomicClientState>,
@@ -63,7 +65,7 @@ impl Default for ClientStats {
6365
username: String::new(),
6466
pool_name: String::new(),
6567
total_wait_time: Arc::new(AtomicU64::new(0)),
66-
max_wait_time: Arc::new(AtomicU64::new(0)),
68+
wait_start: Arc::new(AtomicU64::new(0)),
6769
state: Arc::new(AtomicClientState::new(ClientState::Idle)),
6870
transaction_count: Arc::new(AtomicU64::new(0)),
6971
query_count: Arc::new(AtomicU64::new(0)),
@@ -111,6 +113,11 @@ impl ClientStats {
111113

112114
/// Reports a client is waiting for a connection
113115
pub fn waiting(&self) {
116+
// safe to truncate, we only lose info if duration is greater than ~585,000 years
117+
self.wait_start.store(
118+
Instant::now().duration_since(self.connect_time).as_micros() as u64,
119+
Ordering::Relaxed,
120+
);
114121
self.state.store(ClientState::Waiting, Ordering::Relaxed);
115122
}
116123

@@ -134,8 +141,6 @@ impl ClientStats {
134141
pub fn checkout_time(&self, microseconds: u64) {
135142
self.total_wait_time
136143
.fetch_add(microseconds, Ordering::Relaxed);
137-
self.max_wait_time
138-
.fetch_max(microseconds, Ordering::Relaxed);
139144
}
140145

141146
/// Report a query executed by a client against a server

src/stats/pool.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::{ClientState, ServerState};
44
use crate::{config::PoolMode, messages::DataType, pool::PoolIdentifier};
55
use std::collections::HashMap;
66
use std::sync::atomic::*;
7+
use tokio::time::Instant;
78

89
use crate::pool::get_all_pools;
910

@@ -53,6 +54,7 @@ impl PoolStats {
5354
);
5455
}
5556

57+
let now = Instant::now();
5658
for client in client_map.values() {
5759
match map.get_mut(&PoolIdentifier {
5860
db: client.pool_name(),
@@ -62,10 +64,16 @@ impl PoolStats {
6264
match client.state.load(Ordering::Relaxed) {
6365
ClientState::Active => pool_stats.cl_active += 1,
6466
ClientState::Idle => pool_stats.cl_idle += 1,
65-
ClientState::Waiting => pool_stats.cl_waiting += 1,
67+
ClientState::Waiting => {
68+
pool_stats.cl_waiting += 1;
69+
// wait_start is measured as microseconds since connect_time
70+
// so compute wait_time as (now() - connect_time) - (wait_start - connect_time)
71+
let duration_since_connect = now.duration_since(client.connect_time());
72+
let wait_time = (duration_since_connect.as_micros() as u64)
73+
- client.wait_start.load(Ordering::Relaxed);
74+
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time);
75+
}
6676
}
67-
let max_wait = client.max_wait_time.load(Ordering::Relaxed);
68-
pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait);
6977
}
7078
None => debug!("Client from an obselete pool"),
7179
}

tests/ruby/stats_spec.rb

+12-4
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@
233233
sleep(1.1) # Allow time for stats to update
234234
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
235235
results = admin_conn.async_exec("SHOW POOLS")[0]
236-
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
236+
%w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s|
237237
raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
238238
end
239239

@@ -260,12 +260,20 @@
260260
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil }
261261
end
262262

263-
sleep(2.5) # Allow time for stats to update
264263
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
265-
results = admin_conn.async_exec("SHOW POOLS")[0]
266264

265+
# two connections waiting => they report wait time
266+
sleep(1.1) # Allow time for stats to update
267+
results = admin_conn.async_exec("SHOW POOLS")[0]
267268
expect(results["maxwait"]).to eq("1")
268-
expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
269+
expect(results["maxwait_us"].to_i).to be_within(200_000).of(100_000)
270+
271+
sleep(2.5) # Allow time for stats to update
272+
results = admin_conn.async_exec("SHOW POOLS")[0]
273+
274+
# no connections waiting => no reported wait time
275+
expect(results["maxwait"]).to eq("0")
276+
expect(results["maxwait_us"]).to eq("0")
269277
connections.map(&:close)
270278

271279
sleep(4.5) # Allow time for stats to update

0 commit comments

Comments
 (0)