Skip to content

Commit e7ca5e9

Browse files
committed
[supervisor] analyze image file changes
1 parent 8eb9d8c commit e7ca5e9

File tree

5 files changed

+287
-73
lines changed

5 files changed

+287
-73
lines changed

components/supervisor/pkg/config/gitpod-config.go

Lines changed: 147 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ package config
77
import (
88
"context"
99
"os"
10+
"path/filepath"
1011
"sync"
1112
"time"
1213

1314
"github.com/fsnotify/fsnotify"
14-
"github.com/sirupsen/logrus"
15-
"gopkg.in/yaml.v2"
15+
"gopkg.in/yaml.v3"
1616

17+
"github.com/gitpod-io/gitpod/common-go/log"
1718
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
1819
)
1920

@@ -23,142 +24,231 @@ type ConfigInterface interface {
2324
Watch(ctx context.Context)
2425
// Observe provides channels triggered whenever the config is changed
2526
Observe(ctx context.Context) <-chan *gitpod.GitpodConfig
27+
// Observe provides channels triggered whenever the image file is changed
28+
ObserveImageFile(ctx context.Context) <-chan *struct{}
2629
}
2730

2831
// ConfigService provides access to the gitpod config file.
2932
type ConfigService struct {
30-
location string
3133
locationReady <-chan struct{}
3234

33-
cond *sync.Cond
34-
config *gitpod.GitpodConfig
35+
configLocation string
36+
configWatcher *fileWatcher[gitpod.GitpodConfig]
3537

36-
pollTimer *time.Timer
38+
imageWatcher *fileWatcher[struct{}]
39+
}
3740

38-
log *logrus.Entry
41+
// NewConfigService creates a new instance of ConfigService.
42+
func NewConfigService(configLocation string, locationReady <-chan struct{}) *ConfigService {
43+
return &ConfigService{
44+
locationReady: locationReady,
45+
configLocation: configLocation,
46+
configWatcher: newFileWatcher(func(data []byte) (*gitpod.GitpodConfig, error) {
47+
var config *gitpod.GitpodConfig
48+
err := yaml.Unmarshal(data, &config)
49+
return config, err
50+
}),
51+
imageWatcher: newFileWatcher(func(data []byte) (*struct{}, error) {
52+
return &struct{}{}, nil
53+
}),
54+
}
55+
}
56+
57+
// Observe provides channels triggered whenever the config is changed.
58+
func (service *ConfigService) Observe(ctx context.Context) <-chan *gitpod.GitpodConfig {
59+
return service.configWatcher.observe(ctx)
60+
}
61+
62+
// Observe provides channels triggered whenever the image file is changed
63+
func (service *ConfigService) ObserveImageFile(ctx context.Context) <-chan *struct{} {
64+
return service.imageWatcher.observe(ctx)
65+
}
66+
67+
// Watch starts the config watching.
68+
func (service *ConfigService) Watch(ctx context.Context) {
69+
select {
70+
case <-service.locationReady:
71+
case <-ctx.Done():
72+
return
73+
}
74+
go service.watchImageFile(ctx)
75+
service.configWatcher.watch(ctx, service.configLocation)
76+
}
77+
78+
func (service *ConfigService) watchImageFile(ctx context.Context) {
79+
var (
80+
imageLocation string
81+
cancelWatch func()
82+
)
83+
defer func() {
84+
if cancelWatch != nil {
85+
cancelWatch()
86+
}
87+
}()
88+
cfgs := service.configWatcher.observe(ctx)
89+
for {
90+
select {
91+
case cfg, ok := <-cfgs:
92+
if !ok {
93+
return
94+
}
95+
var currentImageLocation string
96+
if cfg != nil {
97+
switch img := cfg.Image.(type) {
98+
case map[string]interface{}:
99+
currentImageLocation = filepath.Join(filepath.Dir(service.configLocation), img["file"].(string))
100+
}
101+
}
102+
if imageLocation == currentImageLocation {
103+
continue
104+
}
105+
if cancelWatch != nil {
106+
cancelWatch()
107+
cancelWatch = nil
108+
service.imageWatcher.reset()
109+
}
110+
imageLocation = currentImageLocation
111+
if imageLocation == "" {
112+
continue
113+
}
114+
watchCtx, cancel := context.WithCancel(ctx)
115+
cancelWatch = cancel
116+
go service.imageWatcher.watch(watchCtx, imageLocation)
117+
case <-ctx.Done():
118+
return
119+
}
120+
}
121+
}
122+
123+
type fileWatcher[T any] struct {
124+
unmarshal func(data []byte) (*T, error)
125+
126+
cond *sync.Cond
127+
data *T
128+
129+
pollTimer *time.Timer
39130

40131
ready chan struct{}
41132
readyOnce sync.Once
42133

43134
debounceDuration time.Duration
44135
}
45136

