Implement enqueue_after
This commit is contained in:
parent
db83473f68
commit
57bee37db3
|
@ -461,6 +461,45 @@ module Jobs
|
|||
enqueue_in(secs, job_name, opts)
|
||||
end
|
||||
|
||||
@@enqueue_after = []
|
||||
|
||||
def self.process_enqueue_after
|
||||
@@enqueue_after.filter! do |job_ids, job_name, opts|
|
||||
pending_job_ids = Set.new
|
||||
|
||||
Sidekiq::Queue.all.each do |queue|
|
||||
queue.each { |job| pending_job_ids << job.jid if job_ids.include?(job.jid) }
|
||||
end
|
||||
|
||||
if pending_job_ids.empty?
|
||||
enqueue(job_name, opts)
|
||||
false
|
||||
else
|
||||
true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.ensure_enqueue_after_thread!
|
||||
@enqueue_after_thread ||=
|
||||
Thread.new do
|
||||
loop do
|
||||
process_enqueue_after
|
||||
sleep 10
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.enqueue_after(job_ids, job_name, opts = {})
|
||||
@@enqueue_after << [Set.new(job_ids), job_name, opts]
|
||||
|
||||
if run_immediately?
|
||||
process_enqueue_after
|
||||
else
|
||||
ensure_enqueue_after_thread!
|
||||
end
|
||||
end
|
||||
|
||||
def self.cancel_scheduled_job(job_name, opts = {})
|
||||
scheduled_for(job_name, opts).each(&:delete)
|
||||
end
|
||||
|
|
|
@ -17,14 +17,6 @@ module Jobs
|
|||
error_context({}, code_desc: "Exception granting badges", extra: { badge_id: badge.id }),
|
||||
)
|
||||
end
|
||||
|
||||
# If this instance is the last job to be processed, schedule the
|
||||
# EnsureBadgeConsistency job. This guarantees it runs only once after all
|
||||
# badges have been granted.
|
||||
if Discourse.redis.decr("grant_badge_remaining") <= 0
|
||||
Discourse.redis.del("grant_badge_remaining")
|
||||
Jobs.enqueue(:ensure_badge_consistency)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,10 +7,8 @@ module Jobs
|
|||
def execute(args)
|
||||
return unless SiteSetting.enable_badges
|
||||
|
||||
enabled_badges = Badge.enabled
|
||||
|
||||
Discourse.redis.set("grant_badge_remaining", enabled_badges.count)
|
||||
enabled_badges.find_each { |b| Jobs.enqueue(:grant_badge, badge_id: b.id) }
|
||||
job_ids = Badge.enabled.map { |b| Jobs.enqueue(:grant_badge, badge_id: b.id) }
|
||||
Jobs.enqueue_after(job_ids, :ensure_badge_consistency)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,4 +6,12 @@ RSpec.describe Jobs::GrantAllBadges do
|
|||
|
||||
expect(Jobs::GrantBadge.jobs.size).to eq(Badge.enabled.size)
|
||||
end
|
||||
|
||||
it "schedules a EnsureBadgeConsistency job after all GrantBadge jobs" do
|
||||
Jobs.run_immediately!
|
||||
|
||||
Jobs::EnsureBadgeConsistency.any_instance.expects(:execute).once
|
||||
|
||||
described_class.new.execute({})
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
RSpec.describe Jobs::GrantBadge do
|
||||
subject(:job) { described_class.new }
|
||||
|
||||
it "schedules a EnsureBadgeConsistency job" do
|
||||
# Keep the test fast by only enabling 2 badges
|
||||
badge_ids = Badge.enabled.limit(2).pluck(:id)
|
||||
Badge.where.not(id: badge_ids).update_all(enabled: false)
|
||||
|
||||
# Ensures it starts a new batch of GrantBadge jobs
|
||||
Jobs::GrantAllBadges.new.execute({})
|
||||
expect(Jobs::GrantBadge.jobs.map { |job| job["args"][0]["badge_id"] }).to eq(badge_ids)
|
||||
|
||||
# First GrantBadge job should not enqueue EnsureBadgeConsistency
|
||||
Jobs::GrantBadge.new.execute(badge_id: badge_ids.first)
|
||||
expect(Jobs::EnsureBadgeConsistency.jobs).to be_empty
|
||||
|
||||
# Last GrantBadge job should enqueue EnsureBadgeConsistency
|
||||
Jobs::GrantBadge.new.execute(badge_id: badge_ids.last)
|
||||
expect(Jobs::EnsureBadgeConsistency.jobs).not_to be_empty
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue