Skip to content

Process RBS streaming notification #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ba303d8
Persistent storage
gthea Feb 25, 2025
1ade063
WIP
gthea Feb 25, 2025
2b2b424
SqLite persistent storage tests
gthea Feb 25, 2025
e292005
Tests
gthea Feb 25, 2025
bae1f83
Fixes
gthea Feb 25, 2025
c1c96dd
Add exception handling
gthea Feb 25, 2025
ad23322
More tests
gthea Feb 25, 2025
2fc3f2e
Add nullability annotations
gthea Feb 25, 2025
96bca34
Merge branch 'SDKS-9357_baseline' into SDKS-9439_1
gthea Feb 27, 2025
a8bff33
Prep for ParserCommons
gthea Feb 27, 2025
537ed77
Tests for lazy storage provider
gthea Feb 27, 2025
7519aca
Fix impl
gthea Feb 27, 2025
724c681
Storage change & matcher implementation
gthea Feb 27, 2025
2d221fb
Temp parser for RBS storage
gthea Feb 27, 2025
24b8c6b
Fix
gthea Feb 27, 2025
f042de3
Fix test
gthea Feb 27, 2025
cfa0c6c
In RBS matcher test
gthea Feb 27, 2025
0cbde4c
RBS parser
gthea Feb 27, 2025
4ddb4aa
Simplify initialization
gthea Feb 27, 2025
509204e
Fix tests
gthea Feb 27, 2025
83d4550
Merge branch 'SDKS-9357_baseline' into SDKS-9439_3
gthea Feb 28, 2025
2672fbf
WIP
gthea Feb 28, 2025
fa22170
Manual test updates
gthea Mar 5, 2025
0aabcaa
Change mock .json
gthea Mar 5, 2025
d714903
UT migrated
gthea Mar 5, 2025
9f29fb3
WIP tests migration
gthea Mar 5, 2025
7ffc626
Tests migration continued
gthea Mar 5, 2025
c0c380d
Fix
gthea Mar 5, 2025
5cf6449
Merge branch 'SDKS-9357_baseline' into SDKS-9437
gthea Mar 5, 2025
056141a
WIP
gthea Mar 5, 2025
3a6b873
Move storage init
gthea Mar 6, 2025
0d75096
Producer tests
gthea Mar 6, 2025
e4beef9
Merge branch 'SDKS-9357_baseline' into SDKS-9437_2
gthea Mar 6, 2025
738cdc7
Additional test check
gthea Mar 6, 2025
04ccc45
WIP rbSince
gthea Mar 6, 2025
4de3a79
WIP
gthea Mar 6, 2025
98f4ffc
Fixes
gthea Mar 7, 2025
ac8b250
Update tests
gthea Mar 7, 2025
259ea3f
Merge branch 'SDKS-9357_baseline' into SDKS-9441
gthea Mar 7, 2025
7cb981f
Alternate json field names for SplitChange
gthea Mar 7, 2025
d157529
WIP notification processing
gthea Mar 10, 2025
e8eab34
Update tests
gthea Mar 10, 2025
6a358f8
Fix androidTest compilation
gthea Mar 10, 2025
0451225
Merge branch 'SDKS-9357_baseline' into SDKS-9438
gthea Mar 10, 2025
c282496
Call RBS update task
gthea Mar 10, 2025
e98cea3
New notification parsing & processing
gthea Mar 10, 2025
02fe280
Remove comment
gthea Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/androidTest/java/fake/SynchronizerSpyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public void synchronizeSplits(long since) {
mSynchronizer.synchronizeSplits();
}

@Override
public void synchronizeRuleBasedSegments(long changeNumber) {
mSynchronizer.synchronizeRuleBasedSegments(changeNumber);
}

