Skip to content

Commit 259e98b

Browse files
authored
Process RBS streaming notification (#748)
1 parent 2f05ef2 commit 259e98b

30 files changed

+413
-145
lines changed

src/androidTest/java/fake/SynchronizerSpyImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public void synchronizeSplits(long since) {
4646
mSynchronizer.synchronizeSplits();
4747
}
4848

49+
@Override
50+
public void synchronizeRuleBasedSegments(long changeNumber) {
51+
mSynchronizer.synchronizeRuleBasedSegments(changeNumber);
52+
}
53+
4954
@Override
5055
public void synchronizeSplits() {
5156
mSynchronizer.synchronizeSplits();

src/main/java/io/split/android/client/SplitFactoryHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.split.android.client.service.sseclient.ReconnectBackoffCounter;
4646
import io.split.android.client.service.sseclient.SseJwtParser;
4747
import io.split.android.client.service.sseclient.feedbackchannel.PushManagerEventBroadcaster;
48+
import io.split.android.client.service.sseclient.notifications.InstantUpdateChangeNotification;
4849
import io.split.android.client.service.sseclient.notifications.MySegmentsV2PayloadDecoder;
4950
import io.split.android.client.service.sseclient.notifications.NotificationParser;
5051
import io.split.android.client.service.sseclient.notifications.NotificationProcessor;
@@ -86,6 +87,7 @@
8687
import io.split.android.client.storage.events.PersistentEventsStorage;
8788
import io.split.android.client.storage.general.GeneralInfoStorage;
8889
import io.split.android.client.storage.impressions.PersistentImpressionsStorage;
90+
import io.split.android.client.storage.rbs.RuleBasedSegmentStorage;
8991
import io.split.android.client.storage.splits.SplitsStorage;
9092
import io.split.android.client.telemetry.TelemetrySynchronizer;
9193
import io.split.android.client.telemetry.TelemetrySynchronizerImpl;
@@ -353,7 +355,7 @@ public StreamingComponents buildStreamingComponents(@NonNull SplitTaskExecutor s
353355
return new StreamingComponents();
354356
}
355357

356-
BlockingQueue<SplitsChangeNotification> splitsUpdateNotificationQueue = new LinkedBlockingDeque<>();
358+
BlockingQueue<InstantUpdateChangeNotification> splitsUpdateNotificationQueue = new LinkedBlockingDeque<>();
357359
NotificationParser notificationParser = new NotificationParser();
358360

359361
NotificationProcessor notificationProcessor = new NotificationProcessor(splitTaskExecutor, splitTaskFactory,
@@ -419,13 +421,15 @@ SplitUpdatesWorker getSplitUpdatesWorker(SplitClientConfig config,
419421
SplitTaskExecutor splitTaskExecutor,
420422
SplitTaskFactory splitTaskFactory,
421423
Synchronizer mSynchronizer,
422-
BlockingQueue<SplitsChangeNotification> splitsUpdateNotificationQueue,
424+
BlockingQueue<InstantUpdateChangeNotification> splitsUpdateNotificationQueue,
423425
SplitsStorage splitsStorage,
426+
RuleBasedSegmentStorage ruleBasedSegmentStorage,
424427
CompressionUtilProvider compressionProvider) {
425428
if (config.syncEnabled()) {
426429
return new SplitUpdatesWorker(mSynchronizer,
427430
splitsUpdateNotificationQueue,
428431
splitsStorage,
432+
ruleBasedSegmentStorage,
429433
compressionProvider,
430434
splitTaskExecutor,
431435
splitTaskFactory);

src/main/java/io/split/android/client/SplitFactoryImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ private SplitFactoryImpl(@NonNull String apiToken, @NonNull Key key, @NonNull Sp
241241
mSynchronizer,
242242
streamingComponents.getSplitsUpdateNotificationQueue(),
243243
mStorageContainer.getSplitsStorage(),
244+
mStorageContainer.getRuleBasedSegmentStorage(),
244245
compressionProvider),
245246
streamingComponents.getSyncGuardian());
246247

src/main/java/io/split/android/client/events/SplitInternalEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ public enum SplitInternalEvent {
1616
ATTRIBUTES_LOADED_FROM_STORAGE,
1717
ENCRYPTION_MIGRATION_DONE,
1818
MY_LARGE_SEGMENTS_UPDATED,
19+
RULE_BASED_SEGMENTS_UPDATED,
1920
}

src/main/java/io/split/android/client/service/executor/SplitTaskFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.split.android.client.service.executor;
22

3+
import io.split.android.client.dtos.RuleBasedSegment;
34
import io.split.android.client.dtos.Split;
45
import io.split.android.client.service.CleanUpDatabaseTask;
56
import io.split.android.client.service.events.EventsRecorderTask;
67
import io.split.android.client.service.impressions.ImpressionsTaskFactory;
78
import io.split.android.client.service.rules.LoadRuleBasedSegmentsTask;
89
import io.split.android.client.service.splits.FilterSplitsInCacheTask;
910
import io.split.android.client.service.splits.LoadSplitsTask;
11+
import io.split.android.client.service.splits.RuleBasedSegmentInPlaceUpdateTask;
1012
import io.split.android.client.service.splits.SplitInPlaceUpdateTask;
1113
import io.split.android.client.service.splits.SplitKillTask;
1214
import io.split.android.client.service.splits.SplitsSyncTask;
@@ -37,4 +39,6 @@ public interface SplitTaskFactory extends TelemetryTaskFactory, ImpressionsTaskF
3739
CleanUpDatabaseTask createCleanUpDatabaseTask(long maxTimestamp);
3840

3941
EncryptionMigrationTask createEncryptionMigrationTask(String sdkKey, SplitRoomDatabase splitRoomDatabase, boolean encryptionEnabled, SplitCipher splitCipher);
42+
43+
RuleBasedSegmentInPlaceUpdateTask createRuleBasedSegmentUpdateTask(RuleBasedSegment ruleBasedSegment, long changeNumber);
4044
}

src/main/java/io/split/android/client/service/executor/SplitTaskFactoryImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.split.android.client.SplitClientConfig;
1717
import io.split.android.client.SplitFilter;
1818
import io.split.android.client.TestingConfig;
19+
import io.split.android.client.dtos.RuleBasedSegment;
1920
import io.split.android.client.dtos.Split;
2021
import io.split.android.client.events.ISplitEventsManager;
2122
import io.split.android.client.service.CleanUpDatabaseTask;
@@ -32,8 +33,10 @@
3233
import io.split.android.client.service.impressions.unique.UniqueKeysRecorderTask;
3334
import io.split.android.client.service.impressions.unique.UniqueKeysRecorderTaskConfig;
3435
import io.split.android.client.service.rules.LoadRuleBasedSegmentsTask;
36+
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
3537
import io.split.android.client.service.splits.FilterSplitsInCacheTask;
3638
import io.split.android.client.service.splits.LoadSplitsTask;
39+
import io.split.android.client.service.splits.RuleBasedSegmentInPlaceUpdateTask;
3740
import io.split.android.client.service.splits.SplitChangeProcessor;
3841
import io.split.android.client.service.splits.SplitInPlaceUpdateTask;
3942
import io.split.android.client.service.splits.SplitKillTask;
@@ -64,6 +67,7 @@ public class SplitTaskFactoryImpl implements SplitTaskFactory {
6467
private final ISplitEventsManager mEventsManager;
6568
private final TelemetryTaskFactory mTelemetryTaskFactory;
6669
private final SplitChangeProcessor mSplitChangeProcessor;
70+
private final RuleBasedSegmentChangeProcessor mRuleBasedSegmentChangeProcessor;
6771
private final TelemetryRuntimeProducer mTelemetryRuntimeProducer;
6872
private final List<SplitFilter> mFilters;
6973

@@ -85,6 +89,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
8589
mFlagsSpecFromConfig = flagsSpecFromConfig;
8690
mEventsManager = eventsManager;
8791
mSplitChangeProcessor = new SplitChangeProcessor(filters, flagSetsFilter);
92+
mRuleBasedSegmentChangeProcessor = new RuleBasedSegmentChangeProcessor();
8893
RuleBasedSegmentStorageProducer ruleBasedSegmentStorageProducer = mSplitsStorageContainer.getRuleBasedSegmentStorage();
8994

9095
TelemetryStorage telemetryStorage = mSplitsStorageContainer.getTelemetryStorage();
@@ -93,6 +98,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
9398
mSplitsSyncHelper = new SplitsSyncHelper(mSplitApiFacade.getSplitFetcher(),
9499
mSplitsStorageContainer.getSplitsStorage(),
95100
mSplitChangeProcessor,
101+
mRuleBasedSegmentChangeProcessor,
96102
ruleBasedSegmentStorageProducer,
97103
mTelemetryRuntimeProducer,
98104
new ReconnectBackoffCounter(1, testingConfig.getCdnBackoffTime()),
@@ -101,6 +107,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
101107
mSplitsSyncHelper = new SplitsSyncHelper(mSplitApiFacade.getSplitFetcher(),
102108
mSplitsStorageContainer.getSplitsStorage(),
103109
mSplitChangeProcessor,
110+
mRuleBasedSegmentChangeProcessor,
104111
ruleBasedSegmentStorageProducer,
105112
mTelemetryRuntimeProducer,
106113
flagsSpecFromConfig);
@@ -222,6 +229,11 @@ public EncryptionMigrationTask createEncryptionMigrationTask(String sdkKey, Spli
222229
return new EncryptionMigrationTask(sdkKey, splitRoomDatabase, encryptionEnabled, splitCipher);
223230
}
224231

232+
@Override
233+
public RuleBasedSegmentInPlaceUpdateTask createRuleBasedSegmentUpdateTask(RuleBasedSegment ruleBasedSegment, long changeNumber) {
234+
return new RuleBasedSegmentInPlaceUpdateTask(mSplitsStorageContainer.getRuleBasedSegmentStorage(), mRuleBasedSegmentChangeProcessor, mEventsManager, ruleBasedSegment, changeNumber);
235+
}
236+
225237
@NonNull
226238
private TelemetryTaskFactory initializeTelemetryTaskFactory(@NonNull SplitClientConfig splitClientConfig, @Nullable Map<SplitFilter.Type, SplitFilter> filters, TelemetryStorage telemetryStorage) {
227239
final TelemetryTaskFactory mTelemetryTaskFactory;

src/main/java/io/split/android/client/service/executor/SplitTaskType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ public enum SplitTaskType {
99
TELEMETRY_CONFIG_TASK, TELEMETRY_STATS_TASK,
1010
SAVE_UNIQUE_KEYS_TASK, UNIQUE_KEYS_RECORDER_TASK,
1111
MY_LARGE_SEGMENTS_UPDATE, LOAD_LOCAL_RULE_BASED_SEGMENTS,
12+
RULE_BASED_SEGMENT_SYNC,
1213
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.split.android.client.service.rules;
2+
3+
import java.util.Set;
4+
5+
import io.split.android.client.dtos.RuleBasedSegment;
6+
7+
public class ProcessedRuleBasedSegmentChange {
8+
private final Set<RuleBasedSegment> mActive;
9+
private final Set<RuleBasedSegment> mArchived;
10+
private final long mChangeNumber;
11+
private final long mUpdateTimestamp;
12+
13+
public ProcessedRuleBasedSegmentChange(Set<RuleBasedSegment> active,
14+
Set<RuleBasedSegment> archived,
15+
long changeNumber,
16+
long updateTimestamp) {
17+
mActive = active;
18+
mArchived = archived;
19+
mChangeNumber = changeNumber;
20+
mUpdateTimestamp = updateTimestamp;
21+
}
22+
23+
public Set<RuleBasedSegment> getActive() {
24+
return mActive;
25+
}
26+
27+
public Set<RuleBasedSegment> getArchived() {
28+
return mArchived;
29+
}
30+
31+
public long getChangeNumber() {
32+
return mChangeNumber;
33+
}
34+
35+
public long getUpdateTimestamp() {
36+
return mUpdateTimestamp;
37+
}
38+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.split.android.client.service.rules;
2+
3+
import java.util.Collections;
4+
import java.util.HashSet;
5+
import java.util.List;
6+
import java.util.Set;
7+
8+
import io.split.android.client.dtos.RuleBasedSegment;
9+
import io.split.android.client.dtos.Status;
10+
11+
public class RuleBasedSegmentChangeProcessor {
12+
13+
public ProcessedRuleBasedSegmentChange process(List<RuleBasedSegment> segments, long changeNumber) {
14+
Set<RuleBasedSegment> toAdd = new HashSet<>();
15+
Set<RuleBasedSegment> toRemove = new HashSet<>();
16+
for (RuleBasedSegment segment : segments) {
17+
if (segment.getStatus() == Status.ACTIVE) {
18+
toAdd.add(segment);
19+
} else {
20+
toRemove.add(segment);
21+
}
22+
}
23+
24+
return new ProcessedRuleBasedSegmentChange(toAdd, toRemove, changeNumber, System.currentTimeMillis());
25+
}
26+
27+
public ProcessedRuleBasedSegmentChange process(RuleBasedSegment segment, long changeNumber) {
28+
return process(Collections.singletonList(segment), changeNumber);
29+
}
30+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.split.android.client.service.splits;
2+
3+
import static io.split.android.client.utils.Utils.checkNotNull;
4+
5+
import androidx.annotation.NonNull;
6+
7+
import io.split.android.client.dtos.RuleBasedSegment;
8+
import io.split.android.client.events.ISplitEventsManager;
9+
import io.split.android.client.events.SplitInternalEvent;
10+
import io.split.android.client.service.executor.SplitTask;
11+
import io.split.android.client.service.executor.SplitTaskExecutionInfo;
12+
import io.split.android.client.service.executor.SplitTaskType;
13+
import io.split.android.client.service.rules.ProcessedRuleBasedSegmentChange;
14+
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
15+
import io.split.android.client.storage.rbs.RuleBasedSegmentStorage;
16+
import io.split.android.client.utils.logger.Logger;
17+
18+
public class RuleBasedSegmentInPlaceUpdateTask implements SplitTask {
19+
20+
private final RuleBasedSegmentStorage mRuleBasedSegmentStorage;
21+
private final long mChangeNumber;
22+
private final RuleBasedSegment mRuleBasedSegment;
23+
private final RuleBasedSegmentChangeProcessor mChangeProcessor;
24+
private final ISplitEventsManager mEventsManager;
25+
26+
public RuleBasedSegmentInPlaceUpdateTask(@NonNull RuleBasedSegmentStorage ruleBasedSegmentStorage,
27+
@NonNull RuleBasedSegmentChangeProcessor changeProcessor,
28+
@NonNull ISplitEventsManager eventsManager,
29+
@NonNull RuleBasedSegment ruleBasedSegment,
30+
long changeNumber) {
31+
mRuleBasedSegmentStorage = checkNotNull(ruleBasedSegmentStorage);
32+
mRuleBasedSegment = checkNotNull(ruleBasedSegment);
33+
mChangeProcessor = checkNotNull(changeProcessor);
34+
mEventsManager = eventsManager;
35+
mChangeNumber = changeNumber;
36+
}
37+
38+
@NonNull
39+
@Override
40+
public SplitTaskExecutionInfo execute() {
41+
try {
42+
ProcessedRuleBasedSegmentChange processedChange = mChangeProcessor.process(mRuleBasedSegment, mChangeNumber);
43+
boolean triggerSdkUpdate = mRuleBasedSegmentStorage.update(processedChange.getActive(), processedChange.getArchived(), mChangeNumber);
44+
45+
if (triggerSdkUpdate) {
46+
mEventsManager.notifyInternalEvent(SplitInternalEvent.RULE_BASED_SEGMENTS_UPDATED);
47+
}
48+
49+
Logger.v("Updated rule based segment");
50+
return SplitTaskExecutionInfo.success(SplitTaskType.RULE_BASED_SEGMENT_SYNC);
51+
} catch (Exception ex) {
52+
Logger.e("Could not update rule based segment");
53+
54+
return SplitTaskExecutionInfo.error(SplitTaskType.RULE_BASED_SEGMENT_SYNC);
55+
}
56+
}
57+
}

src/main/java/io/split/android/client/service/splits/SplitsSyncHelper.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,12 @@
88
import androidx.annotation.VisibleForTesting;
99

1010
import java.util.Collections;
11-
import java.util.HashSet;
1211
import java.util.LinkedHashMap;
1312
import java.util.Map;
14-
import java.util.Set;
1513
import java.util.concurrent.TimeUnit;
1614

17-
import io.split.android.client.dtos.RuleBasedSegment;
1815
import io.split.android.client.dtos.RuleBasedSegmentChange;
1916
import io.split.android.client.dtos.SplitChange;
20-
import io.split.android.client.dtos.Status;
2117
import io.split.android.client.dtos.TargetingRulesChange;
2218
import io.split.android.client.network.SplitHttpHeadersBuilder;
2319
import io.split.android.client.service.ServiceConstants;
@@ -26,6 +22,8 @@
2622
import io.split.android.client.service.http.HttpFetcher;
2723
import io.split.android.client.service.http.HttpFetcherException;
2824
import io.split.android.client.service.http.HttpStatus;
25+
import io.split.android.client.service.rules.ProcessedRuleBasedSegmentChange;
26+
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
2927
import io.split.android.client.service.sseclient.BackoffCounter;
3028
import io.split.android.client.service.sseclient.ReconnectBackoffCounter;
3129
import io.split.android.client.storage.rbs.RuleBasedSegmentStorageProducer;
@@ -44,6 +42,7 @@ public class SplitsSyncHelper {
4442
private final HttpFetcher<TargetingRulesChange> mSplitFetcher;
4543
private final SplitsStorage mSplitsStorage;
4644
private final SplitChangeProcessor mSplitChangeProcessor;
45+
private final RuleBasedSegmentChangeProcessor mRuleBasedSegmentChangeProcessor;
4746
private final RuleBasedSegmentStorageProducer mRuleBasedSegmentStorage;
4847
private final TelemetryRuntimeProducer mTelemetryRuntimeProducer;
4948
private final BackoffCounter mBackoffCounter;
@@ -52,12 +51,14 @@ public class SplitsSyncHelper {
5251
public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
5352
@NonNull SplitsStorage splitsStorage,
5453
@NonNull SplitChangeProcessor splitChangeProcessor,
54+
@NonNull RuleBasedSegmentChangeProcessor ruleBasedSegmentChangeProcessor,
5555
@NonNull RuleBasedSegmentStorageProducer ruleBasedSegmentStorage,
5656
@NonNull TelemetryRuntimeProducer telemetryRuntimeProducer,
5757
@Nullable String flagsSpec) {
5858
this(splitFetcher,
5959
splitsStorage,
6060
splitChangeProcessor,
61+
ruleBasedSegmentChangeProcessor,
6162
ruleBasedSegmentStorage,
6263
telemetryRuntimeProducer,
6364
new ReconnectBackoffCounter(1, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT),
@@ -68,13 +69,15 @@ public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
6869
public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
6970
@NonNull SplitsStorage splitsStorage,
7071
@NonNull SplitChangeProcessor splitChangeProcessor,
72+
@NonNull RuleBasedSegmentChangeProcessor ruleBasedSegmentChangeProcessor,
7173
@NonNull RuleBasedSegmentStorageProducer ruleBasedSegmentStorage,
7274
@NonNull TelemetryRuntimeProducer telemetryRuntimeProducer,
7375
@NonNull BackoffCounter backoffCounter,
7476
@Nullable String flagsSpec) {
7577
mSplitFetcher = checkNotNull(splitFetcher);
7678
mSplitsStorage = checkNotNull(splitsStorage);
7779
mSplitChangeProcessor = checkNotNull(splitChangeProcessor);
80+
mRuleBasedSegmentChangeProcessor = checkNotNull(ruleBasedSegmentChangeProcessor);
7881
mRuleBasedSegmentStorage = checkNotNull(ruleBasedSegmentStorage);
7982
mTelemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
8083
mBackoffCounter = checkNotNull(backoffCounter);
@@ -205,17 +208,8 @@ private void updateStorage(boolean clearBeforeUpdate, SplitChange splitChange, R
205208
}
206209

207210
private void updateRbsStorage(RuleBasedSegmentChange ruleBasedSegmentChange) {
208-
long changeNumber = ruleBasedSegmentChange.getTill();
209-
Set<RuleBasedSegment> toAdd = new HashSet<>();
210-
Set<RuleBasedSegment> toRemove = new HashSet<>();
211-
for (RuleBasedSegment segment : ruleBasedSegmentChange.getSegments()) {
212-
if (segment.getStatus() == Status.ACTIVE) {
213-
toAdd.add(segment);
214-
} else {
215-
toRemove.add(segment);
216-
}
217-
}
218-
mRuleBasedSegmentStorage.update(toAdd, toRemove, changeNumber);
211+
ProcessedRuleBasedSegmentChange change = mRuleBasedSegmentChangeProcessor.process(ruleBasedSegmentChange.getSegments(), ruleBasedSegmentChange.getTill());
212+
mRuleBasedSegmentStorage.update(change.getActive(), change.getArchived(), change.getChangeNumber());
219213
}
220214

221215
private void logError(String message) {

0 commit comments

Comments
 (0)