FIX: add support for pipelined and multi redis commands (#16682)

Latest redis interoduces a block form of multi / pipelined, this was incorrectly
passed through and not namespaced.

Fix also updates logster, we held off on upgrading it due to missing functions
This commit is contained in:
Sam 2022-05-10 08:19:02 +10:00 committed by GitHub
parent 919f71537e
commit 2df3c65ba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 28 deletions

View File

@ -230,7 +230,7 @@ gem 'cppjieba_rb', require: false
gem 'lograge', require: false
gem 'logstash-event', require: false
gem 'logstash-logger', require: false
gem 'logster', '2.11.0'
gem 'logster'
# NOTE: later versions of sassc are causing a segfault, possibly dependent on processer architecture
# and until resolved should be locked at 2.0.1

View File

@ -215,7 +215,7 @@ GEM
logstash-event (1.2.02)
logstash-logger (0.26.1)
logstash-event (~> 1.2)
logster (2.11.0)
logster (2.11.2)
loofah (2.17.0)
crass (~> 1.0.2)
nokogiri (>= 1.5.9)
@ -560,7 +560,7 @@ DEPENDENCIES
lograge
logstash-event
logstash-logger
logster (= 2.11.0)
logster
loofah
lru_redux
lz4-ruby
@ -638,4 +638,4 @@ DEPENDENCIES
yaml-lint
BUNDLED WITH
2.3.5
2.3.13

View File

@ -181,10 +181,10 @@ class Site
json = MultiJson.dump(SiteSerializer.new(site, root: false, scope: guardian))
if guardian.anonymous?
Discourse.redis.multi do
Discourse.redis.setex 'site_json', 1800, json
Discourse.redis.set 'site_json_seq', seq
Discourse.redis.set 'site_json_version', Discourse.git_version
Discourse.redis.multi do |transaction|
transaction.setex 'site_json', 1800, json
transaction.set 'site_json_seq', seq
transaction.set 'site_json_version', Discourse.git_version
end
end

View File

@ -40,9 +40,9 @@ class RandomTopicSelector
key = cache_key(category)
if results.present?
Discourse.redis.multi do
Discourse.redis.rpush(key, results)
Discourse.redis.expire(key, 2.days)
Discourse.redis.multi do |transaction|
transaction.rpush(key, results)
transaction.expire(key, 2.days)
end
end
@ -56,9 +56,9 @@ class RandomTopicSelector
return results if count < 1
results = Discourse.redis.multi do
Discourse.redis.lrange(key, 0, count - 1)
Discourse.redis.ltrim(key, count, -1)
results = Discourse.redis.multi do |transaction|
transaction.lrange(key, 0, count - 1)
transaction.ltrim(key, count, -1)
end
if !results.is_a?(Array) # Redis is in readonly mode

View File

@ -14,9 +14,9 @@ class DiscourseRedis
GlobalSetting.redis_config
end
def initialize(config = nil, namespace: true)
def initialize(config = nil, namespace: true, raw_redis: nil)
@config = config || DiscourseRedis.config
@redis = DiscourseRedis.raw_connection(@config.dup)
@redis = raw_redis || DiscourseRedis.raw_connection(@config.dup)
@namespace = namespace
end
@ -150,6 +150,26 @@ class DiscourseRedis
Cache.new
end
def multi
if block_given?
@redis.multi do |transaction|
yield DiscourseRedis.new(@config, namespace: @namespace, raw_redis: transaction)
end
else
@redis.multi
end
end
def pipelined
if block_given?
@redis.pipelined do |transaction|
yield DiscourseRedis.new(@config, namespace: @namespace, raw_redis: transaction)
end
else
@redis.pipelined
end
end
private
def remove_namespace(key)

View File

@ -93,9 +93,9 @@ class DistributedMutex
got_lock = false
else
result =
redis.multi do
redis.set key, expire_time.to_s
redis.expireat key, expire_time + 1
redis.multi do |transaction|
transaction.set key, expire_time.to_s
transaction.expireat key, expire_time + 1
end
got_lock = !result.nil?
@ -112,9 +112,11 @@ class DistributedMutex
current_expire_time = redis.get key
if current_expire_time == expire_time.to_s
# MULTI is the way redis ensures the watched key
# has not changed by the time it is deleted
result =
redis.multi do
redis.del key
redis.multi do |transaction|
transaction.del key
end
return !result.nil?
else

View File

@ -14,9 +14,9 @@ class RedisSnapshot
keys = redis.keys
values =
redis.pipelined do
redis.pipelined do |batch|
keys.each do |key|
redis.dump(key)
batch.dump(key)
end
end
@ -28,11 +28,11 @@ class RedisSnapshot
end
def restore(redis = Discourse.redis)
redis.pipelined do
redis.flushdb
redis.pipelined do |batch|
batch.flushdb
@dump.each do |key, value|
redis.restore(key, 0, value)
batch.restore(key, 0, value)
end
end

View File

@ -15,13 +15,13 @@ task 'redis:clean_up' => ['environment'] do
cursor, keys = redis.scan(cursor)
cursor = cursor.to_i
redis.multi do
redis.multi do |transaction|
keys.each do |key|
if match = key.match(regexp)
db_name = match[:message_bus] || match[:namespace]
if !dbs.include?(db_name)
redis.del(key)
transaction.del(key)
end
end
end

View File

@ -17,6 +17,45 @@ describe DiscourseRedis do
raw_redis.flushdb
end
describe 'pipelined / multi' do
let(:redis) { DiscourseRedis.new }
it 'should support multi commands' do
val = redis.multi do |transaction|
transaction.set 'foo', 'bar'
transaction.set 'bar', 'foo'
transaction.get 'bar'
end
expect(raw_redis.get('foo')).to eq(nil)
expect(raw_redis.get('bar')).to eq(nil)
expect(redis.get('foo')).to eq('bar')
expect(redis.get('bar')).to eq('foo')
expect(val).to eq(["OK", "OK", "foo"])
end
it 'should support pipelined commands' do
set, incr = nil
val = redis.pipelined do |pipeline|
set = pipeline.set "foo", "baz"
incr = pipeline.incr "baz"
end
expect(val).to eq(["OK", 1])
expect(set.value).to eq("OK")
expect(incr.value).to eq(1)
expect(raw_redis.get('foo')).to eq(nil)
expect(raw_redis.get('baz')).to eq(nil)
expect(redis.get('foo')).to eq("baz")
expect(redis.get('baz')).to eq("1")
end
end
describe 'when namespace is enabled' do
let(:redis) { DiscourseRedis.new }