Skip to content

[FSSDK-8939] fix: make batch event processor default #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions lib/optimizely.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
require_relative 'optimizely/decision_service'
require_relative 'optimizely/error_handler'
require_relative 'optimizely/event_builder'
require_relative 'optimizely/event/forwarding_event_processor'
require_relative 'optimizely/event/batch_event_processor'
require_relative 'optimizely/event/event_factory'
require_relative 'optimizely/event/user_event_factory'
require_relative 'optimizely/event_dispatcher'
Expand Down Expand Up @@ -67,6 +67,7 @@ class Project
# @param notification_center - Optional Instance of NotificationCenter.
# @param event_processor - Optional Responds to process.
# @param default_decide_options: Optional default decision options.
# @param event_processor_options: Optional hash of options to be passed to the default batch event processor.
# @param settings: Optional instance of OptimizelySdkSettings for sdk configuration.

def initialize( # rubocop:disable Metrics/ParameterLists
Expand All @@ -81,6 +82,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists
notification_center = nil,
event_processor = nil,
default_decide_options = [],
event_processor_options = {},
settings = nil
)
@logger = logger || NoOpLogger.new
Expand All @@ -97,6 +99,11 @@ def initialize( # rubocop:disable Metrics/ParameterLists
@default_decide_options = []
end

unless event_processor_options.is_a? Hash
@logger.log(Logger::DEBUG, 'Provided event processor options is not a hash.')
event_processor_options = {}
end

begin
validate_instantiation_options
rescue InvalidInputError => e
Expand Down Expand Up @@ -128,7 +135,13 @@ def initialize( # rubocop:disable Metrics/ParameterLists
@event_processor = if event_processor.respond_to?(:process)
event_processor
else
ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center)
BatchEventProcessor.new(
event_dispatcher: @event_dispatcher,
logger: @logger,
notification_center: @notification_center,
batch_size: event_processor_options[:batch_size] || BatchEventProcessor::DEFAULT_BATCH_SIZE,
flush_interval: event_processor_options[:flush_interval] || BatchEventProcessor::DEFAULT_BATCH_INTERVAL
)
end
end

Expand Down
9 changes: 3 additions & 6 deletions lib/optimizely/odp/odp_event_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@

module Optimizely
class OdpEventManager
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
# Events passed to the BatchEventProcessor are immediately added to an EventQueue.
# The BatchEventProcessor maintains a single consumer thread that pulls events off of
# Events passed to the OdpEventManager are immediately added to an EventQueue.
# The OdpEventManager maintains a single consumer thread that pulls events off of
# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
# maximum duration before the resulting OdpEvent is sent to Odp.

attr_reader :batch_size, :api_manager, :logger
attr_accessor :odp_config
Expand All @@ -37,8 +36,6 @@ def initialize(
request_timeout: nil,
flush_interval: nil
)
super()

@odp_config = nil
@api_host = nil
@api_key = nil
Expand Down
1 change: 1 addition & 0 deletions lib/optimizely/optimizely_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def self.custom_instance( # rubocop:disable Metrics/ParameterLists
notification_center,
event_processor,
[],
{},
settings
)
end
Expand Down
14 changes: 13 additions & 1 deletion spec/optimizely_user_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
let(:error_handler) { Optimizely::RaiseErrorHandler.new }
let(:spy_logger) { spy('logger') }
let(:project_instance) { Optimizely::Project.new(config_body_JSON, nil, spy_logger, error_handler) }
let(:forced_decision_project_instance) { Optimizely::Project.new(forced_decision_JSON, nil, spy_logger, error_handler) }
let(:forced_decision_project_instance) { Optimizely::Project.new(forced_decision_JSON, nil, spy_logger, error_handler, false, nil, nil, nil, nil, nil, [], {batch_size: 1}) }
let(:integration_project_instance) { Optimizely::Project.new(integration_JSON, nil, spy_logger, error_handler) }
let(:impression_log_url) { 'https://logx.optimizely.com/v1/events' }
let(:good_response_data) do
Expand Down Expand Up @@ -258,6 +258,10 @@
forced_decision = Optimizely::OptimizelyUserContext::OptimizelyForcedDecision.new('3324490562')
user_context_obj.set_forced_decision(context, forced_decision)
decision = user_context_obj.decide(feature_key)

# wait for batch processing thread to send event
sleep 0.1 until forced_decision_project_instance.event_processor.event_queue.empty?

expect(forced_decision_project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, expected_params, post_headers))
expect(decision.variation_key).to eq('3324490562')
expect(decision.rule_key).to be_nil
Expand Down Expand Up @@ -350,6 +354,10 @@
forced_decision = Optimizely::OptimizelyUserContext::OptimizelyForcedDecision.new('b')
user_context_obj.set_forced_decision(context, forced_decision)
decision = user_context_obj.decide(feature_key, [Optimizely::Decide::OptimizelyDecideOption::INCLUDE_REASONS])

# wait for batch processing thread to send event
sleep 0.1 until forced_decision_project_instance.event_processor.event_queue.empty?

expect(forced_decision_project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, expected_params, post_headers))
expect(decision.variation_key).to eq('b')
expect(decision.rule_key).to eq('exp_with_audience')
Expand Down Expand Up @@ -471,6 +479,10 @@
user_context_obj.remove_forced_decision(context_with_rule)
# decision should be based on flag forced decision
decision = user_context_obj.decide(feature_key)

# wait for batch processing thread to send event
sleep 0.1 until forced_decision_project_instance.event_processor.event_queue.empty?

expect(forced_decision_project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, expected_params, post_headers))
expect(decision.variation_key).to eq('3324490562')
expect(decision.rule_key).to be_nil
Expand Down
Loading