DEV: Limit concurrency of NotifyReviewables job ()

Under scenarios of extremely high load where large numbers of `Reviewable*` items are being created, it has been observed that multiple instances of the `NotifyReviewable` job may run simultaneously.

These jobs will work satisfactorily if the concurrency is limited to 1, and the different types of jobs (items reviewable by admins, vs moderators, vs particular groups, etc.) are run eventually.

This change introduces a new option to `DistributedMutex` which allows the `max_get_lock_attempts` to be specified. If the number is exceeded an error will be raised, which will cause Sidekiq to requeue the job. Sidekiq has existing logic to back-off on retry times for jobs that have failed multiple times.
This commit is contained in:
jbrw 2023-01-25 15:19:11 -05:00 committed by GitHub
parent e6a41150e2
commit f8863b0f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 42 deletions

View File

@ -22,58 +22,60 @@ class Jobs::NotifyReviewable < ::Jobs::Base
end end
end end
counts = Hash.new(0) DistributedMutex.synchronize("notify_reviewable_job", validity: 10, max_get_lock_attempts: 1) do
counts = Hash.new(0)
Reviewable.default_visible.pending.each do |r| Reviewable.default_visible.pending.each do |r|
counts[:admins] += 1 counts[:admins] += 1
counts[:moderators] += 1 if r.reviewable_by_moderator? counts[:moderators] += 1 if r.reviewable_by_moderator?
counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id
end end
if SiteSetting.legacy_navigation_menu?
notify_legacy(
User.real.admins.pluck(:id),
count: counts[:admins],
updates: all_updates[:admins],
)
else
notify_users(User.real.admins, all_updates[:admins])
end
if reviewable.reviewable_by_moderator?
if SiteSetting.legacy_navigation_menu? if SiteSetting.legacy_navigation_menu?
notify_legacy( notify_legacy(
User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id), User.real.admins.pluck(:id),
count: counts[:moderators], count: counts[:admins],
updates: all_updates[:moderators], updates: all_updates[:admins],
) )
else else
notify_users( notify_users(User.real.admins, all_updates[:admins])
User.real.moderators.where("id NOT IN (?)", @contacted),
all_updates[:moderators],
)
end end
end
if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group)
users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted)
users.find_each do |user|
count = 0
updates = {}
user.group_users.each do |gu|
updates.merge!(all_updates[gu.group_id])
count += counts[gu.group_id]
end
if reviewable.reviewable_by_moderator?
if SiteSetting.legacy_navigation_menu? if SiteSetting.legacy_navigation_menu?
notify_legacy([user.id], count: count, updates: updates) notify_legacy(
User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id),
count: counts[:moderators],
updates: all_updates[:moderators],
)
else else
notify_user(user, updates) notify_users(
User.real.moderators.where("id NOT IN (?)", @contacted),
all_updates[:moderators],
)
end end
end end
@contacted += users.pluck(:id) if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group)
users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted)
users.find_each do |user|
count = 0
updates = {}
user.group_users.each do |gu|
updates.merge!(all_updates[gu.group_id])
count += counts[gu.group_id]
end
if SiteSetting.legacy_navigation_menu?
notify_legacy([user.id], count: count, updates: updates)
else
notify_user(user, updates)
end
end
@contacted += users.pluck(:id)
end
end end
end end

View File

@ -30,16 +30,28 @@ class DistributedMutex
end end
LUA LUA
def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk) def self.synchronize(
self.new(key, redis: redis, validity: validity).synchronize(&blk) key,
redis: nil,
validity: DEFAULT_VALIDITY,
max_get_lock_attempts: nil,
&blk
)
self.new(
key,
redis: redis,
validity: validity,
max_get_lock_attempts: max_get_lock_attempts,
).synchronize(&blk)
end end
def initialize(key, redis: nil, validity: DEFAULT_VALIDITY) def initialize(key, redis: nil, validity: DEFAULT_VALIDITY, max_get_lock_attempts: nil)
@key = key @key = key
@using_global_redis = true if !redis @using_global_redis = true if !redis
@redis = redis || Discourse.redis @redis = redis || Discourse.redis
@mutex = Mutex.new @mutex = Mutex.new
@validity = validity @validity = validity
@max_get_lock_attempts = max_get_lock_attempts
end end
# NOTE wrapped in mutex to maintain its semantics # NOTE wrapped in mutex to maintain its semantics
@ -69,11 +81,15 @@ class DistributedMutex
result result
end end
class MaximumAttemptsExceeded < StandardError
end
private private
attr_reader :key attr_reader :key
attr_reader :redis attr_reader :redis
attr_reader :validity attr_reader :validity
attr_reader :max_get_lock_attempts
def get_lock def get_lock
attempts = 0 attempts = 0
@ -92,6 +108,10 @@ class DistributedMutex
if @using_global_redis && Discourse.recently_readonly? && attempts > CHECK_READONLY_ATTEMPTS if @using_global_redis && Discourse.recently_readonly? && attempts > CHECK_READONLY_ATTEMPTS
raise Discourse::ReadOnly raise Discourse::ReadOnly
end end
if max_get_lock_attempts && attempts > max_get_lock_attempts
raise DistributedMutex::MaximumAttemptsExceeded
end
end end
end end