FEATURE: new scheduler

Removed sidetiq, introduced new scheduler

- add basic UI
- add schedule discover
- add scheduling in initializer
This commit is contained in:
Sam 2014-02-06 10:14:41 +11:00
parent 447816fe39
commit e1f293ad66
24 changed files with 193 additions and 52 deletions

View File

@ -84,7 +84,6 @@ gem 'rails_multisite', path: 'vendor/gems/rails_multisite'
gem 'redcarpet', require: false
gem 'airbrake', '3.1.2', require: false # errbit is broken with 3.1.3 for now
gem 'sidetiq', '>= 0.3.6'
gem 'eventmachine'
gem 'fast_xs'

View File

@ -138,7 +138,6 @@ GEM
hiredis (0.4.5)
httpauth (0.2.0)
i18n (0.6.9)
ice_cube (0.11.1)
image_optim (0.9.1)
exifr (~> 1.1.3)
fspath (~> 2.0.5)
@ -336,10 +335,6 @@ GEM
redis-namespace (>= 1.3.1)
sidekiq-failures (0.2.2)
sidekiq (>= 2.9.0)
sidetiq (0.4.3)
celluloid (>= 0.14.1)
ice_cube (~> 0.11.0)
sidekiq (~> 2.15.0)
simple-rss (1.3.1)
simplecov (0.7.1)
multi_json (~> 1.0)
@ -477,7 +472,6 @@ DEPENDENCIES
shoulda
sidekiq (= 2.15.1)
sidekiq-failures
sidetiq (>= 0.3.6)
simple-rss
simplecov
sinatra

View File

@ -1,3 +1,5 @@
require 'scheduler/scheduler'
module Jobs
def self.queued
@ -147,7 +149,7 @@ module Jobs
end
class Scheduled < Base
include Sidetiq::Schedulable
extend Scheduler::Schedule
end
def self.enqueue(job_name, opts={})

View File

@ -1,7 +1,7 @@
module Jobs
class CategoryStats < Jobs::Scheduled
recurrence { daily.hour_of_day(4) }
every 4.hours
def execute(args)
Category.update_stats
@ -9,4 +9,4 @@ module Jobs
end
end
end

View File

@ -1,7 +1,7 @@
module Jobs
class CleanUpUploads < Jobs::Scheduled
recurrence { hourly }
every 1.hour
def execute(args)
return unless SiteSetting.clean_up_uploads?

View File

@ -1,6 +1,6 @@
module Jobs
class DashboardStats < Jobs::Scheduled
recurrence { hourly.minute_of_hour(0,30) }
every 30.minutes
def execute(args)
stats_json = AdminDashboardData.fetch_stats.as_json

View File

@ -1,7 +1,7 @@
module Jobs
# various consistency checks
class DestroyOldDeletionStubs < Jobs::Scheduled
recurrence { hourly.minute_of_hour(0, 30) }
every 30.minutes
def execute(args)
PostDestroyer.destroy_stubs

View File

@ -3,7 +3,7 @@ require_dependency 'avatar_detector'
module Jobs
class DetectAvatars < Jobs::Scheduled
recurrence { daily.hour_of_day(8) }
every 8.hours
def execute(args)
return unless SiteSetting.detect_custom_avatars?
@ -27,4 +27,4 @@ module Jobs
end
end
end

View File

@ -2,7 +2,7 @@ module Jobs
# A daily job that will enqueue digest emails to be sent to users
class EnqueueDigestEmails < Jobs::Scheduled
recurrence { daily.hour_of_day(6) }
every 6.hours
def execute(args)
target_user_ids.each do |user_id|

View File

@ -1,7 +1,7 @@
module Jobs
# various consistency checks
class EnsureDbConsistency < Jobs::Scheduled
recurrence { daily.hour_of_day(2) }
every 1.day
def execute(args)
TopicUser.ensure_consistency!

View File

@ -3,8 +3,7 @@ require_dependency 'admin_user_index_query'
module Jobs
class PendingUsersReminder < Jobs::Scheduled
recurrence { daily.hour_of_day(9) }
every 9.hours
def execute(args)
if SiteSetting.must_approve_users

View File

@ -5,7 +5,7 @@ module Jobs
# This job will run on a regular basis to update statistics and denormalized data.
# If it does not run, the site will not function properly.
class PeriodicalUpdates < Jobs::Scheduled
recurrence { hourly.minute_of_hour(3, 18, 33, 48) }
every 15.minutes
def execute(args)
# Update the average times

View File

@ -8,7 +8,8 @@ require 'open-uri'
module Jobs
class PollFeed < Jobs::Scheduled
recurrence { hourly }
every 1.hour
sidekiq_options retry: false
def execute(args)

View File

@ -6,7 +6,7 @@ require_dependency 'email/receiver'
module Jobs
class PollMailbox < Jobs::Scheduled
recurrence { hourly.minute_of_hour(0,5,10,15,20,25,30,35,40,45,50,55) }
every 5.minutes
sidekiq_options retry: false
def execute(args)

View File

@ -1,7 +1,7 @@
module Jobs
class PurgeDeletedUploads < Jobs::Scheduled
recurrence { daily }
every 1.day
def execute(args)
grace_period = SiteSetting.purge_deleted_uploads_grace_period_days

View File

@ -3,7 +3,7 @@ require_dependency 'discourse_updates'
module Jobs
class VersionCheck < Jobs::Scheduled
recurrence { daily }
every 1.day
def execute(args)
if SiteSetting.version_checks? and (DiscourseUpdates.updated_at.nil? or DiscourseUpdates.updated_at < 1.minute.ago)
@ -30,4 +30,4 @@ module Jobs
end
end
end
end

View File

@ -4,11 +4,26 @@ Sidekiq.configure_server do |config|
config.redis = sidekiq_redis
end
Sidetiq.configure do |config|
# we only check for new jobs once every 5 seconds
# to cut down on cpu cost
config.resolution = 5
if Sidekiq.server?
require 'scheduler/scheduler'
manager = Scheduler::Manager.new
Scheduler::Manager.discover_schedules.each do |schedule|
manager.ensure_schedule!(schedule)
end
Thread.new do
while true
begin
manager.tick
rescue => e
# the show must go on
Scheduler::Manager.handle_exception(e)
end
sleep 1
end
end
end
Sidekiq.configure_client { |config| config.redis = sidekiq_redis }
Sidekiq.logger.level = Logger::WARN

View File

@ -1,5 +1,5 @@
require "sidekiq/web"
require "sidetiq/web"
require_dependency "scheduler/web"
require_dependency "admin_constraint"
require_dependency "staff_constraint"

View File

@ -1,16 +0,0 @@
module IceCube
class MinutelyRule < ValidatedRule
def initialize(interval = 1, week_start = :sunday)
super
unless interval == 1
raise "Due to a gigantic awful bug in ice_cube, don't specify an interval for minutely. Use `hourly.minute_of_hour`"
end
interval(interval)
schedule_lock(:sec)
reset
end
end
end

View File