@Override
public void synchronizeSplits() {
mSynchronizer.synchronizeSplits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.split.android.client.service.sseclient.ReconnectBackoffCounter;
import io.split.android.client.service.sseclient.SseJwtParser;
import io.split.android.client.service.sseclient.feedbackchannel.PushManagerEventBroadcaster;
import io.split.android.client.service.sseclient.notifications.InstantUpdateChangeNotification;
import io.split.android.client.service.sseclient.notifications.MySegmentsV2PayloadDecoder;
import io.split.android.client.service.sseclient.notifications.NotificationParser;
import io.split.android.client.service.sseclient.notifications.NotificationProcessor;
Expand Down Expand Up @@ -86,6 +87,7 @@
import io.split.android.client.storage.events.PersistentEventsStorage;
import io.split.android.client.storage.general.GeneralInfoStorage;
import io.split.android.client.storage.impressions.PersistentImpressionsStorage;
import io.split.android.client.storage.rbs.RuleBasedSegmentStorage;
import io.split.android.client.storage.splits.SplitsStorage;
import io.split.android.client.telemetry.TelemetrySynchronizer;
import io.split.android.client.telemetry.TelemetrySynchronizerImpl;
Expand Down Expand Up @@ -353,7 +355,7 @@ public StreamingComponents buildStreamingComponents(@NonNull SplitTaskExecutor s
return new StreamingComponents();
}

BlockingQueue<SplitsChangeNotification> splitsUpdateNotificationQueue = new LinkedBlockingDeque<>();
BlockingQueue<InstantUpdateChangeNotification> splitsUpdateNotificationQueue = new LinkedBlockingDeque<>();
NotificationParser notificationParser = new NotificationParser();

NotificationProcessor notificationProcessor = new NotificationProcessor(splitTaskExecutor, splitTaskFactory,
Expand Down Expand Up @@ -419,13 +421,15 @@ SplitUpdatesWorker getSplitUpdatesWorker(SplitClientConfig config,
SplitTaskExecutor splitTaskExecutor,
SplitTaskFactory splitTaskFactory,
Synchronizer mSynchronizer,
BlockingQueue<SplitsChangeNotification> splitsUpdateNotificationQueue,
BlockingQueue<InstantUpdateChangeNotification> splitsUpdateNotificationQueue,
SplitsStorage splitsStorage,
RuleBasedSegmentStorage ruleBasedSegmentStorage,
CompressionUtilProvider compressionProvider) {
if (config.syncEnabled()) {
return new SplitUpdatesWorker(mSynchronizer,
splitsUpdateNotificationQueue,
splitsStorage,
ruleBasedSegmentStorage,
compressionProvider,
splitTaskExecutor,
splitTaskFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ private SplitFactoryImpl(@NonNull String apiToken, @NonNull Key key, @NonNull Sp
mSynchronizer,
streamingComponents.getSplitsUpdateNotificationQueue(),
mStorageContainer.getSplitsStorage(),
mStorageContainer.getRuleBasedSegmentStorage(),
compressionProvider),
streamingComponents.getSyncGuardian());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public enum SplitInternalEvent {
ATTRIBUTES_LOADED_FROM_STORAGE,
ENCRYPTION_MIGRATION_DONE,
MY_LARGE_SEGMENTS_UPDATED,
RULE_BASED_SEGMENTS_UPDATED,
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.split.android.client.service.executor;

import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.dtos.Split;
import io.split.android.client.service.CleanUpDatabaseTask;
import io.split.android.client.service.events.EventsRecorderTask;
import io.split.android.client.service.impressions.ImpressionsTaskFactory;
import io.split.android.client.service.rules.LoadRuleBasedSegmentsTask;
import io.split.android.client.service.splits.FilterSplitsInCacheTask;
import io.split.android.client.service.splits.LoadSplitsTask;
import io.split.android.client.service.splits.RuleBasedSegmentInPlaceUpdateTask;
import io.split.android.client.service.splits.SplitInPlaceUpdateTask;
import io.split.android.client.service.splits.SplitKillTask;
import io.split.android.client.service.splits.SplitsSyncTask;
Expand Down Expand Up @@ -37,4 +39,6 @@ public interface SplitTaskFactory extends TelemetryTaskFactory, ImpressionsTaskF
CleanUpDatabaseTask createCleanUpDatabaseTask(long maxTimestamp);

EncryptionMigrationTask createEncryptionMigrationTask(String sdkKey, SplitRoomDatabase splitRoomDatabase, boolean encryptionEnabled, SplitCipher splitCipher);

RuleBasedSegmentInPlaceUpdateTask createRuleBasedSegmentUpdateTask(RuleBasedSegment ruleBasedSegment, long changeNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.split.android.client.SplitClientConfig;
import io.split.android.client.SplitFilter;
import io.split.android.client.TestingConfig;
import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.dtos.Split;
import io.split.android.client.events.ISplitEventsManager;
import io.split.android.client.service.CleanUpDatabaseTask;
Expand All @@ -32,8 +33,10 @@
import io.split.android.client.service.impressions.unique.UniqueKeysRecorderTask;
import io.split.android.client.service.impressions.unique.UniqueKeysRecorderTaskConfig;
import io.split.android.client.service.rules.LoadRuleBasedSegmentsTask;
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
import io.split.android.client.service.splits.FilterSplitsInCacheTask;
import io.split.android.client.service.splits.LoadSplitsTask;
import io.split.android.client.service.splits.RuleBasedSegmentInPlaceUpdateTask;
import io.split.android.client.service.splits.SplitChangeProcessor;
import io.split.android.client.service.splits.SplitInPlaceUpdateTask;
import io.split.android.client.service.splits.SplitKillTask;
Expand Down Expand Up @@ -64,6 +67,7 @@ public class SplitTaskFactoryImpl implements SplitTaskFactory {
private final ISplitEventsManager mEventsManager;
private final TelemetryTaskFactory mTelemetryTaskFactory;
private final SplitChangeProcessor mSplitChangeProcessor;
private final RuleBasedSegmentChangeProcessor mRuleBasedSegmentChangeProcessor;
private final TelemetryRuntimeProducer mTelemetryRuntimeProducer;
private final List<SplitFilter> mFilters;

Expand All @@ -85,6 +89,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
mFlagsSpecFromConfig = flagsSpecFromConfig;
mEventsManager = eventsManager;
mSplitChangeProcessor = new SplitChangeProcessor(filters, flagSetsFilter);
mRuleBasedSegmentChangeProcessor = new RuleBasedSegmentChangeProcessor();
RuleBasedSegmentStorageProducer ruleBasedSegmentStorageProducer = mSplitsStorageContainer.getRuleBasedSegmentStorage();

TelemetryStorage telemetryStorage = mSplitsStorageContainer.getTelemetryStorage();
Expand All @@ -93,6 +98,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
mSplitsSyncHelper = new SplitsSyncHelper(mSplitApiFacade.getSplitFetcher(),
mSplitsStorageContainer.getSplitsStorage(),
mSplitChangeProcessor,
mRuleBasedSegmentChangeProcessor,
ruleBasedSegmentStorageProducer,
mTelemetryRuntimeProducer,
new ReconnectBackoffCounter(1, testingConfig.getCdnBackoffTime()),
Expand All @@ -101,6 +107,7 @@ public SplitTaskFactoryImpl(@NonNull SplitClientConfig splitClientConfig,
mSplitsSyncHelper = new SplitsSyncHelper(mSplitApiFacade.getSplitFetcher(),
mSplitsStorageContainer.getSplitsStorage(),
mSplitChangeProcessor,
mRuleBasedSegmentChangeProcessor,
ruleBasedSegmentStorageProducer,
mTelemetryRuntimeProducer,
flagsSpecFromConfig);
Expand Down Expand Up @@ -222,6 +229,11 @@ public EncryptionMigrationTask createEncryptionMigrationTask(String sdkKey, Spli
return new EncryptionMigrationTask(sdkKey, splitRoomDatabase, encryptionEnabled, splitCipher);
}

@Override
public RuleBasedSegmentInPlaceUpdateTask createRuleBasedSegmentUpdateTask(RuleBasedSegment ruleBasedSegment, long changeNumber) {
return new RuleBasedSegmentInPlaceUpdateTask(mSplitsStorageContainer.getRuleBasedSegmentStorage(), mRuleBasedSegmentChangeProcessor, mEventsManager, ruleBasedSegment, changeNumber);
}

@NonNull
private TelemetryTaskFactory initializeTelemetryTaskFactory(@NonNull SplitClientConfig splitClientConfig, @Nullable Map<SplitFilter.Type, SplitFilter> filters, TelemetryStorage telemetryStorage) {
final TelemetryTaskFactory mTelemetryTaskFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public enum SplitTaskType {
TELEMETRY_CONFIG_TASK, TELEMETRY_STATS_TASK,
SAVE_UNIQUE_KEYS_TASK, UNIQUE_KEYS_RECORDER_TASK,
MY_LARGE_SEGMENTS_UPDATE, LOAD_LOCAL_RULE_BASED_SEGMENTS,
RULE_BASED_SEGMENT_SYNC,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.split.android.client.service.rules;

import java.util.Set;

import io.split.android.client.dtos.RuleBasedSegment;

public class ProcessedRuleBasedSegmentChange {
private final Set<RuleBasedSegment> mActive;
private final Set<RuleBasedSegment> mArchived;
private final long mChangeNumber;
private final long mUpdateTimestamp;

public ProcessedRuleBasedSegmentChange(Set<RuleBasedSegment> active,
Set<RuleBasedSegment> archived,
long changeNumber,
long updateTimestamp) {
mActive = active;
mArchived = archived;
mChangeNumber = changeNumber;
mUpdateTimestamp = updateTimestamp;
}

public Set<RuleBasedSegment> getActive() {
return mActive;
}

public Set<RuleBasedSegment> getArchived() {
return mArchived;
}

public long getChangeNumber() {
return mChangeNumber;
}

public long getUpdateTimestamp() {
return mUpdateTimestamp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.split.android.client.service.rules;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.dtos.Status;

public class RuleBasedSegmentChangeProcessor {

public ProcessedRuleBasedSegmentChange process(List<RuleBasedSegment> segments, long changeNumber) {
Set<RuleBasedSegment> toAdd = new HashSet<>();
Set<RuleBasedSegment> toRemove = new HashSet<>();
for (RuleBasedSegment segment : segments) {
if (segment.getStatus() == Status.ACTIVE) {
toAdd.add(segment);
} else {
toRemove.add(segment);
}
}

return new ProcessedRuleBasedSegmentChange(toAdd, toRemove, changeNumber, System.currentTimeMillis());
}

public ProcessedRuleBasedSegmentChange process(RuleBasedSegment segment, long changeNumber) {
return process(Collections.singletonList(segment), changeNumber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.split.android.client.service.splits;

import static io.split.android.client.utils.Utils.checkNotNull;

import androidx.annotation.NonNull;

import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.events.ISplitEventsManager;
import io.split.android.client.events.SplitInternalEvent;
import io.split.android.client.service.executor.SplitTask;
import io.split.android.client.service.executor.SplitTaskExecutionInfo;
import io.split.android.client.service.executor.SplitTaskType;
import io.split.android.client.service.rules.ProcessedRuleBasedSegmentChange;
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
import io.split.android.client.storage.rbs.RuleBasedSegmentStorage;
import io.split.android.client.utils.logger.Logger;

public class RuleBasedSegmentInPlaceUpdateTask implements SplitTask {

private final RuleBasedSegmentStorage mRuleBasedSegmentStorage;
private final long mChangeNumber;
private final RuleBasedSegment mRuleBasedSegment;
private final RuleBasedSegmentChangeProcessor mChangeProcessor;
private final ISplitEventsManager mEventsManager;

public RuleBasedSegmentInPlaceUpdateTask(@NonNull RuleBasedSegmentStorage ruleBasedSegmentStorage,
@NonNull RuleBasedSegmentChangeProcessor changeProcessor,
@NonNull ISplitEventsManager eventsManager,
@NonNull RuleBasedSegment ruleBasedSegment,
long changeNumber) {
mRuleBasedSegmentStorage = checkNotNull(ruleBasedSegmentStorage);
mRuleBasedSegment = checkNotNull(ruleBasedSegment);
mChangeProcessor = checkNotNull(changeProcessor);
mEventsManager = eventsManager;
mChangeNumber = changeNumber;
}

@NonNull
@Override
public SplitTaskExecutionInfo execute() {
try {
ProcessedRuleBasedSegmentChange processedChange = mChangeProcessor.process(mRuleBasedSegment, mChangeNumber);
boolean triggerSdkUpdate = mRuleBasedSegmentStorage.update(processedChange.getActive(), processedChange.getArchived(), mChangeNumber);

if (triggerSdkUpdate) {
mEventsManager.notifyInternalEvent(SplitInternalEvent.RULE_BASED_SEGMENTS_UPDATED);
}

Logger.v("Updated rule based segment");
return SplitTaskExecutionInfo.success(SplitTaskType.RULE_BASED_SEGMENT_SYNC);
} catch (Exception ex) {
Logger.e("Could not update rule based segment");

return SplitTaskExecutionInfo.error(SplitTaskType.RULE_BASED_SEGMENT_SYNC);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
import androidx.annotation.VisibleForTesting;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.dtos.RuleBasedSegmentChange;
import io.split.android.client.dtos.SplitChange;
import io.split.android.client.dtos.Status;
import io.split.android.client.dtos.TargetingRulesChange;
import io.split.android.client.network.SplitHttpHeadersBuilder;
import io.split.android.client.service.ServiceConstants;
Expand All @@ -26,6 +22,8 @@
import io.split.android.client.service.http.HttpFetcher;
import io.split.android.client.service.http.HttpFetcherException;
import io.split.android.client.service.http.HttpStatus;
import io.split.android.client.service.rules.ProcessedRuleBasedSegmentChange;
import io.split.android.client.service.rules.RuleBasedSegmentChangeProcessor;
import io.split.android.client.service.sseclient.BackoffCounter;
import io.split.android.client.service.sseclient.ReconnectBackoffCounter;
import io.split.android.client.storage.rbs.RuleBasedSegmentStorageProducer;
Expand All @@ -44,6 +42,7 @@ public class SplitsSyncHelper {
private final HttpFetcher<TargetingRulesChange> mSplitFetcher;
private final SplitsStorage mSplitsStorage;
private final SplitChangeProcessor mSplitChangeProcessor;
private final RuleBasedSegmentChangeProcessor mRuleBasedSegmentChangeProcessor;
private final RuleBasedSegmentStorageProducer mRuleBasedSegmentStorage;
private final TelemetryRuntimeProducer mTelemetryRuntimeProducer;
private final BackoffCounter mBackoffCounter;
Expand All @@ -52,12 +51,14 @@ public class SplitsSyncHelper {
public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
@NonNull SplitsStorage splitsStorage,
@NonNull SplitChangeProcessor splitChangeProcessor,
@NonNull RuleBasedSegmentChangeProcessor ruleBasedSegmentChangeProcessor,
@NonNull RuleBasedSegmentStorageProducer ruleBasedSegmentStorage,
@NonNull TelemetryRuntimeProducer telemetryRuntimeProducer,
@Nullable String flagsSpec) {
this(splitFetcher,
splitsStorage,
splitChangeProcessor,
ruleBasedSegmentChangeProcessor,
ruleBasedSegmentStorage,
telemetryRuntimeProducer,
new ReconnectBackoffCounter(1, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT),
Expand All @@ -68,13 +69,15 @@ public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
public SplitsSyncHelper(@NonNull HttpFetcher<TargetingRulesChange> splitFetcher,
@NonNull SplitsStorage splitsStorage,
@NonNull SplitChangeProcessor splitChangeProcessor,
@NonNull RuleBasedSegmentChangeProcessor ruleBasedSegmentChangeProcessor,
@NonNull RuleBasedSegmentStorageProducer ruleBasedSegmentStorage,
@NonNull TelemetryRuntimeProducer telemetryRuntimeProducer,
@NonNull BackoffCounter backoffCounter,
@Nullable String flagsSpec) {
mSplitFetcher = checkNotNull(splitFetcher);
mSplitsStorage = checkNotNull(splitsStorage);
mSplitChangeProcessor = checkNotNull(splitChangeProcessor);
mRuleBasedSegmentChangeProcessor = checkNotNull(ruleBasedSegmentChangeProcessor);
mRuleBasedSegmentStorage = checkNotNull(ruleBasedSegmentStorage);
mTelemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
mBackoffCounter = checkNotNull(backoffCounter);
Expand Down Expand Up @@ -205,17 +208,8 @@ private void updateStorage(boolean clearBeforeUpdate, SplitChange splitChange, R
}

private void updateRbsStorage(RuleBasedSegmentChange ruleBasedSegmentChange) {
long changeNumber = ruleBasedSegmentChange.getTill();
Set<RuleBasedSegment> toAdd = new HashSet<>();
Set<RuleBasedSegment> toRemove = new HashSet<>();
for (RuleBasedSegment segment : ruleBasedSegmentChange.getSegments()) {
if (segment.getStatus() == Status.ACTIVE) {
toAdd.add(segment);
} else {
toRemove.add(segment);
}
}
mRuleBasedSegmentStorage.update(toAdd, toRemove, changeNumber);
ProcessedRuleBasedSegmentChange change = mRuleBasedSegmentChangeProcessor.process(ruleBasedSegmentChange.getSegments(), ruleBasedSegmentChange.getTill());
mRuleBasedSegmentStorage.update(change.getActive(), change.getArchived(), change.getChangeNumber());
}

private void logError(String message) {
Expand Down
Loading
Loading