Revert "DEV: Centralize user updates to a single MessageBus channel. (#17058)" (#17115)

This reverts commit 94c3bbc2d1.

At this current point in time, we do not have enough data on whether
this centralisation is the trade-offs of coupling features into a single
channel.
This commit is contained in:
Alan Guo Xiang Tan 2022-06-17 12:24:15 +08:00 committed by GitHub
parent 4c810ca121
commit f618fdf17f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 47 additions and 85 deletions

View File

@ -108,19 +108,17 @@ export default {
user.notification_channel_position user.notification_channel_position
); );
bus.subscribe(`/user-updates/${user.id}`, (data) => { bus.subscribe(`/user-drafts/${user.id}`, (data) => {
switch (data.type) { user.updateDraftProperties(data);
case "drafts": });
user.updateDraftProperties(data.payload);
break; bus.subscribe(`/do-not-disturb/${user.get("id")}`, (data) => {
case "do_not_disturb": user.updateDoNotDisturbStatus(data.ends_at);
user.updateDoNotDisturbStatus(data.payload.ends_at); });
break;
case "user_status": bus.subscribe(`/user-status/${user.id}`, (data) => {
user.set("status", data.payload); user.set("status", data);
appEvents.trigger("user-status:changed"); appEvents.trigger("user-status:changed");
break;
}
}); });
const site = container.lookup("site:main"); const site = container.lookup("site:main");

View File

