DEV: Various behind-the-scenes improvements to PresenceChannel (#14518)

- Allow the `/presence/get` endpoint to return multiple channels in a single request (limited to 50)
- When multiple presence channels are initialized in a single Ember runloop, batch them into a single GET request
- Introduce the `presence-pretender` to allow easy testing of PresenceChannel-related features
- Introduce a `use_cache` boolean (default true) on the the server-side PresenceChannel initializer. Useful during testing.
This commit is contained in:
David Taylor 2021-10-07 15:50:14 +01:00 committed by GitHub
parent ba380c5f52
commit a55642a30a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 251 additions and 81 deletions

View File

@ -2,7 +2,7 @@ import Service from "@ember/service";
import EmberObject, { computed, defineProperty } from "@ember/object"; import EmberObject, { computed, defineProperty } from "@ember/object";
import { readOnly } from "@ember/object/computed"; import { readOnly } from "@ember/object/computed";
import { ajax } from "discourse/lib/ajax"; import { ajax } from "discourse/lib/ajax";
import { cancel, debounce, later, throttle } from "@ember/runloop"; import { cancel, debounce, later, next, once, throttle } from "@ember/runloop";
import Session from "discourse/models/session"; import Session from "discourse/models/session";
import { Promise } from "rsvp"; import { Promise } from "rsvp";
import { isTesting } from "discourse-common/config/environment"; import { isTesting } from "discourse-common/config/environment";
@ -12,6 +12,8 @@ const PRESENCE_INTERVAL_S = 30;
const PRESENCE_DEBOUNCE_MS = isTesting() ? 0 : 500; const PRESENCE_DEBOUNCE_MS = isTesting() ? 0 : 500;
const PRESENCE_THROTTLE_MS = isTesting() ? 0 : 5000; const PRESENCE_THROTTLE_MS = isTesting() ? 0 : 5000;
const PRESENCE_GET_RETRY_MS = 5000;
function createPromiseProxy() { function createPromiseProxy() {
const promiseProxy = {}; const promiseProxy = {};
promiseProxy.promise = new Promise((resolve, reject) => { promiseProxy.promise = new Promise((resolve, reject) => {
@ -121,21 +123,7 @@ class PresenceChannelState extends EmberObject {
} }
if (!initialData) { if (!initialData) {
try { initialData = await this.presenceService._getInitialData(this.name);
initialData = await ajax("/presence/get", {
data: {
channel: this.name,
},
});
} catch (e) {
if (e.jqXHR?.status === 404) {
throw new PresenceChannelNotFound(
`PresenceChannel '${this.name}' not found`
);
} else {
throw e;
}
}
} }
this.set("count", initialData.count); this.set("count", initialData.count);
@ -231,6 +219,7 @@ export default class PresenceService extends Service {
this._presenceChannelStates = EmberObject.create(); this._presenceChannelStates = EmberObject.create();
this._presentProxies = {}; this._presentProxies = {};
this._subscribedProxies = {}; this._subscribedProxies = {};
this._initialDataRequests = {};
window.addEventListener("beforeunload", () => { window.addEventListener("beforeunload", () => {
this._beaconLeaveAll(); this._beaconLeaveAll();
}); });
@ -244,6 +233,64 @@ export default class PresenceService extends Service {
}); });
} }
_getInitialData(channelName) {
let promiseProxy = this._initialDataRequests[channelName];
if (!promiseProxy) {
promiseProxy = this._initialDataRequests[
channelName
] = createPromiseProxy();
}
once(this, this._makeInitialDataRequest);
return promiseProxy.promise;
}
async _makeInitialDataRequest() {
if (this._initialDataAjax) {
// try again next runloop
next(this, () => once(this, this._makeInitialDataRequest));
}
if (Object.keys(this._initialDataRequests).length === 0) {
// Nothing to request
return;
}
this._initialDataAjax = ajax("/presence/get", {
data: {
channels: Object.keys(this._initialDataRequests).slice(0, 50),
},
});
let result;
try {
result = await this._initialDataAjax;
} catch (e) {
later(this, this._makeInitialDataRequest, PRESENCE_GET_RETRY_MS);
throw e;
} finally {
this._initialDataAjax = null;
}
for (const channel in result) {
if (!result.hasOwnProperty(channel)) {
continue;
}
const state = result[channel];
if (state) {
this._initialDataRequests[channel].resolve(state);
} else {
const error = new PresenceChannelNotFound(
`PresenceChannel '${channel}' not found`
);
this._initialDataRequests[channel].reject(error);
}
delete this._initialDataRequests[channel];
}
}
_addPresent(channelProxy) { _addPresent(channelProxy) {
let present = this._presentProxies[channelProxy.name]; let present = this._presentProxies[channelProxy.name];
if (!present) { if (!present) {
@ -459,7 +506,11 @@ export default class PresenceService extends Service {
} else if (this._queuedEvents.length > 0) { } else if (this._queuedEvents.length > 0) {
this._cancelTimer(); this._cancelTimer();
debounce(this, this._throttledUpdateServer, PRESENCE_DEBOUNCE_MS); debounce(this, this._throttledUpdateServer, PRESENCE_DEBOUNCE_MS);
} else if (!this._nextUpdateTimer && !isTesting()) { } else if (
!this._nextUpdateTimer &&
this._presentChannels.size > 0 &&
!isTesting()
) {
this._nextUpdateTimer = later( this._nextUpdateTimer = later(
this, this,
this._throttledUpdateServer, this._throttledUpdateServer,

View File

@ -0,0 +1,84 @@
import { publishToMessageBus } from "discourse/tests/helpers/qunit-helpers";
import User from "discourse/models/user";
import { settled } from "@ember/test-helpers";
let channels = {};
export default function (helper) {
this.post("/presence/update", (request) => {
const params = new URLSearchParams(request.requestBody);
const presentChannels = params.getAll("present_channels[]");
const leaveChannels = params.getAll("leave_channels[]");
const user = User.current();
if (!user) {
return helper.response(403, {});
}
const userInfo = {
id: user.id,
username: user.username,
name: user.name,
avatar_template: "/letter_avatar_proxy/v4/letter/b/35a633/{size}.png",
};
presentChannels.forEach((c) => joinChannel(c, userInfo));
leaveChannels.forEach((c) => leaveChannel(c, userInfo));
return helper.response({ success: "OK" });
});
this.get("/presence/get", (request) => {
const channelNames = request.queryParams.channels;
const response = {};
channelNames.forEach((c) => (response[c] = getChannelInfo(c)));
return helper.response(response);
});
}
export function getChannelInfo(name) {
channels[name] ||= { count: 0, users: [], last_message_id: 0 };
return channels[name];
}
export function joinChannel(name, user) {
const channel = getChannelInfo(name);
if (!channel.users.any((u) => u.id === user.id)) {
channel.users.push(user);
channel.count += 1;
channel.last_message_id += 1;
publishToMessageBus(
`/presence${name}`,
{
entering_users: [user],
},
0,
channel.last_message_id
);
}
return settled();
}
export function leaveChannel(name, user) {
const channel = getChannelInfo(name);
if (channel.users.any((u) => u.id === user.id)) {
channel.users = channel.users.reject((u) => u.id === user.id);
channel.count -= 1;
channel.last_message_id += 1;
publishToMessageBus(
`/presence${name}`,
{
leaving_user_ids: [user.id],
},
0,
channel.last_message_id
);
}
return settled();
}
export function presentUserIds(channelName) {
return getChannelInfo(channelName).users.map((u) => u.id);
}
export function clearState() {
channels = {};
}

View File

@ -33,6 +33,7 @@ import { registerObjects } from "discourse/pre-initializers/inject-discourse-obj
import sinon from "sinon"; import sinon from "sinon";
import { run } from "@ember/runloop"; import { run } from "@ember/runloop";
import { isLegacyEmber } from "discourse-common/config/environment"; import { isLegacyEmber } from "discourse-common/config/environment";
import { clearState as clearPresenceState } from "discourse/tests/helpers/presence-pretender";
const Plugin = $.fn.modal; const Plugin = $.fn.modal;
const Modal = Plugin.Constructor; const Modal = Plugin.Constructor;
@ -308,6 +309,7 @@ function setupTestsCommon(application, container, config) {
QUnit.testDone(function () { QUnit.testDone(function () {
sinon.restore(); sinon.restore();
resetPretender(); resetPretender();
clearPresenceState();
// Destroy any modals // Destroy any modals
$(".modal-backdrop").remove(); $(".modal-backdrop").remove();

View File

@ -27,23 +27,31 @@ function usersFixture() {
}, },
]; ];
} }
acceptance("Presence - Subscribing", function (needs) { acceptance("Presence - Subscribing", function (needs) {
needs.pretender((server, helper) => { needs.pretender((server, helper) => {
server.get("/presence/get", (request) => { server.get("/presence/get", (request) => {
if (request.queryParams.channel?.startsWith("/test/")) { const channels = request.queryParams.channels;
return helper.response({ const response = {};
channels.forEach((c) => {
if (c.startsWith("/test/")) {
response[c] = {
count: 3, count: 3,
last_message_id: 1, last_message_id: 1,
users: usersFixture(), users: usersFixture(),
}); };
} else if (request.queryParams.channel?.startsWith("/countonly/")) { } else if (c.startsWith("/countonly/")) {
return helper.response({ response[c] = {
count: 3, count: 3,
last_message_id: 1, last_message_id: 1,
}); };
} else {
response[c] = null;
} }
});
return helper.response(404, {}); return helper.response(200, response);
}); });
}); });

View File

@ -4,22 +4,35 @@ class PresenceController < ApplicationController
skip_before_action :check_xhr skip_before_action :check_xhr
before_action :ensure_logged_in, only: [:update] before_action :ensure_logged_in, only: [:update]
MAX_CHANNELS_PER_REQUEST ||= 50
def get def get
name = params.require(:channel) names = params.require(:channels)
raise Discourse::InvalidParameters.new(:channels) if !(names.is_a?(Array) && names.all? { |n| n.is_a? String })
begin names.uniq!
raise Discourse::InvalidParameters.new("Too many channels") if names.length > MAX_CHANNELS_PER_REQUEST
user_group_ids = if current_user
GroupUser.where(user_id: current_user.id).pluck("group_id")
else
[]
end
result = {}
names.each do |name|
channel = PresenceChannel.new(name) channel = PresenceChannel.new(name)
if channel.can_view?(user_id: current_user&.id, group_ids: user_group_ids)
result[name] = PresenceChannelStateSerializer.new(channel.state, root: nil)
else
result[name] = nil
end
rescue PresenceChannel::NotFound rescue PresenceChannel::NotFound
raise Discourse::NotFound result[name] = nil
end end
if !channel.can_view?(user_id: current_user&.id) render json: result
# Do not reveal existence of channel
raise Discourse::NotFound
end
state = channel.state
render json: state, serializer: PresenceChannelStateSerializer, root: nil
end end
def update def update
@ -39,7 +52,7 @@ class PresenceController < ApplicationController
raise Discourse::InvalidParameters.new(:leave_channels) raise Discourse::InvalidParameters.new(:leave_channels)
end end
if present_channels && present_channels.length > 50 if present_channels && present_channels.length > MAX_CHANNELS_PER_REQUEST
raise Discourse::InvalidParameters.new("Too many present_channels") raise Discourse::InvalidParameters.new("Too many present_channels")
end end

View File

@ -70,13 +70,13 @@ class PresenceChannel
attr_reader :name, :timeout, :message_bus_channel_name, :config attr_reader :name, :timeout, :message_bus_channel_name, :config
def initialize(name, raise_not_found: true) def initialize(name, raise_not_found: true, use_cache: true)
@name = name @name = name
@timeout = DEFAULT_TIMEOUT @timeout = DEFAULT_TIMEOUT
@message_bus_channel_name = "/presence#{name}" @message_bus_channel_name = "/presence#{name}"
begin begin
@config = fetch_config @config = fetch_config(use_cache: use_cache)
rescue PresenceChannel::NotFound rescue PresenceChannel::NotFound
raise if raise_not_found raise if raise_not_found
@config = Config.new @config = Config.new
@ -85,21 +85,22 @@ class PresenceChannel
# Is this user allowed to view this channel? # Is this user allowed to view this channel?
# Pass `nil` for anonymous viewers # Pass `nil` for anonymous viewers
def can_view?(user_id: nil) def can_view?(user_id: nil, group_ids: nil)
return true if config.public return true if config.public
return true if user_id && config.allowed_user_ids&.include?(user_id) return true if user_id && config.allowed_user_ids&.include?(user_id)
if user_id && config.allowed_group_ids.present? if user_id && config.allowed_group_ids.present?
user_group_ids = GroupUser.where(user_id: user_id).pluck("group_id") group_ids ||= GroupUser.where(user_id: user_id).pluck("group_id")
return true if (user_group_ids & config.allowed_group_ids).present? return true if (group_ids & config.allowed_group_ids).present?
end end
false false
end end
# Is a user allowed to enter this channel? # Is a user allowed to enter this channel?
# Currently equal to the the can_view? permission # Currently equal to the the can_view? permission
def can_enter?(user_id: nil) def can_enter?(user_id: nil, group_ids: nil)
return false if user_id.nil? return false if user_id.nil?
can_view?(user_id: user_id) can_view?(user_id: user_id, group_ids: group_ids)
end end
# Mark a user's client as present in this channel. The client_id should be unique per # Mark a user's client as present in this channel. The client_id should be unique per
@ -297,8 +298,10 @@ class PresenceChannel
private private
def fetch_config def fetch_config(use_cache: true)
cached_config = PresenceChannel.redis.get(redis_key_config) cached_config = if use_cache
PresenceChannel.redis.get(redis_key_config)
end
if cached_config == Config::NOT_FOUND if cached_config == Config::NOT_FOUND
raise PresenceChannel::NotFound raise PresenceChannel::NotFound

View File

@ -122,58 +122,67 @@ describe PresenceController do
let(:user3) { Fabricate(:user) } let(:user3) { Fabricate(:user) }
it "works" do it "works" do
get "/presence/get", params: { channel: ch1.name } get "/presence/get", params: { channels: [ch1.name] }
expect(response.status).to eq(200) expect(response.status).to eq(200)
body = response.parsed_body expect(response.parsed_body).to eq(
expect(body["users"]).to eq([]) ch1.name => {
expect(body["count"]).to eq(0) "users" => [],
expect(body["last_message_id"]).to eq(MessageBus.last_id(ch1.message_bus_channel_name)) "count" => 0,
"last_message_id" => MessageBus.last_id(ch1.message_bus_channel_name)
}
)
ch1.present(user_id: user.id, client_id: SecureRandom.hex) ch1.present(user_id: user.id, client_id: SecureRandom.hex)
ch1.present(user_id: user2.id, client_id: SecureRandom.hex) ch1.present(user_id: user2.id, client_id: SecureRandom.hex)
ch1.present(user_id: user3.id, client_id: SecureRandom.hex) ch1.present(user_id: user3.id, client_id: SecureRandom.hex)
get "/presence/get", params: { channel: ch1.name } get "/presence/get", params: { channels: [ch1.name] }
body = response.parsed_body expect(response.status).to eq(200)
expect(body["users"].map { |u| u["id"] }).to contain_exactly(user.id, user2.id, user3.id) state = response.parsed_body[ch1.name]
expect(body["users"][0].keys).to contain_exactly("avatar_template", "id", "name", "username") expect(state["users"].map { |u| u["id"] }).to contain_exactly(user.id, user2.id, user3.id)
expect(body["count"]).to eq(3) expect(state["users"][0].keys).to contain_exactly("avatar_template", "id", "name", "username")
expect(body["last_message_id"]).to eq(MessageBus.last_id(ch1.message_bus_channel_name)) expect(state["count"]).to eq(3)
expect(state["last_message_id"]).to eq(MessageBus.last_id(ch1.message_bus_channel_name))
end end
it "respects the existence/security of the channel" do it "respects the existence/security of the channel" do
sign_in user sign_in user
get "/presence/get", params: { channel: ch1.name } get "/presence/get", params: {
channels: [
ch1.name,
allowed_user_channel.name,
allowed_group_channel.name,
secure_user_channel.name,
secure_group_channel.name,
"/test/nonexistent"
]
}
expect(response.status).to eq(200) expect(response.status).to eq(200)
get "/presence/get", params: { channel: secure_user_channel.name } expect(response.parsed_body).to include(
expect(response.status).to eq(404) ch1.name => be_truthy,
allowed_user_channel.name => be_truthy,
get "/presence/get", params: { channel: secure_group_channel.name } allowed_group_channel.name => be_truthy,
expect(response.status).to eq(404) secure_user_channel.name => be_nil,
secure_group_channel.name => be_nil,
get "/presence/get", params: { channel: allowed_user_channel.name } "/test/nonexistent" => be_nil,
expect(response.status).to eq(200) )
get "/presence/get", params: { channel: allowed_group_channel.name }
expect(response.status).to eq(200)
get "/presence/get", params: { channel: "/test/nonexistent" }
expect(response.status).to eq(404)
end end
it "works for count_only channels" do it "works for count_only channels" do
get "/presence/get", params: { channel: count_only_channel.name } get "/presence/get", params: { channels: [count_only_channel.name] }
expect(response.status).to eq(200) expect(response.status).to eq(200)
expect(response.parsed_body.keys).to contain_exactly("count", "last_message_id") state = response.parsed_body[count_only_channel.name]
expect(response.parsed_body["count"]).to eq(0) expect(state.keys).to contain_exactly("count", "last_message_id")
expect(state["count"]).to eq(0)
count_only_channel.present(user_id: user.id, client_id: "a") count_only_channel.present(user_id: user.id, client_id: "a")
get "/presence/get", params: { channel: count_only_channel.name } get "/presence/get", params: { channels: [count_only_channel.name] }
expect(response.status).to eq(200) expect(response.status).to eq(200)
expect(response.parsed_body["count"]).to eq(1) expect(response.parsed_body[count_only_channel.name]["count"]).to eq(1)
end end
end end