DEV: Make DistributedMemoizer use DistributedMutex (#16229)
Its implementation was already distributed-mutex-like, with slight differences that did not seem necessary.
This commit is contained in:
parent
14778ba52e
commit
3c44bed545
|
@ -1,72 +1,31 @@
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
class DistributedMemoizer
|
class DistributedMemoizer
|
||||||
|
|
||||||
# never wait for longer that 1 second for a cross process lock
|
# never wait for longer that 1 second for a cross process lock
|
||||||
MAX_WAIT = 2
|
MAX_WAIT = 1
|
||||||
LOCK = Mutex.new
|
|
||||||
|
|
||||||
# memoize a key across processes and machines
|
# memoize a key across processes and machines
|
||||||
def self.memoize(key, duration = 60 * 60 * 24, redis = nil)
|
def self.memoize(key, duration = 60 * 60 * 24, redis = Discourse.redis)
|
||||||
redis ||= Discourse.redis
|
redis_lock_key = self.redis_lock_key(key)
|
||||||
|
|
||||||
redis_key = self.redis_key(key)
|
redis_key = self.redis_key(key)
|
||||||
|
|
||||||
unless result = redis.get(redis_key)
|
DistributedMutex.synchronize(redis_lock_key, redis: redis, validity: MAX_WAIT) do
|
||||||
redis_lock_key = self.redis_lock_key(key)
|
result = redis.get(redis_key)
|
||||||
|
|
||||||
start = Time.now
|
unless result
|
||||||
got_lock = false
|
result = yield
|
||||||
|
redis.setex(redis_key, duration, result)
|
||||||
begin
|
|
||||||
while Time.now < start + MAX_WAIT && !got_lock
|
|
||||||
LOCK.synchronize do
|
|
||||||
got_lock = get_lock(redis, redis_lock_key)
|
|
||||||
end
|
|
||||||
sleep 0.001
|
|
||||||
end
|
|
||||||
|
|
||||||
unless result = redis.get(redis_key)
|
|
||||||
result = yield
|
|
||||||
redis.setex(redis_key, duration, result)
|
|
||||||
end
|
|
||||||
|
|
||||||
ensure
|
|
||||||
# NOTE: delete regardless so next one in does not need to wait MAX_WAIT again
|
|
||||||
redis.del(redis_lock_key)
|
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
result
|
result
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.redis_lock_key(key)
|
def self.redis_lock_key(key)
|
||||||
+"memoize_lock_" << key
|
"memoize_lock_#{key}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.redis_key(key)
|
def self.redis_key(key)
|
||||||
+"memoize_" << key
|
"memoize_#{key}"
|
||||||
end
|
|
||||||
|
|
||||||
# Used for testing
|
|
||||||
def self.flush!
|
|
||||||
Discourse.redis.scan_each(match: "memoize_*").each { |key| Discourse.redis.del(key) }
|
|
||||||
end
|
|
||||||
|
|
||||||
protected
|
|
||||||
|
|
||||||
def self.get_lock(redis, redis_lock_key)
|
|
||||||
redis.watch(redis_lock_key)
|
|
||||||
current = redis.get(redis_lock_key)
|
|
||||||
return false if current
|
|
||||||
|
|
||||||
unique = SecureRandom.hex
|
|
||||||
|
|
||||||
result = redis.multi do
|
|
||||||
redis.setex(redis_lock_key, MAX_WAIT, unique)
|
|
||||||
end
|
|
||||||
|
|
||||||
redis.unwatch
|
|
||||||
result == ["OK"]
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,35 +1,25 @@
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
describe DistributedMemoizer do
|
describe DistributedMemoizer do
|
||||||
|
after do
|
||||||
before do
|
|
||||||
Discourse.redis.del(DistributedMemoizer.redis_key("hello"))
|
Discourse.redis.del(DistributedMemoizer.redis_key("hello"))
|
||||||
Discourse.redis.del(DistributedMemoizer.redis_lock_key("hello"))
|
Discourse.redis.del(DistributedMemoizer.redis_lock_key("hello"))
|
||||||
Discourse.redis.unwatch
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# NOTE we could use a mock redis here, but I think it makes sense to test the real thing
|
|
||||||
# let(:mock_redis) { MockRedis.new }
|
|
||||||
|
|
||||||
def memoize(&block)
|
def memoize(&block)
|
||||||
DistributedMemoizer.memoize("hello", duration = 120, &block)
|
DistributedMemoizer.memoize("hello", duration = 120, &block)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "returns the value of a block" do
|
it "returns the value of a block" do
|
||||||
expect(memoize do
|
expect(memoize { "abc" }).to eq("abc")
|
||||||
"abc"
|
|
||||||
end).to eq("abc")
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it "return the old value once memoized" do
|
it "return the old value once memoized" do
|
||||||
|
|
||||||
memoize do
|
memoize do
|
||||||
"abc"
|
"abc"
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(memoize do
|
expect(memoize { "world" }).to eq("abc")
|
||||||
"world"
|
|
||||||
end).to eq("abc")
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it "memoizes correctly when used concurrently" do
|
it "memoizes correctly when used concurrently" do
|
||||||
|
@ -48,7 +38,5 @@ describe DistributedMemoizer do
|
||||||
threads.each(&:join)
|
threads.each(&:join)
|
||||||
expect(results.uniq.length).to eq(1)
|
expect(results.uniq.length).to eq(1)
|
||||||
expect(results.count).to eq(5)
|
expect(results.count).to eq(5)
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -11,12 +11,10 @@ describe StaticController do
|
||||||
UploadCreator.new(file, filename).create_for(Discourse.system_user.id)
|
UploadCreator.new(file, filename).create_for(Discourse.system_user.id)
|
||||||
end
|
end
|
||||||
|
|
||||||
before_all do
|
|
||||||
DistributedMemoizer.flush!
|
|
||||||
end
|
|
||||||
|
|
||||||
after do
|
after do
|
||||||
DistributedMemoizer.flush!
|
Discourse.redis.scan_each(match: "memoize_*").each do |key|
|
||||||
|
Discourse.redis.del(key)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe 'local store' do
|
describe 'local store' do
|
||||||
|
|
Loading…
Reference in New Issue