FEATURE: ThreadPool implementation (#30364)
This commit introduces a new ThreadPool class that provides efficient worker thread management for background tasks. Key features include: - Dynamic scaling from min to max threads based on workload - Proper database connection management in multisite setup - Graceful shutdown with task completion - Robust error handling and logging - FIFO task processing with a managed queue - Configurable idle timeout for worker threads The implementation is thoroughly tested, including stress tests, error scenarios, and multisite compatibility.
This commit is contained in:
parent
2a3f0f3bef
commit
efa50a4da2
|
@ -0,0 +1,147 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Scheduler
|
||||
# ThreadPool manages a pool of worker threads that process tasks from a queue.
|
||||
# It maintains a minimum number of threads and can scale up to a maximum number
|
||||
# when there's more work to be done.
|
||||
#
|
||||
# Usage:
|
||||
# pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
|
||||
# pool.post { do_something }
|
||||
#
|
||||
# (optional)
|
||||
# pool.shutdown
|
||||
|
||||
class ThreadPool
|
||||
class ShutdownError < StandardError
|
||||
end
|
||||
|
||||
def initialize(min_threads:, max_threads:, idle_time:)
|
||||
raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0
|
||||
raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1
|
||||
raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads
|
||||
raise ArgumentError, "idle_time must be positive" if idle_time <= 0
|
||||
|
||||
@min_threads = min_threads
|
||||
@max_threads = max_threads
|
||||
@idle_time = idle_time
|
||||
|
||||
@threads = Set.new
|
||||
|
||||
@queue = Queue.new
|
||||
@mutex = Mutex.new
|
||||
@new_work = ConditionVariable.new
|
||||
@shutdown = false
|
||||
|
||||
# Initialize minimum number of threads
|
||||
@min_threads.times { spawn_thread }
|
||||
end
|
||||
|
||||
def post(&block)
|
||||
raise ShutdownError, "Cannot post work to a shutdown ThreadPool" if shutdown?
|
||||
|
||||
db = RailsMultisite::ConnectionManagement.current_db
|
||||
wrapped_block = wrap_block(block, db)
|
||||
|
||||
@mutex.synchronize do
|
||||
@queue << wrapped_block
|
||||
|
||||
spawn_thread if @queue.length > 1 && @threads.length < @max_threads
|
||||
|
||||
@new_work.signal
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown(timeout: 30)
|
||||
@mutex.synchronize do
|
||||
return if @shutdown
|
||||
@shutdown = true
|
||||
@threads.length.times { @queue << :shutdown }
|
||||
@new_work.broadcast
|
||||
end
|
||||
|
||||
threads_to_join = nil
|
||||
@mutex.synchronize { threads_to_join = @threads.to_a }
|
||||
|
||||
failed_to_shutdown = false
|
||||
|
||||
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
|
||||
threads_to_join.each do |thread|
|
||||
remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
break if remaining_time <= 0
|
||||
if !thread.join(remaining_time)
|
||||
Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}"
|
||||
failed_to_shutdown = true
|
||||
end
|
||||
end
|
||||
|
||||
raise ShutdownError, "Failed to shutdown ThreadPool within timeout" if failed_to_shutdown
|
||||
end
|
||||
|
||||
def shutdown?
|
||||
@mutex.synchronize { @shutdown }
|
||||
end
|
||||
|
||||
def stats
|
||||
@mutex.synchronize do
|
||||
{
|
||||
thread_count: @threads.size,
|
||||
queued_tasks: @queue.size,
|
||||
shutdown: @shutdown,
|
||||
min_threads: @min_threads,
|
||||
max_threads: @max_threads,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def wrap_block(block, db)
|
||||
proc do
|
||||
begin
|
||||
RailsMultisite::ConnectionManagement.with_connection(db) { block.call }
|
||||
rescue StandardError => e
|
||||
Discourse.warn_exception(
|
||||
e,
|
||||
message: "Discourse Scheduler ThreadPool: Unhandled exception",
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def thread_loop
|
||||
done = false
|
||||
while !done
|
||||
work = nil
|
||||
|
||||
@mutex.synchronize do
|
||||
@new_work.wait(@mutex, @idle_time)
|
||||
|
||||
if @queue.empty?
|
||||
done = @threads.count > @min_threads
|
||||
else
|
||||
work = @queue.pop
|
||||
|
||||
if work == :shutdown
|
||||
work = nil
|
||||
done = true
|
||||
end
|
||||
end
|
||||
|
||||
@threads.delete(Thread.current) if done
|
||||
end
|
||||
|
||||
# could be nil if the thread just needs to idle
|
||||
work&.call if !done
|
||||
end
|
||||
end
|
||||
|
||||
# Outside of constructor usage this is called from a synchronized block
|
||||
# we are already synchronized
|
||||
def spawn_thread
|
||||
thread = Thread.new { thread_loop }
|
||||
thread.abort_on_exception = true
|
||||
@threads << thread
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,179 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Scheduler::ThreadPool, type: :multisite do
|
||||
let(:min_threads) { 2 }
|
||||
let(:max_threads) { 4 }
|
||||
let(:idle_time) { 0.1 }
|
||||
|
||||
let(:pool) do
|
||||
described_class.new(min_threads: min_threads, max_threads: max_threads, idle_time: idle_time)
|
||||
end
|
||||
|
||||
after { pool.shutdown(timeout: 1) }
|
||||
|
||||
describe "initialization" do
|
||||
it "creates the minimum number of threads and validates parameters" do
|
||||
expect(pool.stats[:thread_count]).to eq(min_threads)
|
||||
expect(pool.stats[:min_threads]).to eq(min_threads)
|
||||
expect(pool.stats[:max_threads]).to eq(max_threads)
|
||||
expect(pool.stats[:shutdown]).to be false
|
||||
end
|
||||
|
||||
it "raises ArgumentError for invalid parameters" do
|
||||
expect { described_class.new(min_threads: -1, max_threads: 2, idle_time: 1) }.to raise_error(
|
||||
ArgumentError,
|
||||
"min_threads must be 0 or larger",
|
||||
)
|
||||
|
||||
expect { described_class.new(min_threads: 2, max_threads: 1, idle_time: 1) }.to raise_error(
|
||||
ArgumentError,
|
||||
"max_threads must be >= min_threads",
|
||||
)
|
||||
|
||||
expect { described_class.new(min_threads: 1, max_threads: 2, idle_time: 0) }.to raise_error(
|
||||
ArgumentError,
|
||||
"idle_time must be positive",
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#post" do
|
||||
it "executes submitted tasks" do
|
||||
completion_queue = Queue.new
|
||||
|
||||
pool.post { completion_queue << 1 }
|
||||
pool.post { completion_queue << 2 }
|
||||
|
||||
results = Array.new(2) { completion_queue.pop }
|
||||
expect(results).to contain_exactly(1, 2)
|
||||
end
|
||||
|
||||
it "maintains database connection context" do
|
||||
completion_queue = Queue.new
|
||||
|
||||
test_multisite_connection("second") do
|
||||
pool.post { completion_queue << RailsMultisite::ConnectionManagement.current_db }
|
||||
end
|
||||
|
||||
expect(completion_queue.pop).to eq("second")
|
||||
end
|
||||
|
||||
it "scales up threads when work increases" do
|
||||
completion_queue = Queue.new
|
||||
blocker_queue = Queue.new
|
||||
|
||||
# Create enough blocking tasks to force thread creation
|
||||
(max_threads + 1).times do |i|
|
||||
pool.post do
|
||||
completion_queue << i
|
||||
blocker_queue.pop
|
||||
end
|
||||
end
|
||||
|
||||
expect(pool.stats[:thread_count]).to eq(max_threads)
|
||||
(max_threads + 1).times { blocker_queue << :continue }
|
||||
|
||||
results = Array.new(max_threads + 1) { completion_queue.pop }
|
||||
|
||||
expect(results.sort).to eq((0..max_threads).to_a)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#shutdown" do
|
||||
it "prevents new tasks from being posted" do
|
||||
completion_queue = Queue.new
|
||||
pool.post { completion_queue << 1 }
|
||||
completion_queue.pop # ensure first task completes
|
||||
|
||||
pool.shutdown
|
||||
expect(pool.shutdown?).to be true
|
||||
expect { pool.post { true } }.to raise_error(Scheduler::ThreadPool::ShutdownError)
|
||||
end
|
||||
|
||||
it "completes pending tasks before shutting down" do
|
||||
blocker_queue1 = Queue.new
|
||||
completion_queue1 = Queue.new
|
||||
|
||||
blocker_queue2 = Queue.new
|
||||
completion_queue2 = Queue.new
|
||||
|
||||
3.times do |i|
|
||||
pool.post do
|
||||
blocker_queue1.pop
|
||||
completion_queue1 << i
|
||||
blocker_queue2.pop
|
||||
completion_queue2 << i
|
||||
end
|
||||
end
|
||||
|
||||
3.times { blocker_queue1 << :continue }
|
||||
results1 = Array.new(3) { completion_queue1.pop }
|
||||
|
||||
# this is not perfect, but it close enough
|
||||
# usually spawing the thread will take longer than making the call to shutdown
|
||||
# even if it does not it does not really matter that much
|
||||
results2 = nil
|
||||
|
||||
Thread.new do
|
||||
3.times { blocker_queue2 << :continue }
|
||||
results2 = Array.new(3) { completion_queue2.pop }
|
||||
end
|
||||
|
||||
pool.shutdown(timeout: 1)
|
||||
|
||||
expect(results1.size).to eq(3)
|
||||
expect(results1.sort).to eq([0, 1, 2])
|
||||
|
||||
expect(results2.size).to eq(3)
|
||||
expect(results2.sort).to eq([0, 1, 2])
|
||||
end
|
||||
end
|
||||
|
||||
describe "error handling" do
|
||||
it "captures and logs exceptions without crashing the thread" do
|
||||
completion_queue = Queue.new
|
||||
error_msg = "Test error"
|
||||
|
||||
pool.post { raise StandardError, error_msg }
|
||||
pool.post { completion_queue << :completed }
|
||||
|
||||
# If the error handling works, this second task should complete
|
||||
expect(completion_queue.pop).to eq(:completed)
|
||||
expect(pool.stats[:thread_count]).to eq(min_threads)
|
||||
end
|
||||
end
|
||||
|
||||
describe "queue management" do
|
||||
it "processes tasks in FIFO order" do
|
||||
completion_queue = Queue.new
|
||||
control_queue = Queue.new
|
||||
|
||||
# First task will wait for signal
|
||||
pool.post do
|
||||
control_queue.pop
|
||||
completion_queue << 1
|
||||
end
|
||||
|
||||
# Second task should execute after first
|
||||
pool.post { completion_queue << 2 }
|
||||
|
||||
# Signal first task to complete
|
||||
control_queue << :continue
|
||||
|
||||
results = Array.new(2) { completion_queue.pop }
|
||||
expect(results).to eq([1, 2])
|
||||
end
|
||||
end
|
||||
|
||||
describe "stress test" do
|
||||
it "handles multiple task submissions correctly" do
|
||||
completion_queue = Queue.new
|
||||
task_count = 50
|
||||
|
||||
task_count.times { |i| pool.post { completion_queue << i } }
|
||||
|
||||
results = Array.new(task_count) { completion_queue.pop }
|
||||
expect(results.sort).to eq((0...task_count).to_a)
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue