From 26e267478d785e2f32ee7da4613e2cf4a65ff182 Mon Sep 17 00:00:00 2001 From: Daniel Waterworth Date: Fri, 28 Jul 2023 12:53:51 +0100 Subject: [PATCH] SECURITY: Don't allow a particular site to monopolize the defer queue --- lib/hijack.rb | 17 ++- lib/scheduler/defer.rb | 17 ++- lib/work_queue.rb | 111 ++++++++++++++++++ spec/lib/hijack_spec.rb | 8 ++ spec/lib/work_queue_spec.rb | 216 ++++++++++++++++++++++++++++++++++++ 5 files changed, 361 insertions(+), 8 deletions(-) create mode 100644 lib/work_queue.rb create mode 100644 spec/lib/work_queue_spec.rb diff --git a/lib/hijack.rb b/lib/hijack.rb index eb9b5ce2163..775ad3b7d57 100644 --- a/lib/hijack.rb +++ b/lib/hijack.rb @@ -23,13 +23,25 @@ module Hijack transfer_timings = MethodProfiler.transfer - io = hijack.call + scheduled = Concurrent::Promises.resolvable_event + + begin + Scheduler::Defer.later( + "hijack #{params["controller"]} #{params["action"]} #{info}", + force: false, + &scheduled.method(:resolve) + ) + rescue WorkQueue::WorkQueueFull + return render plain: "", status: 503 + end # duplicate headers so other middleware does not mess with it # on the way down the stack original_headers = response.headers.dup - Scheduler::Defer.later("hijack #{params["controller"]} #{params["action"]} #{info}") do + io = hijack.call + + scheduled.on_resolution! do MethodProfiler.start(transfer_timings) begin Thread.current[Logster::Logger::LOGSTER_ENV] = env @@ -148,6 +160,7 @@ module Hijack tempfiles&.each(&:close!) end end + # not leaked out, we use 418 ... I am a teapot to denote that we are hijacked render plain: "", status: 418 else diff --git a/lib/scheduler/defer.rb b/lib/scheduler/defer.rb index 176ae1a62a3..0389fa2571a 100644 --- a/lib/scheduler/defer.rb +++ b/lib/scheduler/defer.rb @@ -8,7 +8,11 @@ module Scheduler def initialize @async = !Rails.env.test? - @queue = Queue.new + @queue = + WorkQueue::ThreadSafeWrapper.new( + WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(10) }, + ) + @mutex = Mutex.new @stats_mutex = Mutex.new @paused = false @@ -23,7 +27,7 @@ module Scheduler end def length - @queue.length + @queue.size end def stats @@ -44,7 +48,7 @@ module Scheduler @async = val end - def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk) + def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk) @stats_mutex.synchronize do stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 }) stats[:queued] += 1 @@ -52,7 +56,7 @@ module Scheduler if @async start_thread if !@thread&.alive? && !@paused - @queue << [db, blk, desc] + @queue.push({ key: db, task: [db, blk, desc] }, force: force) else blk.call end @@ -71,7 +75,7 @@ module Scheduler end def do_all_work - do_work(_non_block = true) while !@queue.empty? + do_work(non_block = true) while !@queue.empty? end private @@ -89,7 +93,8 @@ module Scheduler # using non_block to match Ruby #deq def do_work(non_block = false) - db, job, desc = @queue.deq(non_block) + db, job, desc = @queue.shift(block: !non_block)[:task] + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) db ||= RailsMultisite::ConnectionManagement::DEFAULT diff --git a/lib/work_queue.rb b/lib/work_queue.rb new file mode 100644 index 00000000000..d4df4207908 --- /dev/null +++ b/lib/work_queue.rb @@ -0,0 +1,111 @@ +# frozen_string_literal: true + +require "monitor" + +module WorkQueue + class WorkQueueFull < StandardError + end + + class ThreadSafeWrapper + include MonitorMixin + + def initialize(queue) + mon_initialize + + @queue = queue + @has_items = new_cond + end + + def push(task, force:) + synchronize do + previously_empty = @queue.empty? + @queue.push(task, force: force) + + @has_items.signal if previously_empty + end + end + + def shift(block:) + synchronize do + loop do + if task = @queue.shift + break task + elsif block + @has_items.wait + else + break nil + end + end + end + end + + def empty? + synchronize { @queue.empty? } + end + + def size + synchronize { @queue.size } + end + end + + class FairQueue + attr_reader :size + + def initialize(limit, &blk) + @limit = limit + @size = 0 + @elements = Hash.new { |h, k| h[k] = blk.call } + end + + def push(task, force:) + raise WorkQueueFull if !force && @size >= @limit + key, task = task.values_at(:key, :task) + @elements[key].push(task, force: force) + @size += 1 + nil + end + + def shift + unless @elements.empty? + key, queue = @elements.shift + + task = queue.shift + + @elements[key] = queue unless queue.empty? + + @size -= 1 + + { key: key, task: task } + end + end + + def empty? + @elements.empty? + end + end + + class BoundedQueue + def initialize(limit) + @limit = limit + @elements = [] + end + + def push(task, force:) + raise WorkQueueFull if !force && @elements.size >= @limit + @elements << task + nil + end + + def shift + @elements.shift + end + + def empty? + @elements.empty? + end + + def size + @elements.size + end + end +end diff --git a/spec/lib/hijack_spec.rb b/spec/lib/hijack_spec.rb index 87e1518a176..8e38061c1e7 100644 --- a/spec/lib/hijack_spec.rb +++ b/spec/lib/hijack_spec.rb @@ -225,4 +225,12 @@ RSpec.describe Hijack do expect(ran).to eq(false) end + + it "handles the queue being full" do + Scheduler::Defer.stubs(:later).raises(WorkQueue::WorkQueueFull.new) + + tester.hijack_test {} + + expect(tester.response.status).to eq(503) + end end diff --git a/spec/lib/work_queue_spec.rb b/spec/lib/work_queue_spec.rb new file mode 100644 index 00000000000..964212c77d6 --- /dev/null +++ b/spec/lib/work_queue_spec.rb @@ -0,0 +1,216 @@ +# frozen_string_literal: true + +RSpec.describe WorkQueue::BoundedQueue do + subject(:queue) { WorkQueue::BoundedQueue.new(3) } + + let(:task1) { "Task 1" } + let(:task2) { "Task 2" } + let(:task3) { "Task 3" } + let(:task4) { "Task 4" } + + describe "#push" do + context "when the queue is not full" do + it "adds the task to the queue" do + queue.push(task1, force: false) + expect(queue.size).to eq(1) + end + end + + context "when the queue is full" do + before do + queue.push(task1, force: false) + queue.push(task2, force: false) + queue.push(task3, force: false) + end + + it "adds the task to the queue if force parameter is true" do + expect { queue.push(task4, force: true) }.not_to raise_error + expect(queue.size).to eq(4) + end + + it "raises an error if the force parameter is false" do + expect { queue.push(task4, force: false) }.to raise_error(WorkQueue::WorkQueueFull) + end + end + end + + describe "#shift" do + it "removes and returns the first task from the queue" do + queue.push(task1, force: false) + queue.push(task2, force: false) + + expect(queue.shift).to eq(task1) + expect(queue.shift).to eq(task2) + + expect(queue.size).to eq(0) + expect(queue).to be_empty + end + + it "returns nil when the queue is empty" do + shifted_task = queue.shift + expect(shifted_task).to be_nil + end + end + + describe "#empty?" do + it "returns true if the queue is empty" do + expect(queue).to be_empty + end + + it "returns false if the queue is not empty" do + queue.push(task1, force: false) + expect(queue).not_to be_empty + end + end + + describe "#size" do + it "returns the number of tasks in the queue" do + queue.push(task1, force: false) + queue.push(task2, force: false) + expect(queue.size).to eq(2) + end + end +end + +RSpec.describe WorkQueue::FairQueue do + subject(:queue) do + WorkQueue::FairQueue.new(global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) } + end + + let(:global_limit) { 5 } + let(:per_key_limit) { 3 } + let(:key1) { :key1 } + let(:key2) { :key2 } + let(:key3) { :key3 } + let(:task1) { "task1" } + let(:task2) { "task2" } + let(:task3) { "task3" } + let(:task4) { "task4" } + let(:task5) { "task5" } + let(:task6) { "task6" } + + describe "#push" do + context "when no previous tasks exist for the key" do + it "adds the task to the queue" do + queue.push({ key: key1, task: task1 }, force: false) + expect(queue.size).to eq(1) + end + end + + context "when the global limit is reached" do + before do + queue.push({ key: key1, task: task1 }, force: false) + queue.push({ key: key2, task: task2 }, force: false) + queue.push({ key: key3, task: task3 }, force: false) + queue.push({ key: key1, task: task4 }, force: false) + queue.push({ key: key2, task: task5 }, force: false) + end + + it "raises an error if the force parameter is false" do + expect { queue.push({ key: key3, task: task6 }, force: false) }.to raise_error( + WorkQueue::WorkQueueFull, + ) + end + + it "adds the task to the queue if the force parameter is true" do + queue.push({ key: key3, task: task6 }, force: true) + expect(queue.size).to eq(6) + end + end + end + + describe "#shift" do + it "removes and returns tasks in FIFO order when the keys are different" do + queue.push({ key: key1, task: task1 }, force: false) + queue.push({ key: key2, task: task2 }, force: false) + queue.push({ key: key3, task: task3 }, force: false) + + expect(queue.shift).to eq({ key: key1, task: task1 }) + expect(queue.shift).to eq({ key: key2, task: task2 }) + expect(queue.shift).to eq({ key: key3, task: task3 }) + + expect(queue.size).to eq(0) + expect(queue).to be_empty + end + + it "removes and returns tasks in FIFO order by key when the keys are the same" do + queue.push({ key: key1, task: task1 }, force: false) + queue.push({ key: key1, task: task3 }, force: false) + queue.push({ key: key2, task: task2 }, force: false) + queue.push({ key: key2, task: task4 }, force: false) + + expect(queue.shift).to eq({ key: key1, task: task1 }) + expect(queue.shift).to eq({ key: key2, task: task2 }) + expect(queue.shift).to eq({ key: key1, task: task3 }) + expect(queue.shift).to eq({ key: key2, task: task4 }) + + expect(queue.size).to eq(0) + expect(queue).to be_empty + end + + it "returns nil when the queue is empty" do + shifted_task = queue.shift + expect(shifted_task).to be_nil + end + end + + describe "#empty?" do + it "returns true if the queue is empty" do + expect(queue).to be_empty + end + + it "returns false if the queue is not empty" do + queue.push({ key: key1, task: task1 }, force: false) + expect(queue).not_to be_empty + end + end + + describe "#size" do + it "returns the number of tasks in the queue" do + queue.push({ key: key1, task: task1 }, force: false) + queue.push({ key: key2, task: task2 }, force: false) + expect(queue.size).to eq(2) + end + end +end + +RSpec.describe WorkQueue::ThreadSafeWrapper do + subject(:queue) { WorkQueue::ThreadSafeWrapper.new(WorkQueue::BoundedQueue.new(3)) } + + let(:task) { "task1" } + + describe "#push" do + it "delegates the push operation to the inner queue" do + queue.push(task, force: false) + expect(queue).not_to be_empty + end + end + + describe "#shift" do + context "when block is true" do + it "waits until an item is available and then returns it" do + result = nil + thread = Thread.new { result = queue.shift(block: true) } + expect(thread).to be_alive + + queue.push(task, force: false) + thread.join + + expect(result).to eq(task) + end + end + + context "when block is false" do + it "returns nil immediately if no item is available" do + shifted_task = queue.shift(block: false) + expect(shifted_task).to be_nil + end + + it "returns the first available item if one is present" do + queue.push(task, force: false) + shifted_task = queue.shift(block: false) + expect(shifted_task).to eq(task) + end + end + end +end