DEV: Fix threading error when running jobs immediately in system tests (#19811)
```
class Jobs::DummyDelayedJob < Jobs::Base
def execute(args = {})
end
end
RSpec.describe "Jobs.run_immediately!" do
before { Jobs.run_immediately! }
it "explodes" do
current_user = Fabricate(:user)
Jobs.enqueue_in(1.seconds, :dummy_delayed_job)
sign_in(current_user)
end
end
```
The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool.
```
ActiveRecord::ActiveRecordError:
Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>.
```
We're not exactly sure if this is an ActiveRecord bug or not but we've
invested too much time into investigating this problem. Fundamentally,
we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block
within `Jobs::Base#perform` which was added in
ceddb6e0da
10 years ago. This
commit moves the logic for running jobs immediately out of the
`Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that
`ActiveRecord::Base.connection_handler.clear_active_connections!` is not
called. This change will only impact the test environment.
This commit is contained in:
parent
d24d27f557
commit
8a7b62b126
|
@ -197,27 +197,30 @@ module Jobs
|
||||||
@db_duration || 0
|
@db_duration || 0
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def perform_immediately(*args)
|
||||||
|
opts = args.extract_options!.with_indifferent_access
|
||||||
|
|
||||||
|
if opts.has_key?(:current_site_id) &&
|
||||||
|
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
||||||
|
raise ArgumentError.new(
|
||||||
|
"You can't connect to another database when executing a job synchronously.",
|
||||||
|
)
|
||||||
|
else
|
||||||
|
begin
|
||||||
|
retval = execute(opts)
|
||||||
|
rescue => exc
|
||||||
|
Discourse.handle_job_exception(exc, error_context(opts))
|
||||||
|
end
|
||||||
|
|
||||||
|
retval
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def perform(*args)
|
def perform(*args)
|
||||||
opts = args.extract_options!.with_indifferent_access
|
opts = args.extract_options!.with_indifferent_access
|
||||||
|
|
||||||
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
|
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
|
||||||
|
|
||||||
if opts.delete(:sync_exec)
|
|
||||||
if opts.has_key?(:current_site_id) &&
|
|
||||||
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
|
||||||
raise ArgumentError.new(
|
|
||||||
"You can't connect to another database when executing a job synchronously.",
|
|
||||||
)
|
|
||||||
else
|
|
||||||
begin
|
|
||||||
retval = execute(opts)
|
|
||||||
rescue => exc
|
|
||||||
Discourse.handle_job_exception(exc, error_context(opts))
|
|
||||||
end
|
|
||||||
return retval
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
dbs =
|
dbs =
|
||||||
if opts[:current_site_id]
|
if opts[:current_site_id]
|
||||||
[opts[:current_site_id]]
|
[opts[:current_site_id]]
|
||||||
|
@ -334,9 +337,6 @@ module Jobs
|
||||||
|
|
||||||
DB.after_commit { klass.client_push(hash) }
|
DB.after_commit { klass.client_push(hash) }
|
||||||
else
|
else
|
||||||
# Otherwise execute the job right away
|
|
||||||
opts["sync_exec"] = true
|
|
||||||
|
|
||||||
if Rails.env == "development"
|
if Rails.env == "development"
|
||||||
Scheduler::Defer.later("job") { klass.new.perform(opts) }
|
Scheduler::Defer.later("job") { klass.new.perform(opts) }
|
||||||
else
|
else
|
||||||
|
@ -358,7 +358,7 @@ module Jobs
|
||||||
begin
|
begin
|
||||||
until queue.empty?
|
until queue.empty?
|
||||||
queued_klass, queued_opts = queue.pop(true)
|
queued_klass, queued_opts = queue.pop(true)
|
||||||
queued_klass.new.perform(queued_opts)
|
queued_klass.new.perform_immediately(queued_opts)
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
Thread.current[:discourse_nested_job_queue] = nil
|
Thread.current[:discourse_nested_job_queue] = nil
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
RSpec.describe Jobs::CreateLinkedTopic do
|
RSpec.describe Jobs::CreateLinkedTopic do
|
||||||
it "returns when the post cannot be found" do
|
it "returns when the post cannot be found" do
|
||||||
expect { Jobs::CreateLinkedTopic.new.perform(post_id: 1, sync_exec: true) }.not_to raise_error
|
expect { Jobs::CreateLinkedTopic.new.execute(post_id: 1) }.not_to raise_error
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with a post" do
|
context "with a post" do
|
||||||
|
|
|
@ -59,12 +59,12 @@ RSpec.describe ::Jobs::Base do
|
||||||
|
|
||||||
it "delegates the process call to execute" do
|
it "delegates the process call to execute" do
|
||||||
::Jobs::Base.any_instance.expects(:execute).with({ "hello" => "world" })
|
::Jobs::Base.any_instance.expects(:execute).with({ "hello" => "world" })
|
||||||
::Jobs::Base.new.perform("hello" => "world", "sync_exec" => true)
|
::Jobs::Base.new.perform("hello" => "world")
|
||||||
end
|
end
|
||||||
|
|
||||||
it "converts to an indifferent access hash" do
|
it "converts to an indifferent access hash" do
|
||||||
::Jobs::Base.any_instance.expects(:execute).with(instance_of(HashWithIndifferentAccess))
|
::Jobs::Base.any_instance.expects(:execute).with(instance_of(HashWithIndifferentAccess))
|
||||||
::Jobs::Base.new.perform("hello" => "world", "sync_exec" => true)
|
::Jobs::Base.new.perform("hello" => "world")
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with fake jobs" do
|
context "with fake jobs" do
|
||||||
|
|
|
@ -101,8 +101,9 @@ RSpec.describe Jobs do
|
||||||
it "executes the job right away" do
|
it "executes the job right away" do
|
||||||
Jobs::ProcessPost
|
Jobs::ProcessPost
|
||||||
.any_instance
|
.any_instance
|
||||||
.expects(:perform)
|
.expects(:perform_immediately)
|
||||||
.with({ "post_id" => 1, "sync_exec" => true, "current_site_id" => "default" })
|
.with({ "post_id" => 1, "current_site_id" => "default" })
|
||||||
|
|
||||||
Jobs.enqueue(:process_post, post_id: 1)
|
Jobs.enqueue(:process_post, post_id: 1)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
RSpec.describe Jobs::ProcessPost do
|
RSpec.describe Jobs::ProcessPost do
|
||||||
it "returns when the post cannot be found" do
|
it "returns when the post cannot be found" do
|
||||||
expect { Jobs::ProcessPost.new.perform(post_id: 1, sync_exec: true) }.not_to raise_error
|
expect { Jobs::ProcessPost.new.execute(post_id: 1) }.not_to raise_error
|
||||||
end
|
end
|
||||||
|
|
||||||
context "with a post" do
|
context "with a post" do
|
||||||
|
|
Loading…
Reference in New Issue