PERF: Use a shared message for replies to tracked topics (#16022)

Previously we were publishing one messagebus message per user which was 'tracking' a topic. On large sites, this can easily be 1000+ messages. The important information in the message is common between all users, so we can manage with a single message on a shared channel, which will be much more efficient.

For user-specific values (notification_level and last_read_post_number), the JS app can infer values which are 'good enough'. Correct values will be loaded as soon as a topic-list containing the topic is visited.
This commit is contained in:
David Taylor 2022-02-22 15:27:46 +00:00 committed by GitHub
parent 50da1375ca
commit f6c852bf8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 54 deletions

View File

@ -68,6 +68,7 @@ const TopicTrackingState = EmberObject.extend({
this.messageBus.subscribe("/new", this._processChannelPayload); this.messageBus.subscribe("/new", this._processChannelPayload);
this.messageBus.subscribe("/latest", this._processChannelPayload); this.messageBus.subscribe("/latest", this._processChannelPayload);
if (this.currentUser) { if (this.currentUser) {
this.messageBus.subscribe(`/unread`, this._processChannelPayload);
this.messageBus.subscribe( this.messageBus.subscribe(
`/unread/${this.currentUser.id}`, `/unread/${this.currentUser.id}`,
this._processChannelPayload this._processChannelPayload
@ -813,22 +814,34 @@ const TopicTrackingState = EmberObject.extend({
if (["new_topic", "unread", "read"].includes(data.message_type)) { if (["new_topic", "unread", "read"].includes(data.message_type)) {
this.notifyIncoming(data); this.notifyIncoming(data);
if (!deepEqual(old, data.payload)) { if (!deepEqual(old, data.payload)) {
if (data.message_type === "read") { // The 'unread' and 'read' payloads are deliberately incomplete
let mergeData = {}; // for efficiency. We rebuild them by using any existing state
// we have, and then substitute inferred values for last_read_post_number
// and notification_level. Any errors will be corrected when a
// topic-list is loaded which includes the topic.
// we have to do this because the "read" event does not let payload = data.payload;
// include tags; we don't want them to be overridden
if (old) { if (old) {
mergeData = { payload = deepMerge(old, data.payload);
tags: old.tags, }
topic_tag_ids: old.topic_tag_ids,
}; if (data.message_type === "unread") {
if (payload.last_read_post_number === undefined) {
// If we didn't already have state for this topic,
// we're probably only 1 post behind.
payload.last_read_post_number = payload.highest_post_number - 1;
} }
this.modifyState(data, deepMerge(data.payload, mergeData)); if (payload.notification_level === undefined) {
} else { // /unread messages will only have been published to us
this.modifyState(data, data.payload); // if we are tracking or watching the topic.
// Let's guess TRACKING for now:
payload.notification_level = NotificationLevels.TRACKING;
}
} }
this.modifyState(data, payload);
this.incrementMessageCount(); this.incrementMessageCount();
} }
} }

View File

@ -1,5 +1,5 @@
import QUnit, { module, skip, test } from "qunit"; import QUnit, { module, skip, test } from "qunit";
import { deepMerge } from "discourse-common/lib/object"; import { cloneJSON, deepMerge } from "discourse-common/lib/object";
import MessageBus from "message-bus-client"; import MessageBus from "message-bus-client";
import { import {
clearCache as clearOutletCache, clearCache as clearOutletCache,
@ -487,6 +487,7 @@ export function exists(selector) {
} }
export function publishToMessageBus(channelPath, ...args) { export function publishToMessageBus(channelPath, ...args) {
args = cloneJSON(args);
MessageBus.callbacks MessageBus.callbacks
.filterBy("channel", channelPath) .filterBy("channel", channelPath)
.forEach((c) => c.func(...args)); .forEach((c) => c.func(...args));

View File

@ -426,7 +426,7 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
}); });
discourseModule( discourseModule(
"establishChannels - /unread/:userId MessageBus channel payloads processed", "establishChannels - /unread MessageBus channel payloads processed",
function (unreadHooks) { function (unreadHooks) {
let trackingState; let trackingState;
let unreadTopicPayload = { let unreadTopicPayload = {
@ -436,11 +436,9 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
category_id: 123, category_id: 123,
topic_tag_ids: [44], topic_tag_ids: [44],
tags: ["pending"], tags: ["pending"],
last_read_post_number: 4,
highest_post_number: 10, highest_post_number: 10,
created_at: "2012-11-31 12:00:00 UTC", created_at: "2012-11-31 12:00:00 UTC",
archetype: "regular", archetype: "regular",
notification_level: NotificationLevels.TRACKING,
}, },
}; };
let currentUser; let currentUser;
@ -468,7 +466,7 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
}); });
test("message count is incremented", function (assert) { test("message count is incremented", function (assert) {
publishToMessageBus(`/unread/${currentUser.id}`, unreadTopicPayload); publishToMessageBus(`/unread`, unreadTopicPayload);
assert.strictEqual( assert.strictEqual(
trackingState.messageCount, trackingState.messageCount,
@ -482,10 +480,11 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
trackingState.onStateChange(() => { trackingState.onStateChange(() => {
stateCallbackCalled = true; stateCallbackCalled = true;
}); });
publishToMessageBus(`/unread/${currentUser.id}`, unreadTopicPayload); publishToMessageBus(`/unread`, unreadTopicPayload);
assert.deepEqual( assert.deepEqual(
trackingState.findState(111), trackingState.findState(111),
{ {
topic_id: 111,
category_id: 123, category_id: 123,
topic_tag_ids: [44], topic_tag_ids: [44],
tags: ["pending"], tags: ["pending"],
@ -506,7 +505,7 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
test("adds incoming so it is counted in topic lists", function (assert) { test("adds incoming so it is counted in topic lists", function (assert) {
trackingState.trackIncoming("all"); trackingState.trackIncoming("all");
publishToMessageBus(`/unread/${currentUser.id}`, unreadTopicPayload); publishToMessageBus(`/unread`, unreadTopicPayload);
assert.deepEqual( assert.deepEqual(
trackingState.newIncoming, trackingState.newIncoming,
[111], [111],
@ -541,6 +540,27 @@ discourseModule("Unit | Model | topic-tracking-state", function (hooks) {
assert.strictEqual(trackingState.filter, "latest"); assert.strictEqual(trackingState.filter, "latest");
}); });
test("correctly infers missing information", function (assert) {
publishToMessageBus(`/unread`, {
...unreadTopicPayload,
topic_id: 999,
});
assert.deepEqual(
trackingState.findState(999),
{
category_id: 123,
topic_tag_ids: [44],
tags: ["pending"],
last_read_post_number: 9,
highest_post_number: 10,
notification_level: NotificationLevels.TRACKING,
created_at: "2012-11-31 12:00:00 UTC",
archetype: "regular",
},
"topic state updated with guesses for last_read_post_number and notification_level"
);
});
test("adds incoming in the categories latest topics list", function (assert) { test("adds incoming in the categories latest topics list", function (assert) {
trackingState.trackIncoming("categories"); trackingState.trackIncoming("categories");
const unreadCategoriesLatestTopicsPayload = { const unreadCategoriesLatestTopicsPayload = {

View File

@ -159,38 +159,32 @@ class TopicTrackingState
.where("gu.group_id IN (?)", group_ids) .where("gu.group_id IN (?)", group_ids)
end end
scope user_ids = scope.pluck(:user_id)
.select([:user_id, :last_read_post_number, :notification_level]) return if user_ids.empty?
.each do |tu|
payload = { payload = {
last_read_post_number: tu.last_read_post_number, highest_post_number: post.post_number,
highest_post_number: post.post_number, updated_at: post.topic.updated_at,
updated_at: post.topic.updated_at, created_at: post.created_at,
created_at: post.created_at, category_id: post.topic.category_id,
category_id: post.topic.category_id, archetype: post.topic.archetype,
notification_level: tu.notification_level, unread_not_too_old: true
archetype: post.topic.archetype, }
first_unread_at: tu.user.user_stat&.first_unread_at,
unread_not_too_old: true
}
if tags if tags
payload[:tags] = tags payload[:tags] = tags
payload[:topic_tag_ids] = tag_ids payload[:topic_tag_ids] = tag_ids
end
message = {
topic_id: post.topic_id,
message_type: UNREAD_MESSAGE_TYPE,
payload: payload
}
MessageBus.publish(self.unread_channel_key(tu.user_id), message.as_json,
user_ids: [tu.user_id]
)
end end
message = {
topic_id: post.topic_id,
message_type: UNREAD_MESSAGE_TYPE,
payload: payload
}
MessageBus.publish("/unread", message.as_json,
user_ids: user_ids
)
end end
def self.publish_recover(topic) def self.publish_recover(topic)

View File

@ -165,7 +165,7 @@ describe PostCreator do
"/new", "/new",
"/u/#{admin.username}", "/u/#{admin.username}",
"/u/#{admin.username}", "/u/#{admin.username}",
"/unread/#{admin.id}", "/unread",
"/unread/#{admin.id}", "/unread/#{admin.id}",
"/latest", "/latest",
"/latest", "/latest",

View File

@ -77,7 +77,7 @@ describe TopicTrackingState do
describe '#publish_unread' do describe '#publish_unread' do
it "can correctly publish unread" do it "can correctly publish unread" do
message = MessageBus.track_publish(described_class.unread_channel_key(post.user.id)) do message = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end.first end.first
@ -92,7 +92,7 @@ describe TopicTrackingState do
it "is not erroring when user_stat is missing" do it "is not erroring when user_stat is missing" do
post.user.user_stat.destroy! post.user.user_stat.destroy!
message = MessageBus.track_publish(described_class.unread_channel_key(post.user.id)) do message = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end.first end.first
@ -104,7 +104,7 @@ describe TopicTrackingState do
it "does not publish whisper post to non-staff users" do it "does not publish whisper post to non-staff users" do
post.update!(post_type: Post.types[:whisper]) post.update!(post_type: Post.types[:whisper])
messages = MessageBus.track_publish(described_class.unread_channel_key(post.user_id)) do messages = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end end
@ -112,7 +112,7 @@ describe TopicTrackingState do
post.user.grant_admin! post.user.grant_admin!
message = MessageBus.track_publish(described_class.unread_channel_key(post.user_id)) do message = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end.first end.first
@ -126,7 +126,7 @@ describe TopicTrackingState do
post.topic.update!(category: category) post.topic.update!(category: category)
messages = MessageBus.track_publish(described_class.unread_channel_key(post.user_id)) do messages = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end end
@ -134,7 +134,7 @@ describe TopicTrackingState do
group.add(post.user) group.add(post.user)
message = MessageBus.track_publish(described_class.unread_channel_key(post.user_id)) do message = MessageBus.track_publish("/unread") do
TopicTrackingState.publish_unread(post) TopicTrackingState.publish_unread(post)
end.first end.first