From 525df17221d8be701fbe344577ad0030fa29d321 Mon Sep 17 00:00:00 2001 From: Jon Evans Date: Thu, 11 Jan 2024 10:35:09 -0700 Subject: [PATCH] Introduce ForkTracker to automatically restart threads --- .../config_manager/async_scheduler.rb | 25 +++--- lib/optimizely/event/batch_event_processor.rb | 2 + lib/optimizely/fork_tracker.rb | 80 +++++++++++++++++++ lib/optimizely/odp/odp_event_manager.rb | 2 + 4 files changed, 99 insertions(+), 10 deletions(-) create mode 100644 lib/optimizely/fork_tracker.rb diff --git a/lib/optimizely/config_manager/async_scheduler.rb b/lib/optimizely/config_manager/async_scheduler.rb index f6318248..d95c5731 100644 --- a/lib/optimizely/config_manager/async_scheduler.rb +++ b/lib/optimizely/config_manager/async_scheduler.rb @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +require_relative '../fork_tracker' + module Optimizely class AsyncScheduler attr_reader :running @@ -48,16 +50,8 @@ def start! return end - begin - @running = true - @thread = Thread.new { execution_wrapper(@callback) } - rescue StandardError => e - @logger.log( - Logger::ERROR, - "Couldn't create a new thread for async scheduler. #{e.message}" - ) - @error_handler.handle_error(e) - end + force_start! + ForkTracker.after_fork { force_start! } end def stop! @@ -72,6 +66,17 @@ def stop! private + def force_start! + @running = true + @thread = Thread.new { execution_wrapper(@callback) } + rescue StandardError => e + @logger.log( + Logger::ERROR, + "Couldn't create a new thread for async scheduler. #{e.message}" + ) + @error_handler.handle_error(e) + end + def execution_wrapper(callback) # Executes the given callback periodically diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 52ec0533..aad3f8c8 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -16,6 +16,7 @@ # limitations under the License. # require_relative 'event_processor' +require_relative '../fork_tracker' require_relative '../helpers/validator' module Optimizely class BatchEventProcessor < EventProcessor @@ -77,6 +78,7 @@ def start! @resource = ConditionVariable.new end @thread = Thread.new { run_queue } + ForkTracker.after_fork { @thread = Thread.new { run_queue } } @started = true @stopped = false end diff --git a/lib/optimizely/fork_tracker.rb b/lib/optimizely/fork_tracker.rb new file mode 100644 index 00000000..b9495b67 --- /dev/null +++ b/lib/optimizely/fork_tracker.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +# This class is used for re-starting of threads after a fork +# Sourced from https://github.com/rails/rails/blob/a44559679aa26a54ea9867a68a78c5a4a55a3b9f/activesupport/lib/active_support/fork_tracker.rb + +module Optimizely + module ForkTracker + module ModernCoreExt + def _fork + pid = super + ForkTracker.after_fork_callback if pid.zero? + pid + end + end + + module CoreExt + def fork(...) + if block_given? + super do + ForkTracker.check! + yield + end + else + unless (pid = super) + ForkTracker.check! + end + pid + end + end + end + + module CoreExtPrivate + include CoreExt + private :fork + end + + @pid = Process.pid + @callbacks = [] + + class << self + def after_fork_callback + new_pid = Process.pid + return unless @pid != new_pid + + @callbacks.each(&:call) + @pid = new_pid + end + + if Process.respond_to?(:_fork) # Ruby 3.1+ + def check! + # We trust the `_fork` callback + end + else + alias check! after_fork_callback + end + + def hook! + if Process.respond_to?(:_fork) # Ruby 3.1+ + ::Process.singleton_class.prepend(ModernCoreExt) + elsif Process.respond_to?(:fork) + ::Object.prepend(CoreExtPrivate) if RUBY_VERSION < '3.0' + ::Kernel.prepend(CoreExtPrivate) + ::Kernel.singleton_class.prepend(CoreExt) + ::Process.singleton_class.prepend(CoreExt) + end + end + + def after_fork(&block) + @callbacks << block + block + end + + def unregister(callback) + @callbacks.delete(callback) + end + end + end +end + +Optimizely::ForkTracker.hook! diff --git a/lib/optimizely/odp/odp_event_manager.rb b/lib/optimizely/odp/odp_event_manager.rb index fc9084a1..b5d7c29d 100644 --- a/lib/optimizely/odp/odp_event_manager.rb +++ b/lib/optimizely/odp/odp_event_manager.rb @@ -18,6 +18,7 @@ require_relative 'odp_event_api_manager' require_relative '../helpers/constants' require_relative 'odp_event' +require_relative '../fork_tracker' module Optimizely class OdpEventManager @@ -68,6 +69,7 @@ def start!(odp_config) @api_key = odp_config.api_key @thread = Thread.new { run } + ForkTracker.after_fork { @thread = Thread.new { run } } @logger.log(Logger::INFO, 'Starting scheduler.') end