FIX: Add multisite support to Sidekiq::Pausable. (#6960)
Having a global Sidekiq pause switch is problematic because a site in the cluster can pause Sidekiq for the entire cluster.
This commit is contained in:
parent
ba2fb2024f
commit
53d592ad3b
|
@ -1,15 +1,19 @@
|
|||
require 'thread'
|
||||
|
||||
class SidekiqPauser
|
||||
TTL = 60
|
||||
PAUSED_KEY = "sidekiq_is_paused_v2"
|
||||
|
||||
def initialize
|
||||
@mutex = Mutex.new
|
||||
@dbs ||= Set.new
|
||||
end
|
||||
|
||||
def pause!
|
||||
redis.setex paused_key, 60, "paused"
|
||||
$redis.setex PAUSED_KEY, TTL, "paused"
|
||||
|
||||
@mutex.synchronize do
|
||||
@extend_lease_thread ||= extend_lease_thread
|
||||
extend_lease_thread
|
||||
sleep 0.001 while !paused?
|
||||
end
|
||||
|
||||
|
@ -17,38 +21,38 @@ class SidekiqPauser
|
|||
end
|
||||
|
||||
def paused?
|
||||
!!redis.get(paused_key)
|
||||
!!$redis.get(PAUSED_KEY)
|
||||
end
|
||||
|
||||
def unpause!
|
||||
@mutex.synchronize do
|
||||
@extend_lease_thread = nil
|
||||
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
|
||||
@extend_lease_thread = nil if @dbs.size == 0
|
||||
end
|
||||
|
||||
redis.del(paused_key)
|
||||
$redis.del(PAUSED_KEY)
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def extend_lease_thread
|
||||
Thread.new do
|
||||
@dbs << RailsMultisite::ConnectionManagement.current_db
|
||||
|
||||
@extend_lease_thread ||= Thread.new do
|
||||
while true do
|
||||
break unless @mutex.synchronize { @extend_lease_thread }
|
||||
redis.expire paused_key, 60
|
||||
sleep(Rails.env.test? ? 0.01 : 30)
|
||||
|
||||
@dbs.each do |db|
|
||||
RailsMultisite::ConnectionManagement.with_connection(db) do
|
||||
$redis.expire PAUSED_KEY, TTL
|
||||
end
|
||||
end
|
||||
|
||||
sleep(Rails.env.test? ? 0.01 : TTL / 2)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def redis
|
||||
$redis.without_namespace
|
||||
end
|
||||
|
||||
def paused_key
|
||||
"sidekiq_is_paused_v2"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
module Sidekiq
|
||||
|
@ -74,7 +78,7 @@ class Sidekiq::Pausable
|
|||
end
|
||||
|
||||
def call(worker, msg, queue)
|
||||
if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker)
|
||||
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
|
||||
worker.class.perform_in(@delay, *msg['args'])
|
||||
else
|
||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
|
@ -85,4 +89,14 @@ class Sidekiq::Pausable
|
|||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def sidekiq_paused?(msg)
|
||||
if site_id = msg["args"]&.first&.dig("current_site_id")
|
||||
RailsMultisite::ConnectionManagement.with_connection(site_id) do
|
||||
Sidekiq.paused?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
require 'rails_helper'
|
||||
require_dependency 'sidekiq/pausable'
|
||||
|
||||
describe Sidekiq do
|
||||
after do
|
||||
Sidekiq.unpause!
|
||||
end
|
||||
|
||||
it "can pause and unpause" do
|
||||
Sidekiq.pause!
|
||||
expect(Sidekiq.paused?).to eq(true)
|
||||
Sidekiq.unpause!
|
||||
expect(Sidekiq.paused?).to eq(false)
|
||||
end
|
||||
|
||||
it "can still run heartbeats when paused" do
|
||||
Sidekiq.pause!
|
||||
|
||||
freeze_time 1.week.from_now
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
|
||||
Sidekiq::Testing.disable! do
|
||||
jobs.clear
|
||||
|
||||
middleware = Sidekiq::Pausable.new
|
||||
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
|
||||
"done"
|
||||
end
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
expect(jobs.size).to eq(0)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -0,0 +1,89 @@
|
|||
require 'rails_helper'
|
||||
require_dependency 'sidekiq/pausable'
|
||||
|
||||
RSpec.describe "Pausing/Unpausing Sidekiq", type: :multisite do
|
||||
after do
|
||||
$redis.flushall
|
||||
end
|
||||
|
||||
describe '#pause!, #unpause! and #paused?' do
|
||||
it "can pause and unpause" do
|
||||
Sidekiq.pause!
|
||||
expect(Sidekiq.paused?).to eq(true)
|
||||
|
||||
test_multisite_connection('second') do
|
||||
expect(Sidekiq.paused?).to eq(false)
|
||||
end
|
||||
|
||||
Sidekiq.unpause!
|
||||
|
||||
expect(Sidekiq.paused?).to eq(false)
|
||||
|
||||
test_multisite_connection('second') do
|
||||
Sidekiq.pause!
|
||||
expect(Sidekiq.paused?).to eq(true)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
RSpec.describe Sidekiq::Pausable do
|
||||
after do
|
||||
$redis.flushall
|
||||
end
|
||||
|
||||
it "can still run heartbeats when paused" do
|
||||
Sidekiq.pause!
|
||||
|
||||
freeze_time 1.week.from_now
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
jobs.clear
|
||||
middleware = Sidekiq::Pausable.new
|
||||
|
||||
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
|
||||
"done"
|
||||
end
|
||||
|
||||
jobs = Sidekiq::ScheduledSet.new
|
||||
expect(jobs.size).to eq(0)
|
||||
end
|
||||
|
||||
describe 'when sidekiq is paused', type: :multisite do
|
||||
let(:middleware) { Sidekiq::Pausable.new }
|
||||
|
||||
def call_middleware(db = RailsMultisite::ConnectionManagement::DEFAULT)
|
||||
middleware.call(Jobs::PostAlert.new, {
|
||||
"args" => [{ "current_site_id" => db }]
|
||||
}, "critical") do
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
it 'should delay the job' do
|
||||
Sidekiq.pause!
|
||||
|
||||
called = false
|
||||
called2 = false
|
||||
call_middleware { called = true }
|
||||
|
||||
expect(called).to eq(false)
|
||||
|
||||
test_multisite_connection('second') do
|
||||
call_middleware('second') { called2 = true }
|
||||
expect(called2).to eq(true)
|
||||
end
|
||||
|
||||
Sidekiq.unpause!
|
||||
call_middleware { called = true }
|
||||
|
||||
expect(called).to eq(true)
|
||||
|
||||
test_multisite_connection('second') do
|
||||
Sidekiq.pause!
|
||||
call_middleware('second') { called2 = false }
|
||||
expect(called2).to eq(true)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue