Skip to content

feat(integrateep): Integrate Event processor #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
28ea204
feat(eventProcessor): Adds EventProcessor and BatchEventProcessor
rashidsp Aug 1, 2019
bcfca16
feat(forwarding-EP): Implements forwarding event processor
rashidsp Aug 2, 2019
ccbea99
feat(notification-center): Adds LogEvent notification
rashidsp Jul 31, 2019
f7a5b97
log event in forward-EP
rashidsp Aug 2, 2019
f5db35b
Addressing feedback
rashidsp Aug 2, 2019
32a566e
Addressing feedback
rashidsp Aug 2, 2019
c8ed3f7
Addressing feedback
rashidsp Aug 2, 2019
c0946ea
fixes log event issue
rashidsp Aug 2, 2019
cfcf5a8
Addresses review
rashidsp Aug 5, 2019
717adb0
resolves test issue
rashidsp Aug 5, 2019
6eec6bd
fixes logger issue
rashidsp Aug 5, 2019
7ac5896
feat(eventprocess): Integrate Event Processor with Optimizely
rashidsp Aug 6, 2019
899f3a6
fixes: failing test
rashidsp Aug 6, 2019
a31a9da
tests: adds SizedQueue
rashidsp Aug 7, 2019
93562a1
resolves conflicts
rashidsp Aug 7, 2019
0b7e53d
resolves conflicts
rashidsp Aug 7, 2019
e1f6715
Merge branch 'rashid/log-event' into rashid/optly-EP-integration
rashidsp Aug 7, 2019
309ee3a
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 8, 2019
096ae95
Addresses feedback
rashidsp Aug 8, 2019
1341177
Addresses feedback
rashidsp Aug 9, 2019
ededaa7
reverts log-event changes
rashidsp Aug 9, 2019
2512412
resolves conflicts
rashidsp Aug 9, 2019
61e3426
config manager update
rashidsp Aug 9, 2019
b409edf
Removed verbose log.
msohailhussain Aug 15, 2019
9c019c7
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 16, 2019
2fd378f
Resolves unit test
rashidsp Aug 16, 2019
efab9f4
removes log
rashidsp Aug 16, 2019
3c4a980
adds positive number conditions
rashidsp Aug 16, 2019
29fc092
resolves commits
rashidsp Aug 18, 2019
cbdce5e
fixes visitor_attributes bug
rashidsp Aug 19, 2019
6021aef
fixes visitor_attributes bug
rashidsp Aug 19, 2019
184fd06
Addressed review
rashidsp Aug 20, 2019
432933c
Merge branch 'rashid/batch-event-processor' into rashid/optly-EP-inte…
rashidsp Aug 20, 2019
0e12040
resolves conflicts
rashidsp Aug 23, 2019
d3a7495
Fixes event bug
rashidsp Aug 21, 2019
e802860
review changes
rashidsp Aug 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions lib/optimizely.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
require_relative 'optimizely/decision_service'
require_relative 'optimizely/error_handler'
require_relative 'optimizely/event_builder'
require_relative 'optimizely/event/forwarding_event_processor'
require_relative 'optimizely/event/event_factory'
require_relative 'optimizely/event/user_event_factory'
require_relative 'optimizely/event_dispatcher'
require_relative 'optimizely/exceptions'
require_relative 'optimizely/helpers/constants'
Expand All @@ -35,8 +38,8 @@ module Optimizely
class Project
attr_reader :notification_center
# @api no-doc
attr_reader :config_manager, :decision_service, :error_handler,
:event_builder, :event_dispatcher, :logger
attr_reader :config_manager, :decision_service, :error_handler, :event_dispatcher,
:event_processor, :logger, :stopped

# Constructor for Projects.
#
Expand All @@ -51,6 +54,7 @@ class Project
# Must provide at least one of datafile or sdk_key.
# @param config_manager - Optional Responds to get_config.
# @param notification_center - Optional Instance of NotificationCenter.
# @param event_processor - Optional Responds to process.

