Skip to content

Ken/use alt appliances #39078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 281 additions & 15 deletions pkg/collector/corechecks/network-devices/versa/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"strconv"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
defaultMaxAttempts = 3
defaultMaxPages = 100
defaultMaxCount = "2000"
defaultLookback = 30 * time.Minute
defaultLookback = "30minutesAgo"
defaultHTTPTimeout = 10
defaultHTTPScheme = "https"
)
Expand All @@ -47,14 +49,14 @@ type Client struct {
maxAttempts int
maxPages int
maxCount string // Stored as string to be passed as an HTTP param
lookback time.Duration
lookback string
}

// ClientOptions are the functional options for the Versa client
type ClientOptions func(*Client)

// NewClient creates a new Versa HTTP client.
func NewClient(directorEndpoint, analyticsEndpoint, username, password string, useHTTP bool, options ...ClientOptions) (*Client, error) {
func NewClient(directorEndpoint string, directorPort int, analyticsEndpoint string, username string, password string, useHTTP bool, skipCertVerification bool, options ...ClientOptions) (*Client, error) {
err := validateParams(directorEndpoint, analyticsEndpoint, username, password)
if err != nil {
return nil, err
Expand All @@ -65,9 +67,21 @@ func NewClient(directorEndpoint, analyticsEndpoint, username, password string, u
return nil, err
}

// TODO: remove this in favor of allowing certs to be
// added, this is just for triage/testing
if skipCertVerification {
log.Warnf("TLS Certificate Verification disabled!")
}
httpTransport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: skipCertVerification,
},
}

httpClient := &http.Client{
Timeout: defaultHTTPTimeout * time.Second,
Jar: cookieJar,
Transport: httpTransport,
Timeout: defaultHTTPTimeout * time.Second,
Jar: cookieJar,
}

scheme := defaultHTTPScheme
Expand All @@ -88,7 +102,7 @@ func NewClient(directorEndpoint, analyticsEndpoint, username, password string, u
client := &Client{
httpClient: httpClient,
directorEndpoint: directorEndpointURL.String(),
directorAPIPort: 9182, // TODO: make configurable based on auth type
directorAPIPort: directorPort,
analyticsEndpoint: analyticsEndpointURL.String(),
username: username,
password: password,
Expand Down Expand Up @@ -175,9 +189,9 @@ func WithMaxPages(maxPages int) ClientOptions {
}

// WithLookback is a functional option to set the client lookback interval
func WithLookback(lookback time.Duration) ClientOptions {
func WithLookback(lookback int) ClientOptions {
return func(c *Client) {
c.lookback = lookback
c.lookback = createLookbackString(lookback)
}
}

Expand Down Expand Up @@ -251,6 +265,163 @@ func (client *Client) GetChildAppliancesDetail(tenant string) ([]Appliance, erro
return appliances, nil
}

// GetAppliances retrieves a list of appliances using the general appliance endpoint
func (client *Client) GetAppliances() ([]Appliance, error) {
var allAppliances []Appliance
params := map[string]string{
"limit": client.maxCount,
"offset": "0",
}

// Make the first request to get the first page and total count
resp, err := get[ApplianceListResponse](client, "/vnms/appliance/appliance", params, false)
if err != nil {
return nil, fmt.Errorf("failed to get appliances: %v", err)
}
if resp == nil {
return nil, errors.New("failed to get appliances: returned nil")
}

// Add the first page of appliances
allAppliances = append(allAppliances, resp.Appliances...)

// Calculate remaining pages needed
maxCount, _ := strconv.Atoi(client.maxCount)
if maxCount <= 0 {
return nil, fmt.Errorf("invalid max count: %d", maxCount)
}

totalPages := (resp.TotalCount + maxCount - 1) / maxCount // calculate total pages, rounding up

// Paginate through the remaining pages
for i := 1; i < totalPages; i++ {
params["offset"] = strconv.Itoa(i * maxCount)

pageResp, err := get[ApplianceListResponse](client, "/vnms/appliance/appliance", params, false)
if err != nil {
return nil, fmt.Errorf("failed to get appliances page %d: %v", i+1, err)
}
if pageResp == nil {
return nil, fmt.Errorf("failed to get appliances page %d: returned nil", i+1)
}

allAppliances = append(allAppliances, pageResp.Appliances...)
}

// Verify we got the expected number of appliances
if len(allAppliances) != resp.TotalCount {
return nil, fmt.Errorf("failed to get all appliances: expected %d, got %d", resp.TotalCount, len(allAppliances))
}

return allAppliances, nil
}

// GetInterfaces retrieves a list of interfaces for a specific tenant
func (client *Client) GetInterfaces(tenantName string) ([]Interface, error) {
if tenantName == "" {
return nil, fmt.Errorf("tenantName cannot be empty")
}

params := map[string]string{
"tenantName": tenantName,
}

resp, err := get[InterfaceListResponse](client, "/vnms/dashboard/health/interface", params, false)
if err != nil {
return nil, fmt.Errorf("failed to get interfaces: %v", err)
}
if resp == nil {
return nil, errors.New("failed to get interfaces: returned nil")
}

return resp.List.Value, nil
}

// GetInterfaceMetrics retrieves interface metrics for a specific appliance and tenant using pagination
func (client *Client) GetInterfaceMetrics(applianceName string, tenantName string) ([]InterfaceMetrics, error) {
if applianceName == "" {
return nil, fmt.Errorf("applianceName cannot be empty")
}
if tenantName == "" {
return nil, fmt.Errorf("tenantName cannot be empty")
}

var allMetrics []InterfaceMetrics

// make the initial request to get the first page and query-id
initialEndpoint := fmt.Sprintf("/vnms/dashboard/appliance/%s/pageable_interfaces", applianceName)
params := map[string]string{
"orgName": tenantName,
"limit": client.maxCount,
}

resp, err := get[InterfaceMetricsResponse](client, initialEndpoint, params, false)
if err != nil {
return nil, fmt.Errorf("failed to get interface metrics (initial request): %v", err)
}
if resp == nil {
return nil, errors.New("failed to get interface metrics: initial response was nil")
}

queryID := resp.QueryID
if queryID == "" {
return nil, errors.New("failed to get interface metrics: no query-id returned")
}
// always attempt to clean up the query
defer client.closeQuery(queryID)

// add the first page of data
allMetrics = append(allMetrics, resp.Collection.Interfaces...)

// if the first page has less interfaces than the limit,
// there's no need to paginate
maxCount, _ := strconv.Atoi(client.maxCount)
if len(resp.Collection.Interfaces) < maxCount {
return allMetrics, nil
}

// paginate through remaining data using the query-id
nextPageEndpoint := "/vnms/dashboard/appliance/next_page_data"
for range client.maxPages {
pageParams := map[string]string{
"queryId": queryID,
}

pageResp, err := get[InterfaceMetricsResponse](client, nextPageEndpoint, pageParams, false)
if err != nil {
return nil, fmt.Errorf("failed to get interface metrics (pagination): %v", err)
}
if pageResp == nil {
return nil, errors.New("failed to get interface metrics: page response was nil")
}

// Add the page data to our results
allMetrics = append(allMetrics, pageResp.Collection.Interfaces...)

// If we get a response with less than the limit,
// we've hit the last page
if len(pageResp.Collection.Interfaces) < maxCount {
break
}
}

return allMetrics, nil
}

// closeQuery closes a pageable query to clean up resources
func (client *Client) closeQuery(queryID string) {
closeEndpoint := "/vnms/dashboard/appliance/close_query"
params := map[string]string{
"queryId": queryID,
}

// We don't expect any meaningful response from the close endpoint
_, err := client.get(closeEndpoint, params, false)
if err != nil {
log.Warnf("failed to close query %s: %v", queryID, err)
}
}

// GetDirectorStatus retrieves the director status
func (client *Client) GetDirectorStatus() (*DirectorStatus, error) {
resp, err := get[DirectorStatus](client, "/vnms/dashboard/vdStatus", nil, false)
Expand All @@ -262,7 +433,7 @@ func (client *Client) GetDirectorStatus() (*DirectorStatus, error) {
}

// TODO: clean this up to be more generalizable
func parseAaData(data [][]interface{}) ([]SLAMetrics, error) {
func parseSLAMetrics(data [][]interface{}) ([]SLAMetrics, error) {
var rows []SLAMetrics
for _, row := range data {
m := SLAMetrics{}
Expand Down Expand Up @@ -308,8 +479,8 @@ func parseAaData(data [][]interface{}) ([]SLAMetrics, error) {
}

// GetSLAMetrics retrieves SLA metrics from the Versa Analytics API
func (client *Client) GetSLAMetrics() ([]SLAMetrics, error) {
analyticsURL := buildAnalyticsPath("datadog", "SDWAN", "15minutesAgo", "slam(localsite,remotesite,localaccckt,remoteaccckt,fc)", "tableData", []string{
func (client *Client) GetSLAMetrics(tenant string) ([]SLAMetrics, error) {
analyticsURL := client.buildAnalyticsPath(tenant, "SDWAN", "slam(localsite,remotesite,localaccckt,remoteaccckt,fc)", "tableData", []string{
"delay",
"fwdDelayVar",
"revDelayVar",
Expand All @@ -318,18 +489,109 @@ func (client *Client) GetSLAMetrics() ([]SLAMetrics, error) {
"pduLossRatio",
})

resp, err := get[SLAMetricsResponse](client, analyticsURL, nil, true)
resp, err := get[AnalyticsMetricsResponse](client, analyticsURL, nil, true)
if err != nil {
return nil, fmt.Errorf("failed to get SLA metrics: %v", err)
}
aaData := resp.AaData
metrics, err := parseAaData(aaData)
metrics, err := parseSLAMetrics(aaData)
if err != nil {
return nil, fmt.Errorf("failed to parse SLA metrics: %v", err)
}
return metrics, nil
}

// GetLinkStatusMetrics retrieves link metrics from the Versa Analytics API
func (client *Client) GetLinkStatusMetrics(tenant string) error {
analyticsURL := client.buildAnalyticsPath(tenant, "SDWAN", "linkstatus(site,accckt)", "table", []string{
"availability",
})

resp, err := client.get(analyticsURL, nil, true)
if err != nil {
return fmt.Errorf("failed to get Link Metrics: %v", err)
}

log.Tracef("Link Metrics Response: %s", string(resp))

return nil
}

// parseLinkUsageMetrics parses the raw AaData response into LinkUsageMetrics structs
func parseLinkUsageMetrics(data [][]interface{}) ([]LinkUsageMetrics, error) {
var rows []LinkUsageMetrics
for _, row := range data {
m := LinkUsageMetrics{}
if len(row) < 13 {
return nil, fmt.Errorf("missing columns in row: got %d columns, expected 13", len(row))
}
// Type assertions for each value
var ok bool
if m.DrillKey, ok = row[0].(string); !ok {
return nil, fmt.Errorf("expected string for DrillKey")
}
if m.Site, ok = row[1].(string); !ok {
return nil, fmt.Errorf("expected string for Site")
}
if m.AccessCircuit, ok = row[2].(string); !ok {
return nil, fmt.Errorf("expected string for AccessCircuit")
}
if m.UplinkBandwidth, ok = row[3].(string); !ok {
return nil, fmt.Errorf("expected string for UplinkBandwidth")
}
if m.DownlinkBandwidth, ok = row[4].(string); !ok {
return nil, fmt.Errorf("expected string for DownlinkBandwidth")
}
if m.Type, ok = row[5].(string); !ok {
return nil, fmt.Errorf("expected string for Type")
}
if m.Media, ok = row[6].(string); !ok {
return nil, fmt.Errorf("expected string for Media")
}
if m.IP, ok = row[7].(string); !ok {
return nil, fmt.Errorf("expected string for IP")
}
if m.ISP, ok = row[8].(string); !ok {
return nil, fmt.Errorf("expected string for ISP")
}

// Floats from index 9–12
floatFields := []*float64{
&m.VolumeTx, &m.VolumeRx, &m.BandwidthTx, &m.BandwidthRx,
}
for i, ptr := range floatFields {
if val, ok := row[i+9].(float64); ok {
*ptr = val
} else {
return nil, fmt.Errorf("expected float64 at index %d", i+9)
}
}
rows = append(rows, m)
}
return rows, nil
}

// GetLinkUsageMetrics gets link metrics for a Versa tenant
func (client *Client) GetLinkUsageMetrics(tenant string) ([]LinkUsageMetrics, error) {
analyticsURL := client.buildAnalyticsPath(tenant, "SDWAN", "linkusage(site,accckt,accckt.uplinkBW,accckt.downlinkBW,accckt.type,accckt.media,accckt.ip,accckt.isp)", "tableData", []string{
"volume-tx",
"volume-rx",
"bw-tx",
"bw-rx",
})

resp, err := get[AnalyticsMetricsResponse](client, analyticsURL, nil, true)
if err != nil {
return nil, fmt.Errorf("failed to get link usage metrics: %v", err)
}
aaData := resp.AaData
metrics, err := parseLinkUsageMetrics(aaData)
if err != nil {
return nil, fmt.Errorf("failed to parse link usage metrics: %v", err)
}
return metrics, nil
}

// buildAnalyticsPath constructs a Versa Analytics query path in a cleaner way so multiple metrics can be added.
//
// Parameters:
Expand All @@ -341,11 +603,11 @@ func (client *Client) GetSLAMetrics() ([]SLAMetrics, error) {
// - metrics: list of metric strings (e.g., "delay", "fwdLossRatio").
//
// Returns the full encoded URL string.
func buildAnalyticsPath(tenant string, feature string, startDate string, query string, queryType string, metrics []string) string {
func (client *Client) buildAnalyticsPath(tenant string, feature string, query string, queryType string, metrics []string) string {
baseAnalyticsPath := "/versa/analytics/v1.0.0/data/provider"
path := fmt.Sprintf("%s/tenants/%s/features/%s", baseAnalyticsPath, tenant, feature)
params := url.Values{
"start-date": []string{startDate},
"start-date": []string{client.lookback},
"qt": []string{queryType},
"q": []string{query},
"ds": []string{"aggregate"}, // this seems to be the only datastore supported (from docs)
Expand All @@ -355,3 +617,7 @@ func buildAnalyticsPath(tenant string, feature string, startDate string, query s
}
return path + "?" + params.Encode()
}

func createLookbackString(lookbackMinutes int) string {
return fmt.Sprintf("%dminutesAgo", lookbackMinutes)
}
Loading