require 'thread' class SidekiqPauser TTL = 60 PAUSED_KEY = "sidekiq_is_paused_v2" def initialize @mutex = Mutex.new @dbs ||= Set.new end def pause!(value = "paused") $redis.setex PAUSED_KEY, TTL, value extend_lease_thread true end def paused? !!$redis.get(PAUSED_KEY) end def unpause_all! @mutex.synchronize do @dbs = Set.new stop_extend_lease_thread end RailsMultisite::ConnectionManagement.each_connection do unpause! if paused? end end def paused_dbs dbs = [] RailsMultisite::ConnectionManagement.each_connection do dbs << RailsMultisite::ConnectionManagement.current_db if paused? end dbs end def unpause! @mutex.synchronize do @dbs.delete(RailsMultisite::ConnectionManagement.current_db) stop_extend_lease_thread if @dbs.size == 0 end $redis.del(PAUSED_KEY) true end private def stop_extend_lease_thread # should always be called from a mutex if t = @extend_lease_thread @extend_lease_thread = nil while t.alive? begin t.wakeup rescue ThreadError => e unless e.message =~ /killed thread/ raise e end end sleep 0 end end end def extend_lease_thread @mutex.synchronize do @dbs << RailsMultisite::ConnectionManagement.current_db @extend_lease_thread ||= Thread.new do while true do break if !@extend_lease_thread @mutex.synchronize do @dbs.each do |db| RailsMultisite::ConnectionManagement.with_connection(db) do if !$redis.expire(PAUSED_KEY, TTL) # if it was unpaused in another process we got to remove the # bad key @dbs.delete(db) end end end end sleep(Rails.env.test? ? 0.01 : TTL / 2) end end end end end module Sidekiq @pauser = SidekiqPauser.new def self.pause!(key = nil) key ? @pauser.pause!(key) : @pauser.pause! end def self.paused? @pauser.paused? end def self.unpause! @pauser.unpause! end def self.unpause_all! @pauser.unpause_all! end def self.paused_dbs @pauser.paused_dbs end end # server middleware that will reschedule work whenever Sidekiq is paused class Sidekiq::Pausable def initialize(delay = 5.seconds) @delay = delay end def call(worker, msg, queue) if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker) worker.class.perform_in(@delay, *msg['args']) else start = Process.clock_gettime(Process::CLOCK_MONOTONIC) result = yield duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration) result end end private def sidekiq_paused?(msg) if site_id = msg["args"]&.first&.dig("current_site_id") RailsMultisite::ConnectionManagement.with_connection(site_id) do Sidekiq.paused? end end end end