# Like a hash, just does its best to stay in sync across the farm # On boot all instances are blank, but they populate as various processes # fill it up require 'weakref' require 'base64' class DistributedCache @subscribers = [] @subscribed = false @lock = Mutex.new attr_reader :key def self.subscribers @subscribers end def self.process_message(message) i = @subscribers.length-1 payload = message.data while i >= 0 begin current = @subscribers[i] next if payload["origin"] == current.identity next if current.key != payload["hash_key"] next if payload["discourse_version"] != Discourse.git_version hash = current.hash(message.site_id) case payload["op"] when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"] when "delete" then hash.delete(payload["key"]) when "clear" then hash.clear end rescue WeakRef::RefError @subscribers.delete_at(i) ensure i -= 1 end end end def self.channel_name "/distributed_hash".freeze end def self.ensure_subscribe! return if @subscribed @lock.synchronize do return if @subscribed MessageBus.subscribe(channel_name) do |message| @lock.synchronize do process_message(message) end end @subscribed = true end end def self.publish(hash, message) message[:origin] = hash.identity message[:hash_key] = hash.key message[:discourse_version] = Discourse.git_version MessageBus.publish(channel_name, message, { user_ids: [-1] }) end def self.set(hash, key, value) # special support for set marshal = (Set === value || Hash === value) value = Base64.encode64(Marshal.dump(value)) if marshal publish(hash, { op: :set, key: key, value: value, marshalled: marshal }) end def self.delete(hash, key) publish(hash, { op: :delete, key: key }) end def self.clear(hash) publish(hash, { op: :clear }) end def self.register(hash) @lock.synchronize do @subscribers << WeakRef.new(hash) end end def initialize(key) DistributedCache.ensure_subscribe! DistributedCache.register(self) @key = key @data = {} end def identity # fork resilient / multi machine identity (@seed_id ||= SecureRandom.hex) + "#{Process.pid}" end def []=(k,v) k = k.to_s if Symbol === k DistributedCache.set(self, k, v) hash[k] = v end def [](k) k = k.to_s if Symbol === k hash[k] end def delete(k) k = k.to_s if Symbol === k DistributedCache.delete(self, k) hash.delete(k) end def clear DistributedCache.clear(self) hash.clear end def hash(db = nil) db ||= RailsMultisite::ConnectionManagement.current_db @data[db] ||= ThreadSafe::Hash.new end end