DEV: Run jobs sequentially in test mode (#9897)
When running jobs in tests, we use `Jobs.run_immediately!`. This means that jobs are run synchronously when they are enqueued. Jobs sometimes enqueue other jobs, which are also executed synchronously. This means that the outermost job will block until the inner jobs have finished executing. In some cases (e.g. process_post with hotlinked images) this can lead to a deadlock. This commit changes the behavior slightly. Now we will never run jobs inside other jobs. Instead, we will queue them up and run them sequentially in the order they were enqueued. As a whole, they are still executed synchronously. Consider the example ```ruby class Jobs::InnerJob < Jobs::Base def execute(args) puts "Running inner job" end end class Jobs::OuterJob < Jobs::Base def execute(args) puts "Starting outer job" Jobs.enqueue(:inner_job) puts "Finished outer job" end end Jobs.enqueue(:outer_job) puts "All jobs complete" ``` The old behavior would result in: ``` Starting outer job Running inner job Finished outer job All jobs complete ``` The new behavior will result in: ``` Starting outer job Finished outer job Running inner job All jobs complete ```
This commit is contained in:
parent
6491db579b
commit
8a3d9d7036
|
@ -281,8 +281,12 @@ module Jobs
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.enqueue(job_name, opts = {})
|
def self.enqueue(job, opts = {})
|
||||||
klass = "::Jobs::#{job_name.to_s.camelcase}".constantize
|
if job.instance_of?(Class)
|
||||||
|
klass = job
|
||||||
|
else
|
||||||
|
klass = "::Jobs::#{job.to_s.camelcase}".constantize
|
||||||
|
end
|
||||||
|
|
||||||
# Unless we want to work on all sites
|
# Unless we want to work on all sites
|
||||||
unless opts.delete(:all_sites)
|
unless opts.delete(:all_sites)
|
||||||
|
@ -319,7 +323,30 @@ module Jobs
|
||||||
klass.new.perform(opts)
|
klass.new.perform(opts)
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
klass.new.perform(opts)
|
# Run the job synchronously
|
||||||
|
# But never run a job inside another job
|
||||||
|
# That could cause deadlocks during test runs
|
||||||
|
queue = Thread.current[:discourse_nested_job_queue]
|
||||||
|
outermost_job = !queue
|
||||||
|
|
||||||
|
if outermost_job
|
||||||
|
queue = Queue.new
|
||||||
|
Thread.current[:discourse_nested_job_queue] = queue
|
||||||
|
end
|
||||||
|
|
||||||
|
queue.push([klass, opts])
|
||||||
|
|
||||||
|
if outermost_job
|
||||||
|
# responsible for executing the queue
|
||||||
|
begin
|
||||||
|
until queue.empty?
|
||||||
|
queued_klass, queued_opts = queue.pop(true)
|
||||||
|
queued_klass.new.perform(queued_opts)
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
Thread.current[:discourse_nested_job_queue] = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -47,4 +47,41 @@ describe ::Jobs::Base do
|
||||||
::Jobs::Base.new.perform('hello' => 'world', 'sync_exec' => true)
|
::Jobs::Base.new.perform('hello' => 'world', 'sync_exec' => true)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "with fake jobs" do
|
||||||
|
let(:common_state) { [] }
|
||||||
|
|
||||||
|
let(:test_job_1) {
|
||||||
|
Class.new(Jobs::Base).tap do |klass|
|
||||||
|
state = common_state
|
||||||
|
klass.define_method(:execute) do |args|
|
||||||
|
state << "job_1_executed"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
}
|
||||||
|
|
||||||
|
let(:test_job_2) {
|
||||||
|
Class.new(Jobs::Base).tap do |klass|
|
||||||
|
state = common_state
|
||||||
|
job_1 = test_job_1
|
||||||
|
klass.define_method(:execute) do |args|
|
||||||
|
state << "job_2_started"
|
||||||
|
Jobs.enqueue(job_1)
|
||||||
|
state << "job_2_finished"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
}
|
||||||
|
|
||||||
|
it "runs jobs synchronously sequentially in tests" do
|
||||||
|
Jobs.run_immediately!
|
||||||
|
Jobs.enqueue(test_job_2)
|
||||||
|
|
||||||
|
expect(common_state).to eq([
|
||||||
|
"job_2_started",
|
||||||
|
"job_2_finished",
|
||||||
|
"job_1_executed"
|
||||||
|
])
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue