Skip to content

Commit e471971

Browse files
committed
Add TO Go cdns/capacity (apache#2306)
1 parent 5db0cb6 commit e471971

File tree

2 files changed

+340
-1
lines changed

2 files changed

+340
-1
lines changed
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
package cdn
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import (
23+
"database/sql"
24+
"encoding/json"
25+
"errors"
26+
"net/http"
27+
"net/url"
28+
"strconv"
29+
"strings"
30+
"time"
31+
32+
"github.com/apache/trafficcontrol/lib/go-log"
33+
"github.com/apache/trafficcontrol/lib/go-tc"
34+
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
35+
)
36+
37+
func GetCapacity(w http.ResponseWriter, r *http.Request) {
38+
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
39+
if userErr != nil || sysErr != nil {
40+
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
41+
return
42+
}
43+
defer inf.Close()
44+
45+
api.RespWriter(w, r, inf.Tx.Tx)(getCapacity(inf.Tx.Tx))
46+
}
47+
48+
const MonitorProxyParameter = "tm.traffic_mon_fwd_proxy"
49+
const MonitorRequestTimeout = time.Second * 10
50+
const MonitorOnlineStatus = "ONLINE"
51+
52+
// CRStates contains the Monitor CRStates members needed for health. It is NOT the full object served by the Monitor, but only the data required by this endpoint.
53+
type CRStates struct {
54+
Caches map[tc.CacheName]Available `json:"caches"`
55+
}
56+
57+
type Available struct {
58+
IsAvailable bool `json:"isAvailable"`
59+
}
60+
61+
// CRConfig contains the Monitor CRConfig members needed for health. It is NOT the full object served by the Monitor, but only the data required by this endpoint.
62+
type CRConfig struct {
63+
ContentServers map[tc.CacheName]CRConfigServer `json:"contentServers"`
64+
}
65+
66+
type CRConfigServer struct {
67+
CacheGroup tc.CacheGroupName `json:"locationId"`
68+
Status tc.CacheStatus `json:"status"`
69+
Type tc.CacheType `json:"type"`
70+
Profile string `json:"profile"`
71+
}
72+
73+
func getCapacity(tx *sql.Tx) (CapacityResp, error) {
74+
monitors, err := getCDNMonitorFQDNs(tx)
75+
if err != nil {
76+
return CapacityResp{}, errors.New("getting monitors: " + err.Error())
77+
}
78+
79+
return getMonitorsCapacity(tx, monitors)
80+
}
81+
82+
type CapacityResp struct {
83+
AvailablePercent float64 `json:"availablePercent"`
84+
UnavailablePercent float64 `json:"unavailablePercent"`
85+
UtilizedPercent float64 `json:utilizedPercent"`
86+
MaintenancePercent float64 `json:maintenancePercent"`
87+
}
88+
89+
type CapData struct {
90+
Available float64
91+
Unavailable float64
92+
Utilized float64
93+
Maintenance float64
94+
Capacity float64
95+
}
96+
97+
func getMonitorsCapacity(tx *sql.Tx, monitors map[tc.CDNName][]string) (CapacityResp, error) {
98+
monitorForwardProxy, monitorForwardProxyExists, err := getGlobalParam(tx, MonitorProxyParameter)
99+
if err != nil {
100+
return CapacityResp{}, errors.New("getting global monitor proxy parameter: " + err.Error())
101+
}
102+
client := &http.Client{Timeout: MonitorRequestTimeout}
103+
if monitorForwardProxyExists {
104+
proxyURI, err := url.Parse(monitorForwardProxy)
105+
if err != nil {
106+
return CapacityResp{}, errors.New("monitor forward proxy '" + monitorForwardProxy + "' in parameter '" + MonitorProxyParameter + "' not a URI: " + err.Error())
107+
}
108+
client = &http.Client{Timeout: MonitorRequestTimeout, Transport: &http.Transport{Proxy: http.ProxyURL(proxyURI)}}
109+
}
110+
111+
thresholds, err := getEdgeProfileHealthThresholdBandwidth(tx)
112+
if err != nil {
113+
return CapacityResp{}, errors.New("getting profile thresholds: " + err.Error())
114+
}
115+
116+
cap, err := getCapacityData(monitors, thresholds, client)
117+
if err != nil {
118+
return CapacityResp{}, errors.New("getting capacity from monitors: " + err.Error())
119+
} else if cap.Capacity == 0 {
120+
return CapacityResp{}, errors.New("capacity was zero!") // avoid divide-by-zero below.
121+
}
122+
123+
return CapacityResp{
124+
UtilizedPercent: (cap.Available * 100) / cap.Capacity,
125+
UnavailablePercent: (cap.Unavailable * 100) / cap.Capacity,
126+
MaintenancePercent: (cap.Maintenance * 100) / cap.Capacity,
127+
AvailablePercent: ((cap.Capacity - cap.Unavailable - cap.Maintenance - cap.Available) * 100) / cap.Capacity,
128+
}, nil
129+
}
130+
131+
// getCapacityData attempts to get the CDN capacity from each monitor. If one fails, it tries the next.
132+
// The first monitor for which all data requests succeed is used.
133+
// Only if all monitors for a CDN fail is an error returned, from the last monitor tried.
134+
func getCapacityData(monitors map[tc.CDNName][]string, thresholds map[string]float64, client *http.Client) (CapData, error) {
135+
cap := CapData{}
136+
for cdn, monitorFQDNs := range monitors {
137+
err := error(nil)
138+
for _, monitorFQDN := range monitorFQDNs {
139+
crStates := CRStates{}
140+
crConfig := CRConfig{}
141+
cacheStats := CacheStats{}
142+
if crStates, err = getCRStates(monitorFQDN, client); err != nil {
143+
err = errors.New("getting CRStates for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
144+
log.Warnln("getCapacity failed to get CRStates from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
145+
continue
146+
}
147+
if crConfig, err = getCRConfig(monitorFQDN, client); err != nil {
148+
err = errors.New("getting CRConfig for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
149+
log.Warnln("getCapacity failed to get CRConfig from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
150+
continue
151+
}
152+
if err := getCacheStats(monitorFQDN, client, []string{"kbps", "maxKbps"}, &cacheStats); err != nil {
153+
err = errors.New("getting cache stats for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
154+
log.Warnln("getCapacity failed to get CacheStats from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
155+
continue
156+
}
157+
cap = addCapacity(cap, cacheStats, crStates, crConfig, thresholds)
158+
break
159+
}
160+
if err != nil {
161+
return CapData{}, err
162+
}
163+
}
164+
return cap, nil
165+
}
166+
167+
func addCapacity(cap CapData, cacheStats CacheStats, crStates CRStates, crConfig CRConfig, thresholds map[string]float64) CapData {
168+
for cacheName, stats := range cacheStats.Caches {
169+
cache, ok := crConfig.ContentServers[cacheName]
170+
if !ok {
171+
continue
172+
}
173+
if !strings.HasPrefix(string(cache.Type), string(tc.CacheTypeEdge)) {
174+
continue
175+
}
176+
if len(stats.KBPS) < 1 || len(stats.MaxKBPS) < 1 {
177+
continue
178+
}
179+
if cache.Status == "REPORTED" || cache.Status == "ONLINE" {
180+
if crStates.Caches[cacheName].IsAvailable {
181+
cap.Available += float64(stats.KBPS[0].Value)
182+
} else {
183+
cap.Unavailable += float64(stats.KBPS[0].Value)
184+
}
185+
} else if cache.Status == "ADMIN_DOWN" {
186+
cap.Maintenance += float64(stats.KBPS[0].Value)
187+
} else {
188+
continue // don't add capacity for OFFLINE or other statuses
189+
}
190+
cap.Capacity += float64(stats.MaxKBPS[0].Value) - thresholds[cache.Profile]
191+
}
192+
return cap
193+
}
194+
195+
func getEdgeProfileHealthThresholdBandwidth(tx *sql.Tx) (map[string]float64, error) {
196+
rows, err := tx.Query(`
197+
SELECT pr.name as profile, pa.name, pa.config_file, pa.value
198+
FROM parameter as pa
199+
JOIN profile_parameter as pp ON pp.parameter = pa.id
200+
JOIN profile as pr ON pp.profile = pr.id
201+
JOIN server as s ON s.profile = pr.id
202+
JOIN cdn as c ON c.id = s.cdn_id
203+
JOIN type as t ON s.type = t.id
204+
WHERE t.name LIKE 'EDGE%'
205+
AND pa.config_file = 'rascal-config.txt'
206+
AND pa.name = 'health.threshold.availableBandwidthInKbps'
207+
`)
208+
if err != nil {
209+
return nil, errors.New("querying thresholds: " + err.Error())
210+
}
211+
defer rows.Close()
212+
profileThresholds := map[string]float64{}
213+
for rows.Next() {
214+
profile := ""
215+
threshStr := ""
216+
if err := rows.Scan(&profile, &threshStr); err != nil {
217+
return nil, errors.New("scanning thresholds: " + err.Error())
218+
}
219+
threshStr = strings.TrimPrefix(threshStr, ">")
220+
thresh, err := strconv.ParseFloat(threshStr, 64)
221+
if err != nil {
222+
return nil, errors.New("profile '" + profile + "' health.threshold.availableBandwidthInKbps is not a number")
223+
}
224+
profileThresholds[profile] = thresh
225+
}
226+
return profileThresholds, nil
227+
}
228+
229+
func getCRStates(monitorFQDN string, client *http.Client) (CRStates, error) {
230+
path := `/publish/CrStates`
231+
resp, err := client.Get("http://" + monitorFQDN + path)
232+
if err != nil {
233+
return CRStates{}, errors.New("getting CRStates from Monitor '" + monitorFQDN + "': " + err.Error())
234+
}
235+
defer resp.Body.Close()
236+
237+
crs := CRStates{}
238+
if err := json.NewDecoder(resp.Body).Decode(&crs); err != nil {
239+
return CRStates{}, errors.New("decoding CRStates from monitor '" + monitorFQDN + "': " + err.Error())
240+
}
241+
return crs, nil
242+
}
243+
244+
func getCRConfig(monitorFQDN string, client *http.Client) (CRConfig, error) {
245+
path := `/publish/CrConfig`
246+
resp, err := client.Get("http://" + monitorFQDN + path)
247+
if err != nil {
248+
return CRConfig{}, errors.New("getting CRConfig from Monitor '" + monitorFQDN + "': " + err.Error())
249+
}
250+
defer resp.Body.Close()
251+
crs := CRConfig{}
252+
if err := json.NewDecoder(resp.Body).Decode(&crs); err != nil {
253+
return CRConfig{}, errors.New("decoding CRConfig from monitor '" + monitorFQDN + "': " + err.Error())
254+
}
255+
return crs, nil
256+
}
257+
258+
// CacheStats contains the Monitor CacheStats needed by Cachedata. It is NOT the full object served by the Monitor, but only the data required by the caches stats endpoint.
259+
type CacheStats struct {
260+
Caches map[tc.CacheName]CacheStat `json:"caches"`
261+
}
262+
263+
type CacheStat struct {
264+
KBPS []CacheStatData `json:"kbps"`
265+
MaxKBPS []CacheStatData `json:"maxKbps"`
266+
}
267+
268+
type CacheStatData struct {
269+
Value float64 `json:"value,string"`
270+
}
271+
272+
// getCacheStats gets the cache stats from the given monitor. It takes stats, a slice of stat names; and cacheStats, an object to deserialize stats into. The cacheStats type must be of the form struct {caches map[tc.CacheName]struct{statName []struct{value float64}}} with the desired stats, with appropriate member names or tags.
273+
func getCacheStats(monitorFQDN string, client *http.Client, stats []string, cacheStats interface{}) error {
274+
path := `/publish/CacheStats`
275+
if len(stats) > 0 {
276+
path += `?stats=` + strings.Join(stats, `,`)
277+
}
278+
resp, err := client.Get("http://" + monitorFQDN + path)
279+
if err != nil {
280+
return errors.New("getting CacheStats from Monitor '" + monitorFQDN + "': " + err.Error())
281+
}
282+
defer resp.Body.Close()
283+
if err := json.NewDecoder(resp.Body).Decode(cacheStats); err != nil {
284+
return errors.New("decoding CacheStats from monitor '" + monitorFQDN + "': " + err.Error())
285+
}
286+
return nil
287+
}
288+
289+
// getCDNMonitors returns an FQDN, including port, of an online monitor for each CDN. If a CDN has no online monitors, that CDN will not have an entry in the map. If a CDN has multiple online monitors, an arbitrary one will be returned.
290+
func getCDNMonitorFQDNs(tx *sql.Tx) (map[tc.CDNName][]string, error) {
291+
rows, err := tx.Query(`
292+
SELECT s.host_name, s.domain_name, s.tcp_port, c.name as cdn
293+
FROM server as s
294+
JOIN type as t ON s.type = t.id
295+
JOIN status as st ON st.id = s.status
296+
JOIN cdn as c ON c.id = s.cdn_id
297+
WHERE t.name = '` + tc.MonitorTypeName + `'
298+
AND st.name = '` + MonitorOnlineStatus + `'
299+
`)
300+
if err != nil {
301+
return nil, errors.New("querying monitors: " + err.Error())
302+
}
303+
defer rows.Close()
304+
monitors := map[tc.CDNName][]string{}
305+
for rows.Next() {
306+
host := ""
307+
domain := ""
308+
port := sql.NullInt64{}
309+
cdn := tc.CDNName("")
310+
if err := rows.Scan(&host, &domain, &port, &cdn); err != nil {
311+
return nil, errors.New("scanning monitors: " + err.Error())
312+
}
313+
fqdn := host + "." + domain
314+
if port.Valid {
315+
fqdn += ":" + strconv.FormatInt(port.Int64, 10)
316+
}
317+
monitors[cdn] = append(monitors[cdn], fqdn)
318+
}
319+
return monitors, nil
320+
}
321+
322+
// getGlobalParams returns the value of the global param, whether it existed, or any error
323+
func getGlobalParam(tx *sql.Tx, name string) (string, bool, error) {
324+
return getParam(tx, name, "global")
325+
}
326+
327+
// getGlobalParams returns the value of the param, whether it existed, or any error.
328+
func getParam(tx *sql.Tx, name string, configFile string) (string, bool, error) {
329+
val := ""
330+
if err := tx.QueryRow(`select value from parameter where name = $1 and config_file = $2`, name, configFile).Scan(&val); err != nil {
331+
if err == sql.ErrNoRows {
332+
return "", false, nil
333+
}
334+
return "", false, errors.New("Error querying global paramter '" + name + "': " + err.Error())
335+
}
336+
return val, true, nil
337+
}

traffic_ops/traffic_ops_golang/routing/routes.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,10 @@ func Routes(d ServerData) ([]Route, []RawRoute, http.Handler, error) {
123123
//CDN
124124
{1.1, http.MethodGet, `cdns/name/{name}/sslkeys/?(\.json)?$`, cdn.GetSSLKeys, auth.PrivLevelAdmin, Authenticated, nil},
125125
{1.1, http.MethodGet, `cdns/metric_types`, notImplementedHandler, 0, NoAuth, nil}, // MUST NOT end in $, because the 1.x route is longer
126-
{1.1, http.MethodGet, `cdns/capacity$`, handlerToFunc(proxyHandler), 0, NoAuth, []Middleware{}},
126+
127+
{1.1, http.MethodGet, `cdns/capacity$`, cdn.GetCapacity, auth.PrivLevelReadOnly, Authenticated, nil},
127128
{1.1, http.MethodGet, `cdns/configs/?(\.json)?$`, cdn.GetConfigs, auth.PrivLevelReadOnly, Authenticated, nil},
129+
128130
{1.1, http.MethodGet, `cdns/domains/?(\.json)?$`, cdn.DomainsHandler, auth.PrivLevelReadOnly, Authenticated, nil},
129131
{1.1, http.MethodGet, `cdns/health$`, handlerToFunc(proxyHandler), 0, NoAuth, []Middleware{}},
130132
{1.1, http.MethodGet, `cdns/routing$`, handlerToFunc(proxyHandler), 0, NoAuth, []Middleware{}},

0 commit comments

Comments
 (0)