DEV: Improve MessageBus subscriptions for TopicTrackingState (#19767)

## Why do we need this change? 

When loading the ember app, [MessageBus does not start polling immediately](f31f0b70f8/app/assets/javascripts/discourse/app/initializers/message-bus.js (L71-L81)) and instead waits for `document.readyState` to be `complete`. What this means is that if there are new messages being created while we have yet to start polling, those messages will not be received by the client.

With sidebar being the default navigation menu, the counts derived from `topic-tracking-state.js` on the client side is prominently displayed on every page. Therefore, we want to ensure that we are not dropping any messages on the channels that `topic-tracking-state.js` subscribes to.  

## What does this change do? 

This includes the `MessageBus.last_id`s for the MessageBus channels which `topic-tracking-state.js` subscribes to as part of the preloaded data when loading a page. The last ids are then used when we subscribe the MessageBus channels so that messages which are published before MessageBus starts polling will not be missed.

## Review Notes

1. See https://github.com/discourse/message_bus#client-support for documentation about subscribing from a given message id.
This commit is contained in:
Alan Guo Xiang Tan 2023-02-01 07:18:45 +08:00 committed by GitHub
parent f1ea2a2509
commit 07ef828db9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 164 additions and 81 deletions

View File

@ -83,20 +83,53 @@ const TopicTrackingState = EmberObject.extend({
*
* @method establishChannels
*/
establishChannels() {
this.messageBus.subscribe("/latest", this._processChannelPayload);
establishChannels(meta) {
meta ??= {};
const messageBusDefaultNewMessageId = -1;
this.messageBus.subscribe(
"/latest",
this._processChannelPayload,
meta["/latest"] || messageBusDefaultNewMessageId
);
if (this.currentUser) {
this.messageBus.subscribe("/new", this._processChannelPayload);
this.messageBus.subscribe(`/unread`, this._processChannelPayload);
this.messageBus.subscribe(
"/new",
this._processChannelPayload,
meta["/new"] || messageBusDefaultNewMessageId
);
this.messageBus.subscribe(
`/unread`,
this._processChannelPayload,
meta["/unread"] || messageBusDefaultNewMessageId
);
this.messageBus.subscribe(
`/unread/${this.currentUser.id}`,
this._processChannelPayload
this._processChannelPayload,
meta[`/unread/${this.currentUser.id}`] || messageBusDefaultNewMessageId
);
}
this.messageBus.subscribe("/delete", this.onDeleteMessage);
this.messageBus.subscribe("/recover", this.onRecoverMessage);
this.messageBus.subscribe("/destroy", this.onDestroyMessage);
this.messageBus.subscribe(
"/delete",
this.onDeleteMessage,
meta["/delete"] || messageBusDefaultNewMessageId
);
this.messageBus.subscribe(
"/recover",
this.onRecoverMessage,
meta["/recover"] || messageBusDefaultNewMessageId
);
this.messageBus.subscribe(
"/destroy",
this.onDestroyMessage,
meta["/destroy"] || messageBusDefaultNewMessageId
);
},
@bind
@ -1005,10 +1038,13 @@ const TopicTrackingState = EmberObject.extend({
});
export function startTracking(tracking) {
const data = PreloadStore.get("topicTrackingStates");
tracking.loadStates(data);
tracking.establishChannels();
PreloadStore.remove("topicTrackingStates");
PreloadStore.getAndRemove("topicTrackingStates").then((data) =>
tracking.loadStates(data)
);
PreloadStore.getAndRemove("topicTrackingStateMeta").then((meta) =>
tracking.establishChannels(meta)
);
}
export default TopicTrackingState;

View File

@ -651,15 +651,10 @@ class ApplicationController < ActionController::Base
)
report = TopicTrackingState.report(current_user)
serializer = TopicTrackingStateSerializer.new(report, scope: guardian, root: false)
serializer =
ActiveModel::ArraySerializer.new(
report,
each_serializer: TopicTrackingStateSerializer,
scope: guardian,
)
store_preloaded("topicTrackingStates", MultiJson.dump(serializer))
store_preloaded("topicTrackingStates", MultiJson.dump(serializer.as_json[:data]))
store_preloaded("topicTrackingStateMeta", MultiJson.dump(serializer.as_json[:meta]))
end
def custom_html_json

View File

@ -394,14 +394,9 @@ class UsersController < ApplicationController
guardian.ensure_can_edit!(user)
report = TopicTrackingState.report(user)
serializer =
ActiveModel::ArraySerializer.new(
report,
each_serializer: TopicTrackingStateSerializer,
scope: guardian,
)
serializer = TopicTrackingStateSerializer.new(report, scope: guardian, root: false)
render json: MultiJson.dump(serializer)
render json: MultiJson.dump(serializer.as_json[:data])
end
def private_message_topic_tracking_state

View File

@ -31,6 +31,13 @@ class TopicTrackingState
DISMISS_NEW_MESSAGE_TYPE = "dismiss_new"
MAX_TOPICS = 5000
NEW_MESSAGE_BUS_CHANNEL = "/new"
LATEST_MESSAGE_BUS_CHANNEL = "/latest"
UNREAD_MESSAGE_BUS_CHANNEL = "/unread"
RECOVER_MESSAGE_BUS_CHANNEL = "/recover"
DELETE_MESSAGE_BUS_CHANNEL = "/delete"
DESTROY_MESSAGE_BUS_CHANNEL = "/destroy"
def self.publish_new(topic)
return unless topic.regular?
@ -55,7 +62,7 @@ class TopicTrackingState
group_ids = secure_category_group_ids(topic)
MessageBus.publish("/new", message.as_json, group_ids: group_ids)
MessageBus.publish(NEW_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
publish_read(topic.id, 1, topic.user)
end
@ -86,8 +93,7 @@ class TopicTrackingState
else
secure_category_group_ids(topic)
end
MessageBus.publish("/latest", message.as_json, group_ids: group_ids)
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.unread_channel_key(user_id)
@ -107,8 +113,10 @@ class TopicTrackingState
.limit(100)
.pluck(:user_id)
return if user_ids.blank?
message = { topic_id: topic.id, message_type: MUTED_MESSAGE_TYPE }
MessageBus.publish("/latest", message.as_json, user_ids: user_ids)
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_unmuted(topic)
@ -122,8 +130,10 @@ class TopicTrackingState
.limit(100)
.pluck(:id)
return if user_ids.blank?
message = { topic_id: topic.id, message_type: UNMUTED_MESSAGE_TYPE }
MessageBus.publish("/latest", message.as_json, user_ids: user_ids)
MessageBus.publish(LATEST_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_unread(post)
@ -172,7 +182,7 @@ class TopicTrackingState
message = { topic_id: post.topic_id, message_type: UNREAD_MESSAGE_TYPE, payload: payload }
MessageBus.publish("/unread", message.as_json, user_ids: user_ids)
MessageBus.publish(UNREAD_MESSAGE_BUS_CHANNEL, message.as_json, user_ids: user_ids)
end
def self.publish_recover(topic)
@ -182,7 +192,7 @@ class TopicTrackingState
message = { topic_id: topic.id, message_type: RECOVER_MESSAGE_TYPE }
MessageBus.publish("/recover", message.as_json, group_ids: group_ids)
MessageBus.publish(RECOVER_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.publish_delete(topic)
@ -202,7 +212,7 @@ class TopicTrackingState
message = { topic_id: topic.id, message_type: DESTROY_MESSAGE_TYPE }
MessageBus.publish("/destroy", message.as_json, group_ids: group_ids)
MessageBus.publish(DESTROY_MESSAGE_BUS_CHANNEL, message.as_json, group_ids: group_ids)
end
def self.publish_read(topic_id, last_read_post_number, user, notification_level = nil)

View File

@ -0,0 +1,22 @@
# frozen_string_literal: true
class TopicTrackingStateItemSerializer < ApplicationSerializer
attributes :topic_id,
:highest_post_number,
:last_read_post_number,
:created_at,
:category_id,
:notification_level,
:created_in_new_period,
:treat_as_new_topic_start_date,
:tags
def created_in_new_period
return true if !scope
object.created_at >= treat_as_new_topic_start_date
end
def include_tags?
object.respond_to?(:tags)
end
end

View File

@ -1,22 +1,23 @@
# frozen_string_literal: true
class TopicTrackingStateSerializer < ApplicationSerializer
attributes :topic_id,
:highest_post_number,
:last_read_post_number,
:created_at,
:category_id,
:notification_level,
:created_in_new_period,
:treat_as_new_topic_start_date,
:tags
attributes :data, :meta
def created_in_new_period
return true if !scope
object.created_at >= treat_as_new_topic_start_date
def data
object.map do |item|
TopicTrackingStateItemSerializer.new(item, scope: scope, root: false).as_json
end
end
def include_tags?
object.respond_to?(:tags)
def meta
MessageBus.last_ids(
TopicTrackingState::LATEST_MESSAGE_BUS_CHANNEL,
TopicTrackingState::RECOVER_MESSAGE_BUS_CHANNEL,
TopicTrackingState::DELETE_MESSAGE_BUS_CHANNEL,
TopicTrackingState::DESTROY_MESSAGE_BUS_CHANNEL,
TopicTrackingState::NEW_MESSAGE_BUS_CHANNEL,
TopicTrackingState::UNREAD_MESSAGE_BUS_CHANNEL,
TopicTrackingState.unread_channel_key(scope.user.id),
)
end
end

View File

@ -0,0 +1,44 @@
# frozen_string_literal: true
RSpec.describe TopicTrackingStateItemSerializer do
fab!(:user) { Fabricate(:user) }
fab!(:post) { create_post }
before do
SiteSetting.navigation_menu = "legacy"
SiteSetting.chat_enabled = false if defined?(::Chat)
end
it "serializes topic tracking state reports" do
report = TopicTrackingState.report(user)
serialized = described_class.new(report[0], scope: Guardian.new(user), root: false).as_json
expect(serialized[:topic_id]).to eq(post.topic_id)
expect(serialized[:highest_post_number]).to eq(post.topic.highest_post_number)
expect(serialized[:last_read_post_number]).to eq(nil)
expect(serialized[:created_at]).to be_present
expect(serialized[:notification_level]).to eq(nil)
expect(serialized[:created_in_new_period]).to eq(true)
expect(serialized[:treat_as_new_topic_start_date]).to be_present
expect(serialized.has_key?(:tags)).to eq(false)
end
it "includes tags attribute when tags are present" do
TopicTrackingState.include_tags_in_report = true
post.topic.notifier.watch_topic!(post.topic.user_id)
DiscourseTagging.tag_topic_by_names(
post.topic,
Guardian.new(Discourse.system_user),
%w[bananas apples],
)
report = TopicTrackingState.report(user)
serialized = described_class.new(report[0], scope: Guardian.new(user), root: false).as_json
expect(serialized[:tags]).to contain_exactly("bananas", "apples")
ensure
TopicTrackingState.include_tags_in_report = false
end
end

View File

@ -4,41 +4,21 @@ RSpec.describe TopicTrackingStateSerializer do
fab!(:user) { Fabricate(:user) }
fab!(:post) { create_post }
before do
SiteSetting.navigation_menu = "legacy"
SiteSetting.chat_enabled = false if defined?(::Chat)
end
it "serializes topic tracking state reports" do
it "serializes topic tracking state report correctly for a normal user" do
report = TopicTrackingState.report(user)
serialized = described_class.new(report[0], scope: Guardian.new(user), root: false).as_json
serialized = described_class.new(report, scope: Guardian.new(user), root: false).as_json
expect(serialized[:topic_id]).to eq(post.topic_id)
expect(serialized[:highest_post_number]).to eq(post.topic.highest_post_number)
expect(serialized[:last_read_post_number]).to eq(nil)
expect(serialized[:created_at]).to be_present
expect(serialized[:notification_level]).to eq(nil)
expect(serialized[:created_in_new_period]).to eq(true)
expect(serialized[:treat_as_new_topic_start_date]).to be_present
expect(serialized.has_key?(:tags)).to eq(false)
end
expect(serialized[:data].length).to eq(1)
expect(serialized[:data].first[:topic_id]).to eq(post.topic_id)
it "includes tags attribute when tags are present" do
TopicTrackingState.include_tags_in_report = true
post.topic.notifier.watch_topic!(post.topic.user_id)
DiscourseTagging.tag_topic_by_names(
post.topic,
Guardian.new(Discourse.system_user),
%w[bananas apples],
expect(serialized[:meta].keys).to contain_exactly(
TopicTrackingState::LATEST_MESSAGE_BUS_CHANNEL,
TopicTrackingState::RECOVER_MESSAGE_BUS_CHANNEL,
TopicTrackingState::DELETE_MESSAGE_BUS_CHANNEL,
TopicTrackingState::DESTROY_MESSAGE_BUS_CHANNEL,
TopicTrackingState::NEW_MESSAGE_BUS_CHANNEL,
TopicTrackingState::UNREAD_MESSAGE_BUS_CHANNEL,
TopicTrackingState.unread_channel_key(user.id),
)
report = TopicTrackingState.report(user)
serialized = described_class.new(report[0], scope: Guardian.new(user), root: false).as_json
expect(serialized[:tags]).to contain_exactly("bananas", "apples")
ensure
TopicTrackingState.include_tags_in_report = false
end
end