Skip to content

Commit 8b85ee4

Browse files
committed
USHIFT-5297: Nits
1 parent f8b477a commit 8b85ee4

File tree

4 files changed

+129
-40
lines changed

4 files changed

+129
-40
lines changed

pkg/controllers/clusterid.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,11 @@ func initClusterIDFile(clusterID string) error {
9898
klog.Infof("Writing MicroShift Cluster ID '%v' to '%v'", clusterID, ClusterIDFilePath)
9999
return os.WriteFile(ClusterIDFilePath, []byte(clusterID), 0400)
100100
}
101+
102+
func GetClusterId() (string, error) {
103+
data, err := os.ReadFile(ClusterIDFilePath)
104+
if err != nil {
105+
return "", fmt.Errorf("failed to read file: %v", err)
106+
}
107+
return string(data), nil
108+
}

pkg/controllers/telemetry.go

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -49,58 +49,51 @@ func (t *TelemetryManager) Dependencies() []string {
4949
}
5050

5151
func (t *TelemetryManager) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error {
52+
// The service manager expects a service to get ready eventually before a timeout or else
53+
// MicroShift fails to start correctly. Stopping a service is relevant only when stopping
54+
// MicroShift, therefore if Telemetry is disabled we need to signal a fake readiness (even
55+
// though it is technically correct, the service is ready even if it is not going to do anything).
56+
// For stoppin the service we need to close the stopped channel or else MicroShift fails
57+
// to stop gracefully.
5258
defer close(stopped)
53-
defer close(ready)
54-
5559
if t.config.Telemetry.Status == config.StatusDisabled {
5660
klog.Info("Telemetry is disabled")
61+
close(ready)
5762
return nil
5863
}
59-
60-
clusterId, err := getClusterId()
64+
clusterId, err := GetClusterId()
6165
if err != nil {
66+
close(ready)
6267
return fmt.Errorf("unable to get cluster id: %v", err)
6368
}
64-
65-
go func() {
66-
client := telemetry.NewTelemetryClient(t.config.Telemetry.Endpoint, clusterId)
67-
collectAndSend := func() {
68-
pullSecret, err := readPullSecret()
69-
if err != nil {
70-
klog.Errorf("Unable to get pull secret: %v", err)
71-
return
72-
}
73-
//TODO swap with collected metrics from client.
74-
metrics := []telemetry.Metric{}
75-
if err := client.Send(ctx, pullSecret, metrics); err != nil {
76-
klog.Errorf("Failed to send metrics: %v", err)
77-
}
69+
close(ready)
70+
client := telemetry.NewTelemetryClient(t.config.Telemetry.Endpoint, clusterId)
71+
collectAndSend := func() {
72+
pullSecret, err := readPullSecret()
73+
if err != nil {
74+
klog.Errorf("Unable to get pull secret: %v", err)
75+
return
7876
}
79-
80-
klog.Infof("First metrics collection")
81-
collectAndSend()
82-
83-
for {
84-
select {
85-
case <-ctx.Done():
86-
klog.Infof("collect and send for the last time")
87-
collectAndSend()
88-
return
89-
case <-time.After(time.Hour):
90-
klog.Infof("collect and send again")
91-
collectAndSend()
92-
}
77+
//TODO swap with collected metrics from client.
78+
metrics := []telemetry.Metric{}
79+
if err := client.Send(ctx, pullSecret, metrics); err != nil {
80+
klog.Errorf("Failed to send metrics: %v", err)
9381
}
94-
}()
95-
return nil
96-
}
82+
}
9783

98-
func getClusterId() (string, error) {
99-
data, err := os.ReadFile(ClusterIDFilePath)
100-
if err != nil {
101-
return "", fmt.Errorf("failed to read file: %v", err)
84+
klog.Infof("First metrics collection")
85+
collectAndSend()
86+
for {
87+
select {
88+
case <-ctx.Done():
89+
klog.Infof("Collect and send for the last time")
90+
collectAndSend()
91+
return nil
92+
case <-time.After(time.Hour):
93+
klog.Infof("Collect and send again")
94+
collectAndSend()
95+
}
10296
}
103-
return string(data), nil
10497
}
10598

10699
func readPullSecret() (string, error) {

pkg/telemetry/telemetry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,15 @@ func (t *TelemetryClient) Send(ctx context.Context, pullSecret string, metrics [
8989
return fmt.Errorf("unable to do the request: %v", err)
9090
}
9191
defer func() {
92+
// Discard the body to close the TCP socket right away instead of waiting for
93+
// the timeout in TIME_WAIT.
9294
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
9395
klog.Error(err, "error discarding body")
9496
}
9597
resp.Body.Close()
9698
}()
9799
if resp.StatusCode == http.StatusOK {
100+
klog.Infof("Metrics sent successfully")
98101
return nil
99102
}
100103
body, err := io.ReadAll(resp.Body)

pkg/telemetry/telemetry_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package telemetry
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"net/http/httptest"
10+
"reflect"
11+
"testing"
12+
"time"
13+
14+
proto "github.com/gogo/protobuf/proto"
15+
"github.com/golang/snappy"
16+
"github.com/prometheus/prometheus/prompb"
17+
)
18+
19+
func TestTelemetryClient_Send(t *testing.T) {
20+
clusterId := "fake-cluster-id"
21+
pullSecret := "fake-pull-secret"
22+
expectedBearer := fmt.Sprintf("Bearer %s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(authString, pullSecret, clusterId))))
23+
sampleMetrics := []Metric{
24+
{
25+
Name: "fake-metric",
26+
Labels: []MetricLabel{{Name: "fake-label", Value: "fake-value"}},
27+
Timestamp: 1234567890,
28+
Value: 42,
29+
},
30+
}
31+
expectedWriteRequest := prompb.WriteRequest{
32+
Metadata: []prompb.MetricMetadata{
33+
{MetricFamilyName: "fake-metric", Type: prompb.MetricMetadata_COUNTER},
34+
},
35+
Timeseries: []prompb.TimeSeries{
36+
{
37+
Labels: []prompb.Label{
38+
{Name: "__name__", Value: "fake-metric"},
39+
{Name: "fake-label", Value: "fake-value"},
40+
},
41+
Samples: []prompb.Sample{
42+
{Timestamp: 1234567890, Value: 42},
43+
},
44+
},
45+
},
46+
}
47+
48+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
49+
if r.Header.Get("Content-Type") != "application/x-protobuf" {
50+
t.Fatalf("Expected Content-Type application/x-protobuf, got %s", r.Header.Get("Content-Type"))
51+
}
52+
if r.Header.Get("Content-Encoding") != "snappy" {
53+
t.Fatalf("Expected Content-Encoding snappy, got %s", r.Header.Get("Content-Encoding"))
54+
}
55+
if r.Header.Get("Authorization") != expectedBearer {
56+
t.Fatalf("Expected Authorization '%s', got %s", expectedBearer, r.Header.Get("Authorization"))
57+
}
58+
59+
bodyBytes, err := io.ReadAll(r.Body)
60+
if err != nil {
61+
t.Fatalf("Failed to read request body: %v", err)
62+
}
63+
writeBytes, err := snappy.Decode(nil, bodyBytes)
64+
if err != nil {
65+
t.Fatalf("Failed to decode snappy body: %v", err)
66+
}
67+
var req prompb.WriteRequest
68+
if err := proto.Unmarshal(writeBytes, &req); err != nil {
69+
t.Fatalf("failed to unmarshal WriteRequest: %v", err)
70+
}
71+
if !reflect.DeepEqual(req, expectedWriteRequest) {
72+
t.Fatalf("Expected WriteRequest %v, got %v", expectedWriteRequest, req)
73+
}
74+
w.WriteHeader(http.StatusOK)
75+
}))
76+
defer server.Close()
77+
78+
client := NewTelemetryClient(server.URL, clusterId)
79+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
80+
defer cancel()
81+
err := client.Send(ctx, pullSecret, sampleMetrics)
82+
if err != nil {
83+
t.Fatalf("Send method failed: %v", err)
84+
}
85+
}

0 commit comments

Comments
 (0)