Skip to content

Commit 84c8ab9

Browse files
authored
Help to recover from corrupted levelqueue (#24912)
gitea.com experienced the corrupted LevelQueue bug again. I think the problem is clear now: if the keys in LevelDB went out-of-sync, the LevelQueue itself doesn't have the ability to recover, eg: * LevelQueue.Len() reports 100 * LevelQueue.LPop() reports ErrNotFound = errors.New("no key found") So it needs to dive into the LevelDB to remove all keys to recover the corrupted LevelQueue. More comments are in TestCorruptedLevelQueue.
1 parent 8faf946 commit 84c8ab9

File tree

5 files changed

+162
-55
lines changed

5 files changed

+162
-55
lines changed

modules/queue/base_levelqueue.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@ package queue
55

66
import (
77
"context"
8+
"sync/atomic"
89

910
"code.gitea.io/gitea/modules/nosql"
11+
"code.gitea.io/gitea/modules/queue/lqinternal"
1012

1113
"gitea.com/lunny/levelqueue"
14+
"github.com/syndtr/goleveldb/leveldb"
1215
)
1316

1417
type baseLevelQueue struct {
15-
internal *levelqueue.Queue
16-
conn string
17-
cfg *BaseConfig
18+
internal atomic.Pointer[levelqueue.Queue]
19+
20+
conn string
21+
cfg *BaseConfig
22+
db *leveldb.DB
1823
}
1924

2025
var _ baseQueue = (*baseLevelQueue)(nil)
@@ -31,42 +36,48 @@ func newBaseLevelQueueSimple(cfg *BaseConfig) (baseQueue, error) {
3136
if err != nil {
3237
return nil, err
3338
}
34-
q := &baseLevelQueue{conn: conn, cfg: cfg}
35-
q.internal, err = levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
39+
q := &baseLevelQueue{conn: conn, cfg: cfg, db: db}
40+
lq, err := levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
3641
if err != nil {
3742
return nil, err
3843
}
39-
44+
q.internal.Store(lq)
4045
return q, nil
4146
}
4247

4348
func (q *baseLevelQueue) PushItem(ctx context.Context, data []byte) error {
44-
return baseLevelQueueCommon(q.cfg, q.internal, nil).PushItem(ctx, data)
49+
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
50+
return c.PushItem(ctx, data)
4551
}
4652

4753
func (q *baseLevelQueue) PopItem(ctx context.Context) ([]byte, error) {
48-
return baseLevelQueueCommon(q.cfg, q.internal, nil).PopItem(ctx)
54+
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
55+
return c.PopItem(ctx)
4956
}
5057

5158
func (q *baseLevelQueue) HasItem(ctx context.Context, data []byte) (bool, error) {
5259
return false, nil
5360
}
5461

5562
func (q *baseLevelQueue) Len(ctx context.Context) (int, error) {
56-
return int(q.internal.Len()), nil
63+
return int(q.internal.Load().Len()), nil
5764
}
5865

5966
func (q *baseLevelQueue) Close() error {
60-
err := q.internal.Close()
67+
err := q.internal.Load().Close()
6168
_ = nosql.GetManager().CloseLevelDB(q.conn)
69+
q.db = nil // the db is not managed by us, it's managed by the nosql manager
6270
return err
6371
}
6472

6573
func (q *baseLevelQueue) RemoveAll(ctx context.Context) error {
66-
for q.internal.Len() > 0 {
67-
if _, err := q.internal.LPop(); err != nil {
68-
return err
69-
}
74+
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
75+
lq, err := levelqueue.NewQueue(q.db, []byte(q.cfg.QueueFullName), false)
76+
if err != nil {
77+
return err
7078
}
79+
old := q.internal.Load()
80+
q.internal.Store(lq)
81+
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
7182
return nil
7283
}

modules/queue/base_levelqueue_common.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@ import (
1717
"github.com/syndtr/goleveldb/leveldb"
1818
)
1919

20+
// baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue
2021
type baseLevelQueuePushPoper interface {
2122
RPush(data []byte) error
2223
LPop() ([]byte, error)
2324
Len() int64
2425
}
2526

2627
type baseLevelQueueCommonImpl struct {
27-
length int
28-
internal baseLevelQueuePushPoper
29-
mu *sync.Mutex
28+
length int
29+
internalFunc func() baseLevelQueuePushPoper
30+
mu *sync.Mutex
3031
}
3132

3233
func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
@@ -36,11 +37,11 @@ func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) er
3637
defer q.mu.Unlock()
3738
}
3839

