discourse/spec/jobs/jobs_base_spec.rb

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

191 lines
5.1 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
RSpec.describe ::Jobs::Base do
class GoodJob < ::Jobs::Base
attr_accessor :count
def execute(args)
self.count ||= 0
self.count += 1
end
end
class BadJob < ::Jobs::Base
class BadJobError < StandardError
end
attr_accessor :fail_count
def execute(args)
@fail_count ||= 0
@fail_count += 1
raise BadJobError
end
end
class ConcurrentJob < ::Jobs::Base
cluster_concurrency 1
def self.stop!
@stop = true
end
def self.stop
@stop
end
def self.running?
@running
end
def self.running=(val)
@running = val
end
def execute(args)
self.class.running = true
sleep 0.0001 while !self.class.stop
ensure
self.class.running = false
end
end
it "handles job concurrency" do
ConcurrentJob.clear_cluster_concurrency_lock!
expect(ConcurrentJob.get_cluster_concurrency).to eq(1)
expect(BadJob.get_cluster_concurrency).to eq(nil)
expect(Sidekiq::Queues["default"].size).to eq(0)
thread = Thread.new { ConcurrentJob.new.perform({ "test" => 100 }) }
wait_for { ConcurrentJob.running? }
ConcurrentJob.new.perform({ "test" => 100 })
DEV: Fix job cluster concurrency spec timing out (#25035) Why this change? On CI, we have been seeing the "handles job concurrency" job timing out on CI after 45 seconds. Upon closer inspection of `Jobs::Base#perform` when cluster concurrency has been set, we see that a thread is spun up to extend the expiring of a redis key by 120 seconds every 60 seconds while the job is still being executed. The thread looks like this before the fix: ``` keepalive_thread = Thread.new do while parent_thread.alive? && !finished Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120) sleep 60 end end ``` In an ensure block of `Jobs::Base#perform`, the thread is stop by doing something like this: ``` finished = true keepalive_thread.wakeup keepalive_thread.join ``` If the thread is sleeping, `keepalive_thread.wakeup` will stop the `sleep` method and run the next iteration causing the thread to complete. However, there is a timing issue at play here. If `keepalive_thread.wakeup` is called at a time when the thread is not sleeping, it will have no effect and the thread may end up sleeping for 60 seconds which is longer than our timeout on CI of 45 seconds. What does this change do? 1. Change `sleep 60` to sleep in intervals of 1 second checking if the job has been finished each time. 2. Add `use_redis_snapshotting` to `Jobs::Base` spec since Redis is involved in scheduling and we want to ensure we don't leak Redis keys. 3. Add `ConcurrentJob.stop!` and `thread.join` to `ensure` block in "handles job concurrency" test since a failing expectation will cause us to not clean up the thread we created in the test.
2023-12-26 01:47:03 -05:00
expect(Sidekiq::Queues["default"].size).to eq(1)
expect(Sidekiq::Queues["default"][0]["args"][0]).to eq("test" => 100)
DEV: Fix job cluster concurrency spec timing out (#25035) Why this change? On CI, we have been seeing the "handles job concurrency" job timing out on CI after 45 seconds. Upon closer inspection of `Jobs::Base#perform` when cluster concurrency has been set, we see that a thread is spun up to extend the expiring of a redis key by 120 seconds every 60 seconds while the job is still being executed. The thread looks like this before the fix: ``` keepalive_thread = Thread.new do while parent_thread.alive? && !finished Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120) sleep 60 end end ``` In an ensure block of `Jobs::Base#perform`, the thread is stop by doing something like this: ``` finished = true keepalive_thread.wakeup keepalive_thread.join ``` If the thread is sleeping, `keepalive_thread.wakeup` will stop the `sleep` method and run the next iteration causing the thread to complete. However, there is a timing issue at play here. If `keepalive_thread.wakeup` is called at a time when the thread is not sleeping, it will have no effect and the thread may end up sleeping for 60 seconds which is longer than our timeout on CI of 45 seconds. What does this change do? 1. Change `sleep 60` to sleep in intervals of 1 second checking if the job has been finished each time. 2. Add `use_redis_snapshotting` to `Jobs::Base` spec since Redis is involved in scheduling and we want to ensure we don't leak Redis keys. 3. Add `ConcurrentJob.stop!` and `thread.join` to `ensure` block in "handles job concurrency" test since a failing expectation will cause us to not clean up the thread we created in the test.
2023-12-26 01:47:03 -05:00
ensure
ConcurrentJob.stop!
thread.join
end
it "handles correct jobs" do
job = GoodJob.new
job.perform({})
expect(job.count).to eq(1)
end
it "handles errors in multisite" do
RailsMultisite::ConnectionManagement.expects(:all_dbs).returns(%w[default default default])
# one exception per database
Discourse.expects(:handle_job_exception).times(3)
bad = BadJob.new
2016-05-29 23:38:04 -04:00
expect { bad.perform({}) }.to raise_error(Jobs::HandledExceptionWrapper)
expect(bad.fail_count).to eq(3)
end
2013-02-05 14:16:51 -05:00
describe "#perform" do
context "when a job raises an error" do
before { Discourse.reset_job_exception_stats! }
after { Discourse.reset_job_exception_stats! }
it "collects stats for failing jobs in Discourse.job_exception_stats" do
bad = BadJob.new
3.times do
# During test env handle_job_exception errors out
# in production this is suppressed
expect { bad.perform({}) }.to raise_error(BadJob::BadJobError)
end
expect(Discourse.job_exception_stats).to eq({ BadJob => 3 })
end
end
end
2013-02-05 14:16:51 -05:00
it "delegates the process call to execute" do
::Jobs::Base.any_instance.expects(:execute).with({ "hello" => "world" })
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 00:41:25 -05:00
::Jobs::Base.new.perform("hello" => "world")
2013-02-05 14:16:51 -05:00
end
it "converts to an indifferent access hash" do
::Jobs::Base.any_instance.expects(:execute).with(instance_of(HashWithIndifferentAccess))
DEV: Fix threading error when running jobs immediately in system tests (#19811) ``` class Jobs::DummyDelayedJob < Jobs::Base def execute(args = {}) end end RSpec.describe "Jobs.run_immediately!" do before { Jobs.run_immediately! } it "explodes" do current_user = Fabricate(:user) Jobs.enqueue_in(1.seconds, :dummy_delayed_job) sign_in(current_user) end end ``` The test above will fail with the following error if `ActiveRecord::Base.connection_handler.clear_active_connections!` is called before the configured Capybara server checks out a connection from the connection pool. ``` ActiveRecord::ActiveRecordError: Cannot expire connection, it is owned by a different thread: #<Thread:0x00007f437391df58@puma srv tp 001 /home/tgxworld/.asdf/installs/ruby/3.1.3/lib/ruby/gems/3.1.0/gems/puma-6.0.2/lib/puma/thread_pool.rb:106 sleep_forever>. Current thread: #<Thread:0x00007f437d6cfc60 run>. ``` We're not exactly sure if this is an ActiveRecord bug or not but we've invested too much time into investigating this problem. Fundamentally, we also no longer understand why `ActiveRecord::Base.connection_handler.clear_active_connections!` is being called in an ensure block within `Jobs::Base#perform` which was added in ceddb6e0da92e874b3bee9543915726f1d27d60b 10 years ago. This commit moves the logic for running jobs immediately out of the `Jobs::Base#perform` method into another `Jobs::Base#perform_immediately` method such that `ActiveRecord::Base.connection_handler.clear_active_connections!` is not called. This change will only impact the test environment.
2023-01-10 00:41:25 -05:00
::Jobs::Base.new.perform("hello" => "world")
2013-02-25 11:42:20 -05:00
end
2013-02-05 14:16:51 -05:00
context "with fake jobs" do
let(:common_state) { [] }
let(:test_job_1) do
Class
.new(Jobs::Base)
.tap do |klass|
state = common_state
klass.define_method(:execute) { |args| state << "job_1_executed" }
end
end
let(:test_job_2) do
Class
.new(Jobs::Base)
.tap do |klass|
state = common_state
job_1 = test_job_1
klass.define_method(:execute) do |args|
state << "job_2_started"
Jobs.enqueue(job_1)
state << "job_2_finished"
end
end
end
it "runs jobs synchronously sequentially in tests" do
Jobs.run_immediately!
Jobs.enqueue(test_job_2)
expect(common_state).to eq(%w[job_2_started job_2_finished job_1_executed])
end
end
context "when `Discourse.enable_sidekiq_logging?` is `true`" do
let(:tmp_log_file) { Tempfile.new("sidekiq.log") }
before do
Discourse.enable_sidekiq_logging
described_class::JobInstrumenter.set_log_path(tmp_log_file.path)
end
after do
Discourse.disable_sidekiq_logging
described_class::JobInstrumenter.reset_log_path
tmp_log_file.close
end
it "should log the job in the sidekiq log file" do
job = GoodJob.new
job.perform({ some_param: "some_value" })
parsed_logline = JSON.parse(File.read(tmp_log_file.path).split("\n").first)
expect(parsed_logline["hostname"]).to be_present
expect(parsed_logline["pid"]).to be_present
expect(parsed_logline["database"]).to eq(RailsMultisite::ConnectionManagement.current_db)
expect(parsed_logline["job_name"]).to eq("GoodJob")
expect(parsed_logline["job_type"]).to eq("regular")
expect(parsed_logline["status"]).to eq("success")
expect(JSON.parse(parsed_logline["opts"])).to eq("some_param" => "some_value")
expect(parsed_logline["duration"]).to be_present
expect(parsed_logline["sql_duration"]).to eq(0)
expect(parsed_logline["sql_calls"]).to eq(0)
expect(parsed_logline["redis_duration"]).to eq(0)
expect(parsed_logline["redis_calls"]).to eq(0)
expect(parsed_logline["net_duration"]).to eq(0)
expect(parsed_logline["net_calls"]).to eq(0)
expect(parsed_logline["live_slots_finish"]).to be_present
expect(parsed_logline["live_slots"]).to be_present
end
end
2013-02-05 14:16:51 -05:00
end