DEV: Improve thread-safety of sidekiq logging
This commit is contained in:
parent
ac8425ad30
commit
e2510d79cc
|
@ -19,50 +19,53 @@ module Jobs
|
|||
class JobInstrumenter
|
||||
def initialize(job_class:, opts:, db:, jid:)
|
||||
return unless enabled?
|
||||
@data = {}
|
||||
self.class.mutex.synchronize do
|
||||
@data = {}
|
||||
|
||||
@data["hostname"] = `hostname`.strip # Hostname
|
||||
@data["pid"] = Process.pid # Pid
|
||||
@data["database"] = db # DB name - multisite db name it ran on
|
||||
@data["job_id"] = jid # Job unique ID
|
||||
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
|
||||
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
|
||||
@data["opts"] = opts.to_json # Params - json encoded params for the job
|
||||
@data["hostname"] = `hostname`.strip # Hostname
|
||||
@data["pid"] = Process.pid # Pid
|
||||
@data["database"] = db # DB name - multisite db name it ran on
|
||||
@data["job_id"] = jid # Job unique ID
|
||||
@data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
|
||||
@data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
|
||||
@data["opts"] = opts.to_json # Params - json encoded params for the job
|
||||
|
||||
@data["status"] = 'pending'
|
||||
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
@data["status"] = 'pending'
|
||||
@start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
|
||||
self.class.ensure_interval_logging!
|
||||
@@active_jobs ||= []
|
||||
@@active_jobs << self
|
||||
self.class.ensure_interval_logging!
|
||||
@@active_jobs ||= []
|
||||
@@active_jobs << self
|
||||
|
||||
MethodProfiler.ensure_discourse_instrumentation!
|
||||
MethodProfiler.start
|
||||
MethodProfiler.ensure_discourse_instrumentation!
|
||||
MethodProfiler.start
|
||||
end
|
||||
end
|
||||
|
||||
def stop(exception:)
|
||||
return unless enabled?
|
||||
self.class.mutex.synchronize do
|
||||
profile = MethodProfiler.stop
|
||||
|
||||
profile = MethodProfiler.stop
|
||||
@@active_jobs.delete(self)
|
||||
|
||||
@@active_jobs.delete(self)
|
||||
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
|
||||
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
|
||||
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
|
||||
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
|
||||
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
|
||||
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
|
||||
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
|
||||
|
||||
@data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
|
||||
@data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
|
||||
@data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
|
||||
@data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
|
||||
@data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
|
||||
@data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
|
||||
@data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands
|
||||
if exception.present?
|
||||
@data["exception"] = exception # Exception - if job fails a json encoded exception
|
||||
@data["status"] = 'failed'
|
||||
else
|
||||
@data["status"] = 'success' # Status - fail, success, pending
|
||||
end
|
||||
|
||||
if exception.present?
|
||||
@data["exception"] = exception # Exception - if job fails a json encoded exception
|
||||
@data["status"] = 'failed'
|
||||
else
|
||||
@data["status"] = 'success' # Status - fail, success, pending
|
||||
write_to_log
|
||||
end
|
||||
|
||||
write_to_log
|
||||
end
|
||||
|
||||
def self.raw_log(message)
|
||||
|
@ -97,14 +100,21 @@ module Jobs
|
|||
ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
|
||||
end
|
||||
|
||||
def self.mutex
|
||||
@@mutex ||= Mutex.new
|
||||
end
|
||||
|
||||
def self.ensure_interval_logging!
|
||||
interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
|
||||
return if !interval
|
||||
interval = interval.to_i
|
||||
@@interval_thread ||= Thread.new do
|
||||
begin
|
||||
loop do
|
||||
sleep interval.to_i
|
||||
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval.to_i }
|
||||
sleep interval
|
||||
mutex.synchronize do
|
||||
@@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
|
||||
end
|
||||
end
|
||||
rescue Exception => e
|
||||
Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")
|
||||
|
|
Loading…
Reference in New Issue