39-
cnt := int(q.internal.Len())
40+
cnt := int(q.internalFunc().Len())
4041
if cnt >= q.length {
4142
return true, nil
4243
}
43-
retry, err = false, q.internal.RPush(data)
44+
retry, err = false, q.internalFunc().RPush(data)
4445
if err == levelqueue.ErrAlreadyInQueue {
4546
err = ErrAlreadyInQueue
4647
}
@@ -55,7 +56,7 @@ func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error)
5556
defer q.mu.Unlock()
5657
}
5758

58-
data, err = q.internal.LPop()
59+
data, err = q.internalFunc().LPop()
5960
if err == levelqueue.ErrNotFound {
6061
return true, nil, nil
6162
}
@@ -66,8 +67,8 @@ func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error)
6667
})
6768
}
6869

69-
func baseLevelQueueCommon(cfg *BaseConfig, internal baseLevelQueuePushPoper, mu *sync.Mutex) *baseLevelQueueCommonImpl {
70-
return &baseLevelQueueCommonImpl{length: cfg.Length, internal: internal}
70+
func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl {
71+
return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc}
7172
}
7273

7374
func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {

modules/queue/base_levelqueue_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package queue
66
import (
77
"testing"
88

9+
"code.gitea.io/gitea/modules/queue/lqinternal"
910
"code.gitea.io/gitea/modules/setting"
1011

12+
"gitea.com/lunny/levelqueue"
1113
"github.com/stretchr/testify/assert"
14+
"github.com/syndtr/goleveldb/leveldb"
1215
)
1316

1417
func TestBaseLevelDB(t *testing.T) {
@@ -21,3 +24,55 @@ func TestBaseLevelDB(t *testing.T) {
2124
testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)
2225
testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true)
2326
}
27+
28+
func TestCorruptedLevelQueue(t *testing.T) {
29+
// sometimes the levelqueue could be in a corrupted state, this test is to make sure it can recover from it
30+
dbDir := t.TempDir() + "/levelqueue-test"
31+
db, err := leveldb.OpenFile(dbDir, nil)
32+
if !assert.NoError(t, err) {
33+
return
34+
}
35+
defer db.Close()
36+
37+
assert.NoError(t, db.Put([]byte("other-key"), []byte("other-value"), nil))
38+
39+
nameQueuePrefix := []byte("queue_name")
40+
nameSetPrefix := []byte("set_name")
41+
lq, err := levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
42+
assert.NoError(t, err)
43+
assert.NoError(t, lq.RPush([]byte("item-1")))
44+
45+
itemKey := lqinternal.QueueItemKeyBytes(nameQueuePrefix, 1)
46+
itemValue, err := db.Get(itemKey, nil)
47+
assert.NoError(t, err)
48+
assert.Equal(t, []byte("item-1"), itemValue)
49+
50+
// there should be 5 keys in db: queue low, queue high, 1 queue item, 1 set item, and "other-key"
51+
keys := lqinternal.ListLevelQueueKeys(db)
52+
assert.Len(t, keys, 5)
53+
54+
// delete the queue item key, to corrupt the queue
55+
assert.NoError(t, db.Delete(itemKey, nil))
56+
// now the queue is corrupted, it never works again
57+
_, err = lq.LPop()
58+
assert.ErrorIs(t, err, levelqueue.ErrNotFound)
59+
assert.NoError(t, lq.Close())
60+
61+
// remove all the queue related keys to reset the queue
62+
lqinternal.RemoveLevelQueueKeys(db, nameQueuePrefix)
63+
lqinternal.RemoveLevelQueueKeys(db, nameSetPrefix)
64+
// now there should be only 1 key in db: "other-key"
65+
keys = lqinternal.ListLevelQueueKeys(db)
66+
assert.Len(t, keys, 1)
67+
assert.Equal(t, []byte("other-key"), keys[0])
68+
69+
// re-create a queue from db
70+
lq, err = levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
71+
assert.NoError(t, err)
72+
assert.NoError(t, lq.RPush([]byte("item-new-1")))
73+
// now the queue works again
74+
itemValue, err = lq.LPop()
75+
assert.NoError(t, err)
76+
assert.Equal(t, []byte("item-new-1"), itemValue)
77+
assert.NoError(t, lq.Close())
78+
}

modules/queue/base_levelqueue_unique.go

+25-33
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,21 @@ package queue
66
import (
77
"context"
88
"sync"
9-
"unsafe"
9+
"sync/atomic"
1010

1111
"code.gitea.io/gitea/modules/nosql"
12+
"code.gitea.io/gitea/modules/queue/lqinternal"
1213

1314
"gitea.com/lunny/levelqueue"
1415
"github.com/syndtr/goleveldb/leveldb"
1516
)
1617