@ -6,6 +6,7 @@
module Scheduler
class Manager
extend Sidekiq::ExceptionHandler
attr_accessor :random_ratio, :redis
@ -18,13 +19,16 @@ module Scheduler
klass = @queue.deq
failed = false
start = Time.now.to_f
info = @manager.schedule_info(klass)
begin
info.prev_result = "RUNNING"
info.write!
klass.new.perform
rescue
rescue => e
Scheduler::Manager.handle_exception(e)
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
info = @manager.schedule_info(klass)
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.write!
@ -47,13 +51,28 @@ module Scheduler
end
end
def initialize(redis = nil)
def self.without_runner(redis=nil)
self.new(redis, true)
end
def initialize(redis = nil, skip_runner = false)
@redis = $redis || redis
@random_ratio = 0.1
@runner = Runner.new(self)
unless skip_runner
@runner = Runner.new(self)
self.class.current = self
end
@manager_id = SecureRandom.hex
end
def self.current
@current
end
def self.current=(manager)
@current = manager
end
def schedule_info(klass)
ScheduleInfo.new(klass, self)
end
@ -78,10 +97,18 @@ module Scheduler
def tick
lock do
(key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = key.constantize
klass = begin
key.constantize
rescue NameError
nil
end
return unless klass
info = schedule_info(klass)
info.prev_run = Time.now.to_i
info.prev_result = "QUEUED"
info.prev_duration = -1
info.next_run = nil
info.schedule!
@runner.enq(klass)
@ -96,6 +123,7 @@ module Scheduler
def stop!
@runner.stop!
self.class.current = nil
end
@ -129,6 +157,14 @@ module Scheduler
redis.del Manager.lock_key
end
def self.discover_schedules
schedules = []
ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
schedules << schedule if schedule.scheduled?
end
schedules
end
def self.lock_key
"_scheduler_lock_"
end

View File

@ -1,5 +1,20 @@
module Scheduler::Schedule
def every(duration=nil)
@every ||= duration
if duration
@every = duration
if manager = Scheduler::Manager.current
manager.ensure_schedule!(self)
end
end
@every
end
def schedule_info
manager = Scheduler::Manager.without_runner
manager.schedule_info self
end
def scheduled?
!!@every
end
end

View File

@ -0,0 +1,61 @@
<header class="row">
<div class="col-sm-5">
<h3>Recurring Jobs</h3>
</div>
</header>
<div class="container">
<div class="row">
<div class="col-md-9">
<% if @schedules.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead>
<th style="width: 30%">Worker</th>
<th style="width: 15%">Last Run</th>
<th style="width: 15%">Last Result</th>
<th style="width: 15%">Last Duration</th>
<th style="width: 15%">Next Run Due</th>
<th style="width: 10%">Actions</th>
</thead>
<% @schedules.each do |schedule| %>
<% @info = schedule.schedule_info %>
<tr>
<td>
<%= schedule %>
<td>
<% prev = @info.prev_run %>
<% if prev.nil? %>
Never
<% else %>
<%= relative_time(Time.at(prev)) %>
<% end %>
</td>
<td>
<%= @info.prev_result %>
</td>
<td>
<%= @info.prev_duration %>
</td>
<td>
<% next_run = @info.next_run %>
<% if next_run.nil? %>
Not Scheduled Yet
<% else %>
<%= relative_time(Time.at(next_run)) %>
<% end %>
</td>
<td>
<form action="<%= "#{root_path}scheduler/#{schedule}/trigger" %>" method="post">
<input class="btn btn-danger btn-small" type="submit" name="trigger" value="Trigger" data-confirm="Are you sure you want to trigger this job?" />
</form>
</td>
</tr>
<% end %>
</table>
<% else %>
<div class="alert alert-success">No recurring jobs found.</div>
<% end %>
</div>
</div>
</div>

29
lib/scheduler/web.rb Normal file
View File

@ -0,0 +1,29 @@
# Based off sidetiq https://github.com/tobiassvn/sidetiq/blob/master/lib/sidetiq/web.rb
module Scheduler
module Web
VIEWS = File.expand_path('views', File.dirname(__FILE__))
def self.registered(app)
app.get "/scheduler" do
@schedules = Scheduler::Manager.discover_schedules
@manager = Scheduler::Manager.without_runner
erb File.read(File.join(VIEWS, 'scheduler.erb')), locals: {view_path: VIEWS}
end
app.post "/scheduler/:name/trigger" do
halt 404 unless (name = params[:name])
klass = name.constantize
info = klass.schedule_info
info.next_run = Time.now.to_f
info.write!
redirect "#{root_path}scheduler"
end
end
end
end
Sidekiq::Web.register(Scheduler::Web)
Sidekiq::Web.tabs["Scheduler"] = "scheduler"

View File

@ -61,6 +61,12 @@ describe Scheduler::Manager do
end
describe '#discover_schedules' do
it 'Discovers Testing::RandomJob' do
Scheduler::Manager.discover_schedules.should include(Testing::RandomJob)
end
end
describe '#next_run' do
it 'should be within the next 5 mins if it never ran' do