Revert "Revert "Revert "FIX: Heartbeat check per sidekiq process (#7873)"""

This reverts commit c3497559be.
This commit is contained in:
David Taylor 2019-08-30 11:26:16 +01:00
parent 5d4fcc62bb
commit e2449f9f23
6 changed files with 34 additions and 76 deletions

View File

@ -2,8 +2,19 @@
module Jobs
class RunHeartbeat < Jobs::Base
sidekiq_options queue: 'critical'
def self.heartbeat_key
'heartbeat_last_run'
end
def execute(args)
Demon::Sidekiq.trigger_heartbeat(args[:queue_name])
$redis.set(self.class.heartbeat_key, Time.new.to_i.to_s)
end
def self.last_heartbeat
$redis.get(heartbeat_key).to_i
end
end
end

View File

@ -7,9 +7,7 @@ module Jobs
every 3.minute
def execute(args)
Demon::Sidekiq::QUEUE_IDS.each do |identifier|
Jobs.enqueue(:run_heartbeat, queue_name: identifier, queue: identifier)
end
Jobs.enqueue(:run_heartbeat, {})
end
end
end

View File

@ -144,32 +144,25 @@ before_fork do |server, worker|
@sidekiq_next_heartbeat_check ||= Time.new.to_i + @sidekiq_heartbeat_interval
if @sidekiq_next_heartbeat_check < Time.new.to_i
@sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
restarted = false
last_heartbeat = Jobs::RunHeartbeat.last_heartbeat
restart = false
if out_of_memory?
Rails.logger.warn("Sidekiq is consuming too much memory (using: %0.2fM) for '%s', restarting" % [(max_rss.to_f / 1.megabyte), ENV["DISCOURSE_HOSTNAME"]])
Demon::Sidekiq.restart
restarted = true
restart = true
end
if !restarted
Demon::Sidekiq::QUEUE_IDS.each do |identifier|
last_heartbeat = Demon::Sidekiq.get_queue_last_heartbeat(identifier)
if last_heartbeat < Time.new.to_i - @sidekiq_heartbeat_interval
if demon = Demon::Sidekiq.demons.values.find { |d| d.identifier == identifier }
STDERR.puts "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
Rails.logger.warn "Sidekiq heartbeat test for worker #{demon.pid} failed, restarting"
demon.stop
demon.start
restarted = true
end
end
end
end
STDERR.puts "Sidekiq heartbeat test failed, restarting"
Rails.logger.warn "Sidekiq heartbeat test failed, restarting"
if restarted
restart = true
end
@sidekiq_next_heartbeat_check = Time.new.to_i + @sidekiq_heartbeat_interval
if restart
Demon::Sidekiq.restart
sleep 10
force_kill_rogue_sidekiq
end

View File

@ -11,7 +11,6 @@ class Demon::Base
def self.start(count = 1, verbose: false)
@demons ||= {}
before_start(count)
count.times do |i|
(@demons["#{prefix}_#{i}"] ||= new(i, verbose: verbose)).start
end
@ -38,10 +37,7 @@ class Demon::Base
end
end
def self.before_start(count)
end
attr_reader :pid, :parent_pid, :started, :index, :identifier
attr_reader :pid, :parent_pid, :started, :index
attr_accessor :stop_timeout
def initialize(index, rails_root: nil, parent_pid: nil, verbose: false)

View File

@ -3,38 +3,6 @@
require "demon/base"
class Demon::Sidekiq < Demon::Base
RANDOM_HEX = SecureRandom.hex
QUEUE_IDS = []
def self.queues_last_heartbeat_hash_key
@@queues_last_heartbeat_hash_key ||= "#{RANDOM_HEX}_queues_last_heartbeat_hash"
end
def self.trigger_heartbeat(name)
$redis.hset(queues_last_heartbeat_hash_key, name, Time.new.to_i.to_s)
extend_expiry(queues_last_heartbeat_hash_key)
end
def self.get_queue_last_heartbeat(name)
extend_expiry(queues_last_heartbeat_hash_key)
$redis.hget(queues_last_heartbeat_hash_key, name).to_i
end
def self.clear_heartbeat_queues!
$redis.del(queues_last_heartbeat_hash_key)
end
def self.before_start(count)
# cleans up heartbeat queues from previous boot up
Sidekiq::Queue.all.each { |queue| queue.clear if queue.name[/^\h{32}$/] }
count.times do
QUEUE_IDS << SecureRandom.hex
end
end
def self.extend_expiry(key)
$redis.expire(key, 60 * 60)
end
def self.prefix
"sidekiq"
@ -44,11 +12,6 @@ class Demon::Sidekiq < Demon::Base
blk ? (@blk = blk) : @blk
end
def run
@identifier = QUEUE_IDS[@index]
super
end
private
def suppress_stdout
@ -73,7 +36,7 @@ class Demon::Sidekiq < Demon::Base
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
[['critical', 8], [@identifier, 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
[['critical', 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)

View File

@ -2,7 +2,6 @@
require 'rails_helper'
require_dependency 'jobs/base'
require_dependency 'demon/sidekiq'
describe Jobs::Heartbeat do
after do
@ -11,14 +10,12 @@ describe Jobs::Heartbeat do
it "still enqueues heartbeats in readonly mode" do
freeze_time 1.week.from_now
Demon::Sidekiq.clear_heartbeat_queues!
Jobs.run_immediately!
Discourse.enable_readonly_mode
queue = SecureRandom.hex
Demon::Sidekiq::QUEUE_IDS << queue
Sidekiq::Testing.inline! do
Jobs::Heartbeat.new.perform(nil)
expect(Demon::Sidekiq.get_queue_last_heartbeat(queue)).to eq(Time.new.to_i)
expect(Jobs::RunHeartbeat.last_heartbeat).to eq(Time.new.to_i)
end
end
end