PERF: allow background jobs to flush between requests in same thread
This commit is contained in:
parent
6c09b6739d
commit
cdef67667a
|
@ -1,4 +1,12 @@
|
|||
require_dependency 'scheduler/defer'
|
||||
|
||||
if defined? Unicorn::HttpServer
|
||||
ObjectSpace.each_object(Unicorn::HttpServer) do |s|
|
||||
s.extend(Scheduler::Defer::Unicorn)
|
||||
end
|
||||
|
||||
if ENV['UNICORN_ENABLE_OOBGC'] == '1'
|
||||
require 'middleware/unicorn_oobgc'
|
||||
Middleware::UnicornOobgc.init
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,9 +4,17 @@ module Scheduler
|
|||
@async = !Rails.env.test?
|
||||
@queue = Queue.new
|
||||
@mutex = Mutex.new
|
||||
@paused = false
|
||||
@thread = nil
|
||||
start_thread
|
||||
end
|
||||
|
||||
def pause
|
||||
stop!
|
||||
@paused = true
|
||||
end
|
||||
|
||||
def resume
|
||||
@paused = false
|
||||
end
|
||||
|
||||
# for test
|
||||
|
@ -16,7 +24,7 @@ module Scheduler
|
|||
|
||||
def later(desc = nil, db=RailsMultisite::ConnectionManagement.current_db, &blk)
|
||||
if @async
|
||||
start_thread unless @thread.alive?
|
||||
start_thread unless (@thread && @thread.alive?) || @paused
|
||||
@queue << [db, blk, desc]
|
||||
else
|
||||
blk.call
|
||||
|
@ -24,12 +32,19 @@ module Scheduler
|
|||
end
|
||||
|
||||
def stop!
|
||||
@thread.kill
|
||||
@thread.kill if @thread && @thread.alive?
|
||||
@thread = nil
|
||||
end
|
||||
|
||||
# test only
|
||||
def stopped?
|
||||
!@thread.alive?
|
||||
!(@thread && @thread.alive?)
|
||||
end
|
||||
|
||||
def do_all_work
|
||||
while !@queue.empty?
|
||||
do_work(_non_block=true)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -45,8 +60,9 @@ module Scheduler
|
|||
end
|
||||
end
|
||||
|
||||
def do_work
|
||||
db, job, desc = @queue.deq
|
||||
# using non_block to match Ruby #deq
|
||||
def do_work(non_block=false)
|
||||
db, job, desc = @queue.deq(non_block)
|
||||
begin
|
||||
RailsMultisite::ConnectionManagement.establish_connection(db: db) if db
|
||||
job.call
|
||||
|
@ -62,6 +78,16 @@ module Scheduler
|
|||
end
|
||||
|
||||
class Defer
|
||||
|
||||
module Unicorn
|
||||
def process_client(client)
|
||||
Defer.pause
|
||||
super(client)
|
||||
Defer.do_all_work
|
||||
Defer.resume
|
||||
end
|
||||
end
|
||||
|
||||
extend Deferrable
|
||||
initialize
|
||||
end
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require 'spec_helper'
|
||||
require 'scheduler/scheduler'
|
||||
require_dependency 'scheduler/defer'
|
||||
|
||||
describe Scheduler::Defer do
|
||||
class DeferInstance
|
||||
|
@ -23,6 +23,32 @@ describe Scheduler::Defer do
|
|||
@defer.stop!
|
||||
end
|
||||
|
||||
it "can pause and resume" do
|
||||
x = 1
|
||||
@defer.pause
|
||||
|
||||
@defer.later do
|
||||
x = 2
|
||||
end
|
||||
|
||||
@defer.do_all_work
|
||||
|
||||
expect(x).to eq(2)
|
||||
|
||||
@defer.resume
|
||||
|
||||
|
||||
@defer.later do
|
||||
x = 3
|
||||
end
|
||||
|
||||
wait_for(10) do
|
||||
x == 3
|
||||
end
|
||||
|
||||
expect(x).to eq(3)
|
||||
end
|
||||
|
||||
it "recovers from a crash / fork" do
|
||||
s = nil
|
||||
@defer.stop!
|
||||
|
|
Loading…
Reference in New Issue