Revert "Revert "FIX: Heartbeat check per sidekiq process (#7873)""
This reverts commit e805d44965
.
We now have mechanisms in place to ensure heartbeat will always
be scheduled even if the scheduler is overloaded per: 098f938b
This commit is contained in:
parent
ff8cc244d8
commit
c3497559be
|
@ -2,19 +2,8 @@
|
||||||
|
|
||||||
module Jobs
|
module Jobs
|
||||||
class RunHeartbeat < Jobs::Base
|
class RunHeartbeat < Jobs::Base
|
||||||
|
|
||||||
sidekiq_options queue: 'critical'
|
|
||||||
|
|
||||||
def self.heartbeat_key
|
|
||||||
'heartbeat_last_run'
|
|
||||||
end
|
|
||||||
|
|
||||||
def execute(args)
|
def execute(args)
|
||||||
$redis.set(self.class.heartbeat_key, Time.new.to_i.to_s)
|
Demon::Sidekiq.trigger_heartbeat(args[:queue_name])
|
||||||
end
|
|
||||||
|
|
||||||
def self.last_heartbeat
|
|
||||||
$redis.get(heartbeat_key).to_i
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -7,7 +7,9 @@ module Jobs
|
||||||
every 3.minute
|
every 3.minute
|
||||||
|
|
||||||
def execute(args)
|
def execute(args)
|
||||||
Jobs.enqueue(:run_heartbeat, {})
|
Demon::Sidekiq::QUEUE_IDS.each do |identifier|
|
||||||
|
Jobs.enqueue(:run_heartbeat, queue_name: identifier, queue: identifier)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -144,25 +144,32 @@ before_fork do |server, worker|
|
||||||
@sidekiq_next_heartbeat_check ||= Time.new.to_i + @sidekiq_heartbeat_interval
|
@sidekiq_next_heartbeat_check ||= Time.new.to_i + @sidekiq_heartbeat_interval
|
||||||
|
|
||||||
if @sidekiq_next_heartbeat_check < Time.new.to_i
|
if @sidekiq_next_heartbeat_check < Time.new.to_i
|
||||||
|
@sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
|
||||||
last_heartbeat = Jobs::RunHeartbeat.last_heartbeat
|
restarted = false
|
||||||
restart = false
|
|
||||||
|
|
||||||
if out_of_memory?
|
if out_of_memory?
|
||||||
Rails.logger.warn("Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % [(max_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]])
|
Rails.logger.warn("Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % [(max_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]])
|
||||||
restart = true
|
Demon::Sidekiq.restart
|
||||||
|
restarted = true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if !restarted
|
||||||
|
Demon::Sidekiq::QUEUE_IDS.each do |identifier|
|
||||||
|
last_heartbeat = Demon::Sidekiq.get_queue_last_heartbeat(identifier)
|
||||||
|
|
||||||
if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval
|
if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval
|
||||||
STDERR.puts "Sidekiq heartbeat test failed, restarting"
|
if demon = Demon::Sidekiq.demons.values.find { |d| d.identifier == identifier }
|
||||||
Rails.logger.warn "Sidekiq heartbeat test failed, restarting"
|
STDERR.puts "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
|
||||||
|
Rails.logger.warn "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
|
||||||
restart = true
|
demon.stop
|
||||||
|
demon.start
|
||||||
|
restarted = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
@sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
|
|
||||||
|
|
||||||
if restart
|
if restarted
|
||||||
Demon::Sidekiq.restart
|
|
||||||
sleep 10
|
sleep 10
|
||||||
force_kill_rogue_sidekiq
|
force_kill_rogue_sidekiq
|
||||||
end
|
end
|
||||||
|
|
|
@ -11,6 +11,7 @@ class Demon::Base
|
||||||
|
|
||||||
def self.start(count = 1, verbose: false)
|
def self.start(count = 1, verbose: false)
|
||||||
@demons ||= {}
|
@demons ||= {}
|
||||||
|
before_start(count)
|
||||||
count.times do |i|
|
count.times do |i|
|
||||||
(@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start
|
(@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start
|
||||||
end
|
end
|
||||||
|
@ -37,7 +38,10 @@ class Demon::Base
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
attr_reader :pid, :parent_pid, :started, :index
|
def self.before_start(count)
|
||||||
|
end
|
||||||
|
|
||||||
|
attr_reader :pid, :parent_pid, :started, :index, :identifier
|
||||||
attr_accessor :stop_timeout
|
attr_accessor :stop_timeout
|
||||||
|
|
||||||
def initialize(index, rails_root: nil, parent_pid: nil, verbose: false)
|
def initialize(index, rails_root: nil, parent_pid: nil, verbose: false)
|
||||||
|
|
|
@ -3,6 +3,38 @@
|
||||||
require "demon/base"
|
require "demon/base"
|
||||||
|
|
||||||
class Demon::Sidekiq < Demon::Base
|
class Demon::Sidekiq < Demon::Base
|
||||||
|
RANDOM_HEX = SecureRandom.hex
|
||||||
|
QUEUE_IDS = []
|
||||||
|
|
||||||
|
def self.queues_last_heartbeat_hash_key
|
||||||
|
@@queues_last_heartbeat_hash_key ||= "#{RANDOM_HEX}_queues_last_heartbeat_hash"
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.trigger_heartbeat(name)
|
||||||
|
$redis.hset(queues_last_heartbeat_hash_key, name, Time.new.to_i.to_s)
|
||||||
|
extend_expiry(queues_last_heartbeat_hash_key)
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.get_queue_last_heartbeat(name)
|
||||||
|
extend_expiry(queues_last_heartbeat_hash_key)
|
||||||
|
$redis.hget(queues_last_heartbeat_hash_key, name).to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.clear_heartbeat_queues!
|
||||||
|
$redis.del(queues_last_heartbeat_hash_key)
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.before_start(count)
|
||||||
|
# cleans up heartbeat queues from previous boot up
|
||||||
|
Sidekiq::Queue.all.each { |queue| queue.clear if queue.name[/^\h{32}$/] }
|
||||||
|
count.times do
|
||||||
|
QUEUE_IDS << SecureRandom.hex
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.extend_expiry(key)
|
||||||
|
$redis.expire(key, 60 * 60)
|
||||||
|
end
|
||||||
|
|
||||||
def self.prefix
|
def self.prefix
|
||||||
"sidekiq"
|
"sidekiq"
|
||||||
|
@ -12,6 +44,11 @@ class Demon::Sidekiq < Demon::Base
|
||||||
blk ? (@blk = blk) : @blk
|
blk ? (@blk = blk) : @blk
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def run
|
||||||
|
@identifier = QUEUE_IDS[@index]
|
||||||
|
super
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def suppress_stdout
|
def suppress_stdout
|
||||||
|
@ -36,7 +73,7 @@ class Demon::Sidekiq < Demon::Base
|
||||||
|
|
||||||
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
||||||
|
|
||||||
[['critical', 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
|
[['critical', 8], [@identifier, 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
|
||||||
custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
|
custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
|
||||||
|
|
||||||
if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)
|
if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
require 'rails_helper'
|
require 'rails_helper'
|
||||||
require_dependency 'jobs/base'
|
require_dependency 'jobs/base'
|
||||||
|
require_dependency 'demon/sidekiq'
|
||||||
|
|
||||||
describe Jobs::Heartbeat do
|
describe Jobs::Heartbeat do
|
||||||
after do
|
after do
|
||||||
|
@ -10,12 +11,14 @@ describe Jobs::Heartbeat do
|
||||||
|
|
||||||
it "still enqueues heartbeats in readonly mode" do
|
it "still enqueues heartbeats in readonly mode" do
|
||||||
freeze_time 1.week.from_now
|
freeze_time 1.week.from_now
|
||||||
|
Demon::Sidekiq.clear_heartbeat_queues!
|
||||||
|
Jobs.run_immediately!
|
||||||
|
|
||||||
Discourse.enable_readonly_mode
|
Discourse.enable_readonly_mode
|
||||||
|
|
||||||
Sidekiq::Testing.inline! do
|
queue = SecureRandom.hex
|
||||||
|
Demon::Sidekiq::QUEUE_IDS << queue
|
||||||
Jobs::Heartbeat.new.perform(nil)
|
Jobs::Heartbeat.new.perform(nil)
|
||||||
expect(Jobs::RunHeartbeat.last_heartbeat).to eq(Time.new.to_i)
|
expect(Demon::Sidekiq.get_queue_last_heartbeat(queue)).to eq(Time.new.to_i)
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue