DEV: Change Topic Timer from enqueue_at scheduled jobs to incrementally executed jobs (#11698)

Moves the topic timer jobs from being scheduled ahead of time with enqueue_at to a 5 minute scheduled run like bookmark reminders, in a new job called Jobs::EnqueueTopicTimers. Backwards compatibility is maintained by checking if an existing topic timer job is enqueued in sidekiq for the timer, and if it is not running it inside the new job.

The functionality to close/open a topic if it is in the opposite state still remains in the after_save block of TopicTimer, with further commentary, which is used for Open/Close Temporarily.

This also removes the ensure_consistency! functionality of topic timers as it is no longer needed; the new job will always pick up the timers because they are not stored in a fragile state of sidekiq.
This commit is contained in:
Martin Brennan 2021-01-19 13:30:58 +10:00 committed by GitHub
parent 5fd1001bfd
commit 0034cbda8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 278 additions and 244 deletions

View File

@ -1,22 +1,13 @@
# frozen_string_literal: true
module Jobs
class BumpTopic < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
topic = topic_timer&.topic
if topic_timer.blank? || topic.blank? || topic_timer.execute_at > Time.zone.now
return
end
class BumpTopic < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
if Guardian.new(topic_timer.user).can_create_post_on_topic?(topic)
topic.add_small_action(Discourse.system_user, "autobumped", nil, bump: true)
end
topic_timer.trash!(Discourse.system_user)
end
end
end

View File

@ -1,22 +1,8 @@
# frozen_string_literal: true
module Jobs
class ClearSlowMode < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
if topic_timer.nil? || topic_timer.execute_at > Time.zone.now
return
end
topic = topic_timer&.topic
if topic.nil?
topic_timer.destroy!
return
end
class ClearSlowMode < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
topic.update!(slow_mode_seconds: 0)
topic_timer.trash!(Discourse.system_user)
end

View File

@ -1,16 +1,12 @@
# frozen_string_literal: true
module Jobs
class CloseTopic < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
return if !topic_timer&.runnable?
topic = topic_timer.topic
class CloseTopic < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
silent = @args[:silent]
user = topic_timer.user
silent = args[:silent]
if topic.blank? || topic.closed?
if topic.closed?
topic_timer.destroy!
return
end

View File

@ -1,17 +1,8 @@
# frozen_string_literal: true
module Jobs
class DeleteReplies < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
topic = topic_timer&.topic
if topic_timer.blank? || topic.blank? || topic_timer.execute_at > Time.zone.now
return
end
class DeleteReplies < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
unless Guardian.new(topic_timer.user).is_staff?
topic_timer.trash!(Discourse.system_user)
return

View File

@ -1,23 +1,17 @@
# frozen_string_literal: true
module Jobs
class DeleteTopic < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
topic = topic_timer&.topic
if topic_timer.blank? || topic.blank? || topic_timer.execute_at > Time.zone.now
return
end
class DeleteTopic < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
if Guardian.new(topic_timer.user).can_delete?(topic)
first_post = topic.ordered_posts.first
PostDestroyer.new(topic_timer.user, first_post, context: I18n.t("topic_statuses.auto_deleted_by_timer")).destroy
PostDestroyer.new(
topic_timer.user, first_post, context: I18n.t("topic_statuses.auto_deleted_by_timer")
).destroy
topic_timer.trash!(Discourse.system_user)
end
end
end
end

View File

@ -1,19 +1,10 @@
# frozen_string_literal: true
module Jobs
class OpenTopic < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
return if !topic_timer&.runnable?
topic = topic_timer.topic
class OpenTopic < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
user = topic_timer.user
if topic.blank?
topic_timer.destroy!
return
end
if !Guardian.new(user).can_open_topic?(topic) || topic.open?
topic_timer.destroy!
topic.reload

View File

@ -1,14 +1,8 @@
# frozen_string_literal: true
module Jobs
class PublishTopicToCategory < ::Jobs::Base
def execute(args)
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
return if topic_timer.blank?
topic = topic_timer.topic
return if topic.blank?
class PublishTopicToCategory < ::Jobs::TopicTimerBase
def execute_timer_action(topic_timer, topic)
return unless Guardian.new(topic_timer.user).can_see?(topic)
TopicTimer.transaction do

View File

@ -0,0 +1,20 @@
# frozen_string_literal: true
module Jobs
class TopicTimerBase < ::Jobs::Base
def execute(args)
@args = args
topic_timer = TopicTimer.find_by(id: args[:topic_timer_id])
return if !topic_timer&.runnable?
topic = topic_timer.topic
if topic.blank?
topic_timer.destroy!
return
end
execute_timer_action(topic_timer, topic)
end
end
end

View File

@ -24,9 +24,6 @@ module Jobs
args[:max_topic_length] = 500 unless self.class.should_update_long_topics?
ScoreCalculator.new.calculate(args)
# Re-run stuff that we missed
TopicTimer.ensure_consistency!
# Forces rebake of old posts where needed, as long as no system avatars need updating
if !SiteSetting.automatically_download_gravatars || !UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
problems = Post.rebake_old(SiteSetting.rebake_old_posts_count, priority: :ultra_low)

View File

@ -0,0 +1,25 @@
# frozen_string_literal: true
module Jobs
# Runs periodically to look through topic timers that are ready to execute,
# and enqueues their related jobs.
#
# Any leftovers will be caught in the next run, because execute_at will
# be < now, and topic timers that have run are deleted on completion or
# otherwise have their execute_at time modified.
class TopicTimerEnqueuer < ::Jobs::Scheduled
every 1.minute
def execute(_args = nil)
timers = TopicTimer.pending_timers
timers.find_each do |timer|
# the typed job may not enqueue if it has already
# been scheduled with enqueue_at
timer.enqueue_typed_job
end
end
end
end

View File

@ -1357,6 +1357,8 @@ class Topic < ActiveRecord::Base
end
if self.persisted?
# See TopicTimer.after_save for additional context; the topic
# status may be changed by saving.
topic_timer.save!
else
self.topic_timers << topic_timer

View File

@ -18,6 +18,9 @@ class TopicTimer < ActiveRecord::Base
validate :executed_at_in_future?
scope :scheduled_bump_topics, -> { where(status_type: TopicTimer.types[:bump], deleted_at: nil).pluck(:topic_id) }
scope :pending_timers, ->(before_time = Time.now.utc) do
where("execute_at <= :before_time AND deleted_at IS NULL", before_time: before_time)
end
before_save do
self.created_at ||= Time.zone.now if execute_at
@ -27,21 +30,68 @@ class TopicTimer < ActiveRecord::Base
!attribute_in_database(:execute_at).nil?) ||
will_save_change_to_user_id?
# private implementation detail have to use send
# TODO(martin - 2021-05-01) - Remove this backwards compatability for outstanding
# jobs once they have all been run and after Jobs::TopicTimerEnqueuer is in place
self.send("cancel_auto_#{self.class.types[status_type]}_job")
end
end
# These actions are in place to make sure the topic is in the correct
# state at the point in time where the timer is saved. It does not
# guarantee that the topic will be in the correct state when the timer
# job is executed, but each timer job handles deleted topics etc. gracefully.
#
# This is also important for the Open Temporarily and Close Temporarily timers,
# which change the topic's status straight away and set a timer to do the
# opposite action in the future.
after_save do
if (saved_change_to_execute_at? || saved_change_to_user_id?)
now = Time.zone.now
time = execute_at < now ? now : execute_at
# private implementation detail have to use send
self.send("schedule_auto_#{self.class.types[status_type]}_job", time)
if status_type == TopicTimer.types[:silent_close] || status_type == TopicTimer.types[:close]
topic.update_status('closed', false, user) if topic.closed?
end
if status_type == TopicTimer.types[:open]
topic.update_status('closed', true, user) if topic.open?
end
end
end
def status_type_name
self.class.types[status_type]
end
def enqueue_typed_job(time: nil)
return if typed_job_scheduled?
self.send("schedule_auto_#{status_type_name}_job")
end
# TODO(martin - 2021-05-01) - Remove this backwards compatability for outstanding
# jobs once they have all been run and after Jobs::TopicTimerEnqueuer is in place
def typed_job_scheduled?
scheduled = Jobs.scheduled_for(
TopicTimer.type_job_map[status_type_name], topic_timer_id: id
).any?
if [:close, :silent_close, :open].include?(status_type_name)
return scheduled || Jobs.scheduled_for(:toggle_topic_closed, topic_timer_id: id).any?
end
scheduled
end
def self.type_job_map
{
close: :close_topic,
open: :open_topic,
publish_to_category: :publish_topic_to_category,
delete: :delete_topic,
reminder: :topic_reminder,
bump: :bump_topic,
delete_replies: :delete_replies,
silent_close: :close_topic,
clear_slow_mode: :clear_slow_mode
}
end
def self.types
@types ||= Enum.new(
close: 1,
@ -64,18 +114,6 @@ class TopicTimer < ActiveRecord::Base
@_private_types ||= types.only(:reminder, :clear_slow_mode)
end
def self.ensure_consistency!
TopicTimer.where("topic_timers.execute_at < ?", Time.zone.now)
.find_each do |topic_timer|
# private implementation detail scoped to class
topic_timer.send(
"schedule_auto_#{self.types[topic_timer.status_type]}_job",
topic_timer.execute_at
)
end
end
def public_type?
!!self.class.public_types[self.status_type]
end
@ -100,6 +138,10 @@ class TopicTimer < ActiveRecord::Base
))
end
def publishing_to_category?
self.status_type.to_i == TopicTimer.types[:publish_to_category]
end
# TODO(martin - 2021-05-01) - Remove cancels for toggle_topic_closed once topic timer revamp completed.
def cancel_auto_close_job
Jobs.cancel_scheduled_job(:toggle_topic_closed, topic_timer_id: id)
@ -119,73 +161,63 @@ class TopicTimer < ActiveRecord::Base
end
def cancel_auto_publish_to_category_job
Jobs.cancel_scheduled_job(:publish_topic_to_category, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:publish_to_category], topic_timer_id: id)
end
def cancel_auto_delete_job
Jobs.cancel_scheduled_job(:delete_topic, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:delete], topic_timer_id: id)
end
def cancel_auto_reminder_job
Jobs.cancel_scheduled_job(:topic_reminder, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:reminder], topic_timer_id: id)
end
def cancel_auto_bump_job
Jobs.cancel_scheduled_job(:bump_topic, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:bump], topic_timer_id: id)
end
def cancel_auto_delete_replies_job
Jobs.cancel_scheduled_job(:delete_replies, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:delete_replies], topic_timer_id: id)
end
def cancel_auto_clear_slow_mode_job
Jobs.cancel_scheduled_job(:clear_slow_mode, topic_timer_id: id)
Jobs.cancel_scheduled_job(TopicTimer.type_job_map[:clear_slow_mode], topic_timer_id: id)
end
def schedule_auto_delete_replies_job(time)
Jobs.enqueue_at(time, :delete_replies, topic_timer_id: id)
def schedule_auto_delete_replies_job
Jobs.enqueue(TopicTimer.type_job_map[:delete_replies], topic_timer_id: id)
end
def schedule_auto_bump_job(time)
Jobs.enqueue_at(time, :bump_topic, topic_timer_id: id)
def schedule_auto_bump_job
Jobs.enqueue(TopicTimer.type_job_map[:bump], topic_timer_id: id)
end
def schedule_auto_open_job(time)
topic.update_status('closed', true, user) if topic && !topic.closed
Jobs.enqueue_at(time, :open_topic, topic_timer_id: id)
def schedule_auto_open_job
Jobs.enqueue(TopicTimer.type_job_map[:open], topic_timer_id: id)
end
def schedule_auto_close_job(time)
topic.update_status('closed', false, user) if topic&.closed
Jobs.enqueue_at(time, :close_topic, topic_timer_id: id)
def schedule_auto_close_job
Jobs.enqueue(TopicTimer.type_job_map[:close], topic_timer_id: id)
end
def schedule_auto_silent_close_job(time)
topic.update_status('closed', false, user) if topic&.closed
Jobs.enqueue_at(time, :close_topic, topic_timer_id: id, silent: true)
def schedule_auto_silent_close_job
Jobs.enqueue(TopicTimer.type_job_map[:close], topic_timer_id: id, silent: true)
end
def schedule_auto_publish_to_category_job(time)
Jobs.enqueue_at(time, :publish_topic_to_category, topic_timer_id: id)
def schedule_auto_publish_to_category_job
Jobs.enqueue(TopicTimer.type_job_map[:publish_to_category], topic_timer_id: id)
end
def publishing_to_category?
self.status_type.to_i == TopicTimer.types[:publish_to_category]
def schedule_auto_delete_job
Jobs.enqueue(TopicTimer.type_job_map[:delete], topic_timer_id: id)
end
def schedule_auto_delete_job(time)
Jobs.enqueue_at(time, :delete_topic, topic_timer_id: id)
end
def schedule_auto_reminder_job(time)
def schedule_auto_reminder_job
# noop, TODO(martin 2021-03-11): Remove this after timers migrated and outstanding jobs cancelled
end
def schedule_auto_clear_slow_mode_job(time)
Jobs.enqueue_at(time, :clear_slow_mode, topic_timer_id: id)
def schedule_auto_clear_slow_mode_job
Jobs.enqueue(TopicTimer.type_job_map[:clear_slow_mode], topic_timer_id: id)
end
end

View File

@ -42,10 +42,6 @@ describe Topic do
expect(topic_status_update.topic).to eq(topic)
expect(topic.public_topic_timer.execute_at).to eq_time(2.hours.from_now)
args = job_klass.jobs.last['args'].first
expect(args["topic_timer_id"]).to eq(topic.public_topic_timer.id)
end
context 'topic was created by staff user' do
@ -60,10 +56,6 @@ describe Topic do
expect(topic_status_update.topic).to eq(staff_topic)
expect(topic_status_update.execute_at).to eq_time(2.hours.from_now)
expect(topic_status_update.user).to eq(Discourse.system_user)
args = job_klass.jobs.last['args'].first
expect(args["topic_timer_id"]).to eq(topic_status_update.id)
end
context 'topic is closed manually' do
@ -90,10 +82,6 @@ describe Topic do
expect(topic_status_update.topic).to eq(regular_user_topic)
expect(topic_status_update.execute_at).to eq_time(2.hours.from_now)
expect(topic_status_update.user).to eq(Discourse.system_user)
args = job_klass.jobs.last['args'].first
expect(args["topic_timer_id"]).to eq(topic_status_update.id)
end
end
end

View File

@ -12,7 +12,9 @@ RSpec.describe Jobs::PublishTopicToCategory do
Fabricate(:topic_timer,
status_type: TopicTimer.types[:publish_to_category],
category_id: another_category.id,
topic: topic
topic: topic,
execute_at: 1.minute.ago,
created_at: 5.minutes.ago
)
Fabricate(:post, topic: topic)

View File

@ -0,0 +1,50 @@
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe Jobs::TopicTimerEnqueuer do
subject { described_class.new }
fab!(:timer1) do
Fabricate(:topic_timer, execute_at: 1.minute.ago, created_at: 1.hour.ago, status_type: TopicTimer.types[:close])
end
fab!(:timer2) do
Fabricate(:topic_timer, execute_at: 1.minute.ago, created_at: 1.hour.ago, status_type: TopicTimer.types[:open])
end
fab!(:future_timer) do
Fabricate(:topic_timer, execute_at: 1.hours.from_now, created_at: 1.hour.ago, status_type: TopicTimer.types[:close])
end
fab!(:deleted_timer) do
Fabricate(:topic_timer, execute_at: 1.minute.ago, created_at: 1.hour.ago, status_type: TopicTimer.types[:close])
end
before do
deleted_timer.trash!
end
it "does not enqueue deleted timers" do
expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: deleted_timer.id })
subject.execute
expect(deleted_timer.topic.reload.closed?).to eq(false)
end
it "does not enqueue future timers" do
expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: future_timer.id })
subject.execute
expect(future_timer.topic.reload.closed?).to eq(false)
end
it "enqueues the related job" do
expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: deleted_timer.id })
expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: future_timer.id })
subject.execute
expect_job_enqueued(job: :close_topic, args: { topic_timer_id: timer1.id })
expect_job_enqueued(job: :open_topic, args: { topic_timer_id: timer2.id })
end
it "does not re-enqueue a job that has already been scheduled ahead of time in sidekiq (legacy topic timers)" do
expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: timer1.id })
Jobs.enqueue_at(1.hours.from_now, :close_topic, topic_timer_id: timer1.id)
subject.execute
end
end

