Correct flaky distributed cache test

make distributed cache more testable
This commit is contained in:
Sam 2017-08-01 16:11:48 -04:00
parent 513b7bf706
commit 71ad3a48c2
2 changed files with 121 additions and 95 deletions

View File

@ -1,3 +1,5 @@
# frozen_string_literal: true
# Like a hash, just does its best to stay in sync across the farm
# On boot all instances are blank, but they populate as various processes
# fill it up
@ -6,96 +8,110 @@ require 'weakref'
require 'base64'
class DistributedCache
@subscribers = []
@subscribed = false
@lock = Mutex.new
class Manager
def initialize(message_bus = nil)
@subscribers = []
@subscribed = false
@lock = Mutex.new
@message_bus = message_bus || MessageBus
end
def subscribers
@subscribers
end
def process_message(message)
i = @subscribers.length - 1
payload = message.data
while i >= 0
begin
current = @subscribers[i]
next if payload["origin"] == current.identity
next if current.key != payload["hash_key"]
next if payload["discourse_version"] != Discourse.git_version
hash = current.hash(message.site_id)
case payload["op"]
when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
when "delete" then hash.delete(payload["key"])
when "clear" then hash.clear
end
rescue WeakRef::RefError
@subscribers.delete_at(i)
ensure
i -= 1
end
end
end
def channel_name
"/distributed_hash".freeze
end
def ensure_subscribe!
return if @subscribed
@lock.synchronize do
return if @subscribed
@message_bus.subscribe(channel_name) do |message|
@lock.synchronize do
process_message(message)
end
end
@subscribed = true
end
end
def publish(hash, message)
message[:origin] = hash.identity
message[:hash_key] = hash.key
message[:discourse_version] = Discourse.git_version
@message_bus.publish(channel_name, message, user_ids: [-1])
end
def set(hash, key, value)
# special support for set
marshal = (Set === value || Hash === value)
value = Base64.encode64(Marshal.dump(value)) if marshal
publish(hash, op: :set, key: key, value: value, marshalled: marshal)
end
def delete(hash, key)
publish(hash, op: :delete, key: key)
end
def clear(hash)
publish(hash, op: :clear)
end
def register(hash)
@lock.synchronize do
@subscribers << WeakRef.new(hash)
end
end
end
@default_manager = Manager.new
def self.default_manager
@default_manager
end
attr_reader :key
def self.subscribers
@subscribers
end
def self.process_message(message)
i = @subscribers.length - 1
payload = message.data
while i >= 0
begin
current = @subscribers[i]
next if payload["origin"] == current.identity
next if current.key != payload["hash_key"]
next if payload["discourse_version"] != Discourse.git_version
hash = current.hash(message.site_id)
case payload["op"]
when "set" then hash[payload["key"]] = payload["marshalled"] ? Marshal.load(Base64.decode64(payload["value"])) : payload["value"]
when "delete" then hash.delete(payload["key"])
when "clear" then hash.clear
end
rescue WeakRef::RefError
@subscribers.delete_at(i)
ensure
i -= 1
end
end
end
def self.channel_name
"/distributed_hash".freeze
end
def self.ensure_subscribe!
return if @subscribed
@lock.synchronize do
return if @subscribed
MessageBus.subscribe(channel_name) do |message|
@lock.synchronize do
process_message(message)
end
end
@subscribed = true
end
end
def self.publish(hash, message)
message[:origin] = hash.identity
message[:hash_key] = hash.key
message[:discourse_version] = Discourse.git_version
MessageBus.publish(channel_name, message, user_ids: [-1])
end
def self.set(hash, key, value)
# special support for set
marshal = (Set === value || Hash === value)
value = Base64.encode64(Marshal.dump(value)) if marshal
publish(hash, op: :set, key: key, value: value, marshalled: marshal)
end
def self.delete(hash, key)
publish(hash, op: :delete, key: key)
end
def self.clear(hash)
publish(hash, op: :clear)
end
def self.register(hash)
@lock.synchronize do
@subscribers << WeakRef.new(hash)
end
end
def initialize(key)
DistributedCache.ensure_subscribe!
DistributedCache.register(self)
def initialize(key, manager = nil)
@key = key
@data = {}
@manager = manager || DistributedCache.default_manager
@manager.ensure_subscribe!
@manager.register(self)
end
def identity
@ -105,7 +121,7 @@ class DistributedCache
def []=(k, v)
k = k.to_s if Symbol === k
DistributedCache.set(self, k, v)
@manager.set(self, k, v)
hash[k] = v
end
@ -116,12 +132,12 @@ class DistributedCache
def delete(k)
k = k.to_s if Symbol === k
DistributedCache.delete(self, k)
@manager.delete(self, k)
hash.delete(k)
end
def clear
DistributedCache.clear(self)
@manager.clear(self)
hash.clear
end

View File

@ -4,20 +4,30 @@ require 'distributed_cache'
describe DistributedCache do
before :all do
$redis.flushall
@bus = MessageBus::Instance.new
@bus.configure(backend: :memory)
@manager = DistributedCache::Manager.new(@bus)
end
after :all do
@bus.destroy
end
def cache(name)
DistributedCache.new(name, @manager)
end
let! :cache1 do
DistributedCache.new("test")
cache("test")
end
let! :cache2 do
DistributedCache.new("test")
cache("test")
end
it 'allows us to store Set' do
c1 = DistributedCache.new("test1")
c2 = DistributedCache.new("test1")
c1 = cache("test1")
c2 = cache("test1")
set = Set.new
set << 1
@ -45,8 +55,8 @@ describe DistributedCache do
end
it 'does not leak state across caches' do
c2 = DistributedCache.new("test1")
c3 = DistributedCache.new("test1")
c2 = cache("test1")
c3 = cache("test1")
c2["hi"] = "hi"
wait_for do
c3["hi"] == "hi"