Merge pull request #2532 from riking/sidekiq
Pass more context from Sidekiq jobs to Logster
This commit is contained in:
commit
1359a02128
|
@ -385,7 +385,7 @@ class TopicsController < ApplicationController
|
||||||
user_id = (current_user.id if current_user)
|
user_id = (current_user.id if current_user)
|
||||||
track_visit = should_track_visit_to_topic?
|
track_visit = should_track_visit_to_topic?
|
||||||
|
|
||||||
Scheduler::Defer.later do
|
Scheduler::Defer.later "Track Visit" do
|
||||||
View.create_for_parent(Topic, topic_id, ip, user_id)
|
View.create_for_parent(Topic, topic_id, ip, user_id)
|
||||||
if track_visit
|
if track_visit
|
||||||
TopicUser.track_visit! topic_id, user_id
|
TopicUser.track_visit! topic_id, user_id
|
||||||
|
|
|
@ -55,6 +55,23 @@ module Jobs
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Construct an error context object for Discourse.handle_exception
|
||||||
|
# Subclasses are encouraged to use this!
|
||||||
|
#
|
||||||
|
# `opts` is the arguments passed to execute().
|
||||||
|
# `code_desc` is a short string describing what the code was doing (optional).
|
||||||
|
# `extra` is for any other context you logged.
|
||||||
|
# Note that, when building your `extra`, that :opts, :job, and :code are used by this method,
|
||||||
|
# and :current_db and :current_hostname are used by handle_exception.
|
||||||
|
def error_context(opts, code_desc = nil, extra = {})
|
||||||
|
ctx = {}
|
||||||
|
ctx[:opts] = opts
|
||||||
|
ctx[:job] = self.class
|
||||||
|
ctx[:message] = code_desc if code_desc
|
||||||
|
ctx.merge!(extra) if extra != nil
|
||||||
|
ctx
|
||||||
|
end
|
||||||
|
|
||||||
def self.delayed_perform(opts={})
|
def self.delayed_perform(opts={})
|
||||||
self.new.perform(opts)
|
self.new.perform(opts)
|
||||||
end
|
end
|
||||||
|
@ -75,6 +92,7 @@ module Jobs
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform(*args)
|
def perform(*args)
|
||||||
|
total_db_time = 0
|
||||||
ensure_db_instrumented
|
ensure_db_instrumented
|
||||||
opts = args.extract_options!.with_indifferent_access
|
opts = args.extract_options!.with_indifferent_access
|
||||||
|
|
||||||
|
@ -88,7 +106,12 @@ module Jobs
|
||||||
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
||||||
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
|
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
|
||||||
else
|
else
|
||||||
return execute(opts)
|
begin
|
||||||
|
retval = execute(opts)
|
||||||
|
rescue => exc
|
||||||
|
Discourse.handle_exception(exc, error_context(opts))
|
||||||
|
end
|
||||||
|
return retval
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -100,11 +123,10 @@ module Jobs
|
||||||
RailsMultisite::ConnectionManagement.all_dbs
|
RailsMultisite::ConnectionManagement.all_dbs
|
||||||
end
|
end
|
||||||
|
|
||||||
total_db_time = 0
|
|
||||||
exceptions = []
|
exceptions = []
|
||||||
dbs.each do |db|
|
dbs.each do |db|
|
||||||
begin
|
begin
|
||||||
thread_exception = nil
|
thread_exception = {}
|
||||||
# NOTE: This looks odd, in fact it looks crazy but there is a reason
|
# NOTE: This looks odd, in fact it looks crazy but there is a reason
|
||||||
# A bug in therubyracer means that under certain conditions running in a fiber
|
# A bug in therubyracer means that under certain conditions running in a fiber
|
||||||
# can cause the whole v8 context to corrupt so much that it will hang sidekiq
|
# can cause the whole v8 context to corrupt so much that it will hang sidekiq
|
||||||
|
@ -128,9 +150,15 @@ module Jobs
|
||||||
begin
|
begin
|
||||||
RailsMultisite::ConnectionManagement.establish_connection(db: db)
|
RailsMultisite::ConnectionManagement.establish_connection(db: db)
|
||||||
I18n.locale = SiteSetting.default_locale
|
I18n.locale = SiteSetting.default_locale
|
||||||
|
begin
|
||||||
execute(opts)
|
execute(opts)
|
||||||
rescue => e
|
rescue => e
|
||||||
thread_exception = e
|
thread_exception[:ex] = e
|
||||||
|
end
|
||||||
|
rescue => e
|
||||||
|
thread_exception[:ex] = e
|
||||||
|
thread_exception[:message] = "While establishing database connection to #{db}"
|
||||||
|
thread_exception[:other] = { problem_db: db }
|
||||||
ensure
|
ensure
|
||||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||||
total_db_time += Instrumenter.stats.duration_ms
|
total_db_time += Instrumenter.stats.duration_ms
|
||||||
|
@ -138,17 +166,19 @@ module Jobs
|
||||||
end
|
end
|
||||||
t.join
|
t.join
|
||||||
|
|
||||||
exceptions << thread_exception if thread_exception
|
exceptions << thread_exception unless thread_exception.empty?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if exceptions.length > 0
|
if exceptions.length > 0
|
||||||
exceptions[1..-1].each do |exception|
|
exceptions.each do |exception_hash|
|
||||||
Discourse.handle_exception(exception, opts)
|
Discourse.handle_exception(exception_hash[:ex],
|
||||||
|
error_context(opts, exception_hash[:code], exception_hash[:other]))
|
||||||
end
|
end
|
||||||
raise exceptions[0]
|
raise HandledExceptionWrapper.new exceptions[0][:ex]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
nil
|
||||||
ensure
|
ensure
|
||||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||||
@db_duration = total_db_time
|
@db_duration = total_db_time
|
||||||
|
@ -156,6 +186,14 @@ module Jobs
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class HandledExceptionWrapper < Exception
|
||||||
|
attr_accessor :wrapped
|
||||||
|
def initialize(ex)
|
||||||
|
super("Wrapped #{ex.class}: #{ex.message}")
|
||||||
|
@wrapped = ex
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
class Scheduled < Base
|
class Scheduled < Base
|
||||||
extend Scheduler::Schedule
|
extend Scheduler::Schedule
|
||||||
end
|
end
|
||||||
|
|
|
@ -29,7 +29,10 @@ module Jobs
|
||||||
|
|
||||||
# Forces rebake of old posts where needed, as long as no system avatars need updating
|
# Forces rebake of old posts where needed, as long as no system avatars need updating
|
||||||
unless UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
|
unless UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
|
||||||
Post.rebake_old(250)
|
problems = Post.rebake_old(250)
|
||||||
|
problems.each do |hash|
|
||||||
|
Discourse.handle_exception(hash[:ex], error_context(args, "Rebaking post id #{hash[:post].id}", post_id: hash[:post].id))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -13,6 +13,7 @@ module Jobs
|
||||||
include Email::BuildEmailHelper
|
include Email::BuildEmailHelper
|
||||||
|
|
||||||
def execute(args)
|
def execute(args)
|
||||||
|
@args = args
|
||||||
if SiteSetting.pop3s_polling_enabled?
|
if SiteSetting.pop3s_polling_enabled?
|
||||||
poll_pop3s
|
poll_pop3s
|
||||||
end
|
end
|
||||||
|
@ -47,7 +48,7 @@ module Jobs
|
||||||
client_message = RejectionMailer.send_rejection(message.from, message.body, message.subject, message.to, message_template)
|
client_message = RejectionMailer.send_rejection(message.from, message.body, message.subject, message.to, message_template)
|
||||||
Email::Sender.new(client_message, message_template).send
|
Email::Sender.new(client_message, message_template).send
|
||||||
else
|
else
|
||||||
Discourse.handle_exception(e, { code: "unknown error for incoming email", mail: mail_string} )
|
Discourse.handle_exception(e, error_context(@args, "Unrecognized error type when processing incoming email", mail: mail_string))
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
mail.delete
|
mail.delete
|
||||||
|
@ -70,7 +71,7 @@ module Jobs
|
||||||
pop.finish
|
pop.finish
|
||||||
end
|
end
|
||||||
rescue Net::POPAuthenticationError => e
|
rescue Net::POPAuthenticationError => e
|
||||||
Discourse.handle_exception(e, { code: "signing in for incoming email" } )
|
Discourse.handle_exception(e, error_context(@args, "Signing in to poll incoming email"))
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -319,14 +319,16 @@ class Post < ActiveRecord::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.rebake_old(limit)
|
def self.rebake_old(limit)
|
||||||
|
problems = []
|
||||||
Post.where('baked_version IS NULL OR baked_version < ?', BAKED_VERSION)
|
Post.where('baked_version IS NULL OR baked_version < ?', BAKED_VERSION)
|
||||||
.limit(limit).each do |p|
|
.limit(limit).each do |p|
|
||||||
begin
|
begin
|
||||||
p.rebake!
|
p.rebake!
|
||||||
rescue => e
|
rescue => e
|
||||||
Discourse.handle_exception(e)
|
problems << {post: p, ex: e}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
problems
|
||||||
end
|
end
|
||||||
|
|
||||||
def rebake!(opts={})
|
def rebake!(opts={})
|
||||||
|
|
|
@ -33,7 +33,7 @@ if Sidekiq.server?
|
||||||
manager.tick
|
manager.tick
|
||||||
rescue => e
|
rescue => e
|
||||||
# the show must go on
|
# the show must go on
|
||||||
Discourse.handle_exception(e)
|
Discourse.handle_exception(e, {message: "While ticking scheduling manager"})
|
||||||
end
|
end
|
||||||
sleep 1
|
sleep 1
|
||||||
end
|
end
|
||||||
|
@ -43,18 +43,29 @@ end
|
||||||
|
|
||||||
Sidekiq.logger.level = Logger::WARN
|
Sidekiq.logger.level = Logger::WARN
|
||||||
|
|
||||||
class LogsterErrorHandler
|
class SidekiqLogsterReporter < Sidekiq::ExceptionHandler::Logger
|
||||||
def call(ex, hash={})
|
def call(ex, context = {})
|
||||||
text = "exception: #{ex}\ncontext: #{hash.inspect}\n"
|
# Pass context to Logster
|
||||||
if ex.backtrace
|
fake_env = {}
|
||||||
text << "backtrace: #{ex.backtrace.join("\n")}"
|
context.each do |key, value|
|
||||||
|
Logster.add_to_env(fake_env, key, value)
|
||||||
end
|
end
|
||||||
Rails.logger.error(text)
|
|
||||||
|
text = "Job exception: #{ex}\n"
|
||||||
|
if ex.backtrace
|
||||||
|
Logster.add_to_env(fake_env, :backtrace, ex.backtrace)
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.current[Logster::Logger::LOGSTER_ENV] = fake_env
|
||||||
|
Logster.logger.error(text)
|
||||||
rescue => e
|
rescue => e
|
||||||
Rails.logger.fatal("Failed to log exception #{ex} #{hash}\nReason: #{e}\n#{e.backtrace.join("\n")}")
|
Logster.logger.fatal("Failed to log exception #{ex} #{hash}\nReason: #{e.class} #{e}\n#{e.backtrace.join("\n")}")
|
||||||
|
ensure
|
||||||
|
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
Sidekiq.error_handlers << LogsterErrorHandler.new
|
Sidekiq.error_handlers.clear
|
||||||
|
Sidekiq.error_handlers << SidekiqLogsterReporter.new
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ class Auth::DefaultCurrentUserProvider
|
||||||
|
|
||||||
if current_user && should_update_last_seen?
|
if current_user && should_update_last_seen?
|
||||||
u = current_user
|
u = current_user
|
||||||
Scheduler::Defer.later do
|
Scheduler::Defer.later "Updating Last Seen" do
|
||||||
u.update_last_seen!
|
u.update_last_seen!
|
||||||
u.update_ip_address!(request.ip)
|
u.update_ip_address!(request.ip)
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,7 +9,13 @@ module Discourse
|
||||||
extend Sidekiq::ExceptionHandler
|
extend Sidekiq::ExceptionHandler
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.handle_exception(ex, context=nil, parent_logger = nil)
|
# Log an exception.
|
||||||
|
#
|
||||||
|
# If your code is in a scheduled job, it is recommended to use the
|
||||||
|
# error_context() method in Jobs::Base to pass the job arguments and any
|
||||||
|
# other desired context.
|
||||||
|
# See app/jobs/base.rb for the error_context function.
|
||||||
|
def self.handle_exception(ex, context = {}, parent_logger = nil)
|
||||||
context ||= {}
|
context ||= {}
|
||||||
parent_logger ||= SidekiqExceptionHandler
|
parent_logger ||= SidekiqExceptionHandler
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ module Oneboxer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rescue => e
|
rescue => e
|
||||||
Discourse.handle_exception(e, url: url)
|
Discourse.handle_exception(e, message: "While trying to onebox a URL", url: url)
|
||||||
# return a blank hash, so rest of the code works
|
# return a blank hash, so rest of the code works
|
||||||
{preview: "", onebox: ""}
|
{preview: "", onebox: ""}
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,10 +14,10 @@ module Scheduler
|
||||||
@async = val
|
@async = val
|
||||||
end
|
end
|
||||||
|
|
||||||
def later(&blk)
|
def later(desc = nil, &blk)
|
||||||
if @async
|
if @async
|
||||||
start_thread unless @thread.alive?
|
start_thread unless @thread.alive?
|
||||||
@queue << [RailsMultisite::ConnectionManagement.current_db, blk]
|
@queue << [RailsMultisite::ConnectionManagement.current_db, blk, desc]
|
||||||
else
|
else
|
||||||
blk.call
|
blk.call
|
||||||
end
|
end
|
||||||
|
@ -46,11 +46,15 @@ module Scheduler
|
||||||
end
|
end
|
||||||
|
|
||||||
def do_work
|
def do_work
|
||||||
db, job = @queue.deq
|
db, job, desc = @queue.deq
|
||||||
|
begin
|
||||||
RailsMultisite::ConnectionManagement.establish_connection(db: db)
|
RailsMultisite::ConnectionManagement.establish_connection(db: db)
|
||||||
job.call
|
job.call
|
||||||
rescue => ex
|
rescue => ex
|
||||||
Discourse.handle_exception(ex)
|
Discourse.handle_exception(ex, {message: "Running deferred code '#{desc}'"})
|
||||||
|
end
|
||||||
|
rescue => ex
|
||||||
|
Discourse.handle_exception(ex, {message: "Processing deferred code queue"})
|
||||||
ensure
|
ensure
|
||||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||||
end
|
end
|
||||||
|
|
|
@ -42,13 +42,13 @@ module Scheduler
|
||||||
def keep_alive
|
def keep_alive
|
||||||
@manager.keep_alive
|
@manager.keep_alive
|
||||||
rescue => ex
|
rescue => ex
|
||||||
Discourse.handle_exception(ex)
|
Discourse.handle_exception(ex, {message: "Scheduling manager keep-alive"})
|
||||||
end
|
end
|
||||||
|
|
||||||
def reschedule_orphans
|
def reschedule_orphans
|
||||||
@manager.reschedule_orphans!
|
@manager.reschedule_orphans!
|
||||||
rescue => ex
|
rescue => ex
|
||||||
Discourse.handle_exception(ex)
|
Discourse.handle_exception(ex, {message: "Scheduling manager orphan rescheduler"})
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_queue
|
def process_queue
|
||||||
|
@ -62,8 +62,11 @@ module Scheduler
|
||||||
info.prev_result = "RUNNING"
|
info.prev_result = "RUNNING"
|
||||||
@mutex.synchronize { info.write! }
|
@mutex.synchronize { info.write! }
|
||||||
klass.new.perform
|
klass.new.perform
|
||||||
|
rescue Jobs::HandledExceptionWrapper
|
||||||
|
# Discourse.handle_exception was already called, and we don't have any extra info to give
|
||||||
|
failed = true
|
||||||
rescue => e
|
rescue => e
|
||||||
Discourse.handle_exception(e)
|
Discourse.handle_exception(e, {message: "Running a scheduled job", job: klass})
|
||||||
failed = true
|
failed = true
|
||||||
end
|
end
|
||||||
duration = ((Time.now.to_f - start) * 1000).to_i
|
duration = ((Time.now.to_f - start) * 1000).to_i
|
||||||
|
@ -74,7 +77,7 @@ module Scheduler
|
||||||
@mutex.synchronize { info.write! }
|
@mutex.synchronize { info.write! }
|
||||||
end
|
end
|
||||||
rescue => ex
|
rescue => ex
|
||||||
Discourse.handle_exception(ex)
|
Discourse.handle_exception(ex, {message: "Processing scheduled job queue"})
|
||||||
ensure
|
ensure
|
||||||
@running = false
|
@running = false
|
||||||
end
|
end
|
||||||
|
|
|
@ -117,22 +117,37 @@ describe Discourse do
|
||||||
end
|
end
|
||||||
|
|
||||||
context "#handle_exception" do
|
context "#handle_exception" do
|
||||||
class TempLogger
|
|
||||||
|
class TempSidekiqLogger < Sidekiq::ExceptionHandler::Logger
|
||||||
attr_accessor :exception, :context
|
attr_accessor :exception, :context
|
||||||
def handle_exception(exception, context)
|
def call(ex, ctx)
|
||||||
self.exception = exception
|
self.exception = ex
|
||||||
self.context = context
|
self.context = ctx
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
let!(:logger) { TempSidekiqLogger.new }
|
||||||
|
|
||||||
|
before do
|
||||||
|
Sidekiq.error_handlers.clear
|
||||||
|
Sidekiq.error_handlers << logger
|
||||||
|
end
|
||||||
|
|
||||||
it "should not fail when called" do
|
it "should not fail when called" do
|
||||||
logger = TempLogger.new
|
|
||||||
exception = StandardError.new
|
exception = StandardError.new
|
||||||
|
|
||||||
Discourse.handle_exception(exception, nil, logger)
|
Discourse.handle_exception(exception, nil, nil)
|
||||||
logger.exception.should == exception
|
logger.exception.should == exception
|
||||||
logger.context.keys.should == [:current_db, :current_hostname]
|
logger.context.keys.should == [:current_db, :current_hostname]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "correctly passes extra context" do
|
||||||
|
exception = StandardError.new
|
||||||
|
|
||||||
|
Discourse.handle_exception(exception, {message: "Doing a test", post_id: 31}, nil)
|
||||||
|
logger.exception.should == exception
|
||||||
|
logger.context.keys.sort.should == [:current_db, :current_hostname, :message, :post_id].sort
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -27,13 +27,13 @@ describe Jobs::Base do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'handles errors in multisite' do
|
it 'handles errors in multisite' do
|
||||||
RailsMultisite::ConnectionManagement.expects(:all_dbs).returns(['default','default'])
|
RailsMultisite::ConnectionManagement.expects(:all_dbs).returns(['default','default','default'])
|
||||||
# just stub so logs are not noisy
|
# one exception per database
|
||||||
Discourse.expects(:handle_exception).returns(nil)
|
Discourse.expects(:handle_exception).times(3)
|
||||||
|
|
||||||
bad = BadJob.new
|
bad = BadJob.new
|
||||||
expect{bad.perform({})}.to raise_error
|
expect{bad.perform({})}.to raise_error
|
||||||
bad.fail_count.should == 2
|
bad.fail_count.should == 3
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'delegates the process call to execute' do
|
it 'delegates the process call to execute' do
|
||||||
|
|
Loading…
Reference in New Issue