discourse/app/jobs/base.rb

255 lines
6.7 KiB
Ruby
Raw Normal View History

require 'scheduler/scheduler'
2013-02-05 14:16:51 -05:00
module Jobs
def self.queued
Sidekiq::Stats.new.enqueued
end
def self.last_job_performed_at
Sidekiq.redis do |r|
int = r.get('last_job_perform_at')
int ? Time.at(int.to_i) : nil
end
end
def self.num_email_retry_jobs
Sidekiq::RetrySet.new.count { |job| job.klass =~ /Email$/ }
end
2013-02-05 14:16:51 -05:00
class Base
class Instrumenter
def self.stats
Thread.current[:db_stats] ||= Stats.new
end
class Stats
attr_accessor :query_count, :duration_ms
def initialize
@query_count = 0
@duration_ms = 0
end
end
def call(name, start, finish, message_id, values)
stats = Instrumenter.stats
stats.query_count += 1
stats.duration_ms += (((finish - start).to_f) * 1000).to_i
end
end
2013-02-05 14:16:51 -05:00
include Sidekiq::Worker
def initialize
@db_duration = 0
end
2013-06-30 14:00:23 -04:00
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
# Construct an error context object for Discourse.handle_exception
# Subclasses are encouraged to use this!
#
# `opts` is the arguments passed to execute().
# `code_desc` is a short string describing what the code was doing (optional).
# `extra` is for any other context you logged.
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
# and :current_db and :current_hostname are used by handle_exception.
def error_context(opts, code_desc = nil, extra = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
2014-07-17 18:19:58 -04:00
ctx[:message] = code_desc if code_desc
ctx.merge!(extra) if extra != nil
ctx
end
2017-07-27 21:20:09 -04:00
def self.delayed_perform(opts = {})
2013-02-05 14:16:51 -05:00
self.new.perform(opts)
end
2017-07-27 21:20:09 -04:00
def execute(opts = {})
2013-02-05 14:16:51 -05:00
raise "Overwrite me!"
end
def last_db_duration
@db_duration || 0
end
def ensure_db_instrumented
@@instrumented ||= begin
ActiveSupport::Notifications.subscribe('sql.active_record', Instrumenter.new)
true
end
end
2013-08-07 13:25:05 -04:00
def perform(*args)
total_db_time = 0
ensure_db_instrumented
2013-08-07 13:25:05 -04:00
opts = args.extract_options!.with_indifferent_access
2013-02-25 11:42:20 -05:00
if SiteSetting.queue_jobs?
Sidekiq.redis do |r|
r.set('last_job_perform_at', Time.now.to_i)
end
end
2013-02-05 14:16:51 -05:00
if opts.delete(:sync_exec)
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
2013-02-05 14:16:51 -05:00
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
return retval
2013-02-05 14:16:51 -05:00
end
end
dbs =
2013-02-25 11:42:20 -05:00
if opts[:current_site_id]
2013-02-05 14:16:51 -05:00
[opts[:current_site_id]]
2013-02-25 11:42:20 -05:00
else
2013-02-05 14:16:51 -05:00
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
2013-02-05 14:16:51 -05:00
dbs.each do |db|
begin
exception = {}
2017-10-11 05:45:19 -04:00
RailsMultisite::ConnectionManagement.with_connection(db) do
begin
I18n.locale = SiteSetting.default_locale || "en"
I18n.ensure_all_loaded!
begin
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
total_db_time += Instrumenter.stats.duration_ms
end
end
exceptions << exception unless exception.empty?
2013-02-05 14:16:51 -05:00
end
end
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(exception_hash[:ex], error_context(opts, exception_hash[:code], exception_hash[:other]))
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
@db_duration = total_db_time
2013-02-05 14:16:51 -05:00
end
2013-02-05 14:16:51 -05:00
end
class HandledExceptionWrapper < StandardError
attr_accessor :wrapped
def initialize(ex)
super("Wrapped #{ex.class}: #{ex.message}")
@wrapped = ex
end
end
2013-08-07 13:25:05 -04:00
class Scheduled < Base
extend Scheduler::Schedule
def perform(*args)
return if Discourse.readonly_mode?
super
end
2013-08-07 13:25:05 -04:00
end
2013-02-05 14:16:51 -05:00
2017-07-27 21:20:09 -04:00
def self.enqueue(job_name, opts = {})
2013-08-07 13:25:05 -04:00
klass = "Jobs::#{job_name.to_s.camelcase}".constantize
2013-02-05 14:16:51 -05:00
# Unless we want to work on all sites
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
# If we are able to queue a job, do it
if SiteSetting.queue_jobs?
if opts[:delay_for].present?
2015-12-31 23:40:12 -05:00
klass.perform_in(opts.delete(:delay_for), opts)
2013-02-05 14:16:51 -05:00
else
2013-08-07 13:25:05 -04:00
Sidekiq::Client.enqueue(klass, opts)
2013-02-05 14:16:51 -05:00
end
else
# Otherwise execute the job right away
opts.delete(:delay_for)
opts[:sync_exec] = true
if Rails.env == "development"
Scheduler::Defer.later("job") do
klass.new.perform(opts)
end
else
klass.new.perform(opts)
end
2013-02-05 14:16:51 -05:00
end
end
2017-07-27 21:20:09 -04:00
def self.enqueue_in(secs, job_name, opts = {})
2013-02-05 14:16:51 -05:00
enqueue(job_name, opts.merge!(delay_for: secs))
end
2017-07-27 21:20:09 -04:00
def self.enqueue_at(datetime, job_name, opts = {})
2015-07-29 10:34:21 -04:00
secs = [(datetime - Time.zone.now).to_i, 0].max
enqueue_in(secs, job_name, opts)
end
2017-07-27 21:20:09 -04:00
def self.cancel_scheduled_job(job_name, opts = {})
scheduled_for(job_name, opts).each(&:delete)
end
2017-07-27 21:20:09 -04:00
def self.scheduled_for(job_name, opts = {})
opts = opts.with_indifferent_access
unless opts.delete(:all_sites)
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
end
job_class = "Jobs::#{job_name.to_s.camelcase}"
Sidekiq::ScheduledSet.new.select do |scheduled_job|
if scheduled_job.klass.to_s == job_class
matched = true
job_params = scheduled_job.item["args"][0].with_indifferent_access
opts.each do |key, value|
if job_params[key] != value
matched = false
break
end
end
matched
else
false
end
end
end
2013-02-05 14:16:51 -05:00
end
2017-07-27 21:20:09 -04:00
Dir["#{Rails.root}/app/jobs/onceoff/*.rb"].each { |file| require_dependency file }
Dir["#{Rails.root}/app/jobs/regular/*.rb"].each { |file| require_dependency file }
Dir["#{Rails.root}/app/jobs/scheduled/*.rb"].each { |file| require_dependency file }