discourse/app/jobs/base.rb

489 lines
14 KiB
Ruby

# frozen_string_literal: true
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.run_later?
!@run_immediately
end
def self.run_immediately?
!!@run_immediately
end
def self.run_immediately!
@run_immediately = true
end
def self.run_later!
@run_immediately = false
end
def self.with_immediate_jobs
prior = @run_immediately
run_immediately!
yield
ensure
@run_immediately = prior
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get("last_job_perform_at")
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email\z/ }
end
class Base
class JobInstrumenter
def initialize(job_class:, opts:, db:, jid:)
return unless enabled?
self.class.mutex.synchronize do
@data = {}
@data["hostname"] = Discourse.os_hostname
@data["pid"] = Process.pid # Pid
@data["database"] = db # DB name - multisite db name it ran on
@data["job_id"] = jid # Job unique ID
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
@data["opts"] = opts.to_json # Params - json encoded params for the job
if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
@data["status"] = "starting"
write_to_log
end
@data["status"] = "pending"
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
self.class.ensure_interval_logging!
@@active_jobs ||= []
@@active_jobs << self
MethodProfiler.ensure_discourse_instrumentation!
MethodProfiler.start
@data["live_slots_start"] = GC.stat[:heap_live_slots]
end
end
def stop(exception:)
return unless enabled?
self.class.mutex.synchronize do
profile = MethodProfiler.stop
@@active_jobs.delete(self)
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
@data["live_slots_finish"] = GC.stat[:heap_live_slots]
if exception.present?
@data["exception"] = exception # Exception - if job fails a json encoded exception
@data["status"] = "failed"
else
@data["status"] = "success" # Status - fail, success, pending
end
write_to_log
end
end
def self.raw_log(message)
@@logger ||=
begin
f = File.open "#{Rails.root}/log/sidekiq.log", "a"
f.sync = true
Logger.new f
end
@@log_queue ||= Queue.new
if !defined?(@@log_thread) || !@@log_thread.alive?
@@log_thread =
Thread.new do
loop do
@@logger << @@log_queue.pop
rescue Exception => e
Discourse.warn_exception(
e,
message: "Exception encountered while logging Sidekiq job",
)
end
end
end
@@log_queue.push(message)
end
def current_duration
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
end
def write_to_log
return unless enabled?
@data["@timestamp"] = Time.now
@data["duration"] = current_duration if @data["status"] == "pending"
self.class.raw_log("#{@data.to_json}\n")
if live_slots_limit > 0 && @data["live_slots_start"].present? &&
@data["live_slots_finish"].present?
live_slots = @data["live_slots_finish"] - @data["live_slots_start"]
if live_slots >= live_slots_limit
Rails.logger.warn(
"Sidekiq Job '#{@data["job_name"]}' allocated #{live_slots} objects in the heap: #{@data.inspect}",
)
end
end
end
def enabled?
Discourse.enable_sidekiq_logging?
end
def live_slots_limit
@live_slots_limit ||= ENV["DISCOURSE_LIVE_SLOTS_SIDEKIQ_LIMIT"].to_i
end
def self.mutex
@@mutex ||= Mutex.new
end
def self.ensure_interval_logging!
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
return if !interval
interval = interval.to_i
@@interval_thread ||=
Thread.new do
begin
loop do
sleep interval
mutex.synchronize do
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
end
end
rescue Exception => e
Discourse.warn_exception(
e,
message: "Sidekiq interval logging thread terminated unexpectedly",
)
end
end
end
end
include Sidekiq::Worker
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
def self.get_cluster_concurrency
@cluster_concurrency
end
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
# Construct an error context object for Discourse.handle_exception
# Subclasses are encouraged to use this!
#
# `opts` is the arguments passed to execute().
# `code_desc` is a short string describing what the code was doing (optional).
# `extra` is for any other context you logged.
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
# and :current_db and :current_hostname are used by handle_exception.
def error_context(opts, code_desc = nil, extra = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
ctx[:message] = code_desc if code_desc
ctx.merge!(extra) if extra != nil
ctx
end
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
def execute(opts = {})
raise "Overwrite me!"
end
def last_db_duration
@db_duration || 0
end
def perform_immediately(*args)
opts = args.extract_options!.with_indifferent_access
if opts.has_key?(:current_site_id) &&
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new(
"You can't connect to another database when executing a job synchronously.",
)
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
retval
end
end
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
# Sleep for 60 seconds, but wake up every second to check if the job has been completed
60.times do
break if finished
sleep 1
end
end
end
end
opts = args.extract_options!.with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
dbs.each do |db|
begin
exception = {}
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter =
JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale =
SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(
exception_hash[:ex],
error_context(opts, exception_hash[:code], exception_hash[:other]),
)
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
end
end
class HandledExceptionWrapper < StandardError
attr_accessor :wrapped
def initialize(ex)
super("Wrapped #{ex.class}: #{ex.message}")
@wrapped = ex
end
end
class Scheduled < Base
extend MiniScheduler::Schedule
def perform(*args)
super if (::Jobs::Heartbeat === self) || !Discourse.readonly_mode?
end
end
def self.enqueue(job, opts = {})
if job.instance_of?(Class)
klass = job
else
klass = "::Jobs::#{job.to_s.camelcase}".constantize
end
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
delay = opts.delete(:delay_for)
queue = opts.delete(:queue)
# Only string keys are allowed in JSON. We call `.with_indifferent_access`
# in Jobs::Base#perform, so this is invisible to developers
opts = opts.deep_stringify_keys
# Simulate the args being dumped/parsed through JSON
parsed_opts = JSON.parse(JSON.dump(opts))
if opts != parsed_opts
Discourse.deprecate(<<~TEXT.squish, since: "2.9", drop_from: "3.0", output_in_test: true)
#{klass.name} was enqueued with argument values which do not cleanly serialize to/from JSON.
This means that the job will be run with slightly different values than the ones supplied to `enqueue`.
Argument values should be strings, booleans, numbers, or nil (or arrays/hashes of those value types).
TEXT
end
opts = parsed_opts
if ::Jobs.run_later?
hash = { "class" => klass, "args" => [opts] }
if delay
hash["at"] = Time.now.to_f + delay.to_f if delay.to_f > 0
end
hash["queue"] = queue if queue
DB.after_commit { klass.client_push(hash) }
else
if Rails.env == "development"
Scheduler::Defer.later("job") { klass.new.perform(opts) }
else
# Run the job synchronously
# But never run a job inside another job
# That could cause deadlocks during test runs
queue = Thread.current[:discourse_nested_job_queue]
outermost_job = !queue
if outermost_job
queue = Queue.new
Thread.current[:discourse_nested_job_queue] = queue
end
queue.push([klass, opts])
if outermost_job
# responsible for executing the queue
begin
until queue.empty?
queued_klass, queued_opts = queue.pop(true)
queued_klass.new.perform_immediately(queued_opts)
end
ensure
Thread.current[:discourse_nested_job_queue] = nil
end
end
end
end
end
def self.enqueue_in(secs, job_name, opts = {})
enqueue(job_name, opts.merge!(delay_for: secs))
end
def self.enqueue_at(datetime, job_name, opts = {})
secs = [datetime.to_f - Time.zone.now.to_f, 0].max
enqueue_in(secs, job_name, opts)
end
def self.cancel_scheduled_job(job_name, opts = {})
scheduled_for(job_name, opts).each(&:delete)
end
def self.scheduled_for(job_name, opts = {})
opts = opts.with_indifferent_access
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass.to_s == job_class
matched = true
job_params = scheduled_job.item["args"][0].with_indifferent_access
opts.each do |key, value|
if job_params[key] != value
matched = false
break
end
end
matched
else
false
end
end
end
end