From efa50a4da2db35445d469835e1475f7827f8d7de Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 20 Dec 2024 07:37:12 +1100 Subject: [PATCH] FEATURE: ThreadPool implementation (#30364) This commit introduces a new ThreadPool class that provides efficient worker thread management for background tasks. Key features include: - Dynamic scaling from min to max threads based on workload - Proper database connection management in multisite setup - Graceful shutdown with task completion - Robust error handling and logging - FIFO task processing with a managed queue - Configurable idle timeout for worker threads The implementation is thoroughly tested, including stress tests, error scenarios, and multisite compatibility. --- lib/scheduler/thread_pool.rb | 147 ++++++++++++++++++++ spec/lib/scheduler/thread_pool_spec.rb | 179 +++++++++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 lib/scheduler/thread_pool.rb create mode 100644 spec/lib/scheduler/thread_pool_spec.rb diff --git a/lib/scheduler/thread_pool.rb b/lib/scheduler/thread_pool.rb new file mode 100644 index 00000000000..a6872704355 --- /dev/null +++ b/lib/scheduler/thread_pool.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +module Scheduler + # ThreadPool manages a pool of worker threads that process tasks from a queue. + # It maintains a minimum number of threads and can scale up to a maximum number + # when there's more work to be done. + # + # Usage: + # pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1) + # pool.post { do_something } + # + # (optional) + # pool.shutdown + + class ThreadPool + class ShutdownError < StandardError + end + + def initialize(min_threads:, max_threads:, idle_time:) + raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0 + raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1 + raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads + raise ArgumentError, "idle_time must be positive" if idle_time <= 0 + + @min_threads = min_threads + @max_threads = max_threads + @idle_time = idle_time + + @threads = Set.new + + @queue = Queue.new + @mutex = Mutex.new + @new_work = ConditionVariable.new + @shutdown = false + + # Initialize minimum number of threads + @min_threads.times { spawn_thread } + end + + def post(&block) + raise ShutdownError, "Cannot post work to a shutdown ThreadPool" if shutdown? + + db = RailsMultisite::ConnectionManagement.current_db + wrapped_block = wrap_block(block, db) + + @mutex.synchronize do + @queue << wrapped_block + + spawn_thread if @queue.length > 1 && @threads.length < @max_threads + + @new_work.signal + end + end + + def shutdown(timeout: 30) + @mutex.synchronize do + return if @shutdown + @shutdown = true + @threads.length.times { @queue << :shutdown } + @new_work.broadcast + end + + threads_to_join = nil + @mutex.synchronize { threads_to_join = @threads.to_a } + + failed_to_shutdown = false + + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout + threads_to_join.each do |thread| + remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) + break if remaining_time <= 0 + if !thread.join(remaining_time) + Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}" + failed_to_shutdown = true + end + end + + raise ShutdownError, "Failed to shutdown ThreadPool within timeout" if failed_to_shutdown + end + + def shutdown? + @mutex.synchronize { @shutdown } + end + + def stats + @mutex.synchronize do + { + thread_count: @threads.size, + queued_tasks: @queue.size, + shutdown: @shutdown, + min_threads: @min_threads, + max_threads: @max_threads, + } + end + end + + private + + def wrap_block(block, db) + proc do + begin + RailsMultisite::ConnectionManagement.with_connection(db) { block.call } + rescue StandardError => e + Discourse.warn_exception( + e, + message: "Discourse Scheduler ThreadPool: Unhandled exception", + ) + end + end + end + + def thread_loop + done = false + while !done + work = nil + + @mutex.synchronize do + @new_work.wait(@mutex, @idle_time) + + if @queue.empty? + done = @threads.count > @min_threads + else + work = @queue.pop + + if work == :shutdown + work = nil + done = true + end + end + + @threads.delete(Thread.current) if done + end + + # could be nil if the thread just needs to idle + work&.call if !done + end + end + + # Outside of constructor usage this is called from a synchronized block + # we are already synchronized + def spawn_thread + thread = Thread.new { thread_loop } + thread.abort_on_exception = true + @threads << thread + end + end +end diff --git a/spec/lib/scheduler/thread_pool_spec.rb b/spec/lib/scheduler/thread_pool_spec.rb new file mode 100644 index 00000000000..76bd662932c --- /dev/null +++ b/spec/lib/scheduler/thread_pool_spec.rb @@ -0,0 +1,179 @@ +# frozen_string_literal: true + +RSpec.describe Scheduler::ThreadPool, type: :multisite do + let(:min_threads) { 2 } + let(:max_threads) { 4 } + let(:idle_time) { 0.1 } + + let(:pool) do + described_class.new(min_threads: min_threads, max_threads: max_threads, idle_time: idle_time) + end + + after { pool.shutdown(timeout: 1) } + + describe "initialization" do + it "creates the minimum number of threads and validates parameters" do + expect(pool.stats[:thread_count]).to eq(min_threads) + expect(pool.stats[:min_threads]).to eq(min_threads) + expect(pool.stats[:max_threads]).to eq(max_threads) + expect(pool.stats[:shutdown]).to be false + end + + it "raises ArgumentError for invalid parameters" do + expect { described_class.new(min_threads: -1, max_threads: 2, idle_time: 1) }.to raise_error( + ArgumentError, + "min_threads must be 0 or larger", + ) + + expect { described_class.new(min_threads: 2, max_threads: 1, idle_time: 1) }.to raise_error( + ArgumentError, + "max_threads must be >= min_threads", + ) + + expect { described_class.new(min_threads: 1, max_threads: 2, idle_time: 0) }.to raise_error( + ArgumentError, + "idle_time must be positive", + ) + end + end + + describe "#post" do + it "executes submitted tasks" do + completion_queue = Queue.new + + pool.post { completion_queue << 1 } + pool.post { completion_queue << 2 } + + results = Array.new(2) { completion_queue.pop } + expect(results).to contain_exactly(1, 2) + end + + it "maintains database connection context" do + completion_queue = Queue.new + + test_multisite_connection("second") do + pool.post { completion_queue << RailsMultisite::ConnectionManagement.current_db } + end + + expect(completion_queue.pop).to eq("second") + end + + it "scales up threads when work increases" do + completion_queue = Queue.new + blocker_queue = Queue.new + + # Create enough blocking tasks to force thread creation + (max_threads + 1).times do |i| + pool.post do + completion_queue << i + blocker_queue.pop + end + end + + expect(pool.stats[:thread_count]).to eq(max_threads) + (max_threads + 1).times { blocker_queue << :continue } + + results = Array.new(max_threads + 1) { completion_queue.pop } + + expect(results.sort).to eq((0..max_threads).to_a) + end + end + + describe "#shutdown" do + it "prevents new tasks from being posted" do + completion_queue = Queue.new + pool.post { completion_queue << 1 } + completion_queue.pop # ensure first task completes + + pool.shutdown + expect(pool.shutdown?).to be true + expect { pool.post { true } }.to raise_error(Scheduler::ThreadPool::ShutdownError) + end + + it "completes pending tasks before shutting down" do + blocker_queue1 = Queue.new + completion_queue1 = Queue.new + + blocker_queue2 = Queue.new + completion_queue2 = Queue.new + + 3.times do |i| + pool.post do + blocker_queue1.pop + completion_queue1 << i + blocker_queue2.pop + completion_queue2 << i + end + end + + 3.times { blocker_queue1 << :continue } + results1 = Array.new(3) { completion_queue1.pop } + + # this is not perfect, but it close enough + # usually spawing the thread will take longer than making the call to shutdown + # even if it does not it does not really matter that much + results2 = nil + + Thread.new do + 3.times { blocker_queue2 << :continue } + results2 = Array.new(3) { completion_queue2.pop } + end + + pool.shutdown(timeout: 1) + + expect(results1.size).to eq(3) + expect(results1.sort).to eq([0, 1, 2]) + + expect(results2.size).to eq(3) + expect(results2.sort).to eq([0, 1, 2]) + end + end + + describe "error handling" do + it "captures and logs exceptions without crashing the thread" do + completion_queue = Queue.new + error_msg = "Test error" + + pool.post { raise StandardError, error_msg } + pool.post { completion_queue << :completed } + + # If the error handling works, this second task should complete + expect(completion_queue.pop).to eq(:completed) + expect(pool.stats[:thread_count]).to eq(min_threads) + end + end + + describe "queue management" do + it "processes tasks in FIFO order" do + completion_queue = Queue.new + control_queue = Queue.new + + # First task will wait for signal + pool.post do + control_queue.pop + completion_queue << 1 + end + + # Second task should execute after first + pool.post { completion_queue << 2 } + + # Signal first task to complete + control_queue << :continue + + results = Array.new(2) { completion_queue.pop } + expect(results).to eq([1, 2]) + end + end + + describe "stress test" do + it "handles multiple task submissions correctly" do + completion_queue = Queue.new + task_count = 50 + + task_count.times { |i| pool.post { completion_queue << i } } + + results = Array.new(task_count) { completion_queue.pop } + expect(results.sort).to eq((0...task_count).to_a) + end + end +end