Skip to content

Commit 18f8a33

Browse files
author
Ibrahim Jarif
authored
Merge function should process items in order (#841)
With this commit, merge operator processes the items in the order of which they were inserted. Earlier it would be processed by the merge function in reverse order. For instance, if `A` , `B`, `C` were inserted, and merge function was simple `append` operation, they would be passed to the merge function as ``` mergeFunc(C, B) => result CB mergeFunc(CB, A) => result CBA ``` With this change, they're passed to the merge function as ``` mergeFunc(B, C) => result BC mergeFunc(A, BC) => result ABC ```
1 parent 633a8fa commit 18f8a33

File tree

3 files changed

+51
-30
lines changed

3 files changed

+51
-30
lines changed

README.md

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,41 @@ for {
278278
```
279279

280280
### Merge Operations
281-
Badger provides support for unordered merge operations. You can define a func
281+
Badger provides support for ordered merge operations. You can define a func
282282
of type `MergeFunc` which takes in an existing value, and a value to be
283283
_merged_ with it. It returns a new value which is the result of the _merge_
284284
operation. All values are specified in byte arrays. For e.g., here is a merge
285-
function (`add`) which adds a `uint64` value to an existing `uint64` value.
285+
function (`add`) which appends a `[]byte` value to an existing `[]byte` value.
286+
287+
```Go
288+
// Merge function to append one byte slice to another
289+
func add(originalValue, newValue []byte) []byte {
290+
return append(originalValue, newValue...)
291+
}
292+
```
293+
294+
This function can then be passed to the `DB.GetMergeOperator()` method, along
295+
with a key, and a duration value. The duration specifies how often the merge
296+
function is run on values that have been added using the `MergeOperator.Add()`
297+
method.
298+
299+
`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key
300+
associated with the merge operation.
301+
302+
```Go
303+
key := []byte("merge")
304+
305+
m := db.GetMergeOperator(key, add, 200*time.Millisecond)
306+
defer m.Stop()
307+
308+
m.Add([]byte("A"))
309+
m.Add([]byte("B"))
310+
m.Add([]byte("C"))
311+
312+
res, _ := m.Get() // res should have value ABC encoded
313+
```
314+
315+
Example: Merge operator which increments a counter
286316

287317
```Go
288318
func uint64ToBytes(i uint64) []byte {
@@ -300,26 +330,18 @@ func add(existing, new []byte) []byte {
300330
return uint64ToBytes(bytesToUint64(existing) + bytesToUint64(new))
301331
}
302332
```
303-
304-
This function can then be passed to the `DB.GetMergeOperator()` method, along
305-
with a key, and a duration value. The duration specifies how often the merge
306-
function is run on values that have been added using the `MergeOperator.Add()`
307-
method.
308-
309-
`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key
310-
associated with the merge operation.
311-
333+
It can be used as
312334
```Go
313335
key := []byte("merge")
336+
314337
m := db.GetMergeOperator(key, add, 200*time.Millisecond)
315338
defer m.Stop()
316339

317340
m.Add(uint64ToBytes(1))
318341
m.Add(uint64ToBytes(2))
319342
m.Add(uint64ToBytes(3))
320343

321-
res, err := m.Get() // res should have value 6 encoded
322-
fmt.Println(bytesToUint64(res))
344+
res, _ := m.Get() // res should have value 6 encoded
323345
```
324346

325347
### Setting Time To Live(TTL) and User Metadata on Keys

merge.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,8 @@ type MergeOperator struct {
3737
// another representing a new value that needs to be ‘merged’ into it. MergeFunc
3838
// contains the logic to perform the ‘merge’ and return an updated value.
3939
// MergeFunc could perform operations like integer addition, list appends etc.
40-
// Note that the ordering of the operands is unspecified, so the merge func
41-
// should either be agnostic to ordering or do additional handling if ordering
42-
// is required.
43-
type MergeFunc func(existing, val []byte) []byte
40+
// Note that the ordering of the operands is maintained.
41+
type MergeFunc func(existingVal, newVal []byte) []byte
4442

4543
// GetMergeOperator creates a new MergeOperator for a given key and returns a
4644
// pointer to it. It also fires off a goroutine that performs a compaction using
@@ -60,7 +58,7 @@ func (db *DB) GetMergeOperator(key []byte,
6058

6159
var errNoMerge = errors.New("No need for merge")
6260

63-
func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error) {
61+
func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
6462
txn := op.db.NewTransaction(false)
6563
defer txn.Discard()
6664
opt := DefaultIteratorOptions
@@ -73,14 +71,17 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error
7371
item := it.Item()
7472
numVersions++
7573
if numVersions == 1 {
76-
val, err = item.ValueCopy(val)
74+
// This should be the newVal, considering this is the latest version.
75+
newVal, err = item.ValueCopy(newVal)
7776
if err != nil {
7877
return nil, 0, err
7978
}
8079
latest = item.Version()
8180
} else {
82-
if err := item.Value(func(newVal []byte) error {
83-
val = op.f(val, newVal)
81+
if err := item.Value(func(oldVal []byte) error {
82+
// The merge should always be on the newVal considering it has the merge result of
83+
// the latest version. The value read should be the oldVal.
84+
newVal = op.f(oldVal, newVal)
8485
return nil
8586
}); err != nil {
8687
return nil, 0, err
@@ -93,9 +94,9 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error
9394
if numVersions == 0 {
9495
return nil, latest, ErrKeyNotFound
9596
} else if numVersions == 1 {
96-
return val, latest, errNoMerge
97+
return newVal, latest, errNoMerge
9798
}
98-
return val, latest, nil
99+
return newVal, latest, nil
99100
}
100101

101102
func (op *MergeOperator) compact() error {

merge_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,19 @@ func TestGetMergeOperator(t *testing.T) {
5959
t.Run("Add and Get slices", func(t *testing.T) {
6060
// Merge function to merge two byte slices
6161
add := func(originalValue, newValue []byte) []byte {
62-
// We append original value to new value because the values
63-
// are retrieved in reverse order (Last insertion will be the first value)
64-
return append(newValue, originalValue...)
62+
return append(originalValue, newValue...)
6563
}
6664
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
6765
m := db.GetMergeOperator([]byte("fooprefix"), add, 2*time.Millisecond)
6866
defer m.Stop()
6967

70-
require.Nil(t, m.Add([]byte("1")))
71-
require.Nil(t, m.Add([]byte("2")))
72-
require.Nil(t, m.Add([]byte("3")))
68+
require.Nil(t, m.Add([]byte("A")))
69+
require.Nil(t, m.Add([]byte("B")))
70+
require.Nil(t, m.Add([]byte("C")))
7371

7472
value, err := m.Get()
7573
require.Nil(t, err)
76-
require.Equal(t, "123", string(value))
74+
require.Equal(t, "ABC", string(value))
7775
})
7876
})
7977
t.Run("Get Before Compact", func(t *testing.T) {

0 commit comments

Comments
 (0)