Skip to content

Commit 155d675

Browse files
authored
Possibility to provide version and version epoch fields for publication (#472)
1 parent 9be0c1a commit 155d675

File tree

9 files changed

+285
-45
lines changed

9 files changed

+285
-45
lines changed

broker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ type PublishOptions struct {
120120
IdempotentResultTTL time.Duration
121121
// UseDelta enables using delta encoding for the publication.
122122
UseDelta bool
123+
// Version of Publication. This is a tip to Centrifuge to skip non-actual
124+
// publications. Mostly useful for cases when Publication contains the entire
125+
// state. Version only used when history is configured.
126+
Version uint64
127+
// VersionEpoch is a string that is used to identify the epoch of version of the
128+
// publication. Use it if version may be reused in the future. For example, if
129+
// version comes from in-memory system which can lose data, or due to eviction, etc.
130+
VersionEpoch string
123131
}
124132

125133
// Broker is responsible for PUB/SUB mechanics.

broker_memory.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,14 @@ func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (Str
110110
if opts.HistorySize > 0 && opts.HistoryTTL > 0 {
111111
var err error
112112
var streamTop StreamPosition
113-
streamTop, prevPub, err = b.historyHub.add(ch, pub, opts)
113+
var skip bool
114+
streamTop, prevPub, skip, err = b.historyHub.add(ch, pub, opts)
114115
if err != nil {
115116
return StreamPosition{}, false, err
116117
}
118+
if skip {
119+
return streamTop, false, nil
120+
}
117121
pub.Offset = streamTop.Offset
118122
if opts.IdempotencyKey != "" {
119123
resultExpireSeconds := int64(defaultIdempotentResultExpireSeconds)
@@ -328,7 +332,7 @@ func (h *historyHub) expireStreams() {
328332
}
329333
}
330334

331-
func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (StreamPosition, *Publication, error) {
335+
func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (StreamPosition, *Publication, bool, error) {
332336
h.Lock()
333337
defer h.Unlock()
334338

@@ -339,7 +343,7 @@ func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (Stre
339343
Reverse: true,
340344
}, MetaTTL: opts.HistoryMetaTTL})
341345
if err != nil {
342-
return StreamPosition{}, nil, fmt.Errorf("error getting previous publication from stream: %w", err)
346+
return StreamPosition{}, nil, false, fmt.Errorf("error getting previous publication from stream: %w", err)
343347
}
344348
if len(pubs) > 0 {
345349
prevPub = pubs[0]
@@ -374,18 +378,30 @@ func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (Stre
374378
}
375379
}
376380

381+
if opts.Version > 0 {
382+
if stream, ok := h.streams[ch]; ok {
383+
topVersion := stream.TopVersion()
384+
topVersionEpoch := stream.TopVersionEpoch()
385+
if (opts.VersionEpoch == "" || opts.VersionEpoch == topVersionEpoch) &&
386+
opts.Version <= topVersion {
387+
// We can skip the unordered publication.
388+
return StreamPosition{Offset: stream.Top(), Epoch: stream.Epoch()}, nil, true, nil
389+
}
390+
}
391+
}
392+
377393
if stream, ok := h.streams[ch]; ok {
378-
offset, _ = stream.Add(pub, opts.HistorySize)
394+
offset, _ = stream.Add(pub, opts.HistorySize, opts.Version, opts.VersionEpoch)
379395
epoch = stream.Epoch()
380396
} else {
381397
stream := memstream.New()
382-
offset, _ = stream.Add(pub, opts.HistorySize)
398+
offset, _ = stream.Add(pub, opts.HistorySize, opts.Version, opts.VersionEpoch)
383399
epoch = stream.Epoch()
384400
h.streams[ch] = stream
385401
}
386402
pub.Offset = offset
387403

388-
return StreamPosition{Offset: offset, Epoch: epoch}, prevPub, nil
404+
return StreamPosition{Offset: offset, Epoch: epoch}, prevPub, false, nil
389405
}
390406

391407
// Lock must be held outside.

broker_memory_test.go

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package centrifuge
22

33
import (
44
"context"
5+
"github.com/google/uuid"
56
"os"
67
"strconv"
78
"testing"
@@ -147,7 +148,7 @@ func TestMemoryBrokerPublishIdempotent(t *testing.T) {
147148
},
148149
}
149150

150-
// Test publish with history and with idempotency key.
151+
// Test publish with idempotency key.
151152
_, _, err := e.Publish("channel", testPublicationData(), PublishOptions{
152153
IdempotencyKey: "test",
153154
})
@@ -212,6 +213,67 @@ func TestMemoryBrokerPublishIdempotentWithHistory(t *testing.T) {
212213
require.Equal(t, 1, numPubs)
213214
}
214215

216+
func TestMemoryBrokerPublishSkipOldVersion(t *testing.T) {
217+
e := testMemoryBroker()
218+
defer func() { _ = e.node.Shutdown(context.Background()) }()
219+
220+
numPubs := 0
221+
222+
e.eventHandler = &testBrokerEventHandler{
223+
HandlePublicationFunc: func(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error {
224+
numPubs++
225+
return nil
226+
},
227+
}
228+
229+
channel1 := uuid.NewString()
230+
231+
// Test publish with history and with version.
232+
_, _, err := e.Publish(channel1, testPublicationData(), PublishOptions{
233+
HistorySize: 1,
234+
HistoryTTL: time.Second,
235+
Version: 1,
236+
})
237+
require.NoError(t, err)
238+
// Publish with same version.
239+
_, _, err = e.Publish(channel1, testPublicationData(), PublishOptions{
240+
HistorySize: 1,
241+
HistoryTTL: time.Second,
242+
Version: 1,
243+
})
244+
require.NoError(t, err)
245+
require.Equal(t, 1, numPubs)
246+
247+
numPubs = 0
248+
channel2 := uuid.NewString()
249+
// Test publish with history and with version and version epoch.
250+
_, _, err = e.Publish(channel2, testPublicationData(), PublishOptions{
251+
HistorySize: 1,
252+
HistoryTTL: time.Second,
253+
Version: 1,
254+
VersionEpoch: "xyz",
255+
})
256+
require.NoError(t, err)
257+
// Publish with same version and epoch.
258+
_, _, err = e.Publish(channel2, testPublicationData(), PublishOptions{
259+
HistorySize: 1,
260+
HistoryTTL: time.Second,
261+
Version: 1,
262+
VersionEpoch: "xyz",
263+
})
264+
require.NoError(t, err)
265+
require.Equal(t, 1, numPubs)
266+
// Publish with same version and different epoch.
267+
_, _, err = e.Publish(channel2, testPublicationData(), PublishOptions{
268+
HistorySize: 1,
269+
HistoryTTL: time.Second,
270+
Version: 1,
271+
VersionEpoch: "aaa",
272+
})
273+
require.NoError(t, err)
274+
require.Equal(t, 2, numPubs)
275+
}
276+
215277
func TestMemoryEngineSubscribeUnsubscribe(t *testing.T) {
216278
e := testMemoryBroker()
217279
defer func() { _ = e.node.Shutdown(context.Background()) }()
@@ -229,10 +291,10 @@ func TestMemoryHistoryHub(t *testing.T) {
229291
ch1 := "channel1"
230292
ch2 := "channel2"
231293
pub := newTestPublication()
232-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
233-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
234-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
235-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
294+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
295+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
296+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
297+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
236298

237299
hist, _, err := h.get(ch1, HistoryOptions{
238300
Filter: HistoryFilter{
@@ -271,10 +333,10 @@ func TestMemoryHistoryHub(t *testing.T) {
271333
require.Equal(t, 0, len(hist))
272334

273335
// test history messages limit
274-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
275-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
276-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
277-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
336+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
337+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
338+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
339+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 10, HistoryTTL: time.Second})
278340
hist, _, err = h.get(ch1, HistoryOptions{
279341
Filter: HistoryFilter{
280342
Limit: -1,
@@ -291,8 +353,8 @@ func TestMemoryHistoryHub(t *testing.T) {
291353
require.Equal(t, 1, len(hist))
292354

293355
// test history limit greater than history size
294-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
295-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
356+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
357+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
296358
hist, _, err = h.get(ch1, HistoryOptions{
297359
Filter: HistoryFilter{
298360
Limit: 2,
@@ -312,10 +374,10 @@ func TestMemoryHistoryHubMetaTTL(t *testing.T) {
312374
h.RLock()
313375
require.Equal(t, int64(0), h.nextRemoveCheck)
314376
h.RUnlock()
315-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
316-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
317-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
318-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
377+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
378+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second})
379+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
380+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second})
319381
h.RLock()
320382
require.True(t, h.nextRemoveCheck > 0)
321383
require.Equal(t, 2, len(h.streams))
@@ -350,10 +412,10 @@ func TestMemoryHistoryHubMetaTTLPerChannel(t *testing.T) {
350412
h.RLock()
351413
require.Equal(t, int64(0), h.nextRemoveCheck)
352414
h.RUnlock()
353-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
354-
_, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
355-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
356-
_, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
415+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
416+
_, _, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
417+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
418+
_, _, _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
357419
h.RLock()
358420
require.True(t, h.nextRemoveCheck > 0)
359421
require.Equal(t, 2, len(h.streams))
@@ -784,8 +846,8 @@ func TestMemoryHistoryHubPrevPub(t *testing.T) {
784846
defer h.close()
785847
ch1 := "channel1"
786848
pub := newTestPublication()
787-
_, prevPub, _ := h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, UseDelta: true})
849+
_, prevPub, _, _ := h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, UseDelta: true})
788850
require.Nil(t, prevPub)
789-
_, prevPub, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, UseDelta: true})
851+
_, prevPub, _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, UseDelta: true})
790852
require.NotNil(t, prevPub)
791853
}

broker_redis.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,12 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
898898
useDelta = "1"
899899
}
900900

901+
version := "0"
902+
if opts.Version > 0 {
903+
version = strconv.Itoa(int(opts.Version))
904+
}
905+
versionEpoch := opts.VersionEpoch
906+
901907
replies, err := script.Exec(
902908
context.Background(),
903909
s.shard.client,
@@ -912,12 +918,14 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
912918
publishCommand,
913919
resultExpire,
914920
useDelta,
921+
version,
922+
versionEpoch,
915923
},
916924
).ToArray()
917925
if err != nil {
918926
return StreamPosition{}, false, err
919927
}
920-
if len(replies) != 2 && len(replies) != 3 {
928+
if len(replies) != 2 && len(replies) != 3 && len(replies) != 4 {
921929
return StreamPosition{}, false, errors.New("wrong Redis reply")
922930
}
923931
offset, err := replies[0].AsInt64()
@@ -936,6 +944,14 @@ func (b *RedisBroker) publish(s *shardWrapper, ch string, data []byte, opts Publ
936944
}
937945
fromCache = fromCacheStr == "1"
938946
}
947+
//skipped := false
948+
//if len(replies) == 4 {
949+
// skippedStr, err := replies[3].ToString()
950+
// if err != nil {
951+
// return StreamPosition{}, false, errors.New("wrong Redis reply skipped flag")
952+
// }
953+
// skipped = skippedStr == "1"
954+
//}
939955

940956
return StreamPosition{Offset: uint64(offset), Epoch: epoch}, fromCache, nil
941957
}

broker_redis_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,85 @@ func TestRedisBrokerPublishIdempotent(t *testing.T) {
401401
}
402402
}
403403

