Merge pull request #4539 from tgxworld/use_a_time_task_for_redis_failover
PERF: Spawn a seperate timer task to check if Redis master is up.
This commit is contained in:
commit
a1a7094604
|
@ -14,18 +14,20 @@ class DiscourseRedis
|
||||||
@running = false
|
@running = false
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
@slave_config = DiscourseRedis.slave_config
|
@slave_config = DiscourseRedis.slave_config
|
||||||
|
@timer_task = init_timer_task
|
||||||
end
|
end
|
||||||
|
|
||||||
def verify_master
|
def verify_master
|
||||||
synchronize do
|
synchronize do
|
||||||
return if @running || recently_checked?
|
return if @timer_task.running?
|
||||||
@running = true
|
|
||||||
end
|
end
|
||||||
|
|
||||||
Thread.new { initiate_fallback_to_master }
|
@timer_task.execute
|
||||||
end
|
end
|
||||||
|
|
||||||
def initiate_fallback_to_master
|
def initiate_fallback_to_master
|
||||||
|
success = false
|
||||||
|
|
||||||
begin
|
begin
|
||||||
slave_client = ::Redis::Client.new(@slave_config)
|
slave_client = ::Redis::Client.new(@slave_config)
|
||||||
logger.info "#{log_prefix}: Checking connection to master server..."
|
logger.info "#{log_prefix}: Checking connection to master server..."
|
||||||
|
@ -39,13 +41,14 @@ class DiscourseRedis
|
||||||
|
|
||||||
Discourse.clear_readonly!
|
Discourse.clear_readonly!
|
||||||
Discourse.request_refresh!
|
Discourse.request_refresh!
|
||||||
@master = true
|
self.master = true
|
||||||
|
success = true
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
@running = false
|
|
||||||
@last_checked = Time.zone.now
|
|
||||||
slave_client.disconnect
|
slave_client.disconnect
|
||||||
end
|
end
|
||||||
|
|
||||||
|
success
|
||||||
end
|
end
|
||||||
|
|
||||||
def master
|
def master
|
||||||
|
@ -56,23 +59,18 @@ class DiscourseRedis
|
||||||
synchronize { @master = args }
|
synchronize { @master = args }
|
||||||
end
|
end
|
||||||
|
|
||||||
def recently_checked?
|
def running?
|
||||||
if @last_checked
|
synchronize { @timer_task.running? }
|
||||||
Time.zone.now <= (@last_checked + 5.seconds)
|
|
||||||
else
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Used for testing
|
|
||||||
def reset!
|
|
||||||
@master = true
|
|
||||||
@last_checked = nil
|
|
||||||
@running = false
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
def init_timer_task
|
||||||
|
Concurrent::TimerTask.new(execution_interval: 10) do |task|
|
||||||
|
task.shutdown if initiate_fallback_to_master
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def synchronize
|
def synchronize
|
||||||
@mutex.synchronize { yield }
|
@mutex.synchronize { yield }
|
||||||
end
|
end
|
||||||
|
@ -87,9 +85,6 @@ class DiscourseRedis
|
||||||
end
|
end
|
||||||
|
|
||||||
class Connector < Redis::Client::Connector
|
class Connector < Redis::Client::Connector
|
||||||
MASTER = 'master'.freeze
|
|
||||||
SLAVE = 'slave'.freeze
|
|
||||||
|
|
||||||
def initialize(options)
|
def initialize(options)
|
||||||
super(options)
|
super(options)
|
||||||
@slave_options = DiscourseRedis.slave_config(options)
|
@slave_options = DiscourseRedis.slave_config(options)
|
||||||
|
@ -97,21 +92,18 @@ class DiscourseRedis
|
||||||
end
|
end
|
||||||
|
|
||||||
def resolve
|
def resolve
|
||||||
|
return @slave_options if !@fallback_handler.master
|
||||||
return @options unless @slave_options[:host]
|
|
||||||
|
|
||||||
begin
|
begin
|
||||||
options = @options.dup
|
options = @options.dup
|
||||||
options.delete(:connector)
|
options.delete(:connector)
|
||||||
client = ::Redis::Client.new(options)
|
client = Redis::Client.new(options)
|
||||||
client.call([:role])[0]
|
loading = client.call([:info]).split("\r\n").include?("loading:1")
|
||||||
@options
|
loading ? @slave_options : @options
|
||||||
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex
|
||||||
# A consul service name may be deregistered for a redis container setup
|
|
||||||
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
raise ex if ex.class == RuntimeError && ex.message != "Name or service not known"
|
||||||
|
|
||||||
return @slave_options if !@fallback_handler.master
|
|
||||||
@fallback_handler.master = false
|
@fallback_handler.master = false
|
||||||
|
@fallback_handler.verify_master unless @fallback_handler.running?
|
||||||
raise ex
|
raise ex
|
||||||
ensure
|
ensure
|
||||||
client.disconnect
|
client.disconnect
|
||||||
|
|
|
@ -42,7 +42,7 @@ describe DiscourseRedis do
|
||||||
|
|
||||||
it 'should return the slave config when master is down' do
|
it 'should return the slave config when master is down' do
|
||||||
begin
|
begin
|
||||||
Redis::Client.any_instance.expects(:call).raises(Redis::CannotConnectError).twice
|
Redis::Client.any_instance.expects(:call).raises(Redis::CannotConnectError).once
|
||||||
expect { connector.resolve }.to raise_error(Redis::CannotConnectError)
|
expect { connector.resolve }.to raise_error(Redis::CannotConnectError)
|
||||||
|
|
||||||
config = connector.resolve
|
config = connector.resolve
|
||||||
|
@ -58,7 +58,7 @@ describe DiscourseRedis do
|
||||||
begin
|
begin
|
||||||
error = RuntimeError.new('Name or service not known')
|
error = RuntimeError.new('Name or service not known')
|
||||||
|
|
||||||
Redis::Client.any_instance.expects(:call).raises(error).twice
|
Redis::Client.any_instance.expects(:call).raises(error).once
|
||||||
expect { connector.resolve }.to raise_error(error)
|
expect { connector.resolve }.to raise_error(error)
|
||||||
|
|
||||||
config = connector.resolve
|
config = connector.resolve
|
||||||
|
@ -70,6 +70,18 @@ describe DiscourseRedis do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "should return the slave config when master is still loading data" do
|
||||||
|
begin
|
||||||
|
Redis::Client.any_instance.expects(:call).with([:info]).returns("someconfig:haha\r\nloading:1")
|
||||||
|
config = connector.resolve
|
||||||
|
|
||||||
|
expect(config[:host]).to eq(slave_host)
|
||||||
|
expect(config[:port]).to eq(slave_port)
|
||||||
|
ensure
|
||||||
|
fallback_handler.master = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
it "should raise the right error" do
|
it "should raise the right error" do
|
||||||
error = RuntimeError.new('test error')
|
error = RuntimeError.new('test error')
|
||||||
Redis::Client.any_instance.expects(:call).raises(error).twice
|
Redis::Client.any_instance.expects(:call).raises(error).twice
|
||||||
|
@ -79,12 +91,17 @@ describe DiscourseRedis do
|
||||||
|
|
||||||
describe DiscourseRedis::FallbackHandler do
|
describe DiscourseRedis::FallbackHandler do
|
||||||
after do
|
after do
|
||||||
fallback_handler.reset!
|
fallback_handler.master = true
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#initiate_fallback_to_master' do
|
describe '#initiate_fallback_to_master' do
|
||||||
|
it 'should return the right value if the master server is still down' do
|
||||||
|
fallback_handler.master = false
|
||||||
|
Redis::Client.any_instance.expects(:call).with([:info]).returns("Some other stuff")
|
||||||
|
expect(fallback_handler.initiate_fallback_to_master).to eq(false)
|
||||||
|
end
|
||||||
|
|
||||||
it 'should fallback to the master server once it is up' do
|
it 'should fallback to the master server once it is up' do
|
||||||
begin
|
|
||||||
fallback_handler.master = false
|
fallback_handler.master = false
|
||||||
Redis::Client.any_instance.expects(:call).with([:info]).returns(DiscourseRedis::FallbackHandler::MASTER_LINK_STATUS)
|
Redis::Client.any_instance.expects(:call).with([:info]).returns(DiscourseRedis::FallbackHandler::MASTER_LINK_STATUS)
|
||||||
|
|
||||||
|
@ -92,21 +109,9 @@ describe DiscourseRedis do
|
||||||
Redis::Client.any_instance.expects(:call).with([:client, [:kill, 'type', connection_type]])
|
Redis::Client.any_instance.expects(:call).with([:client, [:kill, 'type', connection_type]])
|
||||||
end
|
end
|
||||||
|
|
||||||
fallback_handler.initiate_fallback_to_master
|
expect(fallback_handler.initiate_fallback_to_master).to eq(true)
|
||||||
|
|
||||||
expect(fallback_handler.master).to eq(true)
|
expect(fallback_handler.master).to eq(true)
|
||||||
expect(Discourse.recently_readonly?).to eq(false)
|
expect(Discourse.recently_readonly?).to eq(false)
|
||||||
ensure
|
|
||||||
fallback_handler.master = true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
it "should restrict the number of checks" do
|
|
||||||
expect { fallback_handler.verify_master }.to change { Thread.list.count }.by(1)
|
|
||||||
expect(fallback_handler.master).to eq(true)
|
|
||||||
|
|
||||||
fallback_handler.master = false
|
|
||||||
expect { fallback_handler.verify_master }.to_not change { Thread.list.count }
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue