Merge pull request #4056 from tgxworld/redis_failover
FEATURE: Master-Slave Redis configuration with fallback and switch over.
This commit is contained in:
commit
b1e0da2b50
|
@ -43,6 +43,8 @@ class GlobalSetting
|
||||||
c = {}
|
c = {}
|
||||||
c[:host] = redis_host if redis_host
|
c[:host] = redis_host if redis_host
|
||||||
c[:port] = redis_port if redis_port
|
c[:port] = redis_port if redis_port
|
||||||
|
c[:slave_host] = redis_slave_host if redis_slave_host
|
||||||
|
c[:slave_port] = redis_slave_port if redis_slave_port
|
||||||
c[:password] = redis_password if redis_password.present?
|
c[:password] = redis_password if redis_password.present?
|
||||||
c[:db] = redis_db if redis_db != 0
|
c[:db] = redis_db if redis_db != 0
|
||||||
c[:db] = 1 if Rails.env == "test"
|
c[:db] = 1 if Rails.env == "test"
|
||||||
|
@ -52,6 +54,7 @@ class GlobalSetting
|
||||||
{host: host, port: port}
|
{host: host, port: port}
|
||||||
end.to_a
|
end.to_a
|
||||||
end
|
end
|
||||||
|
c[:connector] = DiscourseRedis::Connector
|
||||||
c.freeze
|
c.freeze
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -95,6 +95,12 @@ redis_host = localhost
|
||||||
# redis server port
|
# redis server port
|
||||||
redis_port = 6379
|
redis_port = 6379
|
||||||
|
|
||||||
|
# redis slave server address
|
||||||
|
redis_slave_host =
|
||||||
|
|
||||||
|
# redis slave server port
|
||||||
|
redis_slave_port =
|
||||||
|
|
||||||
# redis database
|
# redis database
|
||||||
redis_db = 0
|
redis_db = 0
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,18 @@
|
||||||
|
# https://github.com/redis/redis-rb/pull/591
|
||||||
|
class Redis
|
||||||
|
class Client
|
||||||
|
alias_method :old_initialize, :initialize
|
||||||
|
|
||||||
|
def initialize(options = {})
|
||||||
|
old_initialize(options)
|
||||||
|
|
||||||
|
if options.include?(:connector) && options[:connector].is_a?(Class)
|
||||||
|
@connector = options[:connector].new(@options)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
if Rails.env.development? && ENV['DISCOURSE_FLUSH_REDIS']
|
if Rails.env.development? && ENV['DISCOURSE_FLUSH_REDIS']
|
||||||
puts "Flushing redis (development mode)"
|
puts "Flushing redis (development mode)"
|
||||||
$redis.flushall
|
$redis.flushall
|
||||||
|
|
|
@ -3,6 +3,93 @@
|
||||||
#
|
#
|
||||||
require_dependency 'cache'
|
require_dependency 'cache'
|
||||||
class DiscourseRedis
|
class DiscourseRedis
|
||||||
|
class FallbackHandler
|
||||||
|
include Singleton
|
||||||
|
|
||||||
|
MASTER_LINK_STATUS = "master_link_status:up".freeze
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@master = true
|
||||||
|
@running = false
|
||||||
|
@mutex = Mutex.new
|
||||||
|
@slave_config = DiscourseRedis.slave_config
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_master
|
||||||
|
synchronize do
|
||||||
|
return if @running && !recently_checked?
|
||||||
|
@running = true
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.new { initiate_fallback_to_master }
|
||||||
|
end
|
||||||
|
|
||||||
|
def initiate_fallback_to_master
|
||||||
|
begin
|
||||||
|
slave_client = ::Redis::Client.new(@slave_config)
|
||||||
|
|
||||||
|
if slave_client.call([:info]).split("\r\n").include?(MASTER_LINK_STATUS)
|
||||||
|
slave_client.call([:client, [:kill, 'type', 'normal']])
|
||||||
|
Discourse.clear_readonly!
|
||||||
|
Discourse.request_refresh!
|
||||||
|
@master = true
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
@running = false
|
||||||
|
@last_checked = Time.zone.now
|
||||||
|
slave_client.disconnect
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def master
|
||||||
|
synchronize { @master }
|
||||||
|
end
|
||||||
|
|
||||||
|
def master=(args)
|
||||||
|
synchronize { @master = args }
|
||||||
|
end
|
||||||
|
|
||||||
|
def recently_checked?
|
||||||
|
if @last_checked
|
||||||
|
Time.zone.now > (@last_checked + 5.seconds)
|
||||||
|
else
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def synchronize
|
||||||
|
@mutex.synchronize { yield }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Connector < Redis::Client::Connector
|
||||||
|
MASTER = 'master'.freeze
|
||||||
|
SLAVE = 'slave'.freeze
|
||||||
|
|
||||||
|
def initialize(options)
|
||||||
|
super(options)
|
||||||
|
@slave_options = DiscourseRedis.slave_config(options)
|
||||||
|
@fallback_handler = DiscourseRedis::FallbackHandler.instance
|
||||||
|
end
|
||||||
|
|
||||||
|
def resolve
|
||||||
|
begin
|
||||||
|
options = @options.dup
|
||||||
|
options.delete(:connector)
|
||||||
|
client = ::Redis::Client.new(options)
|
||||||
|
client.call([:role])[0]
|
||||||
|
@options
|
||||||
|
rescue Redis::ConnectionError, Redis::CannotConnectError => ex
|
||||||
|
return @slave_options if !@fallback_handler.master
|
||||||
|
@fallback_handler.master = false
|
||||||
|
raise ex
|
||||||
|
ensure
|
||||||
|
client.disconnect
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def self.raw_connection(config = nil)
|
def self.raw_connection(config = nil)
|
||||||
config ||= self.config
|
config ||= self.config
|
||||||
|
@ -13,11 +100,19 @@ class DiscourseRedis
|
||||||
GlobalSetting.redis_config
|
GlobalSetting.redis_config
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.slave_config(options = config)
|
||||||
|
options.dup.merge!({ host: options[:slave_host], port: options[:slave_port] })
|
||||||
|
end
|
||||||
|
|
||||||
def initialize(config=nil)
|
def initialize(config=nil)
|
||||||
@config = config || DiscourseRedis.config
|
@config = config || DiscourseRedis.config
|
||||||
@redis = DiscourseRedis.raw_connection(@config)
|
@redis = DiscourseRedis.raw_connection(@config)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.fallback_handler
|
||||||
|
@fallback_handler ||= DiscourseRedis::FallbackHandler.instance
|
||||||
|
end
|
||||||
|
|
||||||
def without_namespace
|
def without_namespace
|
||||||
# Only use this if you want to store and fetch data that's shared between sites
|
# Only use this if you want to store and fetch data that's shared between sites
|
||||||
@redis
|
@redis
|
||||||
|
@ -30,6 +125,8 @@ class DiscourseRedis
|
||||||
unless Discourse.recently_readonly?
|
unless Discourse.recently_readonly?
|
||||||
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
|
STDERR.puts "WARN: Redis is in a readonly state. Performed a noop"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
fallback_handler.verify_master if !fallback_handler.master
|
||||||
Discourse.received_readonly!
|
Discourse.received_readonly!
|
||||||
else
|
else
|
||||||
raise ex
|
raise ex
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
describe DiscourseRedis do
|
||||||
|
let(:slave_host) { 'testhost' }
|
||||||
|
let(:slave_port) { 1234 }
|
||||||
|
|
||||||
|
let(:config) do
|
||||||
|
DiscourseRedis.config.dup.merge({
|
||||||
|
slave_host: 'testhost', slave_port: 1234, connector: DiscourseRedis::Connector
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:fallback_handler) { DiscourseRedis::FallbackHandler.instance }
|
||||||
|
|
||||||
|
context '.slave_host' do
|
||||||
|
it 'should return the right config' do
|
||||||
|
slave_config = DiscourseRedis.slave_config(config)
|
||||||
|
expect(slave_config[:host]).to eq(slave_host)
|
||||||
|
expect(slave_config[:port]).to eq(slave_port)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when redis connection is to a slave redis server' do
|
||||||
|
it 'should check the status of the master server' do
|
||||||
|
begin
|
||||||
|
fallback_handler.master = false
|
||||||
|
$redis.without_namespace.expects(:get).raises(Redis::CommandError.new("READONLY"))
|
||||||
|
fallback_handler.expects(:verify_master).once
|
||||||
|
$redis.get('test')
|
||||||
|
ensure
|
||||||
|
fallback_handler.master = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe DiscourseRedis::Connector do
|
||||||
|
let(:connector) { DiscourseRedis::Connector.new(config) }
|
||||||
|
|
||||||
|
it 'should return the master config when master is up' do
|
||||||
|
expect(connector.resolve).to eq(config)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should return the slave config when master is down' do
|
||||||
|
begin
|
||||||
|
Redis::Client.any_instance.expects(:call).raises(Redis::CannotConnectError).twice
|
||||||
|
expect { connector.resolve }.to raise_error(Redis::CannotConnectError)
|
||||||
|
|
||||||
|
config = connector.resolve
|
||||||
|
|
||||||
|
expect(config[:host]).to eq(slave_host)
|
||||||
|
expect(config[:port]).to eq(slave_port)
|
||||||
|
ensure
|
||||||
|
fallback_handler.master = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe DiscourseRedis::FallbackHandler do
|
||||||
|
describe '#initiate_fallback_to_master' do
|
||||||
|
it 'should fallback to the master server once it is up' do
|
||||||
|
begin
|
||||||
|
fallback_handler.master = false
|
||||||
|
Redis::Client.any_instance.expects(:call).with([:info]).returns(DiscourseRedis::FallbackHandler::MASTER_LINK_STATUS)
|
||||||
|
Redis::Client.any_instance.expects(:call).with([:client, [:kill, 'type', 'normal']])
|
||||||
|
|
||||||
|
fallback_handler.initiate_fallback_to_master
|
||||||
|
|
||||||
|
expect(fallback_handler.master).to eq(true)
|
||||||
|
expect(Discourse.recently_readonly?).to eq(false)
|
||||||
|
ensure
|
||||||
|
fallback_handler.master = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue