Skip to content

Commit c99cf66

Browse files
easyCZiQQBot
authored andcommitted
# This is a combination of 2 commits.
# This is the 1st commit message: [usage] Refactor controller package into scheduler # This is the commit message #2: [usage] Refactor controller package into scheduler
1 parent deb1c5a commit c99cf66

File tree

8 files changed

+193
-183
lines changed

8 files changed

+193
-183
lines changed

components/ide-metrics/pkg/server/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func Test_allowListCollector_Reconcile(t *testing.T) {
119119
for _, tt := range tests {
120120
t.Run(tt.name, func(t *testing.T) {
121121
if got := c.Reconcile(tt.args.labels); !reflect.DeepEqual(got, tt.want) {
122-
t.Errorf("allowListCollector.Reconcile() = %v, want %v", got, tt.want)
122+
t.Errorf("allowListCollector.Run() = %v, want %v", got, tt.want)
123123
}
124124
})
125125
}

components/usage/pkg/controller/controller.go

Lines changed: 0 additions & 84 deletions
This file was deleted.

components/usage/pkg/controller/controller_test.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

components/usage/pkg/controller/reconciler.go renamed to components/usage/pkg/scheduler/job.go

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,72 @@
22
// Licensed under the GNU Affero General Public License (AGPL).
33
// See License-AGPL.txt in the project root for license information.
44

5-
package controller
5+
package scheduler
66

77
import (
88
"context"
99
"fmt"
1010
"github.com/gitpod-io/gitpod/common-go/log"
1111
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
12+
"github.com/robfig/cron"
1213
"google.golang.org/protobuf/types/known/timestamppb"
1314
"time"
1415
)
1516

16-
type Reconciler interface {
17-
Reconcile() error
17+
type Job interface {
18+
Run() error
1819
}
1920

20-
type ReconcilerFunc func() error
21+
func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) {
22+
parsed, err := cron.Parse(fmt.Sprintf("@every %s", schedule.String()))
23+
if err != nil {
24+
return JobSpec{}, fmt.Errorf("failed to parse ledger job schedule: %w", err)
25+
}
2126

22-
func (f ReconcilerFunc) Reconcile() error {
23-
return f()
27+
return JobSpec{
28+
Job: job,
29+
ID: "ledger",
30+
Schedule: parsed,
31+
}, nil
2432
}
2533

26-
func NewLedgerReconciler(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerReconciler {
27-
return &LedgerReconciler{
34+
func NewLedgerTrigger(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerJob {
35+
return &LedgerJob{
2836
usageClient: usageClient,
2937
billingClient: billingClient,
38+
39+
running: make(chan struct{}, 1),
3040
}
3141
}
3242

33-
type LedgerReconciler struct {
43+
type LedgerJob struct {
3444
usageClient v1.UsageServiceClient
3545
billingClient v1.BillingServiceClient
46+
47+
running chan struct{}
3648
}
3749

38-
func (r *LedgerReconciler) Reconcile() (err error) {
50+
func (r *LedgerJob) Run() (err error) {
3951
ctx := context.Background()
4052

53+
select {
54+
// attempt a write to signal we want to run
55+
case r.running <- struct{}{}:
56+
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
57+
default:
58+
// we could not write, so another instance is already running. Skip current run.
59+
log.Infof("Skipping ledger run, another run is already in progress.")
60+
return nil
61+
}
62+
4163
now := time.Now().UTC()
4264
hourAgo := now.Add(-1 * time.Hour)
4365

44-
reportUsageReconcileStarted()
45-
defer func() {
46-
reportUsageReconcileFinished(time.Since(now), err)
47-
}()
48-
4966
logger := log.
5067
WithField("from", hourAgo).
5168
WithField("to", now)
5269

53-
logger.Info("Starting ledger reconciliation.")
70+
logger.Info("Running ledger job. Reconciling usage records.")
5471
_, err = r.usageClient.ReconcileUsageWithLedger(ctx, &v1.ReconcileUsageWithLedgerRequest{
5572
From: timestamppb.New(hourAgo),
5673
To: timestamppb.New(now),
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package scheduler
6+
7+
import (
8+
"context"
9+
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
14+
"sync"
15+
"sync/atomic"
16+
"testing"
17+
"time"
18+
)
19+
20+
func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
21+
client := &fakeUsageClient{}
22+
job := NewLedgerTrigger(client, nil)
23+
24+
invocations := 3
25+
wg := sync.WaitGroup{}
26+
wg.Add(invocations)
27+
for i := 0; i < invocations; i++ {
28+
go func() {
29+
_ = job.Run()
30+
wg.Done()
31+
}()
32+
}
33+
wg.Wait()
34+
35+
require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount))
36+
}
37+
38+
type fakeUsageClient struct {
39+
ReconcileUsageWithLedgerCallCount int32
40+
}
41+
42+
// GetCostCenter retrieves the active cost center for the given attributionID
43+
func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) {
44+
return nil, status.Error(codes.Unauthenticated, "not implemented")
45+
}
46+
47+
// SetCostCenter stores the given cost center
48+
func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) {
49+
return nil, status.Error(codes.Unauthenticated, "not implemented")
50+
}
51+
52+
// Triggers reconciliation of usage with ledger implementation.
53+
func (c *fakeUsageClient) ReconcileUsageWithLedger(ctx context.Context, in *v1.ReconcileUsageWithLedgerRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageWithLedgerResponse, error) {
54+
atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1)
55+
time.Sleep(1 * time.Second)
56+
57+
return nil, status.Error(codes.Unauthenticated, "not implemented")
58+
}
59+
60+
// ListUsage retrieves all usage for the specified attributionId and theb given time range
61+
func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) {
62+
return nil, status.Error(codes.Unauthenticated, "not implemented")
63+
}

components/usage/pkg/controller/reporter.go renamed to components/usage/pkg/scheduler/reporter.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the GNU Affero General Public License (AGPL).
33
// See License-AGPL.txt in the project root for license information.
44

5-
package controller
5+
package scheduler
66

77
import (
88
"fmt"
@@ -16,26 +16,26 @@ const (
1616
)
1717

1818
var (
19-
reconcileStartedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
19+
jobStartedSeconds = prometheus.NewCounterVec(prometheus.CounterOpts{
2020
Namespace: namespace,
2121
Subsystem: subsystem,
22-
Name: "reconcile_started_total",
23-
Help: "Number of usage reconciliation runs started",
24-
}, []string{})
22+
Name: "scheduler_job_started",
23+
Help: "Number of jobs started",
24+
}, []string{"job"})
2525

26-
reconcileStartedDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
26+
jobCompletedSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
2727
Namespace: namespace,
2828
Subsystem: subsystem,
29-
Name: "reconcile_completed_duration_seconds",
30-
Help: "Histogram of reconcile duration",
29+
Name: "scheduler_job_completed_seconds",
30+
Help: "Histogram of job duration",
3131
Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs
32-
}, []string{"outcome"})
32+
}, []string{"job", "outcome"})
3333
)
3434

3535
func RegisterMetrics(reg *prometheus.Registry) error {
3636
metrics := []prometheus.Collector{
37-
reconcileStartedTotal,
38-
reconcileStartedDurationSeconds,
37+
jobStartedSeconds,
38+
jobCompletedSeconds,
3939
}
4040
for _, metric := range metrics {
4141
err := reg.Register(metric)
@@ -47,14 +47,14 @@ func RegisterMetrics(reg *prometheus.Registry) error {
4747
return nil
4848
}
4949

50-
func reportUsageReconcileStarted() {
51-
reconcileStartedTotal.WithLabelValues().Inc()
50+
func reportJobStarted(id string) {
51+
jobStartedSeconds.WithLabelValues(id).Inc()
5252
}
5353

54-
func reportUsageReconcileFinished(duration time.Duration, err error) {
54+
func reportJobCompleted(id string, duration time.Duration, err error) {
5555
outcome := "success"
5656
if err != nil {
5757
outcome = "error"
5858
}
59-
reconcileStartedDurationSeconds.WithLabelValues(outcome).Observe(duration.Seconds())
59+
jobCompletedSeconds.WithLabelValues(id, outcome).Observe(duration.Seconds())
6060
}

0 commit comments

Comments
 (0)