Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 2288735

Browse files
committed
Process safe events concurrently
1 parent bb101e0 commit 2288735

File tree

4 files changed

+124
-43
lines changed

4 files changed

+124
-43
lines changed

Diff for: internal/sync/eventcache.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package sync
2+
3+
import (
4+
"os"
5+
"time"
6+
7+
"github.com/rjeczalik/notify"
8+
"go.coder.com/flog"
9+
)
10+
11+
type timedEvent struct {
12+
CreatedAt time.Time
13+
notify.EventInfo
14+
}
15+
16+
type eventCache map[string]timedEvent
17+
18+
func (cache eventCache) Add(ev timedEvent) {
19+
log := flog.New()
20+
log.Prefix = ev.Path() + ": "
21+
lastEvent, ok := cache[ev.Path()]
22+
if ok {
23+
switch {
24+
// If the file was quickly created and then destroyed, pretend nothing ever happened.
25+
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
26+
delete(cache, ev.Path())
27+
log.Info("ignored Create then Remove")
28+
return
29+
}
30+
}
31+
if ok {
32+
log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event())
33+
}
34+
// Only let the latest event for a path have action.
35+
cache[ev.Path()] = ev
36+
}
37+
38+
// DirectoryEvents returns the list of events that pertain to directories.
39+
// The set of returns events is disjoint with FileEvents.
40+
func (cache eventCache) DirectoryEvents() []timedEvent {
41+
var r []timedEvent
42+
for _, ev := range cache {
43+
info, err := os.Stat(ev.Path())
44+
if err != nil {
45+
continue
46+
}
47+
if !info.IsDir() {
48+
continue
49+
}
50+
r = append(r, ev)
51+
52+
}
53+
return r
54+
}
55+
56+
// FileEvents returns the list of events that pertain to files.
57+
// The set of returns events is disjoint with DirectoryEvents.
58+
func (cache eventCache) FileEvents() []timedEvent {
59+
var r []timedEvent
60+
for _, ev := range cache {
61+
info, err := os.Stat(ev.Path())
62+
if err != nil {
63+
continue
64+
}
65+
if info.IsDir() {
66+
continue
67+
}
68+
r = append(r, ev)
69+
70+
}
71+
return r
72+
}

Diff for: internal/sync/sync.go

+31-31
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99
"os/exec"
1010
"path"
1111
"path/filepath"
12+
"sync"
1213
"time"
1314

1415
"github.com/gorilla/websocket"
1516
"github.com/rjeczalik/notify"
1617
"go.coder.com/flog"
17-
"golang.org/x/crypto/ssh/terminal"
1818
"golang.org/x/xerrors"
1919

2020
"cdr.dev/coder-cli/internal/entclient"
@@ -167,51 +167,51 @@ func (s Sync) work(ev timedEvent) {
167167
}
168168
}
169169

170-
func setConsoleTitle(title string) {
171-
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
172-
return
173-
}
174-
fmt.Printf("\033]0;%s\007", title)
175-
}
176-
177170

178171
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")
179172

180173
// workEventGroup converges a group of events to prevent duplicate work.
181174
func (s Sync) workEventGroup(evs []timedEvent) {
182-
cache := map[string]timedEvent{}
175+
cache := make(eventCache)
183176
for _, ev := range evs {
184-
log := flog.New()
185-
log.Prefix = ev.Path() + ": "
186-
lastEvent, ok := cache[ev.Path()]
187-
if ok {
188-
switch {
189-
// If the file was quickly created and then destroyed, pretend nothing ever happened.
190-
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
191-
delete(cache, ev.Path())
192-
log.Info("ignored Create then Remove")
193-
continue
194-
}
195-
}
196-
if ok {
197-
log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event())
198-
}
199-
// Only let the latest event for a path have action.
200-
cache[ev.Path()] = ev
177+
cache.Add(ev)
201178
}
202-
for _, ev := range cache {
203-
setConsoleTitle("🚀 updating " + filepath.Base(ev.Path()))
179+
180+
// We want to process events concurrently but safely for speed.
181+
// Because the event cache prevents duplicate events for the same file, race conditions of that type
182+
// are impossible.
183+
// What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed
184+
// and then a file is moved to it. AFAIK this dependecy only exists with Directories.
185+
// So, we sequentially process the list of directory Renames and Creates, and then concurrently
186+
// perform all Writes.
187+
for _, ev := range cache.DirectoryEvents() {
204188
s.work(ev)
205189
}
206-
}
207190

191+
var wg sync.WaitGroup
192+
for _, ev := range cache.FileEvents() {
193+
setConsoleTitle(fmtUpdateTitle(ev.Path()))
194+
195+
wg.Add(1)
196+
ev := ev
197+
go func() {
198+
defer wg.Done()
199+
s.work(ev)
200+
}()
201+
}
202+
203+
wg.Wait()
204+
}
208205

209206
const (
210207
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
211208
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
212209
// with individual rsyncs.
213210
maxInflightInotify = 8
214-
maxEventDelay = time.Second * 7
211+
maxEventDelay = time.Second * 7
212+
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
213+
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
214+
maxAcceptableDispatch = time.Millisecond * 50
215215
)
216216

217217
func (s Sync) Run() error {
@@ -250,7 +250,7 @@ func (s Sync) Run() error {
250250

251251
var (
252252
eventGroup []timedEvent
253-
dispatchEventGroup = time.NewTicker(time.Millisecond * 10)
253+
dispatchEventGroup = time.NewTicker(maxAcceptableDispatch)
254254
)
255255
defer dispatchEventGroup.Stop()
256256
for {

Diff for: internal/sync/timedevent.go

-12
This file was deleted.

Diff for: internal/sync/title.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package sync
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
8+
"golang.org/x/crypto/ssh/terminal"
9+
)
10+
11+
func setConsoleTitle(title string) {
12+
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
13+
return
14+
}
15+
fmt.Printf("\033]0;%s\007", title)
16+
}
17+
18+
19+
func fmtUpdateTitle(path string) string {
20+
return "🚀 updating " + filepath.Base(path)
21+
}

0 commit comments

Comments
 (0)