Skip to content

Commit 3c5f433

Browse files
authored
Merge pull request #603 from roman-vynar/processlist-new
Rewrite processlist collector
2 parents 492c004 + 17460a4 commit 3c5f433

File tree

2 files changed

+186
-150
lines changed

2 files changed

+186
-150
lines changed

collector/info_schema_processlist.go

100644100755
Lines changed: 89 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"context"
2020
"database/sql"
2121
"fmt"
22+
"reflect"
23+
"sort"
2224
"strings"
2325

2426
"github.com/go-kit/log"
@@ -30,16 +32,15 @@ const infoSchemaProcesslistQuery = `
3032
SELECT
3133
user,
3234
SUBSTRING_INDEX(host, ':', 1) AS host,
33-
COALESCE(command,'') AS command,
34-
COALESCE(state,'') AS state,
35-
count(*) AS processes,
36-
sum(time) AS seconds
35+
COALESCE(command, '') AS command,
36+
COALESCE(state, '') AS state,
37+
COUNT(*) AS processes,
38+
SUM(time) AS seconds
3739
FROM information_schema.processlist
3840
WHERE ID != connection_id()
3941
AND TIME >= %d
40-
GROUP BY user,SUBSTRING_INDEX(host, ':', 1),command,state
41-
ORDER BY null
42-
`
42+
GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state
43+
`
4344

4445
// Tunable flags.
4546
var (
@@ -60,104 +61,23 @@ var (
6061
// Metric descriptors.
6162
var (
6263
processlistCountDesc = prometheus.NewDesc(
63-
prometheus.BuildFQName(namespace, informationSchema, "threads"),
64-
"The number of threads (connections) split by current state.",
65-
[]string{"state"}, nil)
64+
prometheus.BuildFQName(namespace, informationSchema, "processlist_threads"),
65+
"The number of threads split by current state.",
66+
[]string{"command", "state"}, nil)
6667
processlistTimeDesc = prometheus.NewDesc(
67-
prometheus.BuildFQName(namespace, informationSchema, "threads_seconds"),
68-
"The number of seconds threads (connections) have used split by current state.",
69-
[]string{"state"}, nil)
68+
prometheus.BuildFQName(namespace, informationSchema, "processlist_seconds"),
69+
"The number of seconds threads have used split by current state.",
70+
[]string{"command", "state"}, nil)
7071
processesByUserDesc = prometheus.NewDesc(
71-
prometheus.BuildFQName(namespace, informationSchema, "processes_by_user"),
72+
prometheus.BuildFQName(namespace, informationSchema, "processlist_processes_by_user"),
7273
"The number of processes by user.",
7374
[]string{"mysql_user"}, nil)
7475
processesByHostDesc = prometheus.NewDesc(
75-
prometheus.BuildFQName(namespace, informationSchema, "processes_by_host"),
76+
prometheus.BuildFQName(namespace, informationSchema, "processlist_processes_by_host"),
7677
"The number of processes by host.",
7778
[]string{"client_host"}, nil)
7879
)
7980

80-
// whitelist for connection/process states in SHOW PROCESSLIST
81-
// tokudb uses the state column for "Queried about _______ rows"
82-
var (
83-
// TODO: might need some more keys for other MySQL versions or other storage engines
84-
// see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html
85-
threadStateCounterMap = map[string]uint32{
86-
"after create": uint32(0),
87-
"altering table": uint32(0),
88-
"analyzing": uint32(0),
89-
"checking permissions": uint32(0),
90-
"checking table": uint32(0),
91-
"cleaning up": uint32(0),
92-
"closing tables": uint32(0),
93-
"converting heap to myisam": uint32(0),
94-
"copying to tmp table": uint32(0),
95-
"creating sort index": uint32(0),
96-
"creating table": uint32(0),
97-
"creating tmp table": uint32(0),
98-
"deleting": uint32(0),
99-
"executing": uint32(0),
100-
"execution of init_command": uint32(0),
101-
"end": uint32(0),
102-
"freeing items": uint32(0),
103-
"flushing tables": uint32(0),
104-
"fulltext initialization": uint32(0),
105-
"idle": uint32(0),
106-
"init": uint32(0),
107-
"killed": uint32(0),
108-
"waiting for lock": uint32(0),
109-
"logging slow query": uint32(0),
110-
"login": uint32(0),
111-
"manage keys": uint32(0),
112-
"opening tables": uint32(0),
113-
"optimizing": uint32(0),
114-
"preparing": uint32(0),
115-
"reading from net": uint32(0),
116-
"removing duplicates": uint32(0),
117-
"removing tmp table": uint32(0),
118-
"reopen tables": uint32(0),
119-
"repair by sorting": uint32(0),
120-
"repair done": uint32(0),
121-
"repair with keycache": uint32(0),
122-
"replication master": uint32(0),
123-
"rolling back": uint32(0),
124-
"searching rows for update": uint32(0),
125-
"sending data": uint32(0),
126-
"sorting for group": uint32(0),
127-
"sorting for order": uint32(0),
128-
"sorting index": uint32(0),
129-
"sorting result": uint32(0),
130-
"statistics": uint32(0),
131-
"updating": uint32(0),
132-
"waiting for tables": uint32(0),
133-
"waiting for table flush": uint32(0),
134-
"waiting on cond": uint32(0),
135-
"writing to net": uint32(0),
136-
"other": uint32(0),
137-
}
138-
threadStateMapping = map[string]string{
139-
"user sleep": "idle",
140-
"creating index": "altering table",
141-
"committing alter table to storage engine": "altering table",
142-
"discard or import tablespace": "altering table",
143-
"rename": "altering table",
144-
"setup": "altering table",
145-
"renaming result table": "altering table",
146-
"preparing for alter table": "altering table",
147-
"copying to group table": "copying to tmp table",
148-
"copy to tmp table": "copying to tmp table",
149-
"query end": "end",
150-
"update": "updating",
151-
"updating main table": "updating",
152-
"updating reference tables": "updating",
153-
"system lock": "waiting for lock",
154-
"user lock": "waiting for lock",
155-
"table lock": "waiting for lock",
156-
"deleting from main table": "deleting",
157-
"deleting from reference tables": "deleting",
158-
}
159-
)
160-
16181
// ScrapeProcesslist collects from `information_schema.processlist`.
16282
type ScrapeProcesslist struct{}
16383

@@ -189,83 +109,102 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
189109
defer processlistRows.Close()
190110

191111
var (
192-
user string
193-
host string
194-
command string
195-
state string
196-
processes uint32
197-
time uint32
112+
user string
113+
host string
114+
command string
115+
state string
116+
count uint32
117+
time uint32
198118
)
199-
stateCounts := make(map[string]uint32, len(threadStateCounterMap))
200-
stateTime := make(map[string]uint32, len(threadStateCounterMap))
201-
hostCount := make(map[string]uint32)
202-
userCount := make(map[string]uint32)
203-
for k, v := range threadStateCounterMap {
204-
stateCounts[k] = v
205-
stateTime[k] = v
206-
}
119+
// Define maps
120+
stateCounts := make(map[string]map[string]uint32)
121+
stateTime := make(map[string]map[string]uint32)
122+
stateHostCounts := make(map[string]uint32)
123+
stateUserCounts := make(map[string]uint32)
207124

208125
for processlistRows.Next() {
209-
err = processlistRows.Scan(&user, &host, &command, &state, &processes, &time)
126+
err = processlistRows.Scan(&user, &host, &command, &state, &count, &time)
210127
if err != nil {
211128
return err
212129
}
213-
realState := deriveThreadState(command, state)
214-
stateCounts[realState] += processes
215-
stateTime[realState] += time
216-
hostCount[host] = hostCount[host] + processes
217-
userCount[user] = userCount[user] + processes
218-
}
130+
command = sanitizeState(command)
131+
state = sanitizeState(state)
132+
if host == "" {
133+
host = "unknown"
134+
}
219135

220-
if *processesByHostFlag {
221-
for host, processes := range hostCount {
222-
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(processes), host)
136+
// Init maps
137+
if _, ok := stateCounts[command]; !ok {
138+
stateCounts[command] = make(map[string]uint32)
139+
stateTime[command] = make(map[string]uint32)
140+
}
141+
if _, ok := stateCounts[command][state]; !ok {
142+
stateCounts[command][state] = 0
143+
stateTime[command][state] = 0
144+
}
145+
if _, ok := stateHostCounts[host]; !ok {
146+
stateHostCounts[host] = 0
223147
}
148+
if _, ok := stateUserCounts[user]; !ok {
149+
stateUserCounts[user] = 0
150+
}
151+
152+
stateCounts[command][state] += count
153+
stateTime[command][state] += time
154+
stateHostCounts[host] += count
155+
stateUserCounts[user] += count
224156
}
225157

226-
if *processesByUserFlag {
227-
for user, processes := range userCount {
228-
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(processes), user)
158+
for _, command := range sortedMapKeys(stateCounts) {
159+
for _, state := range sortedMapKeys(stateCounts[command]) {
160+
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(stateCounts[command][state]), command, state)
161+
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(stateTime[command][state]), command, state)
229162
}
230163
}
231164

232-
for state, processes := range stateCounts {
233-
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(processes), state)
165+
if *processesByHostFlag {
166+
for _, host := range sortedMapKeys(stateHostCounts) {
167+
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
168+
}
234169
}
235-
for state, time := range stateTime {
236-
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(time), state)
170+
if *processesByUserFlag {
171+
for _, user := range sortedMapKeys(stateUserCounts) {
172+
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
173+
}
237174
}
238175

239176
return nil
240177
}
241178

242-
func deriveThreadState(command string, state string) string {
243-
var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1)
244-
var normState = strings.Replace(strings.ToLower(state), "_", " ", -1)
245-
// check if it's already a valid state
246-
_, knownState := threadStateCounterMap[normState]
247-
if knownState {
248-
return normState
249-
}
250-
// check if plain mapping applies
251-
mappedState, canMap := threadStateMapping[normState]
252-
if canMap {
253-
return mappedState
254-
}
255-
// check special waiting for XYZ lock
256-
if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") {
257-
return "waiting for lock"
179+
func sortedMapKeys(m interface{}) []string {
180+
v := reflect.ValueOf(m)
181+
keys := make([]string, 0, len(v.MapKeys()))
182+
for _, key := range v.MapKeys() {
183+
keys = append(keys, key.String())
258184
}
259-
if normCmd == "sleep" && normState == "" {
260-
return "idle"
185+
sort.Strings(keys)
186+
return keys
187+
}
188+
189+
func sanitizeState(state string) string {
190+
if state == "" {
191+
state = "unknown"
261192
}
262-
if normCmd == "query" {
263-
return "executing"
193+
state = strings.ToLower(state)
194+
replacements := map[string]string{
195+
";": "",
196+
",": "",
197+
":": "",
198+
".": "",
199+
"(": "",
200+
")": "",
201+
" ": "_",
202+
"-": "_",
264203
}
265-
if normCmd == "binlog dump" {
266-
return "replication master"
204+
for r := range replacements {
205+
state = strings.Replace(state, r, replacements[r], -1)
267206
}
268-
return "other"
207+
return state
269208
}
270209

271210
// check interface
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2021 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"testing"
20+
21+
"github.com/DATA-DOG/go-sqlmock"
22+
"github.com/go-kit/log"
23+
"github.com/prometheus/client_golang/prometheus"
24+
dto "github.com/prometheus/client_model/go"
25+
"github.com/smartystreets/goconvey/convey"
26+
"gopkg.in/alecthomas/kingpin.v2"
27+
)
28+
29+
func TestScrapeProcesslist(t *testing.T) {
30+
_, err := kingpin.CommandLine.Parse([]string{
31+
"--collect.info_schema.processlist.processes_by_user",
32+
"--collect.info_schema.processlist.processes_by_host",
33+
})
34+
if err != nil {
35+
t.Fatal(err)
36+
}
37+
38+
db, mock, err := sqlmock.New()
39+
if err != nil {
40+
t.Fatalf("error opening a stub database connection: %s", err)
41+
}
42+
defer db.Close()
43+
44+
query := fmt.Sprintf(infoSchemaProcesslistQuery, 0)
45+
columns := []string{"user", "host", "command", "state", "processes", "seconds"}
46+
rows := sqlmock.NewRows(columns).
47+
AddRow("manager", "10.0.7.234", "Sleep", "", 10, 87).
48+
AddRow("feedback", "10.0.7.154", "Sleep", "", 8, 842).
49+
AddRow("root", "10.0.7.253", "Sleep", "", 1, 20).
50+
AddRow("feedback", "10.0.7.179", "Sleep", "", 2, 14).
51+
AddRow("system user", "", "Connect", "waiting for handler commit", 1, 7271248).
52+
AddRow("manager", "10.0.7.234", "Sleep", "", 4, 62).
53+
AddRow("system user", "", "Query", "Slave has read all relay log; waiting for more updates", 1, 7271248).
54+
AddRow("event_scheduler", "localhost", "Daemon", "Waiting on empty queue", 1, 7271248)
55+
mock.ExpectQuery(sanitizeQuery(query)).WillReturnRows(rows)
56+
57+
ch := make(chan prometheus.Metric)
58+
go func() {
59+
if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
60+
t.Errorf("error calling function on test: %s", err)
61+
}
62+
close(ch)
63+
}()
64+
65+
expected := []MetricResult{
66+
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 1, metricType: dto.MetricType_GAUGE},
67+
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 7271248, metricType: dto.MetricType_GAUGE},
68+
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 1, metricType: dto.MetricType_GAUGE},
69+
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 7271248, metricType: dto.MetricType_GAUGE},
70+
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 1, metricType: dto.MetricType_GAUGE},
71+
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 7271248, metricType: dto.MetricType_GAUGE},
72+
{labels: labelMap{"command": "sleep", "state": "unknown"}, value: 25, metricType: dto.MetricType_GAUGE},
73+
{labels: labelMap{"command": "sleep", "state": "unknown"}, value: 1025, metricType: dto.MetricType_GAUGE},
74+
{labels: labelMap{"client_host": "10.0.7.154"}, value: 8, metricType: dto.MetricType_GAUGE},
75+
{labels: labelMap{"client_host": "10.0.7.179"}, value: 2, metricType: dto.MetricType_GAUGE},
76+
{labels: labelMap{"client_host": "10.0.7.234"}, value: 14, metricType: dto.MetricType_GAUGE},
77+
{labels: labelMap{"client_host": "10.0.7.253"}, value: 1, metricType: dto.MetricType_GAUGE},
78+
{labels: labelMap{"client_host": "localhost"}, value: 1, metricType: dto.MetricType_GAUGE},
79+
{labels: labelMap{"client_host": "unknown"}, value: 2, metricType: dto.MetricType_GAUGE},
80+
{labels: labelMap{"mysql_user": "event_scheduler"}, value: 1, metricType: dto.MetricType_GAUGE},
81+
{labels: labelMap{"mysql_user": "feedback"}, value: 10, metricType: dto.MetricType_GAUGE},
82+
{labels: labelMap{"mysql_user": "manager"}, value: 14, metricType: dto.MetricType_GAUGE},
83+
{labels: labelMap{"mysql_user": "root"}, value: 1, metricType: dto.MetricType_GAUGE},
84+
{labels: labelMap{"mysql_user": "system user"}, value: 2, metricType: dto.MetricType_GAUGE},
85+
}
86+
convey.Convey("Metrics comparison", t, func() {
87+
for _, expect := range expected {
88+
got := readMetric(<-ch)
89+
convey.So(expect, convey.ShouldResemble, got)
90+
}
91+
})
92+
93+
// Ensure all SQL queries were executed
94+
if err := mock.ExpectationsWereMet(); err != nil {
95+
t.Errorf("there were unfulfilled exceptions: %s", err)
96+
}
97+
}

0 commit comments

Comments
 (0)