From 8a7b62b126111ff48e5c79b63bb068cd43d908e7 Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan <gxtan1990@gmail.com> Date: Tue, 10 Jan 2023 13:41:25 +0800 Subject: [PATCH] DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment. --- app/jobs/base.rb | 40 +++++++++++++-------------- spec/jobs/create_linked_topic_spec.rb | 2 +- spec/jobs/jobs_base_spec.rb | 4 +-- spec/jobs/jobs_spec.rb | 5 ++-- spec/jobs/process_post_spec.rb | 2 +- 5 files changed, 27 insertions(+), 26 deletions(-) diff --git a/app/jobs/base.rb b/app/jobs/base.rb index 14255c53310..0f671c95a55 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -197,27 +197,30 @@ module Jobs @db_duration || 0 end + def perform_immediately(*args) + opts = args.extract_options!.with_indifferent_access + + if opts.has_key?(:current_site_id) && + opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db + raise ArgumentError.new( + "You can't connect to another database when executing a job synchronously.", + ) + else + begin + retval = execute(opts) + rescue => exc + Discourse.handle_job_exception(exc, error_context(opts)) + end + + retval + end + end + def perform(*args) opts = args.extract_options!.with_indifferent_access Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later? - if opts.delete(:sync_exec) - if opts.has_key?(:current_site_id) && - opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db - raise ArgumentError.new( - "You can't connect to another database when executing a job synchronously.", - ) - else - begin - retval = execute(opts) - rescue => exc - Discourse.handle_job_exception(exc, error_context(opts)) - end - return retval - end - end - dbs = if opts[:current_site_id] [opts[:current_site_id]] @@ -334,9 +337,6 @@ module Jobs DB.after_commit { klass.client_push(hash) } else - # Otherwise execute the job right away - opts["sync_exec"] = true - if Rails.env == "development" Scheduler::Defer.later("job") { klass.new.perform(opts) } else @@ -358,7 +358,7 @@ module Jobs begin until queue.empty? queued_klass, queued_opts = queue.pop(true) - queued_klass.new.perform(queued_opts) + queued_klass.new.perform_immediately(queued_opts) end ensure Thread.current[:discourse_nested_job_queue] = nil diff --git a/spec/jobs/create_linked_topic_spec.rb b/spec/jobs/create_linked_topic_spec.rb index 973492bbcd9..d2c6244de50 100644 --- a/spec/jobs/create_linked_topic_spec.rb +++ b/spec/jobs/create_linked_topic_spec.rb @@ -2,7 +2,7 @@ RSpec.describe Jobs::CreateLinkedTopic do it "returns when the post cannot be found" do - expect { Jobs::CreateLinkedTopic.new.perform(post_id: 1, sync_exec: true) }.not_to raise_error + expect { Jobs::CreateLinkedTopic.new.execute(post_id: 1) }.not_to raise_error end context "with a post" do diff --git a/spec/jobs/jobs_base_spec.rb b/spec/jobs/jobs_base_spec.rb index 19899879bfa..a76fed09e3b 100644 --- a/spec/jobs/jobs_base_spec.rb +++ b/spec/jobs/jobs_base_spec.rb @@ -59,12 +59,12 @@ RSpec.describe ::Jobs::Base do it "delegates the process call to execute" do ::Jobs::Base.any_instance.expects(:execute).with({ "hello" => "world" }) - ::Jobs::Base.new.perform("hello" => "world", "sync_exec" => true) + ::Jobs::Base.new.perform("hello" => "world") end it "converts to an indifferent access hash" do ::Jobs::Base.any_instance.expects(:execute).with(instance_of(HashWithIndifferentAccess)) - ::Jobs::Base.new.perform("hello" => "world", "sync_exec" => true) + ::Jobs::Base.new.perform("hello" => "world") end context "with fake jobs" do diff --git a/spec/jobs/jobs_spec.rb b/spec/jobs/jobs_spec.rb index c20f23a89dd..c26af358cfe 100644 --- a/spec/jobs/jobs_spec.rb +++ b/spec/jobs/jobs_spec.rb @@ -101,8 +101,9 @@ RSpec.describe Jobs do it "executes the job right away" do Jobs::ProcessPost .any_instance - .expects(:perform) - .with({ "post_id" => 1, "sync_exec" => true, "current_site_id" => "default" }) + .expects(:perform_immediately) + .with({ "post_id" => 1, "current_site_id" => "default" }) + Jobs.enqueue(:process_post, post_id: 1) end diff --git a/spec/jobs/process_post_spec.rb b/spec/jobs/process_post_spec.rb index d7f35b3ff14..20c1b1c46fd 100644 --- a/spec/jobs/process_post_spec.rb +++ b/spec/jobs/process_post_spec.rb @@ -2,7 +2,7 @@ RSpec.describe Jobs::ProcessPost do it "returns when the post cannot be found" do - expect { Jobs::ProcessPost.new.perform(post_id: 1, sync_exec: true) }.not_to raise_error + expect { Jobs::ProcessPost.new.execute(post_id: 1) }.not_to raise_error end context "with a post" do