discourse/lib/presence_channel.rb

628 lines
20 KiB
Ruby

# frozen_string_literal: true
# The server-side implementation of PresenceChannels. See also {PresenceController}
# and +app/assets/javascripts/discourse/app/services/presence.js+
class PresenceChannel
class NotFound < StandardError
end
class InvalidAccess < StandardError
end
class ConfigNotLoaded < StandardError
end
class InvalidConfig < StandardError
end
class State
include ActiveModel::Serialization
attr_reader :message_bus_last_id
attr_reader :user_ids
attr_reader :count
def initialize(message_bus_last_id:, user_ids: nil, count: nil)
raise "user_ids or count required" if user_ids.nil? && count.nil?
@message_bus_last_id = message_bus_last_id
@user_ids = user_ids
@count = count || user_ids.count
end
def users
return nil if user_ids.nil?
User.where(id: user_ids)
end
end
# Class for managing config of PresenceChannel
# Three parameters can be provided on initialization:
# public: boolean value. If true, channel information is visible to all users (default false)
# allowed_user_ids: array of user_ids that can view, and become present in, the channel (default [])
# allowed_group_ids: array of group_ids that can view, and become present in, the channel (default [])
# count_only: boolean. If true, user identities are never revealed to clients. (default [])
class Config
NOT_FOUND ||= "notfound"
attr_accessor :public, :allowed_user_ids, :allowed_group_ids, :count_only, :timeout
def initialize(
public: false,
allowed_user_ids: nil,
allowed_group_ids: nil,
count_only: false,
timeout: nil
)
@public = public
@allowed_user_ids = allowed_user_ids
@allowed_group_ids = allowed_group_ids
@count_only = count_only
@timeout = timeout
end
def self.from_json(json)
data = JSON.parse(json, symbolize_names: true)
data = {} if !data.is_a? Hash
new(**data.slice(:public, :allowed_user_ids, :allowed_group_ids, :count_only, :timeout))
end
def to_json
data = { public: public }
data[:allowed_user_ids] = allowed_user_ids if allowed_user_ids
data[:allowed_group_ids] = allowed_group_ids if allowed_group_ids
data[:count_only] = count_only if count_only
data[:timeout] = timeout if timeout
data.to_json
end
end
DEFAULT_TIMEOUT ||= 60
CONFIG_CACHE_SECONDS ||= 10
GC_SECONDS ||= 24.hours.to_i
MUTEX_TIMEOUT_SECONDS ||= 10
MUTEX_LOCKED_ERROR ||= "PresenceChannel mutex is locked"
@@configuration_blocks ||= {}
attr_reader :name, :timeout, :message_bus_channel_name, :config
def initialize(name, raise_not_found: true, use_cache: true)
@name = name
@message_bus_channel_name = "/presence#{name}"
begin
@config = fetch_config(use_cache: use_cache)
rescue PresenceChannel::NotFound
raise if raise_not_found
@config = Config.new
end
@timeout = config.timeout || DEFAULT_TIMEOUT
end
# Is this user allowed to view this channel?
# Pass `nil` for anonymous viewers
def can_view?(user_id: nil, group_ids: nil)
return true if config.public
return true if user_id && config.allowed_user_ids&.include?(user_id)
if user_id && config.allowed_group_ids.present?
return true if config.allowed_group_ids.include?(Group::AUTO_GROUPS[:everyone])
group_ids ||= GroupUser.where(user_id: user_id).pluck("group_id")
return true if (group_ids & config.allowed_group_ids).present?
end
false
end
# Is a user allowed to enter this channel?
# Currently equal to the the can_view? permission
def can_enter?(user_id: nil, group_ids: nil)
return false if user_id.nil?
can_view?(user_id: user_id, group_ids: group_ids)
end
# Mark a user's client as present in this channel. The client_id should be unique per
# browser tab. This method should be called repeatedly (at least once every DEFAULT_TIMEOUT)
# while the user is present in the channel.
def present(user_id:, client_id:)
raise PresenceChannel::InvalidAccess if !can_enter?(user_id: user_id)
mutex_value = SecureRandom.hex
result =
retry_on_mutex_error do
PresenceChannel.redis_eval(
:present,
redis_keys,
[name, user_id, client_id, (Time.zone.now + timeout).to_i, mutex_value],
)
end
if result == 1
begin
publish_message(entering_user_ids: [user_id])
ensure
release_mutex(mutex_value)
end
end
end
# Immediately mark a user's client as leaving the channel
def leave(user_id:, client_id:)
mutex_value = SecureRandom.hex
result =
retry_on_mutex_error do
PresenceChannel.redis_eval(:leave, redis_keys, [name, user_id, client_id, nil, mutex_value])
end
if result == 1
begin
publish_message(leaving_user_ids: [user_id])
ensure
release_mutex(mutex_value)
end
end
end
# Fetch a {PresenceChannel::State} instance representing the current state of this
#
# @param [Boolean] count_only set true to skip fetching the list of user ids from redis
def state(count_only: config.count_only)
if count_only
last_id, count = retry_on_mutex_error { PresenceChannel.redis_eval(:count, redis_keys) }
else
last_id, ids = retry_on_mutex_error { PresenceChannel.redis_eval(:user_ids, redis_keys) }
end
count ||= ids&.count
last_id = nil if last_id == -1
if Rails.env.test? && MessageBus.backend == :memory
# Doing it this way is not atomic, but we have no other option when
# messagebus is not using the redis backend
last_id = MessageBus.last_id(message_bus_channel_name)
end
State.new(message_bus_last_id: last_id, user_ids: ids, count: count)
end
def user_ids
state.user_ids
end
def count
state(count_only: true).count
end
# Automatically expire all users which have not been 'present' for more than +DEFAULT_TIMEOUT+
def auto_leave
mutex_value = SecureRandom.hex
left_user_ids =
retry_on_mutex_error do
PresenceChannel.redis_eval(:auto_leave, redis_keys, [name, Time.zone.now.to_i, mutex_value])
end
if !left_user_ids.empty?
begin
publish_message(leaving_user_ids: left_user_ids)
ensure
release_mutex(mutex_value)
end
end
end
# Clear all members of the channel. This is intended for debugging/development only
def clear
PresenceChannel.redis.del(redis_key_zlist)
PresenceChannel.redis.del(redis_key_hash)
PresenceChannel.redis.del(redis_key_config)
PresenceChannel.redis.del(redis_key_mutex)
PresenceChannel.redis.zrem(self.class.redis_key_channel_list, name)
end
# Designed to be run periodically. Checks the channel list for channels with expired members,
# and runs auto_leave for each eligible channel
def self.auto_leave_all
channels_with_expiring_members =
PresenceChannel.redis.zrangebyscore(redis_key_channel_list, "-inf", Time.zone.now.to_i)
channels_with_expiring_members.each { |name| new(name, raise_not_found: false).auto_leave }
end
# Clear all known channels. This is intended for debugging/development only
def self.clear_all!
channels = PresenceChannel.redis.zrangebyscore(redis_key_channel_list, "-inf", "+inf")
channels.each { |name| new(name, raise_not_found: false).clear }
config_cache_keys =
PresenceChannel
.redis
.scan_each(match: Discourse.redis.namespace_key("_presence_*_config"))
.to_a
PresenceChannel.redis.del(*config_cache_keys) if config_cache_keys.present?
end
# Shortcut to access a redis client for all PresenceChannel activities.
# PresenceChannel must use the same Redis server as MessageBus, so that
# actions can be applied atomically. For the vast majority of Discourse
# installations, this is the same Redis server as `Discourse.redis`.
def self.redis
if MessageBus.backend == :redis
MessageBus.backend_instance.send(:pub_redis) # TODO: avoid a private API?
elsif Rails.env.test?
Discourse.redis.without_namespace
else
raise "PresenceChannel is unable to access MessageBus's Redis instance"
end
end
def self.redis_eval(key, *args)
LUA_SCRIPTS[key].eval(redis, *args)
end
# Register a callback to configure channels with a given prefix
# Prefix must match [a-zA-Z0-9_-]+
#
# For example, this registration will be used for
# all channels starting /topic-reply/...:
#
# register_prefix("topic-reply") do |channel_name|
# PresenceChannel::Config.new(public: true)
# end
#
# At runtime, the block will be passed a full channel name. If the channel
# should not exist, the block should return `nil`. If the channel should exist,
# the block should return a PresenceChannel::Config object.
#
# Return values may be cached for up to 10 seconds.
#
# Plugins should use the {Plugin::Instance.register_presence_channel_prefix} API instead
def self.register_prefix(prefix, &block)
unless prefix.match? /[a-zA-Z0-9_-]+/
raise "PresenceChannel prefix #{prefix} must match [a-zA-Z0-9_-]+"
end
if @@configuration_blocks&.[](prefix)
raise "PresenceChannel prefix #{prefix} already registered"
end
@@configuration_blocks[prefix] = block
end
# For use in a test environment only
def self.unregister_prefix(prefix)
raise "Only allowed in test environment" if !Rails.env.test?
@@configuration_blocks&.delete(prefix)
end
private
def fetch_config(use_cache: true)
cached_config = (PresenceChannel.redis.get(redis_key_config) if use_cache)
if cached_config == Config::NOT_FOUND
raise PresenceChannel::NotFound
elsif cached_config
Config.from_json(cached_config)
else
prefix = name[%r{/([a-zA-Z0-9_-]+)/.*}, 1]
raise PresenceChannel::NotFound if prefix.nil?
config_block = @@configuration_blocks[prefix]
config_block ||=
DiscoursePluginRegistry.presence_channel_prefixes.find { |t| t[0] == prefix }&.[](1)
raise PresenceChannel::NotFound if config_block.nil?
result = config_block.call(name)
to_cache =
if result.is_a? Config
result.to_json
elsif result.nil?
Config::NOT_FOUND
else
raise InvalidConfig.new "Expected PresenceChannel::Config or nil. Got a #{result.class.name}"
end
DiscourseRedis.ignore_readonly do
PresenceChannel.redis.set(redis_key_config, to_cache, ex: CONFIG_CACHE_SECONDS)
end
raise PresenceChannel::NotFound if result.nil?
result
end
end
def publish_message(entering_user_ids: nil, leaving_user_ids: nil)
message = {}
if config.count_only
message["count_delta"] = entering_user_ids&.count || 0
message["count_delta"] -= leaving_user_ids&.count || 0
return if message["count_delta"] == 0
else
message["leaving_user_ids"] = leaving_user_ids if leaving_user_ids.present?
if entering_user_ids.present?
users = User.where(id: entering_user_ids).includes(:user_option)
message["entering_users"] = ActiveModel::ArraySerializer.new(
users,
each_serializer: BasicUserSerializer,
)
end
end
params = {}
if config.public
# no params required
elsif config.allowed_user_ids || config.allowed_group_ids
params[:user_ids] = config.allowed_user_ids
params[:group_ids] = config.allowed_group_ids
else
# nobody is allowed... don't publish anything
return
end
MessageBus.publish(message_bus_channel_name, message.as_json, **params)
end
# Most atomic actions are achieved via lua scripts. However, when a lua action
# will result in publishing a messagebus message, the atomicity is broken.
#
# For example, if one process is handling a 'user enter' event, and another is
# handling a 'user leave' event, we need to make sure the messagebus messages
# are published in the same sequence that the PresenceChannel lua script are run.
#
# The present/leave/auto_leave lua scripts will automatically acquire this mutex
# if needed. If their return value indicates a change has occurred, the mutex
# should be released via #release_mutex after the messagebus message has been sent
#
# If they need a change, and the mutex is not available, they will raise an error
# and should be retried periodically
def redis_key_mutex
Discourse.redis.namespace_key("_presence_#{name}_mutex")
end
def release_mutex(mutex_value)
PresenceChannel.redis_eval(:release_mutex, [redis_key_mutex], [mutex_value])
end
def retry_on_mutex_error
attempts ||= 0
yield
rescue ::Redis::CommandError => e
if e.to_s =~ /#{MUTEX_LOCKED_ERROR}/ && attempts < 1000
attempts += 1
sleep 0.001
retry
else
raise
end
end
# The redis key which MessageBus uses to store the 'last_id' for the channel
# associated with this PresenceChannel.
def message_bus_last_id_key
return "" if Rails.env.test? && MessageBus.backend == :memory
# TODO: Avoid using private MessageBus methods here
encoded_channel_name = MessageBus.send(:encode_channel_name, message_bus_channel_name)
MessageBus.backend_instance.send(:backlog_id_key, encoded_channel_name)
end
def redis_keys
[
redis_key_zlist,
redis_key_hash,
self.class.redis_key_channel_list,
message_bus_last_id_key,
redis_key_mutex,
]
end
# The zlist is a list of client_ids, ranked by their expiration timestamp
# we periodically delete the 'lowest ranked' items in this list based on the `timeout` of the channel
def redis_key_zlist
Discourse.redis.namespace_key("_presence_#{name}_zlist")
end
# The hash contains a map of user_id => session_count
# when the count for a user reaches 0, the key is deleted
# We use this hash to efficiently count the number of present users
def redis_key_hash
Discourse.redis.namespace_key("_presence_#{name}_hash")
end
# The hash contains a map of user_id => session_count
# when the count for a user reaches 0, the key is deleted
# We use this hash to efficiently count the number of present users
def redis_key_config
Discourse.redis.namespace_key("_presence_#{name}_config")
end
# This list contains all active presence channels, ranked with the expiration timestamp of their least-recently-seen client_id
# We periodically check the 'lowest ranked' items in this list based on the `timeout` of the channel
def self.redis_key_channel_list
Discourse.redis.namespace_key("_presence_channels")
end
COMMON_PRESENT_LEAVE_LUA = <<~LUA
local channel = ARGV[1]
local user_id = ARGV[2]
local client_id = ARGV[3]
local expires = ARGV[4]
local mutex_value = ARGV[5]
local zlist_key = KEYS[1]
local hash_key = KEYS[2]
local channels_key = KEYS[3]
local message_bus_id_key = KEYS[4]
local mutex_key = KEYS[5]
local mutex_locked = redis.call('EXISTS', mutex_key) == 1
local zlist_elem = tostring(user_id) .. " " .. tostring(client_id)
LUA
UPDATE_GLOBAL_CHANNELS_LUA = <<~LUA
-- Update the global channels list with the timestamp of the oldest client
local oldest_client = redis.call('ZRANGE', zlist_key, 0, 0, 'WITHSCORES')
if table.getn(oldest_client) > 0 then
local oldest_client_expire_timestamp = oldest_client[2]
redis.call('ZADD', channels_key, tonumber(oldest_client_expire_timestamp), tostring(channel))
else
-- The channel is now empty, delete from global list
redis.call('ZREM', channels_key, tostring(channel))
end
LUA
LUA_SCRIPTS ||= {}
LUA_SCRIPTS[:present] = DiscourseRedis::EvalHelper.new <<~LUA
#{COMMON_PRESENT_LEAVE_LUA}
if mutex_locked then
local mutex_required = redis.call('HGET', hash_key, tostring(user_id)) == false
if mutex_required then
error("#{MUTEX_LOCKED_ERROR}")
end
end
local added_clients = redis.call('ZADD', zlist_key, expires, zlist_elem)
local added_users = 0
if tonumber(added_clients) > 0 then
local new_count = redis.call('HINCRBY', hash_key, tostring(user_id), 1)
if new_count == 1 then
added_users = 1
redis.call('SET', mutex_key, mutex_value, 'EX', #{MUTEX_TIMEOUT_SECONDS})
end
-- Add the channel to the global channel list. 'NX' means the value will
-- only be set if doesn't already exist
redis.call('ZADD', channels_key, "NX", expires, tostring(channel))
end
redis.call('EXPIREAT', hash_key, expires + #{GC_SECONDS})
redis.call('EXPIREAT', zlist_key, expires + #{GC_SECONDS})
return added_users
LUA
LUA_SCRIPTS[:leave] = DiscourseRedis::EvalHelper.new <<~LUA
#{COMMON_PRESENT_LEAVE_LUA}
if mutex_locked then
local user_session_count = redis.call('HGET', hash_key, tostring(user_id))
local mutex_required = user_session_count == 1 and redis.call('ZRANK', zlist_key, zlist_elem) ~= false
if mutex_required then
error("#{MUTEX_LOCKED_ERROR}")
end
end
-- Remove the user from the channel zlist
local removed_clients = redis.call('ZREM', zlist_key, zlist_elem)
local removed_users = 0
if tonumber(removed_clients) > 0 then
#{UPDATE_GLOBAL_CHANNELS_LUA}
-- Update the user session count in the channel hash
local val = redis.call('HINCRBY', hash_key, user_id, #{Discourse::SYSTEM_USER_ID})
if val <= 0 then
redis.call('HDEL', hash_key, user_id)
removed_users = 1
redis.call('SET', mutex_key, mutex_value, 'EX', #{MUTEX_TIMEOUT_SECONDS})
end
end
return removed_users
LUA
LUA_SCRIPTS[:release_mutex] = DiscourseRedis::EvalHelper.new <<~LUA
local mutex_key = KEYS[1]
local expected_value = ARGV[1]
if redis.call("GET", mutex_key) == expected_value then
redis.call("DEL", mutex_key)
end
LUA
LUA_SCRIPTS[:user_ids] = DiscourseRedis::EvalHelper.new <<~LUA
local zlist_key = KEYS[1]
local hash_key = KEYS[2]
local message_bus_id_key = KEYS[4]
local mutex_key = KEYS[5]
if redis.call('EXISTS', mutex_key) > 0 then
error('#{MUTEX_LOCKED_ERROR}')
end
local user_ids = redis.call('HKEYS', hash_key)
table.foreach(user_ids, function(k,v) user_ids[k] = tonumber(v) end)
local message_bus_id = tonumber(redis.call('GET', message_bus_id_key))
if message_bus_id == nil then
message_bus_id = -1
end
return { message_bus_id, user_ids }
LUA
LUA_SCRIPTS[:count] = DiscourseRedis::EvalHelper.new <<~LUA
local zlist_key = KEYS[1]
local hash_key = KEYS[2]
local message_bus_id_key = KEYS[4]
local mutex_key = KEYS[5]
if redis.call('EXISTS', mutex_key) > 0 then
error('#{MUTEX_LOCKED_ERROR}')
end
local message_bus_id = tonumber(redis.call('GET', message_bus_id_key))
if message_bus_id == nil then
message_bus_id = -1
end
local count = redis.call('HLEN', hash_key)
return { message_bus_id, count }
LUA
LUA_SCRIPTS[:auto_leave] = DiscourseRedis::EvalHelper.new <<~LUA
local zlist_key = KEYS[1]
local hash_key = KEYS[2]
local channels_key = KEYS[3]
local mutex_key = KEYS[5]
local channel = ARGV[1]
local time = ARGV[2]
local mutex_value = ARGV[3]
local expire = redis.call('ZRANGEBYSCORE', zlist_key, '-inf', time)
local has_mutex = false
local get_mutex = function()
if redis.call('SETNX', mutex_key, mutex_value) == 0 then
error("#{MUTEX_LOCKED_ERROR}")
end
redis.call('EXPIRE', mutex_key, #{MUTEX_TIMEOUT_SECONDS})
has_mutex = true
end
local expired_user_ids = {}
local expireOld = function(k, v)
local user_id = v:match("[^ ]+")
if (not has_mutex) and (tonumber(redis.call('HGET', hash_key, user_id)) == 1) then
get_mutex()
end
local val = redis.call('HINCRBY', hash_key, user_id, #{Discourse::SYSTEM_USER_ID})
if val <= 0 then
table.insert(expired_user_ids, tonumber(user_id))
redis.call('HDEL', hash_key, user_id)
end
redis.call('ZREM', zlist_key, v)
end
table.foreach(expire, expireOld)
#{UPDATE_GLOBAL_CHANNELS_LUA}
return expired_user_ids
LUA
end