@ -242,11 +242,8 @@ acceptance("Sidebar - Topics Section", function (needs) {
async function (assert) { async function (assert) {
await visit("/t/280"); await visit("/t/280");
publishToMessageBus(`/user-updates/${loggedInUser().id}`, { publishToMessageBus(`/user-drafts/${loggedInUser().id}`, {
type: "drafts",
payload: {
draft_count: 1, draft_count: 1,
},
}); });
await settled(); await settled();

View File

@ -17,17 +17,13 @@ acceptance("User Status", function (needs) {
needs.pretender((server, helper) => { needs.pretender((server, helper) => {
server.put("/user-status.json", () => { server.put("/user-status.json", () => {
publishToMessageBus(`/user-updates/${userId}`, { publishToMessageBus(`/user-status/${userId}`, {
type: "user_status", description: userStatus,
payload: { description: userStatus },
}); });
return helper.response({ success: true }); return helper.response({ success: true });
}); });
server.delete("/user-status.json", () => { server.delete("/user-status.json", () => {
publishToMessageBus(`/user-updates/${userId}`, { publishToMessageBus(`/user-status/${userId}`, null);
type: "user_status",
payload: null,
});
return helper.response({ success: true }); return helper.response({ success: true });
}); });
}); });

View File

@ -657,38 +657,16 @@ class User < ActiveRecord::Base
MessageBus.publish("/notification/#{id}", payload, user_ids: [id]) MessageBus.publish("/notification/#{id}", payload, user_ids: [id])
end end
PUBLISH_USER_STATUS_TYPE = "user_status"
PUBLISH_DO_NOT_STATUS_TYPE = "do_not_disturb"
PUBLISH_DRAFTS_TYPE = "drafts"
def self.publish_updates_channel(user_id)
"/user-updates/#{user_id}"
end
def self.publish_updates(user_id:, type:, payload:)
MessageBus.publish(
publish_updates_channel(user_id),
{
type: type,
payload: payload
},
user_ids: [user_id]
)
end
def publish_updates(type:, payload:)
self.class.publish_updates(user_id: id, type: type, payload: payload)
end
def publish_do_not_disturb(ends_at: nil) def publish_do_not_disturb(ends_at: nil)
publish_updates(type: PUBLISH_DO_NOT_STATUS_TYPE, payload: { ends_at: ends_at&.httpdate }) MessageBus.publish("/do-not-disturb/#{id}", { ends_at: ends_at&.httpdate }, user_ids: [id])
end end
def publish_user_status(status) def publish_user_status(status)
publish_updates( payload = status ?
type: PUBLISH_USER_STATUS_TYPE, { description: status.description, emoji: status.emoji } :
payload: status ? { description: status.description, emoji: status.emoji } : nil nil
)
MessageBus.publish("/user-status/#{id}", payload, user_ids: [id])
end end
def password=(password) def password=(password)

View File

@ -214,13 +214,13 @@ class UserStat < ActiveRecord::Base
RETURNING draft_count, (SELECT 1 FROM drafts WHERE user_id = :user_id AND draft_key = :new_topic) RETURNING draft_count, (SELECT 1 FROM drafts WHERE user_id = :user_id AND draft_key = :new_topic)
SQL SQL
User.publish_updates( MessageBus.publish(
user_id: user_id, "/user-drafts/#{user_id}",
type: User::PUBLISH_DRAFTS_TYPE, {
payload: {
draft_count: draft_count, draft_count: draft_count,
has_topic_draft: !!has_topic_draft has_topic_draft: !!has_topic_draft
} },
user_ids: [user_id]
) )
else else
DB.exec <<~SQL DB.exec <<~SQL

View File

@ -174,9 +174,9 @@ describe PostCreator do
"/latest", "/latest",
"/topic/#{created_post.topic_id}", "/topic/#{created_post.topic_id}",
"/topic/#{created_post.topic_id}", "/topic/#{created_post.topic_id}",
User.publish_updates_channel(admin.id), "/user-drafts/#{admin.id}",
User.publish_updates_channel(admin.id), "/user-drafts/#{admin.id}",
User.publish_updates_channel(admin.id), "/user-drafts/#{admin.id}",
].sort ].sort
) )
@ -205,7 +205,7 @@ describe PostCreator do
user_action = messages.find { |m| m.channel == "/u/#{p.user.username}" } user_action = messages.find { |m| m.channel == "/u/#{p.user.username}" }
expect(user_action).not_to eq(nil) expect(user_action).not_to eq(nil)
draft_count = messages.find { |m| m.channel == User.publish_updates_channel(p.user_id) } draft_count = messages.find { |m| m.channel == "/user-drafts/#{p.user_id}" }
expect(draft_count).not_to eq(nil) expect(draft_count).not_to eq(nil)
expect(messages.filter { |m| m.channel != "/distributed_hash" }.length).to eq(7) expect(messages.filter { |m| m.channel != "/distributed_hash" }.length).to eq(7)

View File

@ -179,19 +179,21 @@ describe Draft do
it 'updates draft count when a draft is created or destroyed' do it 'updates draft count when a draft is created or destroyed' do
Draft.set(Fabricate(:user), Draft::NEW_TOPIC, 0, "data") Draft.set(Fabricate(:user), Draft::NEW_TOPIC, 0, "data")
messages = MessageBus.track_publish("/user-updates/#{user.id}") do messages = MessageBus.track_publish("/user-drafts/#{user.id}") do
Draft.set(user, Draft::NEW_TOPIC, 0, "data") Draft.set(user, Draft::NEW_TOPIC, 0, "data")
end end
expect(messages.first.data[:payload][:draft_count]).to eq(1) expect(messages.first.data[:draft_count]).to eq(1)
expect(messages.first.data[:payload][:has_topic_draft]).to eq(true) expect(messages.first.data[:has_topic_draft]).to eq(true)
expect(messages.first.user_ids).to contain_exactly(user.id)
messages = MessageBus.track_publish("/user-updates/#{user.id}") do messages = MessageBus.track_publish("/user-drafts/#{user.id}") do
Draft.where(user: user).destroy_all Draft.where(user: user).destroy_all
end end
expect(messages.first.data[:payload][:draft_count]).to eq(0) expect(messages.first.data[:draft_count]).to eq(0)
expect(messages.first.data[:payload][:has_topic_draft]).to eq(false) expect(messages.first.data[:has_topic_draft]).to eq(false)
expect(messages.first.user_ids).to contain_exactly(user.id)
end end
describe '#stream' do describe '#stream' do

View File

@ -50,15 +50,11 @@ describe UserStatusController do
it "publishes to message bus" do it "publishes to message bus" do
status = "off to dentist" status = "off to dentist"
messages = MessageBus.track_publish { put "/user-status.json", params: { description: status } }
messages = MessageBus.track_publish(User.publish_updates_channel(user.id)) do
put "/user-status.json", params: { description: status }
expect(response.status).to eq(200)
end
expect(messages.size).to eq(1) expect(messages.size).to eq(1)
expect(messages[0].data[:type]).to eq(User::PUBLISH_USER_STATUS_TYPE) expect(messages[0].channel).to eq("/user-status/#{user.id}")
expect(messages[0].data[:payload][:description]).to eq(status) expect(messages[0].data[:description]).to eq(status)
expect(messages[0].user_ids).to eq([user.id]) expect(messages[0].user_ids).to eq([user.id])
end end
end end
@ -97,15 +93,11 @@ describe UserStatusController do
end end
it "publishes to message bus" do it "publishes to message bus" do
messages = MessageBus.track_publish(User.publish_updates_channel(user.id)) do messages = MessageBus.track_publish { delete "/user-status.json" }
delete "/user-status.json"
expect(response.status).to eq(200)
end
expect(messages.size).to eq(1) expect(messages.size).to eq(1)
expect(messages[0].data[:type]).to eq(User::PUBLISH_USER_STATUS_TYPE) expect(messages[0].channel).to eq("/user-status/#{user.id}")
expect(messages[0].data[:payload]).to eq(nil) expect(messages[0].data).to eq(nil)
expect(messages[0].user_ids).to eq([user.id]) expect(messages[0].user_ids).to eq([user.id])
end end
end end

View File

@ -143,13 +143,12 @@ describe UserNotificationScheduleProcessor do
user.user_option.update(timezone: "UTC") user.user_option.update(timezone: "UTC")
schedule = standard_schedule schedule = standard_schedule
travel_to Time.new(2020, 12, 31, 1, 0, 0, "+00:00") do travel_to Time.new(2020, 12, 31, 1, 0, 0, "+00:00") do
messages = MessageBus.track_publish(User.publish_updates_channel(user.id)) do messages = MessageBus.track_publish("/do-not-disturb/#{user.id}") do
UserNotificationScheduleProcessor.create_do_not_disturb_timings_for(schedule) UserNotificationScheduleProcessor.create_do_not_disturb_timings_for(schedule)
end end
expect(messages.size).to eq(1) expect(messages.size).to eq(1)
expect(messages[0].data[:type]).to eq(User::PUBLISH_DO_NOT_STATUS_TYPE) expect(messages[0].data[:ends_at]).to eq(Time.new(2020, 12, 31, 7, 59, 0, "+00:00").httpdate)
expect(messages[0].data[:payload][:ends_at]).to eq(Time.new(2020, 12, 31, 7, 59, 0, "+00:00").httpdate)
expect(messages[0].user_ids).to contain_exactly(user.id) expect(messages[0].user_ids).to contain_exactly(user.id)
end end
end end