def initialize(
datafile = nil,
Expand All @@ -61,7 +65,8 @@ def initialize(
user_profile_service = nil,
sdk_key = nil,
config_manager = nil,
notification_center = nil
notification_center = nil,
event_processor = nil
)
@logger = logger || NoOpLogger.new
@error_handler = error_handler || NoOpErrorHandler.new
Expand Down Expand Up @@ -91,8 +96,14 @@ def initialize(
else
StaticProjectConfigManager.new(datafile, @logger, @error_handler, skip_json_validation)
end

@decision_service = DecisionService.new(@logger, @user_profile_service)
@event_builder = EventBuilder.new(@logger)

@event_processor = if event_processor.respond_to?(:process)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean respond_to ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks instance method process exist, similarly we are checking for config_manager; config_manager.respond_to?(:get_config)

event_processor
else
ForwardingEventProcessor.new(@event_dispatcher, @logger)
end
end

# Buckets visitor and sends impression event to Optimizely.
Expand Down Expand Up @@ -243,19 +254,14 @@ def track(event_key, user_id, attributes = nil, event_tags = nil)
return nil
end

conversion_event = @event_builder.create_conversion_event(config, event, user_id, attributes, event_tags)
user_event = UserEventFactory.create_conversion_event(config, event, user_id, attributes, event_tags)
@event_processor.process(user_event)
@logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.")
@logger.log(Logger::INFO,
"Dispatching conversion event to URL #{conversion_event.url} with params #{conversion_event.params}.")
begin
@event_dispatcher.dispatch_event(conversion_event)
rescue => e
@logger.log(Logger::ERROR, "Unable to dispatch conversion event. Error: #{e}")
end

log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, conversion_event
event_key, user_id, attributes, event_tags, log_event
)
nil
end
Expand Down Expand Up @@ -507,6 +513,14 @@ def is_valid
config.is_a?(Optimizely::ProjectConfig)
end

def close
return if @stopped

@stopped = true
@config_manager.stop! if @config_manager.respond_to?(:stop!)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how you are making sure, that stop property exists for both event_processor and config_manager

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest stop should be treated as required for any event_processor or config_manager passed in. We could validate them in the constructor. When stop doesn't exist, we can log a message and ignore it.

@event_processor.stop! if @event_processor.respond_to?(:stop!)
end

private

def get_variation_with_config(experiment_key, user_id, attributes, config)
Expand Down Expand Up @@ -692,18 +706,15 @@ def validate_instantiation_options
def send_impression(config, experiment, variation_key, user_id, attributes = nil)
experiment_key = experiment['key']
variation_id = config.get_variation_id_from_key(experiment_key, variation_key)
impression_event = @event_builder.create_impression_event(config, experiment, variation_id, user_id, attributes)
@logger.log(Logger::INFO,
"Dispatching impression event to URL #{impression_event.url} with params #{impression_event.params}.")
begin
@event_dispatcher.dispatch_event(impression_event)
rescue => e
@logger.log(Logger::ERROR, "Unable to dispatch impression event. Error: #{e}")
end
user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes)
@event_processor.process(user_event)

@logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.")
variation = config.get_variation_from_id(experiment_key, variation_id)
log_event = EventFactory.create_log_event(user_event, @logger)
@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
experiment, user_id, attributes, variation, impression_event
experiment, user_id, attributes, variation, log_event
)
end

Expand Down
16 changes: 15 additions & 1 deletion lib/optimizely/config_manager/http_project_config_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Optimizely
class HTTPProjectConfigManager < ProjectConfigManager
# Config manager that polls for the datafile and updated ProjectConfig based on an update interval.

attr_reader :config
attr_reader :config, :closed

