From 87353faac6b90d11e1285907fb665be90ed018d4 Mon Sep 17 00:00:00 2001 From: Jarek Radosz Date: Mon, 11 Jul 2022 14:16:37 +0200 Subject: [PATCH] DEV: Implement distributed mutex in lua (#16228) The rationale behind this was mostly to stop using `redis.synchronize` (now removed in redis gem 4.6) --- lib/distributed_mutex.rb | 104 ++++++++++++----------------- spec/lib/distributed_mutex_spec.rb | 28 +++----- 2 files changed, 51 insertions(+), 81 deletions(-) diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index 824a81599d6..b724948d231 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -1,10 +1,34 @@ # frozen_string_literal: true # Cross-process locking using Redis. -# # Expiration happens when the current time is greater than the expire time class DistributedMutex - DEFAULT_VALIDITY ||= 60 + DEFAULT_VALIDITY = 60 + CHECK_READONLY_ATTEMPTS = 10 + + LOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA + local now = redis.call("time")[1] + local expire_time = now + ARGV[1] + local current_expire_time = redis.call("get", KEYS[1]) + + if current_expire_time and tonumber(now) <= tonumber(current_expire_time) then + return nil + else + local result = redis.call("setex", KEYS[1], ARGV[1] + 1, tostring(expire_time)) + return expire_time + end + LUA + + UNLOCK_SCRIPT = DiscourseRedis::EvalHelper.new <<~LUA + local current_expire_time = redis.call("get", KEYS[1]) + + if current_expire_time == ARGV[1] then + local result = redis.call("del", KEYS[1]) + return result ~= nil + else + return false + end + LUA def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk) self.new( @@ -22,26 +46,29 @@ class DistributedMutex @validity = validity end - CHECK_READONLY_ATTEMPT ||= 10 - # NOTE wrapped in mutex to maintain its semantics def synchronize + result = nil + @mutex.synchronize do expire_time = get_lock begin - yield + result = yield ensure current_time = redis.time[0] if current_time > expire_time warn("held for too long, expected max: #{@validity} secs, took an extra #{current_time - expire_time} secs") end - if !unlock(expire_time) && current_time <= expire_time + unlocked = UNLOCK_SCRIPT.eval(redis, [prefixed_key], [expire_time.to_s]) + if !unlocked && current_time <= expire_time warn("the redis key appears to have been tampered with before expiration") end end end + + result end private @@ -50,79 +77,32 @@ class DistributedMutex attr_reader :redis attr_reader :validity - def warn(msg) - Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}") - end - def get_lock attempts = 0 while true - got_lock, expire_time = try_to_get_lock - if got_lock - return expire_time - end + expire_time = LOCK_SCRIPT.eval(redis, [prefixed_key], [validity]) + + return expire_time if expire_time sleep 0.001 + # in readonly we will never be able to get a lock if @using_global_redis && Discourse.recently_readonly? attempts += 1 - if attempts > CHECK_READONLY_ATTEMPT + if attempts > CHECK_READONLY_ATTEMPTS raise Discourse::ReadOnly end end end end - def try_to_get_lock - got_lock = false - - now = redis.time[0] - expire_time = now + validity - - redis.synchronize do - redis.unwatch - redis.watch key - - current_expire_time = redis.get key - - if current_expire_time && now <= current_expire_time.to_i - redis.unwatch - - got_lock = false - else - result = - redis.multi do |transaction| - transaction.set key, expire_time.to_s - transaction.expireat key, expire_time + 1 - end - - got_lock = !result.nil? - end - - [got_lock, expire_time] - end + def prefixed_key + @prefixed_key ||= redis.respond_to?(:namespace_key) ? redis.namespace_key(key) : key end - def unlock(expire_time) - redis.synchronize do - redis.unwatch - redis.watch key - current_expire_time = redis.get key - - if current_expire_time == expire_time.to_s - # MULTI is the way redis ensures the watched key - # has not changed by the time it is deleted - result = - redis.multi do |transaction| - transaction.del key - end - return !result.nil? - else - redis.unwatch - return false - end - end + def warn(msg) + Rails.logger.warn("DistributedMutex(#{key.inspect}): #{msg}") end end diff --git a/spec/lib/distributed_mutex_spec.rb b/spec/lib/distributed_mutex_spec.rb index 68a31595722..ec6db7f212a 100644 --- a/spec/lib/distributed_mutex_spec.rb +++ b/spec/lib/distributed_mutex_spec.rb @@ -31,38 +31,28 @@ describe DistributedMutex do Discourse.redis.setnx key, Time.now.to_i - 1 - start = Time.now.to_i + start = Time.now m.synchronize do "nop" end # no longer than a second - expect(Time.now.to_i).to be <= start + 1 + expect(Time.now).to be <= start + 1 end - # expected: 1574200319 - # got: 1574200320 - # - # (compared using ==) - # ./spec/components/distributed_mutex_spec.rb:60:in `block (3 levels) in
' - # ./lib/distributed_mutex.rb:33:in `block in synchronize' - xit 'allows the validity of the lock to be configured' do - freeze_time - + it "allows the validity of the lock to be configured" do mutex = DistributedMutex.new(key, validity: 2) mutex.synchronize do - expect(Discourse.redis.ttl(key)).to eq(2) - expect(Discourse.redis.get(key).to_i).to eq(Time.now.to_i + 2) + expect(Discourse.redis.ttl(key)).to be <= 3 + expect(Discourse.redis.get(key).to_i).to be_within(1.second).of(Time.now.to_i + 2) end mutex = DistributedMutex.new(key) mutex.synchronize do - expect(Discourse.redis.ttl(key)).to eq(DistributedMutex::DEFAULT_VALIDITY) - - expect(Discourse.redis.get(key).to_i) - .to eq(Time.now.to_i + DistributedMutex::DEFAULT_VALIDITY) + expect(Discourse.redis.ttl(key)).to be <= DistributedMutex::DEFAULT_VALIDITY + 1 + expect(Discourse.redis.get(key).to_i).to be_within(1.second).of(Time.now.to_i + DistributedMutex::DEFAULT_VALIDITY) end end @@ -78,7 +68,7 @@ describe DistributedMutex do context "readonly redis" do before do - Discourse.redis.slaveof "127.0.0.1", "99991" + Discourse.redis.slaveof "127.0.0.1", "65534" end after do @@ -97,7 +87,7 @@ describe DistributedMutex do }.to raise_error(Discourse::ReadOnly) expect(done).to eq(false) - expect(Time.now - start).to be <= 1.second + expect(Time.now).to be <= start + 1 end end