Merge pull request #6209 from discourse/mini_scheduler

REFACTOR: extract scheduler to the mini_scheduler gem
This commit is contained in:
Neil Lalonde 2018-08-01 10:28:24 -04:00 committed by GitHub
commit b829452c75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 28 additions and 1106 deletions

View File

@ -88,6 +88,7 @@ gem 'thor', require: false
gem 'rinku'
gem 'sanitize'
gem 'sidekiq'
gem 'mini_scheduler'
# for sidekiq web
gem 'tilt', require: false

View File

@ -200,6 +200,7 @@ GEM
mini_portile2 (2.3.0)
mini_racer (0.2.0)
libv8 (>= 6.3)
mini_scheduler (0.8.1)
mini_sql (0.1.10)
mini_suffix (0.3.0)
ffi (~> 1.9)
@ -490,6 +491,7 @@ DEPENDENCIES
message_bus
mini_mime
mini_racer
mini_scheduler
mini_sql
mini_suffix
minitest

View File

@ -1,5 +1,3 @@
require 'scheduler/scheduler'
module Jobs
def self.queued
@ -173,7 +171,7 @@ module Jobs
end
class Scheduled < Base
extend Scheduler::Schedule
extend MiniScheduler::Schedule
def perform(*args)
return if Discourse.readonly_mode?

View File

@ -1,4 +1,5 @@
require "sidekiq/pausable"
require "sidekiq/web"
Sidekiq.configure_client do |config|
config.redis = Discourse.sidekiq_redis_config
@ -12,6 +13,24 @@ Sidekiq.configure_server do |config|
end
end
MiniScheduler.configure do |config|
config.redis = $redis
config.job_exception_handler do |ex, context|
Discourse.handle_job_exception(ex, context)
end
config.job_ran do |stat|
DiscourseEvent.trigger(:scheduled_job_ran, stat)
end
config.before_sidekiq_web_request do
RailsMultisite::ConnectionManagement.establish_connection(db: 'default')
end
end
if Sidekiq.server?
# defer queue should simply run in sidekiq
Scheduler::Defer.async = false
@ -27,22 +46,7 @@ if Sidekiq.server?
scheduler_hostname = ENV["UNICORN_SCHEDULER_HOSTNAME"]
if !scheduler_hostname || scheduler_hostname.split(',').include?(`hostname`.strip)
require 'scheduler/scheduler'
manager = Scheduler::Manager.new($redis.without_namespace)
Scheduler::Manager.discover_schedules.each do |schedule|
manager.ensure_schedule!(schedule)
end
Thread.new do
while true
begin
manager.tick
rescue => e
# the show must go on
Discourse.handle_job_exception(e, message: "While ticking scheduling manager")
end
sleep 1
end
end
MiniScheduler.start
end
end
end

View File

@ -1,5 +1,5 @@
require "sidekiq/web"
require_dependency "scheduler/web"
require "mini_scheduler/web"
require_dependency "admin_constraint"
require_dependency "staff_constraint"
require_dependency "homepage_constraint"

View File

@ -43,6 +43,8 @@ module Discourse
# other desired context.
# See app/jobs/base.rb for the error_context function.
def self.handle_job_exception(ex, context = {}, parent_logger = nil)
return if ex.class == Jobs::HandledExceptionWrapper
context ||= {}
parent_logger ||= SidekiqExceptionHandler

View File