1718
type baseLevelQueueUnique struct {
18-
internal *levelqueue.UniqueQueue
19-
conn string
20-
cfg *BaseConfig
19+
internal atomic.Pointer[levelqueue.UniqueQueue]
20+
21+
conn string
22+
cfg *BaseConfig
23+
db *leveldb.DB
2124

2225
mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together
2326
}
@@ -29,68 +32,57 @@ func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
2932
if err != nil {
3033
return nil, err
3134
}
32-
q := &baseLevelQueueUnique{conn: conn, cfg: cfg}
33-
q.internal, err = levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
35+
q := &baseLevelQueueUnique{conn: conn, cfg: cfg, db: db}
36+
lq, err := levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
3437
if err != nil {
3538
return nil, err
3639
}
37-
40+
q.internal.Store(lq)
3841
return q, nil
3942
}
4043

4144
func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error {
42-
return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PushItem(ctx, data)
45+
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
46+
return c.PushItem(ctx, data)
4347
}
4448

4549
func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) {
46-
return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PopItem(ctx)
50+
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
51+
return c.PopItem(ctx)
4752
}
4853

4954
func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) {
5055
q.mu.Lock()
5156
defer q.mu.Unlock()
52-
return q.internal.Has(data)
57+
return q.internal.Load().Has(data)
5358
}
5459

5560
func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) {
5661
q.mu.Lock()
5762
defer q.mu.Unlock()
58-
return int(q.internal.Len()), nil
63+
return int(q.internal.Load().Len()), nil
5964
}
6065

6166
func (q *baseLevelQueueUnique) Close() error {
6267
q.mu.Lock()
6368
defer q.mu.Unlock()
64-
err := q.internal.Close()
69+
err := q.internal.Load().Close()
70+
q.db = nil // the db is not managed by us, it's managed by the nosql manager
6571
_ = nosql.GetManager().CloseLevelDB(q.conn)
6672
return err
6773
}
6874

6975
func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
7076
q.mu.Lock()
7177
defer q.mu.Unlock()
72-
73-
type levelUniqueQueue struct {
74-
q *levelqueue.Queue
75-
set *levelqueue.Set
76-
db *leveldb.DB
77-
}
78-
lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal))
79-
80-
for lq.q.Len() > 0 {
81-
if _, err := lq.q.LPop(); err != nil {
82-
return err
83-
}
84-
}
85-
86-
// the "set" must be cleared after the "list" because there is no transaction.
87-
// it's better to have duplicate items than losing items.
88-
members, err := lq.set.Members()
78+
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
79+
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.SetFullName))
80+
lq, err := levelqueue.NewUniqueQueue(q.db, []byte(q.cfg.QueueFullName), []byte(q.cfg.SetFullName), false)
8981
if err != nil {
90-
return err // seriously corrupted
91-
}
92-
for _, v := range members {
93-
_, _ = lq.set.Remove(v)
82+
return err
9483
}
84+
old := q.internal.Load()
85+
q.internal.Store(lq)
86+
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
9587
return nil
9688
}
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package lqinternal
5+
6+
import (
7+
"bytes"
8+
"encoding/binary"
9+
10+
"github.com/syndtr/goleveldb/leveldb"
11+
"github.com/syndtr/goleveldb/leveldb/opt"
12+
)
13+
14+
func QueueItemIDBytes(id int64) []byte {
15+
buf := make([]byte, 8)
16+
binary.PutVarint(buf, id)
17+
return buf
18+
}
19+
20+
func QueueItemKeyBytes(prefix []byte, id int64) []byte {
21+
key := make([]byte, len(prefix), len(prefix)+1+8)
22+
copy(key, prefix)
23+
key = append(key, '-')
24+
return append(key, QueueItemIDBytes(id)...)
25+
}
26+
27+
func RemoveLevelQueueKeys(db *leveldb.DB, namePrefix []byte) {
28+
keyPrefix := make([]byte, len(namePrefix)+1)
29+
copy(keyPrefix, namePrefix)
30+
keyPrefix[len(namePrefix)] = '-'
31+
32+
it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
33+
defer it.Release()
34+
for it.Next() {
35+
if bytes.HasPrefix(it.Key(), keyPrefix) {
36+
_ = db.Delete(it.Key(), nil)
37+
}
38+
}
39+
}
40+
41+
func ListLevelQueueKeys(db *leveldb.DB) (res [][]byte) {
42+
it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
43+
defer it.Release()
44+
for it.Next() {
45+
res = append(res, it.Key())
46+
}
47+
return res
48+
}

0 commit comments

Comments
 (0)