Skip to content

Commit 455203e

Browse files
committed
RubySingleThreadExecutor < RubyThreadPoolExecutor
1 parent 7c2909f commit 455203e

File tree

4 files changed

+34
-79
lines changed

4 files changed

+34
-79
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### Upcoming Release v1.0.0.pre2 (TBD)
2+
3+
* Simplification of `RubySingleThreadExecutor`
4+
15
## Current Release v1.0.0.pre1 (19 Aug 2015)
26

37
* Merged in the `thread_safe` gem

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
module Concurrent
77

88
# @!macro single_thread_executor
9-
# @!macro thread_pool_options
109
# @!macro abstract_executor_service_public_api
1110
# @!visibility private
1211
class JavaSingleThreadExecutor < JavaExecutorService
Lines changed: 10 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,22 @@
1-
require 'thread'
2-
require 'concurrent/executor/ruby_executor_service'
3-
require 'concurrent/executor/serial_executor_service'
1+
require 'concurrent/executor/ruby_thread_pool_executor'
42

53
module Concurrent
64

75
# @!macro single_thread_executor
8-
# @!macro thread_pool_options
96
# @!macro abstract_executor_service_public_api
107
# @!visibility private
11-
class RubySingleThreadExecutor < RubyExecutorService
12-
include SerialExecutorService
8+
class RubySingleThreadExecutor < RubyThreadPoolExecutor
139

1410
# @!macro single_thread_executor_method_initialize
1511
def initialize(opts = {})
16-
super
17-
end
18-
19-
private
20-
21-
def ns_initialize(opts)
22-
@queue = Queue.new
23-
@thread = nil
24-
@fallback_policy = opts.fetch(:fallback_policy, :discard)
25-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
26-
self.auto_terminate = opts.fetch(:auto_terminate, true)
27-
end
28-
29-
# @!visibility private
30-
def ns_execute(*args, &task)
31-
supervise
32-
@queue << [args, task]
33-
end
34-
35-
# @!visibility private
36-
def ns_shutdown_execution
37-
@queue << :stop
38-
stopped_event.set unless alive?
39-
end
40-
41-
# @!visibility private
42-
def ns_kill_execution
43-
@queue.clear
44-
@thread.kill if alive?
45-
end
46-
47-
# @!visibility private
48-
def alive?
49-
@thread && @thread.alive?
50-
end
51-
52-
# @!visibility private
53-
def supervise
54-
@thread = new_worker_thread unless alive?
55-
end
56-
57-
# @!visibility private
58-
def new_worker_thread
59-
Thread.new do
60-
Thread.current.abort_on_exception = false
61-
work
62-
end
63-
end
64-
65-
# @!visibility private
66-
def work
67-
loop do
68-
task = @queue.pop
69-
break if task == :stop
70-
begin
71-
task.last.call(*task.first)
72-
rescue => ex
73-
# let it fail
74-
log DEBUG, ex
75-
end
76-
end
77-
stopped_event.set
12+
super(
13+
min_threads: 1,
14+
max_threads: 1,
15+
max_queue: 0,
16+
idletime: DEFAULT_THREAD_IDLETIMEOUT,
17+
fallback_policy: opts.fetch(:fallback_policy, :discard),
18+
auto_terminate: opts.fetch(:auto_terminate, true)
19+
)
7820
end
7921
end
8022
end

lib/concurrent/executor/single_thread_executor.rb

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,35 @@ module Concurrent
1616

1717
# @!macro [attach] single_thread_executor
1818
#
19-
# A thread pool with a set number of threads. The number of threads in the pool
20-
# is set on construction and remains constant. When all threads are busy new
21-
# tasks `#post` to the thread pool are enqueued until a thread becomes available.
22-
# Should a thread crash for any reason the thread will immediately be removed
23-
# from the pool and replaced.
19+
# A thread pool with a single thread an unlimited queue. Should the thread
20+
# die for any reason it will be removed and replaced, thus ensuring that
21+
# the executor will always remain viable and available to process jobs.
2422
#
25-
# The API and behavior of this class are based on Java's `SingleThreadExecutor`
23+
# A common pattern for background processing is to create a single thread
24+
# on which an infinite loop is run. The thread's loop blocks on an input
25+
# source (perhaps blocking I/O or a queue) and processes each input as it
26+
# is received. This pattern has several issues. The thread itself is highly
27+
# susceptible to errors during processing. Also, the thread itself must be
28+
# constantly monitored and restarted should it die. `SingleThreadExecutor`
29+
# encapsulates all these bahaviors. The task processor is highly resilient
30+
# to errors from within tasks. Also, should the thread die it will
31+
# automatically be restarted.
32+
#
33+
# The API and behavior of this class are based on Java's `SingleThreadExecutor`.
2634
#
27-
# @!macro thread_pool_options
2835
# @!macro abstract_executor_service_public_api
2936
class SingleThreadExecutor < SingleThreadExecutorImplementation
3037

3138
# @!macro [new] single_thread_executor_method_initialize
3239
#
3340
# Create a new thread pool.
3441
#
35-
# @option opts [Symbol] :fallback_policy (:discard) the policy for
36-
# handling new tasks that are received when the queue size has
37-
# reached `max_queue` or after the executor has shut down
42+
# @option opts [Symbol] :fallback_policy (:discard) the policy for handling new
43+
# tasks that are received when the queue size has reached
44+
# `max_queue` or the executor has shut down
45+
#
46+
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
47+
# in `FALLBACK_POLICIES`
3848
#
3949
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
4050
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html

0 commit comments

Comments
 (0)