discourse/app/jobs/regular/notify_reviewable.rb
jbrw f8863b0f98
DEV: Limit concurrency of NotifyReviewables job (#19968)
Under scenarios of extremely high load where large numbers of `Reviewable*` items are being created, it has been observed that multiple instances of the `NotifyReviewable` job may run simultaneously.

These jobs will work satisfactorily if the concurrency is limited to 1, and the different types of jobs (items reviewable by admins, vs moderators, vs particular groups, etc.) are run eventually.

This change introduces a new option to `DistributedMutex` which allows the `max_get_lock_attempts` to be specified. If the number is exceeded an error will be raised, which will cause Sidekiq to requeue the job. Sidekiq has existing logic to back-off on retry times for jobs that have failed multiple times.
2023-01-25 15:19:11 -05:00

103 lines
3.1 KiB
Ruby

# frozen_string_literal: true
class Jobs::NotifyReviewable < ::Jobs::Base
# remove all the legacy stuff here when redesigned_user_menu_enabled is
# removed
def execute(args)
return unless reviewable = Reviewable.find_by(id: args[:reviewable_id])
@contacted = Set.new
all_updates = Hash.new { |h, k| h[k] = {} }
if args[:updated_reviewable_ids].present?
Reviewable
.where(id: args[:updated_reviewable_ids])
.each do |r|
payload = { last_performing_username: args[:performing_username], status: r.status }
all_updates[:admins][r.id] = payload
all_updates[:moderators][r.id] = payload if r.reviewable_by_moderator?
all_updates[r.reviewable_by_group_id][r.id] = payload if r.reviewable_by_group_id
end
end
DistributedMutex.synchronize("notify_reviewable_job", validity: 10, max_get_lock_attempts: 1) do
counts = Hash.new(0)
Reviewable.default_visible.pending.each do |r|
counts[:admins] += 1
counts[:moderators] += 1 if r.reviewable_by_moderator?
counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id
end
if SiteSetting.legacy_navigation_menu?
notify_legacy(
User.real.admins.pluck(:id),
count: counts[:admins],
updates: all_updates[:admins],
)
else
notify_users(User.real.admins, all_updates[:admins])
end
if reviewable.reviewable_by_moderator?
if SiteSetting.legacy_navigation_menu?
notify_legacy(
User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id),
count: counts[:moderators],
updates: all_updates[:moderators],
)
else
notify_users(
User.real.moderators.where("id NOT IN (?)", @contacted),
all_updates[:moderators],
)
end
end
if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group)
users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted)
users.find_each do |user|
count = 0
updates = {}
user.group_users.each do |gu|
updates.merge!(all_updates[gu.group_id])
count += counts[gu.group_id]
end
if SiteSetting.legacy_navigation_menu?
notify_legacy([user.id], count: count, updates: updates)
else
notify_user(user, updates)
end
end
@contacted += users.pluck(:id)
end
end
end
protected
def notify_legacy(user_ids, count:, updates:)
return if user_ids.blank?
data = { reviewable_count: count }
data[:updates] = updates if updates.present?
MessageBus.publish("/reviewable_counts", data, user_ids: user_ids)
@contacted += user_ids
end
def notify_users(users, updates)
users.find_each { |user| notify_user(user, updates) }
@contacted += users.pluck(:id)
end
def notify_user(user, updates)
user.publish_reviewable_counts(updates.present? ? { updates: updates } : nil)
end
end