FEATURE: Automatically create chat threads in background (#20132)

Whenever we create a chat message that is `in_reply_to` another
message, we want to lazily populate the thread record for the
message chain.

If there is no thread yet for the root message in the reply chain,
we create a new thread with the appropriate details, and use that
thread ID for every message in the chain that does not yet have
a thread ID.

* Root message (ID 1) - no thread ID
    * Message (ID 2, in_reply_to 1) - no thread ID
    * When I as a user create a message in reply to ID 2, we create a thread and apply it to ID 1, ID 2, and the new message

If there is a thread for the root message in the reply chain, we
do not create one, and use the thread ID for the newly created chat
message.

* Root message (ID 1) - thread ID 700
    * Message (ID 2, in_reply_to 1) - thread ID 700
    * When I as a user create a message in reply to ID 2, we use the existing thread ID 700 for the new message

We also support passing in the `thread_id` to `ChatMessageCreator`,
which will be used when replying to a message that is already part of
a thread, and we validate whether that `thread_id` is okay in the context
of the channel and also the reply chain.

This work is always done, regardless of channel `thread_enabled` settings
or the `enable_experimental_chat_threaded_discussions` site setting.

This commit does not include a large data migration to backfill threads for
all existing reply chains, its unnecessary to do this so early in the project,
we can do this later if necessary.

This commit also includes thread considerations in the `MessageMover` class:

* If the original message and N other messages of a thread is moved,
   the remaining messages in the thread have a new thread created in
   the old channel and are moved to it.
* The reply chain is not preserved for moved messages, so new threads are
   not created in the destination channel.

In addition to this, I added a fix to also clear the `in_reply_to_id` of messages
in the old channel which are moved out of that channel for data cleanliness.
This commit is contained in:
Martin Brennan 2023-02-08 09:50:42 +10:00 committed by GitHub
parent a04201ae01
commit 37e6e3be7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 589 additions and 12 deletions

View File

@ -73,6 +73,7 @@ en:
over_chat_max_direct_message_users:
one: "You can only create a direct message with yourself."
other: "You can't create a direct message with more than %{count} other users."
root_message_not_found: "The ancestor of the message you are replying cannot be found or has been deleted."
reviewables:
message_already_handled: "Thanks, but we've already reviewed this message and determined it does not need to be flagged again."
actions:

View File

@ -11,6 +11,7 @@ class Chat::ChatMessageCreator
def initialize(
chat_channel:,
in_reply_to_id: nil,
thread_id: nil,
user:,
content:,
staged_id: nil,
@ -20,11 +21,15 @@ class Chat::ChatMessageCreator
@chat_channel = chat_channel
@user = user
@guardian = Guardian.new(user)
# NOTE: We confirm this exists and the user can access it in the ChatController,
# but in future the checks should be here
@in_reply_to_id = in_reply_to_id
@content = content
@staged_id = staged_id
@incoming_chat_webhook = incoming_chat_webhook
@upload_ids = upload_ids || []
@thread_id = thread_id
@error = nil
@chat_message =
@ -42,9 +47,13 @@ class Chat::ChatMessageCreator
validate_channel_status!
uploads = get_uploads
validate_message!(has_uploads: uploads.any?)
validate_reply_chain!
validate_existing_thread!
@chat_message.thread_id = @existing_thread&.id
@chat_message.cook
@chat_message.save!
create_chat_webhook_event
create_thread
@chat_message.attach_uploads(uploads)
ChatDraft.where(user_id: @user.id, chat_channel_id: @chat_channel.id).destroy_all
ChatPublisher.publish_new!(@chat_channel, @chat_message, @staged_id)
@ -81,6 +90,56 @@ class Chat::ChatMessageCreator
end
end
def validate_reply_chain!
return if @in_reply_to_id.blank?
@root_message_id = DB.query_single(<<~SQL).last
WITH RECURSIVE root_message_finder( id, in_reply_to_id )
AS (
-- start with the message id we want to find the parents of
SELECT id, in_reply_to_id
FROM chat_messages
WHERE id = #{@in_reply_to_id}
UNION ALL
-- get the chain of direct parents of the message
-- following in_reply_to_id
SELECT cm.id, cm.in_reply_to_id
FROM root_message_finder rm
JOIN chat_messages cm ON rm.in_reply_to_id = cm.id
)
SELECT id FROM root_message_finder
-- this makes it so only the root parent ID is returned, we can
-- exclude this to return all parents in the chain
WHERE in_reply_to_id IS NULL;
SQL
raise StandardError.new(I18n.t("chat.errors.root_message_not_found")) if @root_message_id.blank?
@root_message = ChatMessage.with_deleted.find_by(id: @root_message_id)
raise StandardError.new(I18n.t("chat.errors.root_message_not_found")) if @root_message&.trashed?
end
def validate_existing_thread!
return if @thread_id.blank?
@existing_thread = ChatThread.find(@thread_id)
if @existing_thread.channel_id != @chat_channel.id
raise StandardError.new(I18n.t("chat.errors.thread_invalid_for_channel"))
end
reply_to_thread_mismatch =
@chat_message.in_reply_to&.thread_id &&
@chat_message.in_reply_to.thread_id != @existing_thread.id
root_message_has_no_thread = @root_message && @root_message.thread_id.blank?
root_message_thread_mismatch = @root_message && @root_message.thread_id != @existing_thread.id
if reply_to_thread_mismatch || root_message_has_no_thread || root_message_thread_mismatch
raise StandardError.new(I18n.t("chat.errors.thread_does_not_match_parent"))
end
end
def validate_message!(has_uploads:)
@chat_message.validate_message(has_uploads: has_uploads)
if @chat_message.errors.present?
@ -101,4 +160,39 @@ class Chat::ChatMessageCreator
Upload.where(id: @upload_ids, user_id: @user.id)
end
def create_thread
return if @in_reply_to_id.blank?
return if @chat_message.thread_id.present?
thread =
@root_message.thread ||
ChatThread.create!(
original_message: @chat_message.in_reply_to,
original_message_user: @chat_message.in_reply_to.user,
channel: @chat_message.chat_channel,
)
# NOTE: We intentionally do not try to correct thread IDs within the chain
# if they are incorrect, and only set the thread ID of messages where the
# thread ID is NULL. In future we may want some sync/background job to correct
# any inconsistencies.
DB.exec(<<~SQL)
WITH RECURSIVE thread_updater AS (
SELECT cm.id, cm.in_reply_to_id
FROM chat_messages cm
WHERE cm.in_reply_to_id IS NULL AND cm.id = #{@root_message_id}
UNION ALL
SELECT cm.id, cm.in_reply_to_id
FROM chat_messages cm
JOIN thread_updater ON cm.in_reply_to_id = thread_updater.id
)
UPDATE chat_messages
SET thread_id = #{thread.id}
FROM thread_updater
WHERE thread_id IS NULL AND chat_messages.id = thread_updater.id
SQL
end
end

View File

@ -16,6 +16,19 @@
# all of the references associated to a chat message (e.g. reactions, bookmarks,
# notifications, revisions, mentions, uploads) will be updated to the new
# message IDs via a moved_chat_messages temporary table.
#
# Reply chains are a little complex. No reply chains are preserved when moving
# messages into a new channel. Remaining messages that referenced moved ones
# have their in_reply_to_id cleared so the data makes sense.
#
# Threads are even more complex. No threads are preserved when moving messages
# into a new channel, they end up as just a flat series of messages that are
# not in a chain. If the original message of a thread and N other messages
# in that thread, then any messages left behind just get placed into a new
# thread. Message moving will be disabled in the thread UI while
# enable_experimental_chat_threaded_discussions is present, its too complicated
# to have end users reason about for now, and we may want a standalone
# "Move Thread" UI later on.
class Chat::MessageMover
class NoMessagesFound < StandardError
end
@ -51,6 +64,8 @@ class Chat::MessageMover
bulk_insert_movement_metadata
update_references
delete_source_messages
update_reply_references
update_thread_references
end
add_moved_placeholder(destination_channel, moved_messages.first)
@ -60,7 +75,10 @@ class Chat::MessageMover
private
def find_messages(message_ids, channel)
ChatMessage.where(id: message_ids, chat_channel_id: channel.id).order("created_at ASC, id ASC")
ChatMessage
.includes(thread: %i[original_message original_message_user])
.where(id: message_ids, chat_channel_id: channel.id)
.order("created_at ASC, id ASC")
end
def create_temp_table
@ -154,7 +172,13 @@ class Chat::MessageMover
end
def delete_source_messages
@source_messages.update_all(deleted_at: Time.zone.now, deleted_by_id: @acting_user.id)
# We do this so @source_messages is not nulled out, which is the
# case when using update_all here.
DB.exec(<<~SQL, source_message_ids: @source_message_ids, deleted_by_id: @acting_user.id)
UPDATE chat_messages
SET deleted_at = NOW(), deleted_by_id = :deleted_by_id
WHERE id IN (:source_message_ids)
SQL
ChatPublisher.publish_bulk_delete!(@source_channel, @source_message_ids)
end
@ -172,4 +196,41 @@ class Chat::MessageMover
),
)
end
def update_reply_references
DB.exec(<<~SQL, deleted_reply_to_ids: @source_message_ids)
UPDATE chat_messages
SET in_reply_to_id = NULL
WHERE in_reply_to_id IN (:deleted_reply_to_ids)
SQL
end
def update_thread_references
threads_to_update = []
@source_messages
.select { |message| message.thread_id.present? }
.each do |message_with_thread|
# If one of the messages we are moving is the original message in a thread,
# then all the remaining messages for that thread must be moved to a new one,
# otherwise they will be pointing to a thread in a different channel.
if message_with_thread.thread.original_message_id == message_with_thread.id
threads_to_update << message_with_thread.thread
end
end
threads_to_update.each do |thread|
# NOTE: We may want to do something with the old empty thread at some
# point, maybe close or delete it. For now just leave it dangling.
next if thread.chat_messages.empty?
original_message = thread.chat_messages.first
new_thread =
ChatThread.create(
original_message: original_message,
original_message_user: original_message.user,
channel: @source_channel,
)
thread.chat_messages.update_all(thread_id: new_thread.id)
end
end
end

View File

@ -357,6 +357,321 @@ describe Chat::ChatMessageCreator do
}.to change { ChatMention.count }.by(1)
end
describe "replies" do
fab!(:reply_message) do
Fabricate(:chat_message, chat_channel: public_chat_channel, user: user2)
end
fab!(:unrelated_message_1) { Fabricate(:chat_message, chat_channel: public_chat_channel) }
fab!(:unrelated_message_2) { Fabricate(:chat_message, chat_channel: public_chat_channel) }
it "links the message that the user is replying to" do
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
expect(message.in_reply_to_id).to eq(reply_message.id)
end
it "creates a thread and includes the original message and the reply" do
message = nil
expect {
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
}.to change { ChatThread.count }.by(1)
expect(message.reload.thread).not_to eq(nil)
expect(message.in_reply_to.thread).to eq(message.thread)
expect(message.thread.original_message).to eq(reply_message)
expect(message.thread.original_message_user).to eq(reply_message.user)
end
context "when the thread_id is provided" do
fab!(:existing_thread) { Fabricate(:chat_thread, channel: public_chat_channel) }
it "does not create a thread when one is passed in" do
message = nil
expect {
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
thread_id: existing_thread.id,
).chat_message
}.not_to change { ChatThread.count }
expect(message.reload.thread).to eq(existing_thread)
end
it "errors when the thread ID is for a different channel" do
other_channel_thread = Fabricate(:chat_thread, channel: Fabricate(:chat_channel))
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
thread_id: other_channel_thread.id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.thread_invalid_for_channel"))
end
it "errors when the thread does not match the in_reply_to thread" do
reply_message.update!(thread: existing_thread)
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
thread_id: Fabricate(:chat_thread, channel: public_chat_channel).id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.thread_does_not_match_parent"))
end
it "errors when the root message does not have a thread ID" do
reply_message.update!(thread: nil)
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
thread_id: existing_thread.id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.thread_does_not_match_parent"))
end
end
context "for missing root messages" do
fab!(:root_message) do
Fabricate(
:chat_message,
chat_channel: public_chat_channel,
user: user2,
created_at: 1.day.ago,
)
end
before { reply_message.update!(in_reply_to: root_message) }
it "raises an error when the root message has been trashed" do
root_message.trash!
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.root_message_not_found"))
end
it "uses the next message in the chain as the root when the root is deleted" do
root_message.destroy!
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
)
expect(reply_message.reload.thread).not_to eq(nil)
end
end
context "when there is an existing reply chain" do
fab!(:old_message_1) do
Fabricate(
:chat_message,
chat_channel: public_chat_channel,
user: user1,
created_at: 6.hours.ago,
)
end
fab!(:old_message_2) do
Fabricate(
:chat_message,
chat_channel: public_chat_channel,
user: user2,
in_reply_to: old_message_1,
created_at: 4.hours.ago,
)
end
fab!(:old_message_3) do
Fabricate(
:chat_message,
chat_channel: public_chat_channel,
user: user1,
in_reply_to: old_message_2,
created_at: 1.hour.ago,
)
end
before do
reply_message.update!(
created_at: old_message_3.created_at + 1.hour,
in_reply_to: old_message_3,
)
end
it "creates a thread and updates all the messages in the chain" do
thread_count = ChatThread.count
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
expect(ChatThread.count).to eq(thread_count + 1)
expect(message.reload.thread).not_to eq(nil)
expect(message.reload.in_reply_to.thread).to eq(message.thread)
expect(old_message_1.reload.thread).to eq(message.thread)
expect(old_message_2.reload.thread).to eq(message.thread)
expect(old_message_3.reload.thread).to eq(message.thread)
expect(message.thread.chat_messages.count).to eq(5)
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
end
context "when a thread already exists and the thread_id is passed in" do
let!(:last_message) do
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
end
let!(:existing_thread) { last_message.reload.thread }
it "does not create a new thread" do
thread_count = ChatThread.count
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message again",
in_reply_to_id: last_message.id,
thread_id: existing_thread.id,
).chat_message
expect(ChatThread.count).to eq(thread_count)
expect(message.reload.thread).to eq(existing_thread)
expect(message.reload.in_reply_to.thread).to eq(existing_thread)
expect(message.thread.chat_messages.count).to eq(6)
end
it "errors when the thread does not match the root thread" do
old_message_1.update!(thread: Fabricate(:chat_thread, channel: public_chat_channel))
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
thread_id: existing_thread.id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.thread_does_not_match_parent"))
end
it "errors when the root message does not have a thread ID" do
old_message_1.update!(thread: nil)
result =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
thread_id: existing_thread.id,
)
expect(result.error.message).to eq(I18n.t("chat.errors.thread_does_not_match_parent"))
end
end
context "when there are hundreds of messages in a reply chain already" do
before do
previous_message = nil
1000.times do |i|
previous_message =
Fabricate(
:chat_message,
chat_channel: public_chat_channel,
user: [user1, user2].sample,
in_reply_to: previous_message,
created_at: i.hours.ago,
)
end
@last_message_in_chain = previous_message
end
xit "works" do
thread_count = ChatThread.count
message = nil
puts Benchmark.measure {
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: @last_message_in_chain.id,
).chat_message
}
expect(ChatThread.count).to eq(thread_count + 1)
expect(message.reload.thread).not_to eq(nil)
expect(message.reload.in_reply_to.thread).to eq(message.thread)
expect(message.thread.chat_messages.count).to eq(1001)
end
end
context "if the root message alread had a thread" do
fab!(:old_thread) { Fabricate(:chat_thread, original_message: old_message_1) }
fab!(:incorrect_thread) { Fabricate(:chat_thread, channel: public_chat_channel) }
before do
old_message_1.update!(thread: old_thread)
old_message_3.update!(thread: incorrect_thread)
end
it "does not change any messages in the chain, assumes they have the correct thread ID" do
thread_count = ChatThread.count
message =
Chat::ChatMessageCreator.create(
chat_channel: public_chat_channel,
user: user1,
content: "this is a message",
in_reply_to_id: reply_message.id,
).chat_message
expect(ChatThread.count).to eq(thread_count)
expect(message.reload.thread).to eq(old_thread)
expect(message.reload.in_reply_to.thread).to eq(old_thread)
expect(old_message_1.reload.thread).to eq(old_thread)
expect(old_message_2.reload.thread).to eq(old_thread)
expect(old_message_3.reload.thread).to eq(incorrect_thread)
expect(message.thread.chat_messages.count).to eq(4)
end
end
end
end
describe "group mentions" do
it "creates chat mentions for group mentions where the group is mentionable" do
expect {

View File

@ -139,3 +139,14 @@ Fabricator(:chat_draft) do
{ value: attrs[:value], replyToMsg: attrs[:reply_to_msg], uploads: attrs[:uploads] }.to_json
end
end
Fabricator(:chat_thread) do
before_create do |thread, transients|
thread.original_message_user = original_message.user
thread.channel = original_message.chat_channel
end
transient :channel
original_message { |attrs| Fabricate(:chat_message, chat_channel: attrs[:channel]) }
end

View File

@ -36,17 +36,13 @@ describe Chat::MessageMover do
fab!(:message6) { Fabricate(:chat_message, chat_channel: destination_channel) }
let(:move_message_ids) { [message1.id, message2.id, message3.id] }
subject do
described_class.new(
acting_user: acting_user,
source_channel: source_channel,
message_ids: move_message_ids,
)
end
describe "#move_to_channel" do
def move!
subject.move_to_channel(destination_channel)
def move!(move_message_ids = [message1.id, message2.id, message3.id])
described_class.new(
acting_user: acting_user,
source_channel: source_channel,
message_ids: move_message_ids,
).move_to_channel(destination_channel)
end
it "raises an error if either the source or destination channels are not public (they cannot be DM channels)" do
@ -126,5 +122,104 @@ describe Chat::MessageMover do
expect(revision.reload.chat_message_id).to eq(moved_messages.third.id)
expect(webhook_event.reload.chat_message_id).to eq(moved_messages.third.id)
end
it "does not preserve reply chains using in_reply_to_id" do
message3.update!(in_reply_to: message2)
message2.update!(in_reply_to: message1)
move!
moved_messages =
ChatMessage.where(chat_channel: destination_channel).order("created_at ASC, id ASC").last(3)
expect(moved_messages.pluck(:in_reply_to_id).uniq).to eq([nil])
end
it "clears in_reply_to_id for remaining messages when the messages they were replying to are moved" do
message3.update!(in_reply_to: message2)
message2.update!(in_reply_to: message1)
move!([message2.id])
expect(message3.reload.in_reply_to_id).to eq(nil)
end
context "when there is a thread" do
fab!(:thread) { Fabricate(:chat_thread, channel: source_channel, original_message: message1) }
before do
message1.update!(thread: thread)
message2.update!(thread: thread)
message3.update!(thread: thread)
end
it "does not preserve thread_ids" do
move!
moved_messages =
ChatMessage
.where(chat_channel: destination_channel)
.order("created_at ASC, id ASC")
.last(3)
expect(moved_messages.pluck(:thread_id).uniq).to eq([nil])
end
it "clears in_reply_to_id for remaining messages when the messages they were replying to are moved but leaves the thread_id" do
message3.update!(in_reply_to: message2)
message2.update!(in_reply_to: message1)
move!([message2.id])
expect(message3.reload.in_reply_to_id).to eq(nil)
expect(message3.reload.thread).to eq(thread)
end
context "when a thread original message is moved" do
it "creates a new thread for the messages left behind in the old channel" do
message4 =
Fabricate(
:chat_message,
chat_channel: source_channel,
message: "the fourth message",
in_reply_to: message3,
thread: thread,
)
message5 =
Fabricate(
:chat_message,
chat_channel: source_channel,
message: "the fifth message",
thread: thread,
)
expect { move! }.to change { ChatThread.count }.by(1)
new_thread = ChatThread.last
expect(message4.reload.thread_id).to eq(new_thread.id)
expect(message5.reload.thread_id).to eq(new_thread.id)
expect(new_thread.channel).to eq(source_channel)
expect(new_thread.original_message).to eq(message4)
end
end
context "when multiple thread original messages are moved" do
it "works the same as when one is" do
message4 =
Fabricate(:chat_message, chat_channel: source_channel, message: "the fourth message")
message5 =
Fabricate(
:chat_message,
chat_channel: source_channel,
in_reply_to: message5,
message: "the fifth message",
)
other_thread =
Fabricate(:chat_thread, channel: source_channel, original_message: message4)
message4.update!(thread: other_thread)
message5.update!(thread: other_thread)
expect { move!([message1.id, message4.id]) }.to change { ChatThread.count }.by(2)
new_threads = ChatThread.order(:created_at).last(2)
expect(message3.reload.thread_id).to eq(new_threads.first.id)
expect(message5.reload.thread_id).to eq(new_threads.second.id)
expect(new_threads.first.channel).to eq(source_channel)
expect(new_threads.second.channel).to eq(source_channel)
expect(new_threads.first.original_message).to eq(message2)
expect(new_threads.second.original_message).to eq(message5)
end
end
end
end
end