# frozen_string_literal: true # Like a hash, just does its best to stay in sync across the farm # On boot all instances are blank, but they populate as various processes # fill it up require 'weakref' require 'base64' class DistributedCache class Manager def initialize(message_bus = nil) @subscribers = [] @subscribed = false @lock = Mutex.new @message_bus = message_bus || MessageBus end def subscribers @subscribers end def process_message(message) i = @subscribers.length - 1 payload = message.data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if payload["discourse_version"] != Discourse.git_version hash = current.hash(message.site_id) case payload["op"] when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end def channel_name "/distributed_hash".freeze end def ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed @message_bus.subscribe(channel_name) do |message| @lock.synchronize do process_message(message) end end @subscribed = true end end def publish(hash, message) message[:origin] = hash.identity message[:hash_key] = hash.key message[:discourse_version] = Discourse.git_version @message_bus.publish(channel_name, message, user_ids: [-1]) end def set(hash, key, value) # special support for set marshal = (Set === value || Hash === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, op: :set, key: key, value: value, marshalled: marshal) end def delete(hash, key) publish(hash, op: :delete, key: key) end def clear(hash) publish(hash, op: :clear) end def register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end end @default_manager = Manager.new def self.default_manager @default_manager end attr_reader :key def initialize(key, manager = nil) @key = key @data = {} @manager = manager || DistributedCache.default_manager @manager.ensure_subscribe! @manager.register(self) end def identity # fork resilient / multi machine identity (@seed_id ||= SecureRandom.hex) + "#{Process.pid}" end def []=(k, v) k = k.to_s if Symbol === k @manager.set(self, k, v) hash[k] = v end def [](k) k = k.to_s if Symbol === k hash[k] end def delete(k) k = k.to_s if Symbol === k @manager.delete(self, k) hash.delete(k) end def clear @manager.clear(self) hash.clear end def hash(db = nil) db ||= RailsMultisite::ConnectionManagement.current_db @data[db] ||= ThreadSafe::Hash.new end end