@ -1,360 +0,0 @@
# Initially we used sidetiq, this was a problem:
#
# 1. No mechnism to add "randomisation" into job execution
# 2. No stats about previous runs or failures
# 3. Dependency on ice_cube gem causes runaway CPU
require_dependency 'distributed_mutex'
module Scheduler
class Manager
attr_accessor :random_ratio, :redis, :enable_stats
class Runner
def initialize(manager)
@stopped = false
@mutex = Mutex.new
@queue = Queue.new
@manager = manager
@reschedule_orphans_thread = Thread.new do
while !@stopped
sleep 1.minute
@mutex.synchronize do
reschedule_orphans
end
end
end
@keep_alive_thread = Thread.new do
while !@stopped
@mutex.synchronize do
keep_alive
end
sleep (@manager.keep_alive_duration / 2)
end
end
@thread = Thread.new do
while !@stopped
process_queue
end
end
end
def keep_alive
@manager.keep_alive
rescue => ex
Discourse.handle_job_exception(ex, message: "Scheduling manager keep-alive")
end
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
Discourse.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
end
def hostname
@hostname ||= begin
`hostname`
rescue
"unknown"
end
end
def process_queue
klass = @queue.deq
return unless klass
# hack alert, I need to both deq and set @running atomically.
@running = true
failed = false
start = Time.now.to_f
info = @mutex.synchronize { @manager.schedule_info(klass) }
stat = nil
error = nil
begin
info.prev_result = "RUNNING"
@mutex.synchronize { info.write! }
if @manager.enable_stats
RailsMultisite::ConnectionManagement.with_connection("default") do
stat = SchedulerStat.create!(
name: klass.to_s,
hostname: hostname,
pid: Process.pid,
started_at: Time.zone.now,
live_slots_start: GC.stat[:heap_live_slots]
)
end
end
klass.new.perform
rescue => e
if e.class != Jobs::HandledExceptionWrapper
Discourse.handle_job_exception(e, message: "Running a scheduled job", job: klass)
end
error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.current_owner = nil
if stat
RailsMultisite::ConnectionManagement.with_connection("default") do
stat.update!(
duration_ms: duration,
live_slots_finish: GC.stat[:heap_live_slots],
success: !failed,
error: error
)
DiscourseEvent.trigger(:scheduled_job_ran, stat)
end
end
attempts(3) do
@mutex.synchronize { info.write! }
end
rescue => ex
Discourse.handle_job_exception(ex, message: "Processing scheduled job queue")
ensure
@running = false
ActiveRecord::Base.connection_handler.clear_active_connections!
end
def stop!
return if @stopped
@mutex.synchronize do
@stopped = true
@keep_alive_thread.kill
@reschedule_orphans_thread.kill
@keep_alive_thread.join
@reschedule_orphans_thread.join
enq(nil)
kill_thread = Thread.new do
sleep 0.5
@thread.kill
end
@thread.join
kill_thread.kill
kill_thread.join
end
end
def enq(klass)
@queue << klass
end
def wait_till_done
while !@queue.empty? && !(@queue.num_waiting > 0)
sleep 0.001
end
# this is a hack, but is only used for test anyway
sleep 0.001
while @running
sleep 0.001
end
end
def attempts(n)
n.times {
begin
yield; break
rescue
sleep Random.rand
end
}
end
end
def self.without_runner(redis = nil)
self.new(redis, skip_runner: true)
end
def initialize(redis = nil, options = nil)
@redis = $redis || redis
@random_ratio = 0.1
unless options && options[:skip_runner]
@runner = Runner.new(self)
self.class.current = self
end
@hostname = options && options[:hostname]
@manager_id = SecureRandom.hex
if options && options.key?(:enable_stats)
@enable_stats = options[:enable_stats]
else
@enable_stats = true
end
end
def self.current
@current
end
def self.current=(manager)
@current = manager
end
def hostname
@hostname ||= `hostname`.strip
end
def schedule_info(klass)
ScheduleInfo.new(klass, self)
end
def next_run(klass)
schedule_info(klass).next_run
end
def ensure_schedule!(klass)
lock do
schedule_info(klass).schedule!
end
end
def remove(klass)
lock do
schedule_info(klass).del!
end
end
def reschedule_orphans!
lock do
reschedule_orphans_on!
reschedule_orphans_on!(hostname)
end
end
def reschedule_orphans_on!(hostname = nil)
redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
klass = get_klass(key)
next unless klass
info = schedule_info(klass)
if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
(info.current_owner.blank? || !redis.get(info.current_owner))
info.prev_result = 'ORPHAN'
info.next_run = Time.now.to_i
info.write!
end
end
end
def get_klass(name)
name.constantize
rescue NameError
nil
end
def tick
lock do
schedule_next_job
schedule_next_job(hostname)
end
end
def schedule_next_job(hostname = nil)
(key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = get_klass(key)
unless klass
# corrupt key, nuke it (renamed job or something)
redis.zrem Manager.queue_key(hostname), key
return
end
info = schedule_info(klass)
info.prev_run = Time.now.to_i
info.prev_result = "QUEUED"
info.prev_duration = -1
info.next_run = nil
info.current_owner = identity_key
info.schedule!
@runner.enq(klass)
end
end
def blocking_tick
tick
@runner.wait_till_done
end
def stop!
@runner.stop!
self.class.current = nil
end
def keep_alive_duration
60
end
def keep_alive
redis.setex identity_key, keep_alive_duration, ""
end
def lock
DistributedMutex.new(Manager.lock_key).synchronize do
yield
end
end
def self.discover_schedules
# hack for developemnt reloader is crazytown
# multiple classes with same name can be in
# object space
unique = Set.new
schedules = []
ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
if schedule.scheduled?
next if unique.include?(schedule.to_s)
schedules << schedule
unique << schedule.to_s
end
end
schedules
end
@mutex = Mutex.new
def self.seq
@mutex.synchronize do
@i ||= 0
@i += 1
end
end
def identity_key
@identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
def self.lock_key
"_scheduler_lock_"
end
def self.queue_key(hostname = nil)
if hostname
"_scheduler_queue_#{hostname}_"
else
"_scheduler_queue_"
end
end
def self.schedule_key(klass, hostname = nil)
if hostname
"_scheduler_#{klass}_#{hostname}"
else
"_scheduler_#{klass}"
end
end
end
end

View File

@ -1,37 +0,0 @@
module Scheduler::Schedule
def daily(options = nil)
if options
@daily = options
end
@daily
end
def every(duration = nil)
if duration
@every = duration
if manager = Scheduler::Manager.current
manager.ensure_schedule!(self)
end
end
@every
end
# schedule job indepndently on each host (looking at hostname)
def per_host
@per_host = true
end
def is_per_host
@per_host
end
def schedule_info
manager = Scheduler::Manager.without_runner
manager.schedule_info self
end
def scheduled?
!!@every || !!@daily
end
end

View File

@ -1,138 +0,0 @@
module Scheduler
class ScheduleInfo
attr_accessor :next_run,
:prev_run,
:prev_duration,
:prev_result,
:current_owner
def initialize(klass, manager)
@klass = klass
@manager = manager
data = nil
if data = @manager.redis.get(key)
data = JSON.parse(data)
end
if data
@next_run = data["next_run"]
@prev_run = data["prev_run"]
@prev_result = data["prev_result"]
@prev_duration = data["prev_duration"]
@current_owner = data["current_owner"]
end
rescue
# corrupt redis
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
# this means the schedule is going to fire, it is setup correctly
# invalid schedules are fixed by running "schedule!"
# this happens automatically after if fire by the manager
def valid?
return false unless @next_run
(!@prev_run && @next_run < Time.now.to_i + 5.minutes) || valid_every? || valid_daily?
end
def valid_every?
return false unless @klass.every
!!@prev_run &&
@prev_run <= Time.now.to_i &&
@next_run < @prev_run + @klass.every * (1 + @manager.random_ratio)
end
def valid_daily?
return false unless @klass.daily
return true if !@prev_run && @next_run && @next_run <= (Time.zone.now + 1.day).to_i
!!@prev_run &&
@prev_run <= Time.zone.now.to_i &&
@next_run < @prev_run + 1.day
end
def schedule_every!
if !valid? && @prev_run
mixup = @klass.every * @manager.random_ratio
mixup = (mixup * Random.rand - mixup / 2).to_i
@next_run = @prev_run + mixup + @klass.every
end
if !valid?
@next_run = Time.now.to_i + 5.minutes * Random.rand
end
end
def schedule_daily!
return if valid?
at = @klass.daily[:at] || 0
today_begin = Time.zone.now.midnight.to_i
today_offset = DateTime.now.seconds_since_midnight
# If it's later today
if at > today_offset
@next_run = today_begin + at
else
# Otherwise do it tomorrow
@next_run = today_begin + 1.day + at
end
end
def schedule!
if @klass.every
schedule_every!
elsif @klass.daily
schedule_daily!
end
write!
end
def write!
clear!
redis.set key, {
next_run: @next_run,
prev_run: @prev_run,
prev_duration: @prev_duration,
prev_result: @prev_result,
current_owner: @current_owner
}.to_json
redis.zadd queue_key, @next_run, @klass if @next_run
end
def del!
clear!
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
def key
if @klass.is_per_host
Manager.schedule_key(@klass, @manager.hostname)
else
Manager.schedule_key(@klass)
end
end
def queue_key
if @klass.is_per_host
Manager.queue_key(@manager.hostname)
else
Manager.queue_key
end
end
def redis
@manager.redis
end
private
def clear!
redis.del key
redis.zrem queue_key, @klass
end
end
end

View File

@ -1,7 +0,0 @@
module Scheduler
end
require_dependency 'scheduler/schedule'
require_dependency 'scheduler/schedule_info'
require_dependency 'scheduler/manager'
require_dependency 'scheduler/defer'

View File

@ -1,47 +0,0 @@
<header class="row">
<div class="col-sm-12">
<h3>Scheduler History</h3>
</div>
</header>
<div class="container">
<div class="row">
<div class="col-md-9">
<% if @scheduler_stats.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead>
<th style="width: 30%">Job Name</th>
<th style="width: 15%">Hostname:Pid</th>
<th style="width: 15%">Live Slots delta</th>
<th style="width: 15%">Started At</th>
<th style="width: 15%">Duration</th>
<th style="width: 15%"></th>
</thead>
<tbody>
<% @scheduler_stats.each do |stat| %>
<tr>
<td><%= stat.name %></td>
<td><%= stat.hostname %>:<%= stat.pid %></td>
<td>
<% if stat.live_slots_start && stat.live_slots_finish %>
<%= stat.live_slots_finish - stat.live_slots_start %>
<% end %>
</td>
<td><%= sane_time stat.started_at %></td>
<td><%= sane_duration stat.duration_ms %></td>
<td>
<% if stat.success.nil? %>
RUNNING
<% elsif !stat.success %>
FAILED
<% end %>
</td>
</tr>
<% end %>
</tbody>
</table>
<% end %>
</div>
</div>
</div>

View File

@ -1,73 +0,0 @@
<header class="row">
<% if Sidekiq.paused? %>
<div class="col-sm-12">
<div class="alert alert-danger text-center">
<h2>SIDEKIQ IS PAUSED!</h2>
</div>
</div>
<% end %>
<div class="col-sm-12">
<h3>Recurring Jobs <a style='font-size:50%; margin-left: 30px' href='scheduler/history'>history</a></h3>
</div>
</header>
<div class="container">
<div class="row">
<div class="col-md-9">
<% if @schedules.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead>
<th style="width: 30%">Worker</th>
<th style="width: 15%">Last Run</th>
<th style="width: 15%">Last Result</th>
<th style="width: 15%">Last Duration</th>
<th style="width: 15%">Last Owner</th>
<th style="width: 15%">Next Run Due</th>
<th style="width: 10%">Actions</th>
</thead>
<% @schedules.each do |schedule| %>
<% @info = schedule.schedule_info %>
<tr>
<td>
<%= schedule %>
<td>
<% prev = @info.prev_run %>
<% if prev.nil? %>
Never
<% else %>
<%= relative_time(Time.at(prev)) %>
<% end %>
</td>
<td>
<%= @info.prev_result %>
</td>
<td>
<%= sane_duration @info.prev_duration %>
</td>
<td>
<%= @info.current_owner %>
</td>
<td>
<% next_run = @info.next_run %>
<% if next_run.nil? %>
Not Scheduled Yet
<% else %>
<%= relative_time(Time.at(next_run)) %>
<% end %>
</td>
<td>
<form action="<%= "#{root_path}scheduler/#{schedule}/trigger" %>" method="post">
<%= csrf_tag if respond_to?(:csrf_tag) %>
<input class="btn btn-danger btn-small" type="submit" name="trigger" value="Trigger" data-confirm="Are you sure you want to trigger this job?" />
</form>
</td>
</tr>
<% end %>
</table>
<% else %>
<div class="alert alert-success">No recurring jobs found.</div>
<% end %>
</div>
</div>
</div>

View File

@ -1,65 +0,0 @@
# Based off sidetiq https://github.com/tobiassvn/sidetiq/blob/master/lib/sidetiq/web.rb
module Scheduler
module Web
VIEWS = File.expand_path('views', File.dirname(__FILE__)) unless defined? VIEWS
def self.registered(app)
app.helpers do
def sane_time(time)
return unless time
time
end
def sane_duration(duration)
return unless duration
if duration < 1000
"#{duration}ms"
elsif duration < 60 * 1000
"#{'%.2f' % (duration / 1000.0)} secs"
end
end
end
app.get "/scheduler" do
RailsMultisite::ConnectionManagement.with_connection("default") do
@manager = Scheduler::Manager.without_runner
@schedules = Scheduler::Manager.discover_schedules.sort do |a, b|
a_next = a.schedule_info.next_run
b_next = b.schedule_info.next_run
if a_next && b_next
a_next <=> b_next
elsif a_next
-1
else
1
end
end
erb File.read(File.join(VIEWS, 'scheduler.erb')), locals: { view_path: VIEWS }
end
end
app.get "/scheduler/history" do
@scheduler_stats = SchedulerStat.order('started_at desc').limit(200)
erb File.read(File.join(VIEWS, 'history.erb')), locals: { view_path: VIEWS }
end
app.post "/scheduler/:name/trigger" do
halt 404 unless (name = params[:name])
RailsMultisite::ConnectionManagement.with_connection("default") do
klass = name.constantize
info = klass.schedule_info
info.next_run = Time.now.to_i
info.write!
redirect "#{root_path}scheduler"
end
end
end
end
end
Sidekiq::Web.register(Scheduler::Web)
Sidekiq::Web.tabs["Scheduler"] = "scheduler"

View File

@ -28,7 +28,7 @@ end
desc "run every task the scheduler knows about in that order, use only for debugging"
task 'scheduler:run_all' => :environment do
Scheduler::Manager.discover_schedules.each do |schedule|
MiniScheduler::Manager.discover_schedules.each do |schedule|
puts "Running #{schedule}"
time { schedule.new.execute({}) }
end

View File

@ -1,255 +0,0 @@
# encoding: utf-8
require 'rails_helper'
require 'scheduler/scheduler'
describe Scheduler::Manager do
module Testing
class RandomJob
extend ::Scheduler::Schedule
def self.runs=(val)
@runs = val
end
def self.runs
@runs ||= 0
end
every 5.minutes
def perform
self.class.runs += 1
sleep 0.001
end
end
class SuperLongJob
extend ::Scheduler::Schedule
every 10.minutes
def perform
sleep 1000
end
end
class PerHostJob
extend ::Scheduler::Schedule
per_host
every 10.minutes
def self.runs=(val)
@runs = val
end
def self.runs
@runs ||= 0
end
def perform
self.class.runs += 1
end
end
end
let(:manager) {
Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
}
before do
expect(ActiveRecord::Base.connection_pool.connections.length).to eq(1)
@thread_count = Thread.list.count
@backtraces = {}
Thread.list.each do |t|
@backtraces[t.object_id] = t.backtrace
end
end
after do
manager.stop!
manager.remove(Testing::RandomJob)
manager.remove(Testing::SuperLongJob)
manager.remove(Testing::PerHostJob)
$redis.flushall
# connections that are not in use must be removed
# otherwise active record gets super confused
ActiveRecord::Base.connection_pool.connections.reject { |c| c.in_use? }.each do |c|
ActiveRecord::Base.connection_pool.remove(c)
end
expect(ActiveRecord::Base.connection_pool.connections.length).to (be <= 1)
on_thread_mismatch = lambda do
current = Thread.list.map { |t| t.object_id }
old_threads = @backtraces.keys
extra = current - old_threads
missing = old_threads - current
if missing.length > 0
STDERR.puts "\nMissing Threads #{missing.length} thread/s"
missing.each do |id|
STDERR.puts @backtraces[id]
STDERR.puts
end
end
if extra.length > 0
Thread.list.each do |thread|
if extra.include?(thread.object_id)
STDERR.puts "\nExtra Thread Backtrace:"
STDERR.puts thread.backtrace
STDERR.puts
end
end
end
end
wait_for(on_fail: on_thread_mismatch) do
@thread_count == Thread.list.count
end
end
it 'can disable stats' do
manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
expect(manager.enable_stats).to eq(false)
manager.stop!
manager = Scheduler::Manager.new(DiscourseRedis.new)
expect(manager.enable_stats).to eq(true)
manager.stop!
end
describe 'per host jobs' do
it "correctly schedules on multiple hosts" do
freeze_time
Testing::PerHostJob.runs = 0
hosts = ['a', 'b', 'c']
hosts.map do |host|
manager = Scheduler::Manager.new(DiscourseRedis.new, hostname: host, enable_stats: false)
manager.ensure_schedule!(Testing::PerHostJob)
info = manager.schedule_info(Testing::PerHostJob)
info.next_run = Time.now.to_i - 10
info.write!
manager
end.each do |manager|
manager.blocking_tick
manager.stop!
end
expect(Testing::PerHostJob.runs).to eq(3)
end
end
describe '#sync' do
it 'increases' do
expect(Scheduler::Manager.seq).to eq(Scheduler::Manager.seq - 1)
end
end
describe '#tick' do
it 'should nuke missing jobs' do
$redis.zadd Scheduler::Manager.queue_key, Time.now.to_i - 1000, "BLABLA"
manager.tick
expect($redis.zcard(Scheduler::Manager.queue_key)).to eq(0)
end
it 'should recover from crashed manager' do
info = manager.schedule_info(Testing::SuperLongJob)
info.next_run = Time.now.to_i - 1
info.write!
manager.tick
manager.stop!
$redis.del manager.identity_key
manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
manager.reschedule_orphans!
info = manager.schedule_info(Testing::SuperLongJob)
expect(info.next_run).to be <= Time.now.to_i
manager.stop!
end
it 'should log when job finishes running' do
Testing::RandomJob.runs = 0
info = manager.schedule_info(Testing::RandomJob)
info.next_run = Time.now.to_i - 1
info.write!
# with stats so we must be careful to cleanup
manager = Scheduler::Manager.new(DiscourseRedis.new)
manager.blocking_tick
manager.stop!
stat = SchedulerStat.first
expect(stat).to be_present
expect(stat.duration_ms).to be > 0
expect(stat.success).to be true
SchedulerStat.destroy_all
end
it 'should only run pending job once' do
Testing::RandomJob.runs = 0
info = manager.schedule_info(Testing::RandomJob)
info.next_run = Time.now.to_i - 1
info.write!
(0..5).map do
Thread.new do
manager = Scheduler::Manager.new(DiscourseRedis.new, enable_stats: false)
manager.blocking_tick
manager.stop!
end
end.map(&:join)
expect(Testing::RandomJob.runs).to eq(1)
info = manager.schedule_info(Testing::RandomJob)
expect(info.prev_run).to be <= Time.now.to_i
expect(info.prev_duration).to be > 0
expect(info.prev_result).to eq("OK")
end
end
describe '#discover_schedules' do
it 'Discovers Testing::RandomJob' do
expect(Scheduler::Manager.discover_schedules).to include(Testing::RandomJob)
end
end
describe '#next_run' do
it 'should be within the next 5 mins if it never ran' do
manager.remove(Testing::RandomJob)
manager.ensure_schedule!(Testing::RandomJob)
expect(manager.next_run(Testing::RandomJob))
.to be_within(5.minutes.to_i).of(Time.now.to_i + 5.minutes)
end
end
end

View File

@ -1,103 +0,0 @@
# encoding: utf-8
require 'rails_helper'
require 'scheduler/scheduler'
describe Scheduler::ScheduleInfo do
let(:manager) { Scheduler::Manager.new }
context "every" do
class RandomJob
extend ::Scheduler::Schedule
every 1.hour
def perform
# work_it
end
end
before do
@info = manager.schedule_info(RandomJob)
@info.del!
end
after do
manager.stop!
$redis.del manager.class.queue_key
end
it "is a scheduled job" do
expect(RandomJob).to be_scheduled
end
it 'starts off invalid' do
expect(@info.valid?).to eq(false)
end
it 'will have a due date in the next 5 minutes if it was blank' do
@info.schedule!
expect(@info.valid?).to eq(true)
expect(@info.next_run).to be_within(5.minutes).of(Time.now.to_i)
end
it 'will have a due date within the next hour if it just ran' do
@info.prev_run = Time.now.to_i
@info.schedule!
expect(@info.valid?).to eq(true)
expect(@info.next_run).to be_within(1.hour * manager.random_ratio).of(Time.now.to_i + 1.hour)
end
it 'is invalid if way in the future' do
@info.next_run = Time.now.to_i + 1.year
expect(@info.valid?).to eq(false)
end
end
context "daily" do
class DailyJob
extend ::Scheduler::Schedule
daily at: 11.hours
def perform
end
end
before do
freeze_time Time.parse("2010-01-10 10:00:00")
@info = manager.schedule_info(DailyJob)
@info.del!
end
after do
manager.stop!
$redis.del manager.class.queue_key
end
it "is a scheduled job" do
expect(DailyJob).to be_scheduled
end
it "starts off invalid" do
expect(@info.valid?).to eq(false)
end
it "will have a due date at the appropriate time if blank" do
expect(@info.next_run).to eq(nil)
@info.schedule!
expect(JSON.parse($redis.get(@info.key))["next_run"])
.to eq((Time.zone.now.midnight + 11.hours).to_i)
expect(@info.valid?).to eq(true)
end
it 'is invalid if way in the future' do
@info.next_run = Time.now.to_i + 1.year
expect(@info.valid?).to eq(false)
end
end
end