46-
// NewConfigService creates a new instance of ConfigService.
47-
func NewConfigService(configLocation string, locationReady <-chan struct{}, log *logrus.Entry) *ConfigService {
48-
return &ConfigService{
49-
location: configLocation,
50-
locationReady: locationReady,
137+
func newFileWatcher[T any](unmarshal func(data []byte) (*T, error)) *fileWatcher[T] {
138+
return &fileWatcher[T]{
139+
unmarshal: unmarshal,
51140
cond: sync.NewCond(&sync.Mutex{}),
52-
log: log.WithField("location", configLocation),
53141
ready: make(chan struct{}),
54142
debounceDuration: 100 * time.Millisecond,
55143
}
56144
}
57145

58-
// Observe provides channels triggered whenever the config is changed.
59-
func (service *ConfigService) Observe(ctx context.Context) <-chan *gitpod.GitpodConfig {
60-
configs := make(chan *gitpod.GitpodConfig)
146+
func (service *fileWatcher[T]) observe(ctx context.Context) <-chan *T {
147+
results := make(chan *T)
61148
go func() {
62-
defer close(configs)
149+
defer close(results)
63150

64151
<-service.ready
65152

66153
service.cond.L.Lock()
67154
defer service.cond.L.Unlock()
68155
for {
69-
configs <- service.config
156+
results <- service.data
70157

71158
service.cond.Wait()
72159
if ctx.Err() != nil {
73160
return
74161
}
75162
}
76163
}()
77-
return configs
164+
return results
78165
}
79166

80-
// Watch starts the config watching.
81-
func (service *ConfigService) Watch(ctx context.Context) {
82-
service.log.Info("gitpod config watcher: starting...")
167+
func (service *fileWatcher[T]) markReady() {
168+
service.readyOnce.Do(func() {
169+
close(service.ready)
170+
})
171+
}
83172

84-
select {
85-
case <-service.locationReady:
86-
case <-ctx.Done():
87-
return
88-
}
173+
func (service *fileWatcher[T]) reset() {
174+
service.cond.L.Lock()
175+
defer service.cond.L.Unlock()
89176

90-
_, err := os.Stat(service.location)
91-
if os.IsNotExist(err) {
92-
service.poll(ctx)
177+
if service.data != nil {
178+
service.data = nil
179+
service.cond.Broadcast()
93180
}
94-
service.watch(ctx)
95181
}
96182

97-
func (service *ConfigService) markReady() {
98-
service.readyOnce.Do(func() {
99-
close(service.ready)
100-
})
183+
func (service *fileWatcher[T]) watch(ctx context.Context, location string) {
184+
log.WithField("location", location).Info("file watcher: starting...")
185+
186+
_, err := os.Stat(location)
187+
if os.IsNotExist(err) {
188+
service.poll(ctx, location)
189+
}
190+
service.doWatch(ctx, location)
101191
}
102192

103-
func (service *ConfigService) watch(ctx context.Context) {
193+
func (service *fileWatcher[T]) doWatch(ctx context.Context, location string) {
104194
watcher, err := fsnotify.NewWatcher()
105195
defer func() {
106196
if err != nil {
107-
service.log.WithError(err).Error("gitpod config watcher: failed to start")
197+
log.WithField("location", location).WithError(err).Error("file watcher: failed to start")
108198
return
109199
}
110200

111-
service.log.Info("gitpod config watcher: started")
201+
log.WithField("location", location).Info("file watcher: started")
112202
}()
113203
if err != nil {
114204
return
115205
}
116206

117-
err = watcher.Add(service.location)
207+
err = watcher.Add(location)
118208
if err != nil {
119209
watcher.Close()
120210
return
121211
}
122212

123213
go func() {
124-
defer service.log.Info("gitpod config watcher: stopped")
214+
defer log.WithField("location", location).Info("file watcher: stopped")
125215
defer watcher.Close()
126216

127217
polling := make(chan struct{}, 1)
128-
service.scheduleUpdateConfig(ctx, polling)
218+
service.scheduleUpdate(ctx, polling, location)
129219
for {
130220
select {
131221
case <-polling:
132222
return
133223
case <-ctx.Done():
134224
return
135225
case err := <-watcher.Errors:
136-
service.log.WithError(err).Error("gitpod config watcher: failed to watch")
226+
log.WithField("location", location).WithError(err).Error("file watcher: failed to watch")
137227
case <-watcher.Events:
138-
service.scheduleUpdateConfig(ctx, polling)
228+
service.scheduleUpdate(ctx, polling, location)
139229
}
140230
}
141231
}()
142232
}
143233

144-
func (service *ConfigService) scheduleUpdateConfig(ctx context.Context, polling chan<- struct{}) {
234+
func (service *fileWatcher[T]) scheduleUpdate(ctx context.Context, polling chan<- struct{}, location string) {
145235
service.cond.L.Lock()
146236
defer service.cond.L.Unlock()
147237
if service.pollTimer != nil {
148238
service.pollTimer.Stop()
149239
}
150240
service.pollTimer = time.AfterFunc(service.debounceDuration, func() {
151-
err := service.updateConfig()
241+
err := service.update(location)
152242
if os.IsNotExist(err) {
153243
polling <- struct{}{}
154-
go service.poll(ctx)
244+
go service.poll(ctx, location)
155245
} else if err != nil {
156-
service.log.WithError(err).Error("gitpod config watcher: failed to parse")
246+
log.WithField("location", location).WithError(err).Error("file watcher: failed to parse")
157247
}
158248
})
159249
}
160250

161-
func (service *ConfigService) poll(ctx context.Context) {
251+
func (service *fileWatcher[T]) poll(ctx context.Context, location string) {
162252
service.markReady()
163253

164254
timer := time.NewTicker(2 * time.Second)
@@ -171,35 +261,33 @@ func (service *ConfigService) poll(ctx context.Context) {
171261
case <-timer.C:
172262
}
173263

174-
if _, err := os.Stat(service.location); !os.IsNotExist(err) {
175-
service.watch(ctx)
264+
if _, err := os.Stat(location); !os.IsNotExist(err) {
265+
service.doWatch(ctx, location)
176266
return
177267
}
178268
}
179269
}
180270

181-
func (service *ConfigService) updateConfig() error {
271+
func (service *fileWatcher[T]) update(location string) error {
182272
service.cond.L.Lock()
183273
defer service.cond.L.Unlock()
184274

185-
config, err := service.parse()
275+
data, err := service.parse(location)
186276
if err == nil || os.IsNotExist(err) {
187-
service.config = config
277+
service.data = data
188278
service.markReady()
189279
service.cond.Broadcast()
190280

191-
service.log.WithField("config", service.config).Debug("gitpod config watcher: updated")
281+
log.WithField("location", location).WithField("data", service.data).Debug("file watcher: updated")
192282
}
193283

194284
return err
195285
}
196286

197-
func (service *ConfigService) parse() (*gitpod.GitpodConfig, error) {
198-
data, err := os.ReadFile(service.location)
287+
func (service *fileWatcher[T]) parse(location string) (*T, error) {
288+
data, err := os.ReadFile(location)
199289
if err != nil {
200290
return nil, err
201291
}
202-
var config *gitpod.GitpodConfig
203-
err = yaml.Unmarshal(data, &config)
204-
return config, err
292+
return service.unmarshal(data)
205293
}

components/supervisor/pkg/config/gitpod-config_analytics_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/google/go-cmp/cmp"
1212

13+
"github.com/gitpod-io/gitpod/common-go/log"
1314
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
1415
)
1516

@@ -65,7 +66,7 @@ func TestAnalyzeGitpodConfig(t *testing.T) {
6566
for _, test := range tests {
6667
t.Run(test.Desc, func(t *testing.T) {
6768
var fields []string
68-
analyzer := NewConfigAnalyzer(log, 100*time.Millisecond, func(field string) {
69+
analyzer := NewConfigAnalyzer(log.Log, 100*time.Millisecond, func(field string) {
6970
fields = append(fields, field)
7071
}, test.Prev)
7172
<-analyzer.Analyse(test.Current)

0 commit comments

Comments
 (0)