Skip to content

Commit e26bae6

Browse files
committed
GC now has concurrent mark phase
Introduce GC rework with concurrent mark phase, tricolor set and rework that in future will allow to incremental and/or concurrent mark and sweep phases. Currently it is made a bit difficult by direct pins. It will also allow for reuse of colorset for improved performance and no set reallocation. From observations pre-rewrok it should reduce GC lock time by half as about 50% of the time is spent in mark phase. This is consistent with observations made after the implementation. License: MIT Signed-off-by: Jakub Sztandera <[email protected]>
1 parent 5146b34 commit e26bae6

File tree

11 files changed

+651
-136
lines changed

11 files changed

+651
-136
lines changed

core/commands/repo.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ order to reclaim hard disk space.
6969

7070
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()
7171

72-
gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())
72+
gcOutChan, err := corerepo.GarbageCollectAsync(req.Context(), n)
73+
if err != nil {
74+
res.SetError(err, cmds.ErrNormal)
75+
return
76+
}
7377

7478
outChan := make(chan interface{}, cap(gcOutChan))
7579
res.SetOutput((<-chan interface{})(outChan))

core/corerepo/gc.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,13 @@ func BestEffortRoots(filesRoot *mfs.Root) ([]*cid.Cid, error) {
7979
return []*cid.Cid{rootDag.Cid()}, nil
8080
}
8181

82-
func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
82+
func GarbageCollect(ctx context.Context, n *core.IpfsNode) error {
8383
ctx, cancel := context.WithCancel(ctx)
84-
defer cancel() // in case error occurs during operation
85-
roots, err := BestEffortRoots(n.FilesRoot)
84+
defer cancel()
85+
rmed, err := GarbageCollectAsync(ctx, n)
8686
if err != nil {
8787
return err
8888
}
89-
rmed := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
9089

9190
return CollectResult(ctx, rmed, nil)
9291
}
@@ -145,16 +144,22 @@ func (e *MultiError) Error() string {
145144
return buf.String()
146145
}
147146

148-
func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
149-
roots, err := BestEffortRoots(n.FilesRoot)
147+
func GarbageCollectAsync(ctx context.Context, n *core.IpfsNode) (<-chan gc.Result, error) {
148+
g, err := gc.NewGC(n.Blockstore, n.DAG)
150149
if err != nil {
151-
out := make(chan gc.Result)
152-
out <- gc.Result{Error: err}
153-
close(out)
154-
return out
150+
return nil, err
151+
}
152+
153+
err = g.AddPinSource(n.Pinning.PinSources()...)
154+
if err != nil {
155+
return nil, err
156+
}
157+
err = g.AddPinSource(*n.FilesRoot.PinSource())
158+
if err != nil {
159+
return nil, err
155160
}
156161

157-
return gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
162+
return g.Run(ctx), nil
158163
}
159164

160165
func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
@@ -217,7 +222,7 @@ func (gc *GC) maybeGC(ctx context.Context, offset uint64) error {
217222
log.Info("Watermark exceeded. Starting repo GC...")
218223
defer log.EventBegin(ctx, "repoGC").Done()
219224

220-
if err := GarbageCollect(gc.Node, ctx); err != nil {
225+
if err := GarbageCollect(ctx, gc.Node); err != nil {
221226
return err
222227
}
223228
log.Infof("Repo GC done. See `ipfs repo stat` to see how much space got freed.\n")

core/coreunix/add_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"github.com/ipfs/go-ipfs/repo/config"
2121
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
2222
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
23-
"gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
2423

2524
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
25+
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
2626
)
2727

2828
func TestAddRecursive(t *testing.T) {
@@ -46,6 +46,9 @@ func TestAddRecursive(t *testing.T) {
4646
}
4747

4848
func TestAddGCLive(t *testing.T) {
49+
ctx, cancel := context.WithCancel(context.Background())
50+
defer cancel()
51+
4952
r := &repo.Mock{
5053
C: config.Config{
5154
Identity: config.Identity{
@@ -54,13 +57,13 @@ func TestAddGCLive(t *testing.T) {
5457
},
5558
D: ds2.ThreadSafeCloserMapDatastore(),
5659
}
57-
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
60+
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
5861
if err != nil {
5962
t.Fatal(err)
6063
}
6164

6265
out := make(chan interface{})
63-
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
66+
adder, err := NewAdder(ctx, node.Pinning, node.Blockstore, node.DAG)
6467
if err != nil {
6568
t.Fatal(err)
6669
}
@@ -98,11 +101,18 @@ func TestAddGCLive(t *testing.T) {
98101
t.Fatal("add shouldnt complete yet")
99102
}
100103

104+
g, err := gc.NewGC(node.Blockstore, node.DAG)
105+
if err != nil {
106+
t.Fatal(err)
107+
}
108+
g.AddPinSource(node.Pinning.PinSources()...)
109+
101110
var gcout <-chan gc.Result
102111
gcstarted := make(chan struct{})
112+
103113
go func() {
104114
defer close(gcstarted)
105-
gcout = gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
115+
gcout = g.Run(ctx)
106116
}()
107117

108118
// gc shouldnt start until we let the add finish its current file.
@@ -126,6 +136,7 @@ func TestAddGCLive(t *testing.T) {
126136
<-gcstarted
127137

128138
for r := range gcout {
139+
t.Logf("gc res: %v", r)
129140
if r.Error != nil {
130141
t.Fatal(err)
131142
}
@@ -144,11 +155,11 @@ func TestAddGCLive(t *testing.T) {
144155
last = c
145156
}
146157

147-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
158+
ctx2, cancel := context.WithTimeout(ctx, time.Second*5)
148159
defer cancel()
149160

150161
set := cid.NewSet()
151-
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
162+
err = dag.EnumerateChildren(ctx2, node.DAG.GetLinks, last, set.Visit)
152163
if err != nil {
153164
t.Fatal(err)
154165
}

mfs/system.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"time"
1818

1919
dag "github.com/ipfs/go-ipfs/merkledag"
20+
pin "github.com/ipfs/go-ipfs/pin"
2021
ft "github.com/ipfs/go-ipfs/unixfs"
2122

2223
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
@@ -123,6 +124,24 @@ func (kr *Root) Flush() error {
123124
return nil
124125
}
125126

127+
// PinSource returns information about pinning requirements.
128+
func (kr *Root) PinSource() *pin.Source {
129+
return &pin.Source{
130+
Get: func() ([]*cid.Cid, error) {
131+
err := kr.Flush()
132+
if err != nil {
133+
return nil, err
134+
}
135+
136+
nd, err := kr.GetValue().GetNode()
137+
if err != nil {
138+
return nil, err
139+
}
140+
return []*cid.Cid{nd.Cid()}, nil
141+
},
142+
}
143+
}
144+
126145
// closeChild implements the childCloser interface, and signals to the publisher that
127146
// there are changes ready to be published
128147
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {

0 commit comments

Comments
 (0)