mirror of
https://github.com/discourse/discourse.git
synced 2025-02-07 20:08:26 +00:00
Spawn a single thread that checks for PostgreSQL fallback.
This commit is contained in:
parent
8c6d8c85db
commit
e8a3043129
@ -6,23 +6,46 @@ class PostgreSQLFallbackHandler
|
|||||||
include Singleton
|
include Singleton
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@master = {}
|
@masters_down = {}
|
||||||
@running = {}
|
@mutex = Mutex.new
|
||||||
@mutex = {}
|
|
||||||
@last_check = {}
|
|
||||||
|
|
||||||
setup!
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def verify_master
|
def verify_master
|
||||||
@mutex[namespace].synchronize do
|
synchronize { return if @thread && @thread.alive? }
|
||||||
return if running || recently_checked?
|
|
||||||
@running[namespace] = true
|
@thread = Thread.new do
|
||||||
|
while true do
|
||||||
|
begin
|
||||||
|
thread = Thread.new { initiate_fallback_to_master }
|
||||||
|
thread.join
|
||||||
|
break if synchronize { @masters_down.empty? }
|
||||||
|
sleep 10
|
||||||
|
ensure
|
||||||
|
thread.kill
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
current_namespace = namespace
|
def master_down?
|
||||||
Thread.new do
|
synchronize { @masters_down[namespace] }
|
||||||
RailsMultisite::ConnectionManagement.with_connection(current_namespace) do
|
end
|
||||||
|
|
||||||
|
def master_down=(args)
|
||||||
|
synchronize { @masters_down[namespace] = args }
|
||||||
|
end
|
||||||
|
|
||||||
|
def master_up(namespace)
|
||||||
|
synchronize { @masters_down.delete(namespace) }
|
||||||
|
end
|
||||||
|
|
||||||
|
def running?
|
||||||
|
synchronize { @thread.alive? }
|
||||||
|
end
|
||||||
|
|
||||||
|
def initiate_fallback_to_master
|
||||||
|
@masters_down.keys.each do |key|
|
||||||
|
RailsMultisite::ConnectionManagement.with_connection(key) do
|
||||||
begin
|
begin
|
||||||
logger.warn "#{log_prefix}: Checking master server..."
|
logger.warn "#{log_prefix}: Checking master server..."
|
||||||
connection = ActiveRecord::Base.postgresql_connection(config)
|
connection = ActiveRecord::Base.postgresql_connection(config)
|
||||||
@ -32,54 +55,20 @@ class PostgreSQLFallbackHandler
|
|||||||
ActiveRecord::Base.clear_all_connections!
|
ActiveRecord::Base.clear_all_connections!
|
||||||
logger.warn "#{log_prefix}: Master server is active. Reconnecting..."
|
logger.warn "#{log_prefix}: Master server is active. Reconnecting..."
|
||||||
|
|
||||||
if namespace == RailsMultisite::ConnectionManagement::DEFAULT
|
self.master_up(key)
|
||||||
ActiveRecord::Base.establish_connection(config)
|
|
||||||
else
|
|
||||||
RailsMultisite::ConnectionManagement.establish_connection(db: namespace)
|
|
||||||
end
|
|
||||||
|
|
||||||
Discourse.disable_readonly_mode
|
Discourse.disable_readonly_mode
|
||||||
self.master = true
|
|
||||||
end
|
end
|
||||||
rescue => e
|
rescue => e
|
||||||
if e.message.include?("could not connect to server")
|
byebug
|
||||||
logger.warn "#{log_prefix}: Connection to master PostgreSQL server failed with '#{e.message}'"
|
logger.warn "#{log_prefix}: Connection to master PostgreSQL server failed with '#{e.message}'"
|
||||||
else
|
|
||||||
raise e
|
|
||||||
end
|
|
||||||
ensure
|
|
||||||
@mutex[namespace].synchronize do
|
|
||||||
@last_check[namespace] = Time.zone.now
|
|
||||||
@running[namespace] = false
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def master
|
# Use for testing
|
||||||
@master[namespace]
|
|
||||||
end
|
|
||||||
|
|
||||||
def master=(args)
|
|
||||||
@master[namespace] = args
|
|
||||||
end
|
|
||||||
|
|
||||||
def running
|
|
||||||
@running[namespace]
|
|
||||||
end
|
|
||||||
|
|
||||||
def setup!
|
def setup!
|
||||||
RailsMultisite::ConnectionManagement.all_dbs.each do |db|
|
@masters_down = {}
|
||||||
@master[db] = true
|
|
||||||
@running[db] = false
|
|
||||||
@mutex[db] = Mutex.new
|
|
||||||
@last_check[db] = nil
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def verify?
|
|
||||||
!master && !running
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
@ -96,17 +85,13 @@ class PostgreSQLFallbackHandler
|
|||||||
"#{self.class} [#{namespace}]"
|
"#{self.class} [#{namespace}]"
|
||||||
end
|
end
|
||||||
|
|
||||||
def recently_checked?
|
|
||||||
if @last_check[namespace]
|
|
||||||
Time.zone.now <= (@last_check[namespace] + 5.seconds)
|
|
||||||
else
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def namespace
|
def namespace
|
||||||
RailsMultisite::ConnectionManagement.current_db
|
RailsMultisite::ConnectionManagement.current_db
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def synchronize
|
||||||
|
@mutex.synchronize { yield }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
module ActiveRecord
|
module ActiveRecord
|
||||||
@ -115,7 +100,7 @@ module ActiveRecord
|
|||||||
fallback_handler = ::PostgreSQLFallbackHandler.instance
|
fallback_handler = ::PostgreSQLFallbackHandler.instance
|
||||||
config = config.symbolize_keys
|
config = config.symbolize_keys
|
||||||
|
|
||||||
if fallback_handler.verify?
|
if fallback_handler.master_down?
|
||||||
connection = postgresql_connection(config.dup.merge({
|
connection = postgresql_connection(config.dup.merge({
|
||||||
host: config[:replica_host], port: config[:replica_port]
|
host: config[:replica_host], port: config[:replica_port]
|
||||||
}))
|
}))
|
||||||
@ -126,7 +111,8 @@ module ActiveRecord
|
|||||||
begin
|
begin
|
||||||
connection = postgresql_connection(config)
|
connection = postgresql_connection(config)
|
||||||
rescue PG::ConnectionBad => e
|
rescue PG::ConnectionBad => e
|
||||||
fallback_handler.master = false
|
fallback_handler.master_down = true
|
||||||
|
fallback_handler.verify_master
|
||||||
raise e
|
raise e
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -141,20 +127,4 @@ module ActiveRecord
|
|||||||
raise "Replica database server is not in recovery mode." if value == 'f'
|
raise "Replica database server is not in recovery mode." if value == 'f'
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
module ConnectionAdapters
|
|
||||||
class PostgreSQLAdapter
|
|
||||||
set_callback :checkout, :before, :switch_back?
|
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
def fallback_handler
|
|
||||||
@fallback_handler ||= ::PostgreSQLFallbackHandler.instance
|
|
||||||
end
|
|
||||||
|
|
||||||
def switch_back?
|
|
||||||
fallback_handler.verify_master if fallback_handler.verify?
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
@ -228,6 +228,7 @@ module Discourse
|
|||||||
|
|
||||||
def self.keep_readonly_mode
|
def self.keep_readonly_mode
|
||||||
# extend the expiry by 1 minute every 30 seconds
|
# extend the expiry by 1 minute every 30 seconds
|
||||||
|
unless Rails.env.test?
|
||||||
Thread.new do
|
Thread.new do
|
||||||
while readonly_mode?
|
while readonly_mode?
|
||||||
$redis.expire(READONLY_MODE_KEY, READONLY_MODE_KEY_TTL)
|
$redis.expire(READONLY_MODE_KEY, READONLY_MODE_KEY_TTL)
|
||||||
@ -235,6 +236,7 @@ module Discourse
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def self.disable_readonly_mode(user_enabled: false)
|
def self.disable_readonly_mode(user_enabled: false)
|
||||||
key = user_enabled ? USER_READONLY_MODE_KEY : READONLY_MODE_KEY
|
key = user_enabled ? USER_READONLY_MODE_KEY : READONLY_MODE_KEY
|
||||||
|
@ -48,8 +48,9 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
end
|
end
|
||||||
|
|
||||||
it 'should failover to a replica server' do
|
it 'should failover to a replica server' do
|
||||||
|
current_threads = Thread.list
|
||||||
|
|
||||||
RailsMultisite::ConnectionManagement.stubs(:all_dbs).returns(['default', multisite_db])
|
RailsMultisite::ConnectionManagement.stubs(:all_dbs).returns(['default', multisite_db])
|
||||||
::PostgreSQLFallbackHandler.instance.setup!
|
|
||||||
|
|
||||||
[config, multisite_config].each do |configuration|
|
[config, multisite_config].each do |configuration|
|
||||||
ActiveRecord::Base.expects(:postgresql_connection).with(configuration).raises(PG::ConnectionBad)
|
ActiveRecord::Base.expects(:postgresql_connection).with(configuration).raises(PG::ConnectionBad)
|
||||||
@ -60,7 +61,7 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
})).returns(@replica_connection)
|
})).returns(@replica_connection)
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(postgresql_fallback_handler.master).to eq(true)
|
expect(postgresql_fallback_handler.master_down?).to eq(nil)
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
expect { ActiveRecord::Base.postgresql_fallback_connection(config) }
|
||||||
.to raise_error(PG::ConnectionBad)
|
.to raise_error(PG::ConnectionBad)
|
||||||
@ -68,10 +69,10 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
expect{ ActiveRecord::Base.postgresql_fallback_connection(config) }
|
expect{ ActiveRecord::Base.postgresql_fallback_connection(config) }
|
||||||
.to change{ Discourse.readonly_mode? }.from(false).to(true)
|
.to change{ Discourse.readonly_mode? }.from(false).to(true)
|
||||||
|
|
||||||
expect(postgresql_fallback_handler.master).to eq(false)
|
expect(postgresql_fallback_handler.master_down?).to eq(true)
|
||||||
|
|
||||||
with_multisite_db(multisite_db) do
|
with_multisite_db(multisite_db) do
|
||||||
expect(postgresql_fallback_handler.master).to eq(true)
|
expect(postgresql_fallback_handler.master_down?).to eq(nil)
|
||||||
|
|
||||||
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
expect { ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
||||||
.to raise_error(PG::ConnectionBad)
|
.to raise_error(PG::ConnectionBad)
|
||||||
@ -79,30 +80,18 @@ describe ActiveRecord::ConnectionHandling do
|
|||||||
expect{ ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
expect{ ActiveRecord::Base.postgresql_fallback_connection(multisite_config) }
|
||||||
.to change{ Discourse.readonly_mode? }.from(false).to(true)
|
.to change{ Discourse.readonly_mode? }.from(false).to(true)
|
||||||
|
|
||||||
expect(postgresql_fallback_handler.master).to eq(false)
|
expect(postgresql_fallback_handler.master_down?).to eq(true)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
postgresql_fallback_handler.master_up(multisite_db)
|
||||||
|
|
||||||
ActiveRecord::Base.unstub(:postgresql_connection)
|
ActiveRecord::Base.unstub(:postgresql_connection)
|
||||||
|
|
||||||
current_threads = Thread.list
|
postgresql_fallback_handler.initiate_fallback_to_master
|
||||||
|
|
||||||
expect{ ActiveRecord::Base.connection_pool.checkout }
|
|
||||||
.to change{ Thread.list.size }.by(1)
|
|
||||||
|
|
||||||
# Ensure that we don't try to connect back to the replica when a thread
|
|
||||||
# is running
|
|
||||||
begin
|
|
||||||
ActiveRecord::Base.postgresql_fallback_connection(config)
|
|
||||||
rescue PG::ConnectionBad => e
|
|
||||||
# This is expected if the thread finishes before the above is called.
|
|
||||||
end
|
|
||||||
|
|
||||||
# Wait for the thread to finish execution
|
|
||||||
(Thread.list - current_threads).each(&:join)
|
|
||||||
|
|
||||||
expect(Discourse.readonly_mode?).to eq(false)
|
expect(Discourse.readonly_mode?).to eq(false)
|
||||||
|
|
||||||
expect(PostgreSQLFallbackHandler.instance.master).to eq(true)
|
expect(postgresql_fallback_handler.master_down?).to eq(nil)
|
||||||
|
|
||||||
expect(ActiveRecord::Base.connection_pool.connections.count).to eq(0)
|
expect(ActiveRecord::Base.connection_pool.connections.count).to eq(0)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user