# frozen_string_literal: true # # A wrapper around redis that namespaces keys with the current site id # class DiscourseRedis class FallbackHandler include Singleton MASTER_ROLE_STATUS = "role:master" MASTER_LOADING_STATUS = "loading:1" MASTER_LOADED_STATUS = "loading:0" CONNECTION_TYPES = %w{normal pubsub} def initialize @master = true @running = false @mutex = Mutex.new @slave_config = DiscourseRedis.slave_config @message_bus_keepalive_interval = MessageBus.keepalive_interval end def verify_master synchronize do return if @thread && @thread.alive? @thread = Thread.new do loop do begin thread = Thread.new { initiate_fallback_to_master } thread.join break if synchronize { @master } sleep 5 ensure thread.kill end end end end end def initiate_fallback_to_master success = false begin redis_config = DiscourseRedis.config.dup redis_config.delete(:connector) master_client = ::Redis::Client.new(redis_config) logger.warn "#{log_prefix}: Checking connection to master server..." info = master_client.call([:info]) if info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS) begin logger.warn "#{log_prefix}: Master server is active, killing all connections to slave..." self.master = true slave_client = ::Redis::Client.new(@slave_config) CONNECTION_TYPES.each do |connection_type| slave_client.call([:client, [:kill, 'type', connection_type]]) end MessageBus.keepalive_interval = @message_bus_keepalive_interval Discourse.clear_readonly! Discourse.request_refresh! success = true ensure slave_client&.disconnect end end rescue => e logger.warn "#{log_prefix}: Connection to Master server failed with '#{e.message}'" ensure master_client&.disconnect end success end def master synchronize { @master } end def master=(args) synchronize do @master = args # Disables MessageBus keepalive when Redis is in readonly mode MessageBus.keepalive_interval = 0 if !@master end end private def synchronize @mutex.synchronize { yield } end def logger Rails.logger end def log_prefix "#{self.class}" end end class Connector < Redis::Client::Connector def initialize(options) super(options) @slave_options = DiscourseRedis.slave_config(options) @fallback_handler = DiscourseRedis::FallbackHandler.instance end def resolve(client = nil) if !@fallback_handler.master @fallback_handler.verify_master return @slave_options end begin options = @options.dup options.delete(:connector) client ||= Redis::Client.new(options) loading = client.call([:info, :persistence]).include?( DiscourseRedis::FallbackHandler::MASTER_LOADING_STATUS ) loading ? @slave_options : @options rescue Redis::ConnectionError, Redis::CannotConnectError, RuntimeError => ex raise ex if ex.class == RuntimeError && ex.message != "Name or service not known" @fallback_handler.master = false @fallback_handler.verify_master raise ex ensure client.disconnect end end end def self.raw_connection(config = nil) config ||= self.config Redis.new(config) end def self.config GlobalSetting.redis_config end def self.slave_config(options = config) options.dup.merge!(host: options[:slave_host], port: options[:slave_port]) end def initialize(config = nil, namespace: true) @config = config || DiscourseRedis.config @redis = DiscourseRedis.raw_connection(@config.dup) @namespace = namespace end def self.fallback_handler @fallback_handler ||= DiscourseRedis::FallbackHandler.instance end def without_namespace # Only use this if you want to store and fetch data that's shared between sites @redis end def self.ignore_readonly yield rescue Redis::CommandError => ex if ex.message =~ /READONLY/ if !ENV["REDIS_RAILS_FAILOVER"] fallback_handler.verify_master if !fallback_handler.master end Discourse.received_redis_readonly! nil else raise ex end end # prefix the key with the namespace def method_missing(meth, *args, &block) if @redis.respond_to?(meth) DiscourseRedis.ignore_readonly { @redis.public_send(meth, *args, &block) } else super end end # Proxy key methods through, but prefix the keys with the namespace [:append, :blpop, :brpop, :brpoplpush, :decr, :decrby, :expire, :expireat, :get, :getbit, :getrange, :getset, :hdel, :hexists, :hget, :hgetall, :hincrby, :hincrbyfloat, :hkeys, :hlen, :hmget, :hmset, :hset, :hsetnx, :hvals, :incr, :incrby, :incrbyfloat, :lindex, :linsert, :llen, :lpop, :lpush, :lpushx, :lrange, :lrem, :lset, :ltrim, :mapped_hmset, :mapped_hmget, :mapped_mget, :mapped_mset, :mapped_msetnx, :move, :mset, :msetnx, :persist, :pexpire, :pexpireat, :psetex, :pttl, :rename, :renamenx, :rpop, :rpoplpush, :rpush, :rpushx, :sadd, :scard, :sdiff, :set, :setbit, :setex, :setnx, :setrange, :sinter, :sismember, :smembers, :sort, :spop, :srandmember, :srem, :strlen, :sunion, :ttl, :type, :watch, :zadd, :zcard, :zcount, :zincrby, :zrange, :zrangebyscore, :zrank, :zrem, :zremrangebyrank, :zremrangebyscore, :zrevrange, :zrevrangebyscore, :zrevrank, :zrangebyscore ].each do |m| define_method m do |*args| args[0] = "#{namespace}:#{args[0]}" if @namespace DiscourseRedis.ignore_readonly { @redis.public_send(m, *args) } end end # Implement our own because https://github.com/redis/redis-rb/issues/698 has stalled def exists(*keys) keys.map! { |a| "#{namespace}:#{a}" } if @namespace DiscourseRedis.ignore_readonly do @redis.synchronize do |client| client.call([:exists, *keys]) do |value| value > 0 end end end end def mget(*args) args.map! { |a| "#{namespace}:#{a}" } if @namespace DiscourseRedis.ignore_readonly { @redis.mget(*args) } end def del(k) DiscourseRedis.ignore_readonly do k = "#{namespace}:#{k}" if @namespace @redis.del k end end def scan_each(options = {}, &block) DiscourseRedis.ignore_readonly do match = options[:match].presence || '*' options[:match] = if @namespace "#{namespace}:#{match}" else match end if block @redis.scan_each(options) do |key| key = remove_namespace(key) if @namespace block.call(key) end else @redis.scan_each(options).map do |key| key = remove_namespace(key) if @namespace key end end end end def keys(pattern = nil) DiscourseRedis.ignore_readonly do pattern = pattern || '*' pattern = "#{namespace}:#{pattern}" if @namespace keys = @redis.keys(pattern) if @namespace len = namespace.length + 1 keys.map! { |k| k[len..-1] } end keys end end def delete_prefixed(prefix) DiscourseRedis.ignore_readonly do keys("#{prefix}*").each { |k| Discourse.redis.del(k) } end end def reconnect @redis._client.reconnect end def namespace_key(key) if @namespace "#{namespace}:#{key}" else key end end def namespace RailsMultisite::ConnectionManagement.current_db end def self.namespace Rails.logger.warn("DiscourseRedis.namespace is going to be deprecated, do not use it!") RailsMultisite::ConnectionManagement.current_db end def self.new_redis_store Cache.new end private def remove_namespace(key) key[(namespace.length + 1)..-1] end end