DEV: Log a warning message when a MiniScheduler scheduled job is stuck (#28258)
This commit adds a `MiniSchedulerLongRunningJobLogger` class which will poll every 60 seconds for mini_scheduler jobs which are stuck. When it detects that a job is stuck, it will log a warning message with the current backtrace of the thread that is executing the job. Note that for scheduled jobs which are executed at a frequency of less than 30 minutes, we will log when the job has been executing for 30 minutes. For scheduled jobs executed at a frequency of less than 2 hours, we will log when the job has been executing for a duration greater than its specified frequency. For scheduled jobs executed at a frequency greater than 2 hours, we will log as long as the job has been executing for more than 2 hours.
This commit is contained in:
parent
814d2e6286
commit
4c0af24173
|
@ -242,7 +242,7 @@ GEM
|
|||
mini_mime (1.1.5)
|
||||
mini_racer (0.9.0)
|
||||
libv8-node (~> 18.19.0.0)
|
||||
mini_scheduler (0.16.0)
|
||||
mini_scheduler (0.17.0)
|
||||
sidekiq (>= 4.2.3, < 7.0)
|
||||
mini_sql (1.5.0)
|
||||
mini_suffix (0.3.3)
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
require "sidekiq/pausable"
|
||||
require "sidekiq_logster_reporter"
|
||||
require "sidekiq_long_running_job_logger"
|
||||
require "mini_scheduler_long_running_job_logger"
|
||||
|
||||
Sidekiq.configure_client { |config| config.redis = Discourse.sidekiq_redis_config }
|
||||
|
||||
|
@ -47,6 +48,11 @@ if Sidekiq.server?
|
|||
if !scheduler_hostname || scheduler_hostname.split(",").include?(Discourse.os_hostname)
|
||||
begin
|
||||
MiniScheduler.start(workers: GlobalSetting.mini_scheduler_workers)
|
||||
|
||||
MiniSchedulerLongRunningJobLogger.new(
|
||||
poll_interval_seconds:
|
||||
ENV["DISCOURSE_MINI_SCHEDULER_LONG_RUNNING_JOB_LOGGER_POLL_INTERVAL_SECONDS"],
|
||||
).start
|
||||
rescue MiniScheduler::DistributedMutex::Timeout
|
||||
sleep 5
|
||||
retry
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class MiniSchedulerLongRunningJobLogger
|
||||
DEFAULT_POLL_INTERVAL_SECONDS = 6
|
||||
|
||||
attr_reader :thread
|
||||
|
||||
def initialize(poll_interval_seconds: nil)
|
||||
@mutex = Mutex.new
|
||||
@stop_requested = false
|
||||
|
||||
@poll_interval_seconds =
|
||||
if poll_interval_seconds
|
||||
begin
|
||||
Integer(poll_interval_seconds)
|
||||
rescue ArgumentError
|
||||
DEFAULT_POLL_INTERVAL_SECONDS
|
||||
end
|
||||
else
|
||||
DEFAULT_POLL_INTERVAL_SECONDS
|
||||
end
|
||||
end
|
||||
|
||||
def start
|
||||
@thread ||=
|
||||
Thread.new do
|
||||
hostname = Discourse.os_hostname
|
||||
|
||||
loop do
|
||||
break if self.stop_requested?
|
||||
|
||||
current_long_running_jobs = Set.new
|
||||
|
||||
begin
|
||||
MiniScheduler::Manager.discover_running_scheduled_jobs.each do |job|
|
||||
job_class = job[:class]
|
||||
job_started_at = job[:started_at]
|
||||
mini_scheduler_worker_thread_id = job[:thread_id]
|
||||
|
||||
job_frequency_minutes =
|
||||
if job_class.daily
|
||||
1.day.in_minutes.minutes
|
||||
else
|
||||
job_class.every.in_minutes.minutes
|
||||
end
|
||||
|
||||
warning_duration =
|
||||
begin
|
||||
if job_frequency_minutes < 30.minutes
|
||||
30.minutes
|
||||
elsif job_frequency_minutes < 2.hours
|
||||
job_frequency_minutes
|
||||
else
|
||||
2.hours
|
||||
end
|
||||
end
|
||||
|
||||
next if job_started_at >= (Time.zone.now - warning_duration)
|
||||
|
||||
running_thread =
|
||||
Thread.list.find do |thread|
|
||||
thread[:mini_scheduler_worker_thread_id] == mini_scheduler_worker_thread_id
|
||||
end
|
||||
|
||||
next if running_thread.nil?
|
||||
|
||||
current_long_running_jobs << job_class
|
||||
|
||||
next if @seen_long_running_jobs&.include?(job_class)
|
||||
|
||||
Rails.logger.warn(<<~MSG)
|
||||
Sidekiq scheduled job `#{job_class}` has been running for more than #{warning_duration.in_minutes.to_i} minutes
|
||||
#{running_thread.backtrace.join("\n")}
|
||||
MSG
|
||||
end
|
||||
|
||||
@seen_long_running_jobs = current_long_running_jobs
|
||||
|
||||
yield if block_given?
|
||||
rescue => error
|
||||
Discourse.warn_exception(
|
||||
error,
|
||||
message: "Unexpected error in MiniSchedulerLongRunningJobLogger thread",
|
||||
)
|
||||
end
|
||||
|
||||
sleep @poll_interval_seconds
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process.
|
||||
def stop
|
||||
@mutex.synchronize { @stop_requested = true }
|
||||
|
||||
if @thread
|
||||
@thread.wakeup
|
||||
@thread.join
|
||||
@thread = nil
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def stop_requested?
|
||||
@mutex.synchronize { @stop_requested }
|
||||
end
|
||||
end
|
|
@ -0,0 +1,116 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "mini_scheduler_long_running_job_logger"
|
||||
|
||||
RSpec.describe MiniSchedulerLongRunningJobLogger do
|
||||
use_redis_snapshotting
|
||||
|
||||
class Every10MinutesJob
|
||||
extend ::MiniScheduler::Schedule
|
||||
|
||||
every 10.minutes
|
||||
|
||||
def perform
|
||||
sleep 10_000
|
||||
end
|
||||
end
|
||||
|
||||
class DailyJob
|
||||
extend ::MiniScheduler::Schedule
|
||||
|
||||
daily at: 4.hours
|
||||
|
||||
def perform
|
||||
sleep 10_000
|
||||
end
|
||||
end
|
||||
|
||||
def with_running_scheduled_job(job_class)
|
||||
manager = MiniScheduler::Manager.new(enable_stats: false)
|
||||
|
||||
info = manager.schedule_info(job_class)
|
||||
info.next_run = Time.now.to_i - 1
|
||||
info.write!
|
||||
manager.tick
|
||||
|
||||
wait_for { manager.schedule_info(job_class).prev_result == "RUNNING" }
|
||||
|
||||
yield
|
||||
ensure
|
||||
manager.stop!
|
||||
end
|
||||
|
||||
before do
|
||||
@orig_logger = Rails.logger
|
||||
Rails.logger = @fake_logger = FakeLogger.new
|
||||
end
|
||||
|
||||
after { Rails.logger = @orig_logger }
|
||||
|
||||
it "logs long running jobs" do
|
||||
with_running_scheduled_job(Every10MinutesJob) do
|
||||
freeze_time(31.minutes.from_now)
|
||||
|
||||
begin
|
||||
checker = described_class.new
|
||||
|
||||
loops = 0
|
||||
|
||||
checker.start { loops += 1 }
|
||||
|
||||
wait_for { loops == 1 }
|
||||
|
||||
expect(@fake_logger.warnings.size).to eq(1)
|
||||
|
||||
expect(@fake_logger.warnings.first).to match(
|
||||
"Sidekiq scheduled job `Every10MinutesJob` has been running for more than 30 minutes",
|
||||
)
|
||||
|
||||
# Matches the backtrace
|
||||
expect(@fake_logger.warnings.first).to match("sleep")
|
||||
|
||||
# Check that the logger doesn't log repeated warnings after 2 loops
|
||||
expect do
|
||||
checker.thread.wakeup # Force the thread to run the next loop
|
||||
|
||||
wait_for { loops == 2 }
|
||||
end.not_to change { @fake_logger.warnings.size }
|
||||
|
||||
# Check that the logger doesn't log repeated warnings after 3 loops
|
||||
expect do
|
||||
checker.thread.wakeup # Force the thread to run the next loop
|
||||
|
||||
wait_for { loops == 3 }
|
||||
end.not_to change { @fake_logger.warnings.size }
|
||||
ensure
|
||||
checker.stop
|
||||
expect(checker.thread).to eq(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "logs long running jobs with daily schedule" do
|
||||
with_running_scheduled_job(DailyJob) do
|
||||
freeze_time(3.hours.from_now)
|
||||
|
||||
begin
|
||||
checker = described_class.new
|
||||
|
||||
loops = 0
|
||||
|
||||
checker.start { loops += 1 }
|
||||
|
||||
wait_for { loops == 1 }
|
||||
|
||||
expect(@fake_logger.warnings.size).to eq(1)
|
||||
|
||||
expect(@fake_logger.warnings.first).to match(
|
||||
"Sidekiq scheduled job `DailyJob` has been running for more than 120 minutes",
|
||||
)
|
||||
ensure
|
||||
checker.stop
|
||||
expect(checker.thread).to eq(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue