FEATURE: introduce ultra_low priority queue
This commit introduces an ultra low priority queue for post rebakes. This way rebakes can never interfere with regular sidekiq processing for cases where we perform a large scale rebake. Additionally it allows Post.rebake_old to be run with rate_limiter: false to avoid triggering the limiter when rebaking. This is handy for cases where you want to just force the full rebake and not wait for it to trickle
This commit is contained in:
parent
67a7670bab
commit
384135845b
|
@ -29,7 +29,7 @@ module Jobs
|
||||||
|
|
||||||
# Forces rebake of old posts where needed, as long as no system avatars need updating
|
# Forces rebake of old posts where needed, as long as no system avatars need updating
|
||||||
if !SiteSetting.automatically_download_gravatars || !UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
|
if !SiteSetting.automatically_download_gravatars || !UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
|
||||||
problems = Post.rebake_old(SiteSetting.rebake_old_posts_count, priority: :low)
|
problems = Post.rebake_old(SiteSetting.rebake_old_posts_count, priority: :ultra_low)
|
||||||
problems.each do |hash|
|
problems.each do |hash|
|
||||||
post_id = hash[:post].id
|
post_id = hash[:post].id
|
||||||
Discourse.handle_job_exception(hash[:ex], error_context(args, "Rebaking post id #{post_id}", post_id: post_id))
|
Discourse.handle_job_exception(hash[:ex], error_context(args, "Rebaking post id #{post_id}", post_id: post_id))
|
||||||
|
|
|
@ -515,7 +515,7 @@ class Post < ActiveRecord::Base
|
||||||
PostRevisor.new(self).revise!(updated_by, changes, opts)
|
PostRevisor.new(self).revise!(updated_by, changes, opts)
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.rebake_old(limit, priority: :normal)
|
def self.rebake_old(limit, priority: :normal, rate_limiter: true)
|
||||||
|
|
||||||
limiter = RateLimiter.new(
|
limiter = RateLimiter.new(
|
||||||
nil,
|
nil,
|
||||||
|
@ -537,7 +537,7 @@ class Post < ActiveRecord::Base
|
||||||
post.rebake!(priority: priority)
|
post.rebake!(priority: priority)
|
||||||
|
|
||||||
begin
|
begin
|
||||||
limiter.performed!
|
limiter.performed! if rate_limiter
|
||||||
rescue RateLimiter::LimitExceeded
|
rescue RateLimiter::LimitExceeded
|
||||||
break
|
break
|
||||||
end
|
end
|
||||||
|
@ -700,8 +700,8 @@ class Post < ActiveRecord::Base
|
||||||
args[:invalidate_oneboxes] = true if invalidate_oneboxes.present?
|
args[:invalidate_oneboxes] = true if invalidate_oneboxes.present?
|
||||||
args[:cooking_options] = self.cooking_options
|
args[:cooking_options] = self.cooking_options
|
||||||
|
|
||||||
if priority == :low
|
if priority && priority != :normal
|
||||||
args[:queue] = 'low'
|
args[:queue] = priority.to_s
|
||||||
end
|
end
|
||||||
|
|
||||||
Jobs.enqueue(:process_post, args)
|
Jobs.enqueue(:process_post, args)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
PARAMS="$@"
|
PARAMS="$@"
|
||||||
CMD="cd /src && USER=discourse RAILS_ENV=${RAILS_ENV:=development} bundle exec sidekiq -q critical -q low -q default"
|
CMD="cd /src && USER=discourse RAILS_ENV=${RAILS_ENV:=development} bundle exec sidekiq -q critical -q low -q default -q ultra_low"
|
||||||
docker exec -it -u discourse:discourse discourse_dev /bin/bash -c "$CMD"
|
docker exec -it -u discourse:discourse discourse_dev /bin/bash -c "$CMD"
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
development:
|
development:
|
||||||
:concurrency: 5
|
:concurrency: 5
|
||||||
:queues:
|
:queues:
|
||||||
- [critical,4]
|
- [critical, 8]
|
||||||
- [default, 2]
|
- [default, 4]
|
||||||
- [low]
|
- [low, 2]
|
||||||
|
- [ultra_low]
|
||||||
|
|
|
@ -34,7 +34,7 @@ class Demon::Sidekiq < Demon::Base
|
||||||
|
|
||||||
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
options = ["-c", GlobalSetting.sidekiq_workers.to_s]
|
||||||
|
|
||||||
[['critical', 4], ['default', 2], ['low', 1]].each do |queue_name, weight|
|
[['critical', 8], ['default', 4], ['low', 2], ['ultra_low', 1]].each do |queue_name, weight|
|
||||||
custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
|
custom_queue_hostname = ENV["UNICORN_SIDEKIQ_#{queue_name.upcase}_QUEUE_HOSTNAME"]
|
||||||
|
|
||||||
if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)
|
if !custom_queue_hostname || custom_queue_hostname.split(',').include?(`hostname`.strip)
|
||||||
|
|
|
@ -22,7 +22,7 @@ describe Jobs::PeriodicalUpdates do
|
||||||
jobs = Jobs::ProcessPost.jobs
|
jobs = Jobs::ProcessPost.jobs
|
||||||
expect(jobs.length).to eq(1)
|
expect(jobs.length).to eq(1)
|
||||||
|
|
||||||
expect(jobs[0]["queue"]).to eq("low")
|
expect(jobs[0]["queue"]).to eq("ultra_low")
|
||||||
end
|
end
|
||||||
|
|
||||||
post.reload
|
post.reload
|
||||||
|
|
Loading…
Reference in New Issue