# Initialize config manager. One of sdk_key or url has to be set to be able to use.
#
Expand Down Expand Up @@ -72,6 +72,7 @@ def initialize(
@last_modified = nil
@async_scheduler = AsyncScheduler.new(method(:fetch_datafile_config), @polling_interval, auto_update, @logger)
@async_scheduler.start! if start_by_default == true
@closed = false
@skip_json_validation = skip_json_validation
@notification_center = notification_center.is_a?(Optimizely::NotificationCenter) ? notification_center : NotificationCenter.new(@logger, @error_handler)
@config = datafile.nil? ? nil : DatafileProjectConfig.create(datafile, @logger, @error_handler, @skip_json_validation)
Expand All @@ -84,11 +85,24 @@ def ready?
end

def start!
if @closed
@logger.log(Logger::WARN, 'Not starting. Already closed.')
return
end

@async_scheduler.start!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must check if it is stopped, then no need to start again.

@closed = false
end

def stop!
if @closed
@logger.log(Logger::WARN, 'Not pausing. Manager has not been started.')
return
end

@async_scheduler.stop!
@config = nil
@closed = true
end

def get_config
Expand Down
18 changes: 9 additions & 9 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BatchEventProcessor < EventProcessor
# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.

attr_reader :event_queue, :current_batch, :batch_size, :flush_interval
attr_reader :event_queue, :current_batch, :started, :batch_size, :flush_interval

DEFAULT_BATCH_SIZE = 10
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
Expand Down Expand Up @@ -54,18 +54,18 @@ def initialize(
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
@is_started = false
@started = false
start!
end

def start!
if @is_started == true
if @started == true
@logger.log(Logger::WARN, 'Service already started.')
return
end
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@thread = Thread.new { run }
@is_started = true
@started = true
end

def flush
Expand All @@ -78,7 +78,7 @@ def flush
def process(user_event)
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

unless @thread.alive?
if !@started || !@thread.alive?
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
return
end
Expand All @@ -95,14 +95,14 @@ def process(user_event)
end

def stop!
return unless @thread.alive?
return unless @started

@mutex.synchronize do
@event_queue << SHUTDOWN_SIGNAL
@received.signal
end

@is_started = false
@started = false
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if stop! is called several times quickly? Would it be more correct this way, with checking @started first?

def stop!
  return unless @started
  return unless @thread.alive? # not sure if this is still necessary?
  
  @logger.log(Logger::WARN, 'Stopping scheduler.')
  @started = false
  @mutex.synchronize do
    event_queue << SHUTDOWN_SIGNAL
    @received.signal
  end

  @thread.exit
end

@logger.log(Logger::WARN, 'Stopping scheduler.')
@thread.exit
end
Expand Down Expand Up @@ -153,7 +153,7 @@ def flush_queue!
begin
@event_dispatcher.dispatch_event(log_event)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}")
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
@current_batch = []
end
Expand All @@ -167,7 +167,7 @@ def add_to_batch(user_event)
# Reset the deadline if starting a new batch.
@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?

@logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.")
@logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.")
@current_batch << user_event
return unless @current_batch.length >= @batch_size

Expand Down
38 changes: 38 additions & 0 deletions lib/optimizely/event/forwarding_event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require_relative 'event_processor'
module Optimizely
class ForwardingEventProcessor < EventProcessor
# ForwardingEventProcessor is a basic transformation stage for converting
# the event batch into a LogEvent to be dispatched.
def initialize(event_dispatcher, logger = nil)
@event_dispatcher = event_dispatcher
@logger = logger || NoOpLogger.new
end

def process(user_event)
log_event = Optimizely::EventFactory.create_log_event(user_event, @logger)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the log event notification implemented yet? I think it should be triggered here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in PR: #196

begin
@event_dispatcher.dispatch_event(log_event)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
end
end
end
1 change: 0 additions & 1 deletion spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::EventFactory.create_log_event(expected_batch, spy_logger)
).once
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, "Adding user event: #{event['key']} to batch.").exactly(10).times
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once
end

Expand Down
103 changes: 103 additions & 0 deletions spec/event/forwarding_event_processor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'spec_helper'
require 'optimizely/event/forwarding_event_processor'
require 'optimizely/event/user_event_factory'
require 'optimizely/error_handler'
require 'optimizely/helpers/date_time_utils'
require 'optimizely/logger'
describe Optimizely::ForwardingEventProcessor do
let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON }
let(:error_handler) { Optimizely::NoOpErrorHandler.new }
let(:spy_logger) { spy('logger') }
let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) }
let(:event) { project_config.get_event_from_key('test_event') }
let(:log_url) { 'https://logx.optimizely.com/v1/events' }
let(:post_headers) { {'Content-Type' => 'application/json'} }

before(:example) do
time_now = Time.now
allow(Time).to receive(:now).and_return(time_now)
allow(SecureRandom).to receive(:uuid).and_return('a68cf1ad-0393-4e18-af87-efe8f01a7c9c')

@event_dispatcher = Optimizely::EventDispatcher.new
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
@conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)

@expected_endpoint = 'https://logx.optimizely.com/v1/events'
@expected_conversion_params = {
account_id: '12001',
project_id: '111001',
visitors: [{
attributes: [{
entity_id: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'],
key: Optimizely::Helpers::Constants::CONTROL_ATTRIBUTES['BOT_FILTERING'],
type: 'custom',
value: true
}],
visitor_id: 'test_user',
snapshots: [{
events: [{
entity_id: '111095',
timestamp: Optimizely::Helpers::DateTimeUtils.create_timestamp,
uuid: 'a68cf1ad-0393-4e18-af87-efe8f01a7c9c',
key: 'test_event'
}]
}]
}],
anonymize_ip: false,
revision: '42',
client_name: Optimizely::CLIENT_ENGINE,
enrich_decisions: true,
client_version: Optimizely::VERSION
}
end

describe '.process' do
it 'should dispatch log event when valid event is provided' do
forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
)

forwarding_event_processor.process(@conversion_event)

expect(@event_dispatcher).to have_received(:dispatch_event).with(
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
).once
end

it 'should log an error when dispatch event raises timeout exception' do
log_event = Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event)

timeout_error = Timeout::Error.new
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error)

forwarding_event_processor = Optimizely::ForwardingEventProcessor.new(
@event_dispatcher, spy_logger
)

forwarding_event_processor.process(@conversion_event)

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
)
end
end
end
Loading