404+
func TestRedisBrokerPublishSkipOldVersion(t *testing.T) {
405+
for _, tt := range historyRedisTests {
406+
t.Run(tt.Name, func(t *testing.T) {
407+
if !tt.UseStreams {
408+
// Does not work with lists.
409+
t.Skip("Skip test for Redis lists - not implemented")
410+
}
411+
412+
node := testNode(t)
413+
414+
b := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster, tt.Port)
415+
defer func() { _ = node.Shutdown(context.Background()) }()
416+
defer stopRedisBroker(b)
417+
418+
channel := uuid.New().String()
419+
420+
_, _, err := b.Publish(channel, testPublicationData(), PublishOptions{
421+
HistorySize: 2,
422+
HistoryTTL: 5 * time.Second,
423+
Version: 1,
424+
})
425+
require.NoError(t, err)
426+
_, _, err = b.Publish(channel, testPublicationData(), PublishOptions{
427+
HistorySize: 2,
428+
HistoryTTL: 5 * time.Second,
429+
Version: 1,
430+
})
431+
require.NoError(t, err)
432+
pubs, _, err := b.History(channel, HistoryOptions{
433+
Filter: HistoryFilter{
434+
Limit: -1,
435+
},
436+
})
437+
require.NoError(t, err)
438+
require.Equal(t, 1, len(pubs))
439+
440+
channel2 := uuid.NewString()
441+
// Test publish with history and with version and version epoch.
442+
_, _, err = b.Publish(channel2, testPublicationData(), PublishOptions{
443+
HistorySize: 2,
444+
HistoryTTL: 5 * time.Second,
445+
Version: 1,
446+
VersionEpoch: "xyz",
447+
})
448+
require.NoError(t, err)
449+
// Publish with same version and epoch.
450+
_, _, err = b.Publish(channel2, testPublicationData(), PublishOptions{
451+
HistorySize: 2,
452+
HistoryTTL: 5 * time.Second,
453+
Version: 1,
454+
VersionEpoch: "xyz",
455+
})
456+
require.NoError(t, err)
457+
pubs, _, err = b.History(channel2, HistoryOptions{
458+
Filter: HistoryFilter{
459+
Limit: -1,
460+
},
461+
})
462+
require.NoError(t, err)
463+
require.Equal(t, 1, len(pubs))
464+
// Publish with same version and different epoch.
465+
_, _, err = b.Publish(channel2, testPublicationData(), PublishOptions{
466+
HistorySize: 2,
467+
HistoryTTL: time.Second,
468+
Version: 1,
469+
VersionEpoch: "aaa",
470+
})
471+
require.NoError(t, err)
472+
pubs, _, err = b.History(channel2, HistoryOptions{
473+
Filter: HistoryFilter{
474+
Limit: -1,
475+
},
476+
})
477+
require.NoError(t, err)
478+
require.Equal(t, 2, len(pubs))
479+
})
480+
}
481+
}
482+
404483
func TestRedisCurrentPosition(t *testing.T) {
405484
for _, tt := range historyRedisTests {
406485
t.Run(tt.Name, func(t *testing.T) {

0 commit comments

Comments
 (0)