diff --git a/lib/optimizely.rb b/lib/optimizely.rb index 84b914c5..7eb634f7 100644 --- a/lib/optimizely.rb +++ b/lib/optimizely.rb @@ -22,6 +22,9 @@ require_relative 'optimizely/decision_service' require_relative 'optimizely/error_handler' require_relative 'optimizely/event_builder' +require_relative 'optimizely/event/forwarding_event_processor' +require_relative 'optimizely/event/event_factory' +require_relative 'optimizely/event/user_event_factory' require_relative 'optimizely/event_dispatcher' require_relative 'optimizely/exceptions' require_relative 'optimizely/helpers/constants' @@ -35,8 +38,8 @@ module Optimizely class Project attr_reader :notification_center # @api no-doc - attr_reader :config_manager, :decision_service, :error_handler, - :event_builder, :event_dispatcher, :logger + attr_reader :config_manager, :decision_service, :error_handler, :event_dispatcher, + :event_processor, :logger, :stopped # Constructor for Projects. # @@ -51,6 +54,7 @@ class Project # Must provide at least one of datafile or sdk_key. # @param config_manager - Optional Responds to get_config. # @param notification_center - Optional Instance of NotificationCenter. + # @param event_processor - Optional Responds to process. def initialize( datafile = nil, @@ -61,7 +65,8 @@ def initialize( user_profile_service = nil, sdk_key = nil, config_manager = nil, - notification_center = nil + notification_center = nil, + event_processor = nil ) @logger = logger || NoOpLogger.new @error_handler = error_handler || NoOpErrorHandler.new @@ -91,8 +96,14 @@ def initialize( else StaticProjectConfigManager.new(datafile, @logger, @error_handler, skip_json_validation) end + @decision_service = DecisionService.new(@logger, @user_profile_service) - @event_builder = EventBuilder.new(@logger) + + @event_processor = if event_processor.respond_to?(:process) + event_processor + else + ForwardingEventProcessor.new(@event_dispatcher, @logger) + end end # Buckets visitor and sends impression event to Optimizely. @@ -243,19 +254,14 @@ def track(event_key, user_id, attributes = nil, event_tags = nil) return nil end - conversion_event = @event_builder.create_conversion_event(config, event, user_id, attributes, event_tags) + user_event = UserEventFactory.create_conversion_event(config, event, user_id, attributes, event_tags) + @event_processor.process(user_event) @logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.") - @logger.log(Logger::INFO, - "Dispatching conversion event to URL #{conversion_event.url} with params #{conversion_event.params}.") - begin - @event_dispatcher.dispatch_event(conversion_event) - rescue => e - @logger.log(Logger::ERROR, "Unable to dispatch conversion event. Error: #{e}") - end + log_event = EventFactory.create_log_event(user_event, @logger) @notification_center.send_notifications( NotificationCenter::NOTIFICATION_TYPES[:TRACK], - event_key, user_id, attributes, event_tags, conversion_event + event_key, user_id, attributes, event_tags, log_event ) nil end @@ -507,6 +513,14 @@ def is_valid config.is_a?(Optimizely::ProjectConfig) end + def close + return if @stopped + + @stopped = true + @config_manager.stop! if @config_manager.respond_to?(:stop!) + @event_processor.stop! if @event_processor.respond_to?(:stop!) + end + private def get_variation_with_config(experiment_key, user_id, attributes, config) @@ -692,18 +706,15 @@ def validate_instantiation_options def send_impression(config, experiment, variation_key, user_id, attributes = nil) experiment_key = experiment['key'] variation_id = config.get_variation_id_from_key(experiment_key, variation_key) - impression_event = @event_builder.create_impression_event(config, experiment, variation_id, user_id, attributes) - @logger.log(Logger::INFO, - "Dispatching impression event to URL #{impression_event.url} with params #{impression_event.params}.") - begin - @event_dispatcher.dispatch_event(impression_event) - rescue => e - @logger.log(Logger::ERROR, "Unable to dispatch impression event. Error: #{e}") - end + user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes) + @event_processor.process(user_event) + + @logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.") variation = config.get_variation_from_id(experiment_key, variation_id) + log_event = EventFactory.create_log_event(user_event, @logger) @notification_center.send_notifications( NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE], - experiment, user_id, attributes, variation, impression_event + experiment, user_id, attributes, variation, log_event ) end diff --git a/lib/optimizely/config_manager/http_project_config_manager.rb b/lib/optimizely/config_manager/http_project_config_manager.rb index 97630092..da92d0e4 100644 --- a/lib/optimizely/config_manager/http_project_config_manager.rb +++ b/lib/optimizely/config_manager/http_project_config_manager.rb @@ -30,7 +30,7 @@ module Optimizely class HTTPProjectConfigManager < ProjectConfigManager # Config manager that polls for the datafile and updated ProjectConfig based on an update interval. - attr_reader :config + attr_reader :config, :closed # Initialize config manager. One of sdk_key or url has to be set to be able to use. # @@ -72,6 +72,7 @@ def initialize( @last_modified = nil @async_scheduler = AsyncScheduler.new(method(:fetch_datafile_config), @polling_interval, auto_update, @logger) @async_scheduler.start! if start_by_default == true + @closed = false @skip_json_validation = skip_json_validation @notification_center = notification_center.is_a?(Optimizely::NotificationCenter) ? notification_center : NotificationCenter.new(@logger, @error_handler) @config = datafile.nil? ? nil : DatafileProjectConfig.create(datafile, @logger, @error_handler, @skip_json_validation) @@ -84,11 +85,24 @@ def ready? end def start! + if @closed + @logger.log(Logger::WARN, 'Not starting. Already closed.') + return + end + @async_scheduler.start! + @closed = false end def stop! + if @closed + @logger.log(Logger::WARN, 'Not pausing. Manager has not been started.') + return + end + @async_scheduler.stop! + @config = nil + @closed = true end def get_config diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index eeb730d4..6575bc32 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor # the BlockingQueue and buffers them for either a configured batch size or for a # maximum duration before the resulting LogEvent is sent to the NotificationCenter. - attr_reader :event_queue, :current_batch, :batch_size, :flush_interval + attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval DEFAULT_BATCH_SIZE = 10 DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds @@ -54,18 +54,18 @@ def initialize( @mutex = Mutex.new @received = ConditionVariable.new @current_batch = [] - @is_started = false + @started = false start! end def start! - if @is_started == true + if @started == true @logger.log(Logger::WARN, 'Service already started.') return end @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval @thread = Thread.new { run } - @is_started = true + @started = true end def flush @@ -78,7 +78,7 @@ def flush def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") - unless @thread.alive? + if !@started || !@thread.alive? @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end @@ -95,14 +95,14 @@ def process(user_event) end def stop! - return unless @thread.alive? + return unless @started @mutex.synchronize do @event_queue << SHUTDOWN_SIGNAL @received.signal end - @is_started = false + @started = false @logger.log(Logger::WARN, 'Stopping scheduler.') @thread.exit end @@ -153,7 +153,7 @@ def flush_queue! begin @event_dispatcher.dispatch_event(log_event) rescue StandardError => e - @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}") + @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end @current_batch = [] end @@ -167,7 +167,7 @@ def add_to_batch(user_event) # Reset the deadline if starting a new batch. @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty? - @logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.") + @logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.") @current_batch << user_event return unless @current_batch.length >= @batch_size diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb new file mode 100644 index 00000000..679174ae --- /dev/null +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +# +# Copyright 2019, Optimizely and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +require_relative 'event_processor' +module Optimizely + class ForwardingEventProcessor < EventProcessor + # ForwardingEventProcessor is a basic transformation stage for converting + # the event batch into a LogEvent to be dispatched. + def initialize(event_dispatcher, logger = nil) + @event_dispatcher = event_dispatcher + @logger = logger || NoOpLogger.new + end + + def process(user_event) + log_event = Optimizely::EventFactory.create_log_event(user_event, @logger) + + begin + @event_dispatcher.dispatch_event(log_event) + rescue StandardError => e + @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") + end + end + end +end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 4794e075..a5351ca4 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -111,7 +111,6 @@ expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) ).once - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, "Adding user event: #{event['key']} to batch.").exactly(10).times expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once end diff --git a/spec/event/forwarding_event_processor_spec.rb b/spec/event/forwarding_event_processor_spec.rb new file mode 100644 index 00000000..7be502f0 --- /dev/null +++ b/spec/event/forwarding_event_processor_spec.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +# +# Copyright 2019, Optimizely and contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +require 'spec_helper' +require 'optimizely/event/forwarding_event_processor' +require 'optimizely/event/user_event_factory' +require 'optimizely/error_handler' +require 'optimizely/helpers/date_time_utils' +require 'optimizely/logger' +describe Optimizely::ForwardingEventProcessor do + let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } + let(:error_handler) { Optimizely::NoOpErrorHandler.new } + let(:spy_logger) { spy('logger') } + let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } + let(:event) { project_config.get_event_from_key('test_event') } + let(:log_url) { 'https://logx.optimizely.com/v1/events' } + let(:post_headers) { {'Content-Type' => 'application/json'} } + + before(:example) do + time_now = Time.now + allow(Time).to receive(:now).and_return(time_now) + allow(SecureRandom).to receive(:uuid).and_return('a68cf1ad-0393-4e18-af87-efe8f01a7c9c') + + @event_dispatcher = Optimizely::EventDispatcher.new + allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) + @conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + + @expected_endpoint = 'https://logx.optimizely.com/v1/events' + @expected_conversion_params = { + account_id: '12001', + project_id: '111001', + visitors: [{ + attributes: [{ + entity_id: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'], + key: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'], + type: 'custom', + value: true + }], + visitor_id: 'test_user', + snapshots: [{ + events: [{ + entity_id: '111095', + timestamp: Optimizely::Helpers::DateTimeUtils.create_timestamp, + uuid: 'a68cf1ad-0393-4e18-af87-efe8f01a7c9c', + key: 'test_event' + }] + }] + }], + anonymize_ip: false, + revision: '42', + client_name: Optimizely::CLIENT_ENGINE, + enrich_decisions: true, + client_version: Optimizely::VERSION + } + end + + describe '.process' do + it 'should dispatch log event when valid event is provided' do + forwarding_event_processor = Optimizely::ForwardingEventProcessor.new( + @event_dispatcher, spy_logger + ) + + forwarding_event_processor.process(@conversion_event) + + expect(@event_dispatcher).to have_received(:dispatch_event).with( + Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) + ).once + end + + it 'should log an error when dispatch event raises timeout exception' do + log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) + allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) + + timeout_error = Timeout::Error.new + allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) + + forwarding_event_processor = Optimizely::ForwardingEventProcessor.new( + @event_dispatcher, spy_logger + ) + + forwarding_event_processor.process(@conversion_event) + + expect(spy_logger).to have_received(:log).once.with( + Logger::ERROR, + "Error dispatching event: #{log_event} Timeout::Error." + ) + end + end +end diff --git a/spec/project_spec.rb b/spec/project_spec.rb index ab538d7d..ece7b934 100644 --- a/spec/project_spec.rb +++ b/spec/project_spec.rb @@ -19,8 +19,10 @@ require 'optimizely' require 'optimizely/audience' require 'optimizely/config_manager/http_project_config_manager' -require 'optimizely/helpers/validator' +require 'optimizely/event_dispatcher' +require 'optimizely/event/batch_event_processor' require 'optimizely/exceptions' +require 'optimizely/helpers/validator' require 'optimizely/version' describe 'Optimizely' do @@ -585,7 +587,6 @@ class InvalidErrorHandler; end end it 'should log and send activate notification when an impression event is dispatched' do - params = @expected_activate_params variation_to_return = project_instance.config_manager.config.get_variation_from_id('test_experiment', '111128') allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @@ -595,7 +596,9 @@ class InvalidErrorHandler; end experiment = project_instance.config_manager.config.get_experiment_from_key('test_experiment') # Decision listener - expect(project_instance.notification_center).to receive(:send_notifications).ordered + expect(project_instance.notification_center).to receive(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args + ).ordered # Activate listener expect(project_instance.notification_center).to receive(:send_notifications).with( @@ -606,16 +609,20 @@ class InvalidErrorHandler; end project_instance.activate('test_experiment', 'test_user') - expect(spy_logger).to have_received(:log).once.with(Logger::INFO, include('Dispatching impression event to' \ - " URL #{impression_log_url} with params #{params}")) + expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Activating user 'test_user' in experiment 'test_experiment'.") end it 'should log when an exception has occurred during dispatching the impression event' do + params = @expected_activate_params + stub_request(:post, impression_log_url).with(query: params) + log_event = Optimizely::Event.new(:post, impression_log_url, params, post_headers) + allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) + variation_to_return = project_instance.config_manager.config.get_variation_from_id('test_experiment', '111128') allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(any_args).and_raise(RuntimeError) project_instance.activate('test_experiment', 'test_user') - expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, 'Unable to dispatch impression event. Error: RuntimeError') + expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, "Error dispatching event: #{log_event} RuntimeError.") end it 'should raise an exception when called with invalid attributes' do @@ -865,9 +872,14 @@ class InvalidErrorHandler; end end it 'should log a message if an exception has occurred during dispatching of the event' do + params = @expected_track_event_params + stub_request(:post, conversion_log_url).with(query: params) + log_event = Optimizely::Event.new(:post, conversion_log_url, params, post_headers) + allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(any_args).and_raise(RuntimeError) + project_instance.track('test_event', 'test_user') - expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, 'Unable to dispatch conversion event. Error: RuntimeError') + expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, "Error dispatching event: #{log_event} RuntimeError.") end it 'should send track notification and properly track an event by calling dispatch_event with right params with revenue provided' do @@ -1014,13 +1026,13 @@ class InvalidErrorHandler; end it 'should log when a conversion event is dispatched' do params = @expected_track_event_params - params[:visitors][0][:snapshots][0][:events][0].merge!(revenue: 42, - tags: {'revenue' => 42}) - + params[:visitors][0][:snapshots][0][:events][0].merge!( + revenue: 42, + tags: {'revenue' => 42} + ) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event', 'test_user', nil, 'revenue' => 42) - expect(spy_logger).to have_received(:log).with(Logger::INFO, include('Dispatching conversion event to' \ - " URL #{conversion_log_url} with params #{params}")) + expect(spy_logger).to have_received(:log).with(Logger::INFO, "Tracking event 'test_event' for user 'test_user'.") end it 'should raise an exception when called with attributes in an invalid format' do @@ -1467,14 +1479,15 @@ class InvalidErrorHandler; end instance_of(Optimizely::Event) ).ordered - expect(project_instance.notification_center).to receive(:send_notifications).ordered + expect(project_instance.notification_center).to receive(:send_notifications) + .with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args + ).ordered allow(project_instance.decision_service).to receive(:get_variation_for_feature).and_return(decision_to_return) - expected_params = @expected_bucketed_params - expect(project_instance.is_feature_enabled('multi_variate_feature', 'test_user')).to be true - expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Dispatching impression event to URL https://logx.optimizely.com/v1/events with params #{expected_params}.") + expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Activating user 'test_user' in experiment 'test_experiment_multivariate'.") expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Feature 'multi_variate_feature' is enabled for user 'test_user'.") end @@ -2742,4 +2755,110 @@ class InvalidErrorHandler; end expect(invalid_project.is_valid).to be false end end + + describe '.close' do + it 'should stop config manager and event processor when optimizely close is called' do + config_manager = Optimizely::HTTPProjectConfigManager.new( + sdk_key: 'QBw9gFM8oTn7ogY9ANCC1z', + start_by_default: true + ) + + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: Optimizely::EventDispatcher.new) + + Optimizely::Project.new(config_body_JSON, nil, spy_logger, error_handler) + + project_instance = Optimizely::Project.new(nil, nil, nil, nil, true, nil, nil, config_manager, nil, event_processor) + + expect(config_manager.closed).to be false + expect(event_processor.started).to be true + + project_instance.close + + expect(config_manager.closed).to be true + expect(event_processor.started).to be false + expect(project_instance.stopped).to be true + end + + it 'should stop invalid object' do + http_project_config_manager = Optimizely::HTTPProjectConfigManager.new( + sdk_key: 'QBw9gFM8oTn7ogY9ANCC1z' + ) + + project_instance = Optimizely::Project.new( + nil, nil, spy_logger, error_handler, + false, nil, nil, http_project_config_manager + ) + + project_instance.close + expect(project_instance.is_valid).to be false + end + + it 'shoud return optimizely as invalid for an API when close is called' do + WebMock.allow_net_connect! + http_project_config_manager = Optimizely::HTTPProjectConfigManager.new( + sdk_key: 'QBw9gFM8oTn7ogY9ANCC1z' + ) + + project_instance = Optimizely::Project.new( + config_body_JSON, nil, spy_logger, error_handler, + false, nil, nil, http_project_config_manager + ) + + until http_project_config_manager.ready?; end + + expect(project_instance.activate('checkout_flow_experiment', 'test_user')).not_to eq(nil) + expect(project_instance.is_valid).to be true + + project_instance.close + + expect(project_instance.is_valid).to be false + expect(project_instance.activate('checkout_flow_experiment', 'test_user')).to eq(nil) + end + + it 'should not raise exception for static config manager' do + static_project_config_manager = Optimizely::StaticProjectConfigManager.new( + config_body_JSON, spy_logger, error_handler, false + ) + + project_instance = Optimizely::Project.new( + nil, nil, spy_logger, error_handler, + false, nil, nil, static_project_config_manager + ) + + project_instance.close + expect(project_instance.stopped).to be true + end + + it 'should not raise exception in any API using static config manager' do + static_project_config_manager = Optimizely::StaticProjectConfigManager.new( + config_body_JSON, spy_logger, error_handler, false + ) + + project_instance = Optimizely::Project.new( + nil, nil, spy_logger, error_handler, + false, nil, nil, static_project_config_manager + ) + + project_instance.close + + expect(project_instance.stopped).to be true + expect(project_instance.activate('checkout_flow_experiment', 'test_user')).to eq(nil) + expect(project_instance.get_variation('checkout_flow_experiment', 'test_user')).to eq(nil) + expect(project_instance.track('test_event', 'test_user')).to eq(nil) + expect(project_instance.is_feature_enabled('boolean_single_variable_feature', 'test_user')).to be false + expect(project_instance.get_enabled_features('test_user')).to be_empty + expect(project_instance.set_forced_variation('test_experiment', 'test', 'variation')).to eq(nil) + expect(project_instance.get_forced_variation('test_experiment', 'test_user')).to eq(nil) + expect(project_instance.get_feature_variable('integer_single_variable_feature', 'integer_variable', 'test_user', nil)) + .to eq(nil) + expect(project_instance.get_feature_variable_string('string_single_variable_feature', 'string_variable', 'test_user', nil)) + .to eq(nil) + expect(project_instance.get_feature_variable_boolean('boolean_single_variable_feature', 'boolean_variable', 'test_user', nil)) + .to eq(nil) + expect(project_instance.get_feature_variable_double('double_single_variable_feature', 'double_variable', 'test_user', nil)) + .to eq(nil) + expect(project_instance.get_feature_variable_integer('integer_single_variable_feature', 'integer_variable', 'test_user', nil)) + .to eq(nil) + end + end end