View File

@ -764,9 +764,7 @@ describe PostAction do
freeze_time timer.execute_at
expect_enqueued_with(job: :open_topic, args: { topic_timer_id: timer.id }, at: Time.zone.now + 1.hour) do
Jobs::OpenTopic.new.execute(topic_timer_id: timer.id)
end
Jobs::OpenTopic.new.execute(topic_timer_id: timer.id)
expect(topic.reload.closed).to eq(true)
expect(timer.reload.execute_at).to eq_time(1.hour.from_now)

View File

@ -1876,7 +1876,7 @@ describe Topic do
freeze_time 3.hours.from_now
TopicTimer.ensure_consistency!
Jobs::TopicTimerEnqueuer.new.execute
expect(topic.reload.closed).to eq(true)
end
end

View File

@ -10,6 +10,20 @@ RSpec.describe TopicTimer, type: :model do
before { freeze_time }
context "validations" do
describe "pending_timers scope" do
it "does not return deleted timers" do
topic_timer.trash!
expect(TopicTimer.pending_timers.pluck(:id)).not_to include(topic_timer.id)
end
it "does not return timers in the future of the provided before time" do
topic_timer.update!(execute_at: 3.days.from_now)
expect(TopicTimer.pending_timers.pluck(:id)).not_to include(topic_timer.id)
expect(TopicTimer.pending_timers(2.days.from_now).pluck(:id)).not_to include(topic_timer.id)
topic_timer.update!(execute_at: 1.minute.ago, created_at: 10.minutes.ago)
expect(TopicTimer.pending_timers.pluck(:id)).to include(topic_timer.id)
end
end
describe '#status_type' do
it 'should ensure that only one active public topic status update exists' do
topic_timer.update!(topic: topic)
@ -20,6 +34,44 @@ RSpec.describe TopicTimer, type: :model do
end
end
describe "#typed_job_scheduled?" do
let(:scheduled_jobs) { Sidekiq::ScheduledSet.new }
after do
scheduled_jobs.clear
end
it "returns true if the job is scheduled" do
Sidekiq::Testing.disable! do
scheduled_jobs.clear
Jobs.enqueue_at(3.hours.from_now, :close_topic, topic_timer_id: topic_timer.id)
expect(topic_timer.typed_job_scheduled?).to eq(true)
end
end
it "returns false if the job is not already scheduled" do
Sidekiq::Testing.disable! do
scheduled_jobs.clear
expect(topic_timer.typed_job_scheduled?).to eq(false)
end
end
it "returns true if the toggle_topic_closed job is scheduled for close,open,silent_close types" do
Sidekiq::Testing.disable! do
scheduled_jobs.clear
topic_timer1 = Fabricate(:topic_timer, status_type: TopicTimer.types[:close])
Jobs.enqueue_at(3.hours.from_now, :toggle_topic_closed, topic_timer_id: topic_timer1.id)
topic_timer2 = Fabricate(:topic_timer, status_type: TopicTimer.types[:open])
Jobs.enqueue_at(3.hours.from_now, :toggle_topic_closed, topic_timer_id: topic_timer2.id)
topic_timer3 = Fabricate(:topic_timer, status_type: TopicTimer.types[:silent_close])
Jobs.enqueue_at(3.hours.from_now, :toggle_topic_closed, topic_timer_id: topic_timer3.id)
expect(topic_timer1.typed_job_scheduled?).to eq(true)
expect(topic_timer2.typed_job_scheduled?).to eq(true)
expect(topic_timer3.typed_job_scheduled?).to eq(true)
end
end
end
describe '#execute_at' do
describe 'when #execute_at is greater than #created_at' do
it 'should be valid' do
@ -95,20 +147,7 @@ RSpec.describe TopicTimer, type: :model do
:close_topic, topic_timer_id: topic_timer.id
)
expect_enqueued_with(job: :close_topic, args: { topic_timer_id: topic_timer.id }, at: 3.days.from_now) do
topic_timer.update!(execute_at: 3.days.from_now, created_at: Time.zone.now)
end
end
describe 'when execute_at is smaller than the current time' do
it 'should enqueue the job immediately' do
expect_enqueued_with(job: :close_topic, args: { topic_timer_id: topic_timer.id }, at: Time.zone.now) do
topic_timer.update!(
execute_at: Time.zone.now - 1.hour,
created_at: Time.zone.now - 2.hour
)
end
end
topic_timer.update!(execute_at: 3.days.from_now, created_at: Time.zone.now)
end
end
@ -121,9 +160,7 @@ RSpec.describe TopicTimer, type: :model do
:close_topic, topic_timer_id: topic_timer.id
)
expect_enqueued_with(job: :close_topic, args: { topic_timer_id: topic_timer.id }, at: topic_timer.execute_at) do
topic_timer.update!(user: admin)
end
topic_timer.update!(user: admin)
end
end
@ -141,18 +178,9 @@ RSpec.describe TopicTimer, type: :model do
end
it 'should close the topic' do
topic_timer
topic_timer.send(:schedule_auto_open_job)
expect(topic.reload.closed).to eq(true)
end
describe 'when topic has been deleted' do
it 'should not queue the job' do
topic.trash!
topic_timer
expect(Jobs::ToggleTopicClosed.jobs).to eq([])
end
end
end
describe 'when a close topic status update is created for a closed topic' do
@ -169,18 +197,9 @@ RSpec.describe TopicTimer, type: :model do
end
it 'should open the topic' do
topic_timer
topic_timer.send(:schedule_auto_close_job)
expect(topic.reload.closed).to eq(false)
end
describe 'when topic has been deleted' do
it 'should not queue the job' do
topic.trash!
topic_timer
expect(Jobs::ToggleTopicClosed.jobs).to eq([])
end
end
end
describe '#public_type' do
@ -205,62 +224,6 @@ RSpec.describe TopicTimer, type: :model do
end
end
describe '.ensure_consistency!' do
it 'should enqueue jobs that have been missed' do
close_topic_timer = Fabricate(:topic_timer,
execute_at: Time.zone.now - 1.hour,
created_at: Time.zone.now - 2.hour
)
open_topic_timer = Fabricate(:topic_timer,
status_type: described_class.types[:open],
execute_at: Time.zone.now - 1.hour,
created_at: Time.zone.now - 2.hour,
topic: Fabricate(:topic, closed: true)
)
Fabricate(:topic_timer, execute_at: Time.zone.now + 1.hour)
trashed_close_topic_timer = Fabricate(:topic_timer,
execute_at: Time.zone.now - 1.hour,
created_at: Time.zone.now - 2.hour
)
trashed_close_topic_timer.topic.trash!
trashed_open_topic_timer = Fabricate(:topic_timer,
execute_at: Time.zone.now - 1.hour,
created_at: Time.zone.now - 2.hour,
status_type: described_class.types[:open]
)
trashed_open_topic_timer.topic.trash!
# creating topic timers already enqueues jobs
# let's delete them to test ensure_consistency!
Sidekiq::Worker.clear_all
expect { described_class.ensure_consistency! }
.to change { Jobs::CloseTopic.jobs.count }.by(2).and change { Jobs::OpenTopic.jobs.count }.by(2)
expect(job_enqueued?(job: :close_topic, args: {
topic_timer_id: close_topic_timer.id
})).to eq(true)
expect(job_enqueued?(job: :open_topic, args: {
topic_timer_id: open_topic_timer.id
})).to eq(true)
expect(job_enqueued?(job: :close_topic, args: {
topic_timer_id: trashed_close_topic_timer.id
})).to eq(true)
expect(job_enqueued?(job: :open_topic, args: {
topic_timer_id: trashed_open_topic_timer.id
})).to eq(true)
end
end
describe "runnable?" do
it "returns false if execute_at > now" do
topic_timer = Fabricate.build(:topic_timer,

View File

@ -47,7 +47,7 @@ module SidekiqHelpers
# end
def expect_not_enqueued_with(job:, args: {}, at: nil)
expect_enqueued_with(job: job, args: args, at: at, expectation: false) do
yield
yield if block_given?
end
end
@ -61,6 +61,20 @@ module SidekiqHelpers
match_jobs(jobs: klass.jobs, args: args, at: at)
end
# Same as job_enqueued? except it checks the expectation is true.
# Use this if you need to check if more than one job is enqueued from
# a single command, unlike expect_enqueued_with which needs a block
# to run code for the expectation to work. E.g.
#
# expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: deleted_timer.id })
# expect_not_enqueued_with(job: :close_topic, args: { topic_timer_id: future_timer.id })
# subject.execute
# expect_job_enqueued(job: :close_topic, args: { topic_timer_id: timer1.id })
# expect_job_enqueued(job: :open_topic, args: { topic_timer_id: timer2.id })
def expect_job_enqueued(job:, args: {}, at: nil)
expect(job_enqueued?(job: job, args: args, at: at)).to eq(true)
end
private
def match_jobs(jobs:, args:, at:)